send_stream.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. package quic
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/Psiphon-Labs/quic-go/internal/ackhandler"
  8. "github.com/Psiphon-Labs/quic-go/internal/flowcontrol"
  9. "github.com/Psiphon-Labs/quic-go/internal/protocol"
  10. "github.com/Psiphon-Labs/quic-go/internal/utils"
  11. "github.com/Psiphon-Labs/quic-go/internal/wire"
  12. )
  13. type sendStreamI interface {
  14. SendStream
  15. handleStopSendingFrame(*wire.StopSendingFrame)
  16. hasData() bool
  17. popStreamFrame(maxBytes protocol.ByteCount) (*ackhandler.Frame, bool)
  18. closeForShutdown(error)
  19. handleMaxStreamDataFrame(*wire.MaxStreamDataFrame)
  20. }
  21. type sendStream struct {
  22. mutex sync.Mutex
  23. numOutstandingFrames int64
  24. retransmissionQueue []*wire.StreamFrame
  25. ctx context.Context
  26. ctxCancel context.CancelFunc
  27. streamID protocol.StreamID
  28. sender streamSender
  29. writeOffset protocol.ByteCount
  30. cancelWriteErr error
  31. closeForShutdownErr error
  32. closedForShutdown bool // set when CloseForShutdown() is called
  33. finishedWriting bool // set once Close() is called
  34. canceledWrite bool // set when CancelWrite() is called, or a STOP_SENDING frame is received
  35. finSent bool // set when a STREAM_FRAME with FIN bit has been sent
  36. completed bool // set when this stream has been reported to the streamSender as completed
  37. dataForWriting []byte
  38. writeChan chan struct{}
  39. deadline time.Time
  40. flowController flowcontrol.StreamFlowController
  41. version protocol.VersionNumber
  42. }
  43. var _ SendStream = &sendStream{}
  44. var _ sendStreamI = &sendStream{}
  45. func newSendStream(
  46. streamID protocol.StreamID,
  47. sender streamSender,
  48. flowController flowcontrol.StreamFlowController,
  49. version protocol.VersionNumber,
  50. ) *sendStream {
  51. s := &sendStream{
  52. streamID: streamID,
  53. sender: sender,
  54. flowController: flowController,
  55. writeChan: make(chan struct{}, 1),
  56. version: version,
  57. }
  58. s.ctx, s.ctxCancel = context.WithCancel(context.Background())
  59. return s
  60. }
  61. func (s *sendStream) StreamID() protocol.StreamID {
  62. return s.streamID // same for receiveStream and sendStream
  63. }
  64. func (s *sendStream) Write(p []byte) (int, error) {
  65. s.mutex.Lock()
  66. defer s.mutex.Unlock()
  67. if s.finishedWriting {
  68. return 0, fmt.Errorf("write on closed stream %d", s.streamID)
  69. }
  70. if s.canceledWrite {
  71. return 0, s.cancelWriteErr
  72. }
  73. if s.closeForShutdownErr != nil {
  74. return 0, s.closeForShutdownErr
  75. }
  76. if !s.deadline.IsZero() && !time.Now().Before(s.deadline) {
  77. return 0, errDeadline
  78. }
  79. if len(p) == 0 {
  80. return 0, nil
  81. }
  82. s.dataForWriting = p
  83. var (
  84. deadlineTimer *utils.Timer
  85. bytesWritten int
  86. notifiedSender bool
  87. )
  88. for {
  89. bytesWritten = len(p) - len(s.dataForWriting)
  90. deadline := s.deadline
  91. if !deadline.IsZero() {
  92. if !time.Now().Before(deadline) {
  93. s.dataForWriting = nil
  94. return bytesWritten, errDeadline
  95. }
  96. if deadlineTimer == nil {
  97. deadlineTimer = utils.NewTimer()
  98. }
  99. deadlineTimer.Reset(deadline)
  100. }
  101. if s.dataForWriting == nil || s.canceledWrite || s.closedForShutdown {
  102. break
  103. }
  104. s.mutex.Unlock()
  105. if !notifiedSender {
  106. s.sender.onHasStreamData(s.streamID) // must be called without holding the mutex
  107. notifiedSender = true
  108. }
  109. if deadline.IsZero() {
  110. <-s.writeChan
  111. } else {
  112. select {
  113. case <-s.writeChan:
  114. case <-deadlineTimer.Chan():
  115. deadlineTimer.SetRead()
  116. }
  117. }
  118. s.mutex.Lock()
  119. }
  120. // [Psiphon]
  121. // Stop timer to immediately release resources
  122. if deadlineTimer != nil {
  123. deadlineTimer.Reset(time.Time{})
  124. }
  125. if s.closeForShutdownErr != nil {
  126. return bytesWritten, s.closeForShutdownErr
  127. } else if s.cancelWriteErr != nil {
  128. return bytesWritten, s.cancelWriteErr
  129. }
  130. return bytesWritten, nil
  131. }
  132. // popStreamFrame returns the next STREAM frame that is supposed to be sent on this stream
  133. // maxBytes is the maximum length this frame (including frame header) will have.
  134. func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*ackhandler.Frame, bool /* has more data to send */) {
  135. s.mutex.Lock()
  136. f, hasMoreData := s.popNewOrRetransmittedStreamFrame(maxBytes)
  137. if f != nil {
  138. s.numOutstandingFrames++
  139. }
  140. s.mutex.Unlock()
  141. if f == nil {
  142. return nil, hasMoreData
  143. }
  144. return &ackhandler.Frame{Frame: f, OnLost: s.queueRetransmission, OnAcked: s.frameAcked}, hasMoreData
  145. }
  146. func (s *sendStream) popNewOrRetransmittedStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more data to send */) {
  147. if len(s.retransmissionQueue) > 0 {
  148. f, hasMoreRetransmissions := s.maybeGetRetransmission(maxBytes)
  149. if f != nil || hasMoreRetransmissions {
  150. if f == nil {
  151. return nil, true
  152. }
  153. // We always claim that we have more data to send.
  154. // This might be incorrect, in which case there'll be a spurious call to popStreamFrame in the future.
  155. return f, true
  156. }
  157. }
  158. f := wire.GetStreamFrame()
  159. f.FinBit = false
  160. f.StreamID = s.streamID
  161. f.Offset = s.writeOffset
  162. f.DataLenPresent = true
  163. f.Data = f.Data[:0]
  164. hasMoreData := s.popNewStreamFrame(f, maxBytes)
  165. if len(f.Data) == 0 && !f.FinBit {
  166. f.PutBack()
  167. return nil, hasMoreData
  168. }
  169. return f, hasMoreData
  170. }
  171. func (s *sendStream) popNewStreamFrame(f *wire.StreamFrame, maxBytes protocol.ByteCount) bool {
  172. if s.canceledWrite || s.closeForShutdownErr != nil {
  173. return false
  174. }
  175. maxDataLen := f.MaxDataLen(maxBytes, s.version)
  176. if maxDataLen == 0 { // a STREAM frame must have at least one byte of data
  177. return s.dataForWriting != nil
  178. }
  179. s.getDataForWriting(f, maxDataLen)
  180. if len(f.Data) == 0 && !f.FinBit {
  181. // this can happen if:
  182. // - popStreamFrame is called but there's no data for writing
  183. // - there's data for writing, but the stream is stream-level flow control blocked
  184. // - there's data for writing, but the stream is connection-level flow control blocked
  185. if s.dataForWriting == nil {
  186. return false
  187. }
  188. if isBlocked, offset := s.flowController.IsNewlyBlocked(); isBlocked {
  189. s.sender.queueControlFrame(&wire.StreamDataBlockedFrame{
  190. StreamID: s.streamID,
  191. DataLimit: offset,
  192. })
  193. return false
  194. }
  195. return true
  196. }
  197. if f.FinBit {
  198. s.finSent = true
  199. }
  200. return s.dataForWriting != nil
  201. }
  202. func (s *sendStream) maybeGetRetransmission(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more retransmissions */) {
  203. f := s.retransmissionQueue[0]
  204. newFrame, needsSplit := f.MaybeSplitOffFrame(maxBytes, s.version)
  205. if needsSplit {
  206. return newFrame, true
  207. }
  208. s.retransmissionQueue = s.retransmissionQueue[1:]
  209. return f, len(s.retransmissionQueue) > 0
  210. }
  211. func (s *sendStream) hasData() bool {
  212. s.mutex.Lock()
  213. hasData := len(s.dataForWriting) > 0
  214. s.mutex.Unlock()
  215. return hasData
  216. }
  217. func (s *sendStream) getDataForWriting(f *wire.StreamFrame, maxBytes protocol.ByteCount) {
  218. if s.dataForWriting == nil {
  219. f.FinBit = s.finishedWriting && !s.finSent
  220. return
  221. }
  222. maxBytes = utils.MinByteCount(maxBytes, s.flowController.SendWindowSize())
  223. if maxBytes == 0 {
  224. return
  225. }
  226. if protocol.ByteCount(len(s.dataForWriting)) > maxBytes {
  227. f.Data = f.Data[:maxBytes]
  228. copy(f.Data, s.dataForWriting)
  229. s.dataForWriting = s.dataForWriting[maxBytes:]
  230. } else {
  231. f.Data = f.Data[:len(s.dataForWriting)]
  232. copy(f.Data, s.dataForWriting)
  233. s.dataForWriting = nil
  234. s.signalWrite()
  235. }
  236. s.writeOffset += f.DataLen()
  237. s.flowController.AddBytesSent(f.DataLen())
  238. f.FinBit = s.finishedWriting && s.dataForWriting == nil && !s.finSent
  239. }
  240. func (s *sendStream) frameAcked(f wire.Frame) {
  241. f.(*wire.StreamFrame).PutBack()
  242. s.mutex.Lock()
  243. s.numOutstandingFrames--
  244. if s.numOutstandingFrames < 0 {
  245. panic("numOutStandingFrames negative")
  246. }
  247. newlyCompleted := s.isNewlyCompleted()
  248. s.mutex.Unlock()
  249. if newlyCompleted {
  250. s.sender.onStreamCompleted(s.streamID)
  251. }
  252. }
  253. func (s *sendStream) isNewlyCompleted() bool {
  254. completed := (s.finSent || s.canceledWrite) && s.numOutstandingFrames == 0 && len(s.retransmissionQueue) == 0
  255. if completed && !s.completed {
  256. s.completed = true
  257. return true
  258. }
  259. return false
  260. }
  261. func (s *sendStream) queueRetransmission(f wire.Frame) {
  262. sf := f.(*wire.StreamFrame)
  263. sf.DataLenPresent = true
  264. s.mutex.Lock()
  265. s.retransmissionQueue = append(s.retransmissionQueue, sf)
  266. s.numOutstandingFrames--
  267. if s.numOutstandingFrames < 0 {
  268. panic("numOutStandingFrames negative")
  269. }
  270. s.mutex.Unlock()
  271. s.sender.onHasStreamData(s.streamID)
  272. }
  273. func (s *sendStream) Close() error {
  274. s.mutex.Lock()
  275. if s.canceledWrite {
  276. s.mutex.Unlock()
  277. return fmt.Errorf("Close called for canceled stream %d", s.streamID)
  278. }
  279. s.ctxCancel()
  280. s.finishedWriting = true
  281. s.mutex.Unlock()
  282. s.sender.onHasStreamData(s.streamID) // need to send the FIN, must be called without holding the mutex
  283. return nil
  284. }
  285. func (s *sendStream) CancelWrite(errorCode protocol.ApplicationErrorCode) {
  286. s.cancelWriteImpl(errorCode, fmt.Errorf("Write on stream %d canceled with error code %d", s.streamID, errorCode))
  287. }
  288. // must be called after locking the mutex
  289. func (s *sendStream) cancelWriteImpl(errorCode protocol.ApplicationErrorCode, writeErr error) {
  290. s.mutex.Lock()
  291. if s.canceledWrite {
  292. s.mutex.Unlock()
  293. return
  294. }
  295. s.ctxCancel()
  296. s.canceledWrite = true
  297. s.cancelWriteErr = writeErr
  298. newlyCompleted := s.isNewlyCompleted()
  299. s.mutex.Unlock()
  300. s.signalWrite()
  301. s.sender.queueControlFrame(&wire.ResetStreamFrame{
  302. StreamID: s.streamID,
  303. ByteOffset: s.writeOffset,
  304. ErrorCode: errorCode,
  305. })
  306. if newlyCompleted {
  307. s.sender.onStreamCompleted(s.streamID)
  308. }
  309. }
  310. func (s *sendStream) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) {
  311. s.mutex.Lock()
  312. hasStreamData := s.dataForWriting != nil
  313. s.mutex.Unlock()
  314. s.flowController.UpdateSendWindow(frame.ByteOffset)
  315. if hasStreamData {
  316. s.sender.onHasStreamData(s.streamID)
  317. }
  318. }
  319. func (s *sendStream) handleStopSendingFrame(frame *wire.StopSendingFrame) {
  320. writeErr := streamCanceledError{
  321. errorCode: frame.ErrorCode,
  322. error: fmt.Errorf("stream %d was reset with error code %d", s.streamID, frame.ErrorCode),
  323. }
  324. s.cancelWriteImpl(frame.ErrorCode, writeErr)
  325. }
  326. func (s *sendStream) Context() context.Context {
  327. return s.ctx
  328. }
  329. func (s *sendStream) SetWriteDeadline(t time.Time) error {
  330. s.mutex.Lock()
  331. s.deadline = t
  332. s.mutex.Unlock()
  333. s.signalWrite()
  334. return nil
  335. }
  336. // CloseForShutdown closes a stream abruptly.
  337. // It makes Write unblock (and return the error) immediately.
  338. // The peer will NOT be informed about this: the stream is closed without sending a FIN or RST.
  339. func (s *sendStream) closeForShutdown(err error) {
  340. s.mutex.Lock()
  341. s.ctxCancel()
  342. s.closedForShutdown = true
  343. s.closeForShutdownErr = err
  344. s.mutex.Unlock()
  345. s.signalWrite()
  346. }
  347. // signalWrite performs a non-blocking send on the writeChan
  348. func (s *sendStream) signalWrite() {
  349. select {
  350. case s.writeChan <- struct{}{}:
  351. default:
  352. }
  353. }