receive_stream.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. package quic
  2. import (
  3. "fmt"
  4. "io"
  5. "sync"
  6. "time"
  7. "github.com/Psiphon-Labs/quic-go/internal/ackhandler"
  8. "github.com/Psiphon-Labs/quic-go/internal/flowcontrol"
  9. "github.com/Psiphon-Labs/quic-go/internal/protocol"
  10. "github.com/Psiphon-Labs/quic-go/internal/qerr"
  11. "github.com/Psiphon-Labs/quic-go/internal/utils"
  12. "github.com/Psiphon-Labs/quic-go/internal/wire"
  13. )
  14. type receiveStreamI interface {
  15. ReceiveStream
  16. handleStreamFrame(*wire.StreamFrame, time.Time) error
  17. handleResetStreamFrame(*wire.ResetStreamFrame, time.Time) error
  18. closeForShutdown(error)
  19. }
  20. type receiveStream struct {
  21. mutex sync.Mutex
  22. streamID protocol.StreamID
  23. sender streamSender
  24. frameQueue *frameSorter
  25. finalOffset protocol.ByteCount
  26. currentFrame []byte
  27. currentFrameDone func()
  28. readPosInFrame int
  29. currentFrameIsLast bool // is the currentFrame the last frame on this stream
  30. queuedStopSending bool
  31. queuedMaxStreamData bool
  32. // Set once we read the io.EOF or the cancellation error.
  33. // Note that for local cancellations, this doesn't necessarily mean that we know the final offset yet.
  34. errorRead bool
  35. completed bool // set once we've called streamSender.onStreamCompleted
  36. cancelledRemotely bool
  37. cancelledLocally bool
  38. cancelErr *StreamError
  39. closeForShutdownErr error
  40. readChan chan struct{}
  41. readOnce chan struct{} // cap: 1, to protect against concurrent use of Read
  42. deadline time.Time
  43. flowController flowcontrol.StreamFlowController
  44. }
  45. var (
  46. _ ReceiveStream = &receiveStream{}
  47. _ receiveStreamI = &receiveStream{}
  48. _ streamControlFrameGetter = &receiveStream{}
  49. )
  50. func newReceiveStream(
  51. streamID protocol.StreamID,
  52. sender streamSender,
  53. flowController flowcontrol.StreamFlowController,
  54. ) *receiveStream {
  55. return &receiveStream{
  56. streamID: streamID,
  57. sender: sender,
  58. flowController: flowController,
  59. frameQueue: newFrameSorter(),
  60. readChan: make(chan struct{}, 1),
  61. readOnce: make(chan struct{}, 1),
  62. finalOffset: protocol.MaxByteCount,
  63. }
  64. }
  65. func (s *receiveStream) StreamID() protocol.StreamID {
  66. return s.streamID
  67. }
  68. // Read implements io.Reader. It is not thread safe!
  69. func (s *receiveStream) Read(p []byte) (int, error) {
  70. // Concurrent use of Read is not permitted (and doesn't make any sense),
  71. // but sometimes people do it anyway.
  72. // Make sure that we only execute one call at any given time to avoid hard to debug failures.
  73. s.readOnce <- struct{}{}
  74. defer func() { <-s.readOnce }()
  75. s.mutex.Lock()
  76. queuedStreamWindowUpdate, queuedConnWindowUpdate, n, err := s.readImpl(p)
  77. completed := s.isNewlyCompleted()
  78. s.mutex.Unlock()
  79. if completed {
  80. s.sender.onStreamCompleted(s.streamID)
  81. }
  82. if queuedStreamWindowUpdate {
  83. s.sender.onHasStreamControlFrame(s.streamID, s)
  84. }
  85. if queuedConnWindowUpdate {
  86. s.sender.onHasConnectionData()
  87. }
  88. return n, err
  89. }
  90. func (s *receiveStream) isNewlyCompleted() bool {
  91. if s.completed {
  92. return false
  93. }
  94. // We need to know the final offset (either via FIN or RESET_STREAM) for flow control accounting.
  95. if s.finalOffset == protocol.MaxByteCount {
  96. return false
  97. }
  98. // We're done with the stream if it was cancelled locally...
  99. if s.cancelledLocally {
  100. s.completed = true
  101. return true
  102. }
  103. // ... or if the error (either io.EOF or the reset error) was read
  104. if s.errorRead {
  105. s.completed = true
  106. return true
  107. }
  108. return false
  109. }
  110. func (s *receiveStream) readImpl(p []byte) (hasStreamWindowUpdate bool, hasConnWindowUpdate bool, _ int, _ error) {
  111. if s.currentFrameIsLast && s.currentFrame == nil {
  112. s.errorRead = true
  113. return false, false, 0, io.EOF
  114. }
  115. if s.cancelledRemotely || s.cancelledLocally {
  116. s.errorRead = true
  117. return false, false, 0, s.cancelErr
  118. }
  119. if s.closeForShutdownErr != nil {
  120. return false, false, 0, s.closeForShutdownErr
  121. }
  122. var bytesRead int
  123. var deadlineTimer *utils.Timer
  124. for bytesRead < len(p) {
  125. if s.currentFrame == nil || s.readPosInFrame >= len(s.currentFrame) {
  126. s.dequeueNextFrame()
  127. }
  128. if s.currentFrame == nil && bytesRead > 0 {
  129. return hasStreamWindowUpdate, hasConnWindowUpdate, bytesRead, s.closeForShutdownErr
  130. }
  131. for {
  132. // Stop waiting on errors
  133. if s.closeForShutdownErr != nil {
  134. return hasStreamWindowUpdate, hasConnWindowUpdate, bytesRead, s.closeForShutdownErr
  135. }
  136. if s.cancelledRemotely || s.cancelledLocally {
  137. s.errorRead = true
  138. return hasStreamWindowUpdate, hasConnWindowUpdate, 0, s.cancelErr
  139. }
  140. deadline := s.deadline
  141. if !deadline.IsZero() {
  142. if !time.Now().Before(deadline) {
  143. return hasStreamWindowUpdate, hasConnWindowUpdate, bytesRead, errDeadline
  144. }
  145. if deadlineTimer == nil {
  146. deadlineTimer = utils.NewTimer()
  147. defer deadlineTimer.Stop()
  148. }
  149. deadlineTimer.Reset(deadline)
  150. }
  151. if s.currentFrame != nil || s.currentFrameIsLast {
  152. break
  153. }
  154. s.mutex.Unlock()
  155. if deadline.IsZero() {
  156. <-s.readChan
  157. } else {
  158. select {
  159. case <-s.readChan:
  160. case <-deadlineTimer.Chan():
  161. deadlineTimer.SetRead()
  162. }
  163. }
  164. s.mutex.Lock()
  165. if s.currentFrame == nil {
  166. s.dequeueNextFrame()
  167. }
  168. }
  169. if bytesRead > len(p) {
  170. return hasStreamWindowUpdate, hasConnWindowUpdate, bytesRead, fmt.Errorf("BUG: bytesRead (%d) > len(p) (%d) in stream.Read", bytesRead, len(p))
  171. }
  172. if s.readPosInFrame > len(s.currentFrame) {
  173. return hasStreamWindowUpdate, hasConnWindowUpdate, bytesRead, fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", s.readPosInFrame, len(s.currentFrame))
  174. }
  175. m := copy(p[bytesRead:], s.currentFrame[s.readPosInFrame:])
  176. s.readPosInFrame += m
  177. bytesRead += m
  178. // when a RESET_STREAM was received, the flow controller was already
  179. // informed about the final byteOffset for this stream
  180. if !s.cancelledRemotely {
  181. hasStream, hasConn := s.flowController.AddBytesRead(protocol.ByteCount(m))
  182. if hasStream {
  183. s.queuedMaxStreamData = true
  184. hasStreamWindowUpdate = true
  185. }
  186. if hasConn {
  187. hasConnWindowUpdate = true
  188. }
  189. }
  190. if s.readPosInFrame >= len(s.currentFrame) && s.currentFrameIsLast {
  191. s.currentFrame = nil
  192. if s.currentFrameDone != nil {
  193. s.currentFrameDone()
  194. }
  195. s.errorRead = true
  196. return hasStreamWindowUpdate, hasConnWindowUpdate, bytesRead, io.EOF
  197. }
  198. }
  199. return hasStreamWindowUpdate, hasConnWindowUpdate, bytesRead, nil
  200. }
  201. func (s *receiveStream) dequeueNextFrame() {
  202. var offset protocol.ByteCount
  203. // We're done with the last frame. Release the buffer.
  204. if s.currentFrameDone != nil {
  205. s.currentFrameDone()
  206. }
  207. offset, s.currentFrame, s.currentFrameDone = s.frameQueue.Pop()
  208. s.currentFrameIsLast = offset+protocol.ByteCount(len(s.currentFrame)) >= s.finalOffset
  209. s.readPosInFrame = 0
  210. }
  211. func (s *receiveStream) CancelRead(errorCode StreamErrorCode) {
  212. s.mutex.Lock()
  213. queuedNewControlFrame := s.cancelReadImpl(errorCode)
  214. completed := s.isNewlyCompleted()
  215. s.mutex.Unlock()
  216. if queuedNewControlFrame {
  217. s.sender.onHasStreamControlFrame(s.streamID, s)
  218. }
  219. if completed {
  220. s.flowController.Abandon()
  221. s.sender.onStreamCompleted(s.streamID)
  222. }
  223. }
  224. func (s *receiveStream) cancelReadImpl(errorCode qerr.StreamErrorCode) (queuedNewControlFrame bool) {
  225. if s.cancelledLocally { // duplicate call to CancelRead
  226. return false
  227. }
  228. if s.closeForShutdownErr != nil {
  229. return false
  230. }
  231. s.cancelledLocally = true
  232. if s.errorRead || s.cancelledRemotely {
  233. return false
  234. }
  235. s.queuedStopSending = true
  236. s.cancelErr = &StreamError{StreamID: s.streamID, ErrorCode: errorCode, Remote: false}
  237. s.signalRead()
  238. return true
  239. }
  240. func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame, now time.Time) error {
  241. s.mutex.Lock()
  242. err := s.handleStreamFrameImpl(frame, now)
  243. completed := s.isNewlyCompleted()
  244. s.mutex.Unlock()
  245. if completed {
  246. s.flowController.Abandon()
  247. s.sender.onStreamCompleted(s.streamID)
  248. }
  249. return err
  250. }
  251. func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame, now time.Time) error {
  252. maxOffset := frame.Offset + frame.DataLen()
  253. if err := s.flowController.UpdateHighestReceived(maxOffset, frame.Fin, now); err != nil {
  254. return err
  255. }
  256. if frame.Fin {
  257. s.finalOffset = maxOffset
  258. }
  259. if s.cancelledLocally {
  260. return nil
  261. }
  262. if err := s.frameQueue.Push(frame.Data, frame.Offset, frame.PutBack); err != nil {
  263. return err
  264. }
  265. s.signalRead()
  266. return nil
  267. }
  268. func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame, now time.Time) error {
  269. s.mutex.Lock()
  270. err := s.handleResetStreamFrameImpl(frame, now)
  271. completed := s.isNewlyCompleted()
  272. s.mutex.Unlock()
  273. if completed {
  274. s.sender.onStreamCompleted(s.streamID)
  275. }
  276. return err
  277. }
  278. func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame, now time.Time) error {
  279. if s.closeForShutdownErr != nil {
  280. return nil
  281. }
  282. if err := s.flowController.UpdateHighestReceived(frame.FinalSize, true, now); err != nil {
  283. return err
  284. }
  285. s.finalOffset = frame.FinalSize
  286. // ignore duplicate RESET_STREAM frames for this stream (after checking their final offset)
  287. if s.cancelledRemotely {
  288. return nil
  289. }
  290. s.flowController.Abandon()
  291. // don't save the error if the RESET_STREAM frames was received after CancelRead was called
  292. if s.cancelledLocally {
  293. return nil
  294. }
  295. s.cancelledRemotely = true
  296. s.cancelErr = &StreamError{StreamID: s.streamID, ErrorCode: frame.ErrorCode, Remote: true}
  297. s.signalRead()
  298. return nil
  299. }
  300. func (s *receiveStream) getControlFrame(now time.Time) (_ ackhandler.Frame, ok, hasMore bool) {
  301. s.mutex.Lock()
  302. defer s.mutex.Unlock()
  303. if !s.queuedStopSending && !s.queuedMaxStreamData {
  304. return ackhandler.Frame{}, false, false
  305. }
  306. if s.queuedStopSending {
  307. s.queuedStopSending = false
  308. return ackhandler.Frame{
  309. Frame: &wire.StopSendingFrame{StreamID: s.streamID, ErrorCode: s.cancelErr.ErrorCode},
  310. }, true, s.queuedMaxStreamData
  311. }
  312. s.queuedMaxStreamData = false
  313. return ackhandler.Frame{
  314. Frame: &wire.MaxStreamDataFrame{
  315. StreamID: s.streamID,
  316. MaximumStreamData: s.flowController.GetWindowUpdate(now),
  317. },
  318. }, true, false
  319. }
  320. func (s *receiveStream) SetReadDeadline(t time.Time) error {
  321. s.mutex.Lock()
  322. s.deadline = t
  323. s.mutex.Unlock()
  324. s.signalRead()
  325. return nil
  326. }
  327. // CloseForShutdown closes a stream abruptly.
  328. // It makes Read unblock (and return the error) immediately.
  329. // The peer will NOT be informed about this: the stream is closed without sending a FIN or RESET.
  330. func (s *receiveStream) closeForShutdown(err error) {
  331. s.mutex.Lock()
  332. s.closeForShutdownErr = err
  333. s.mutex.Unlock()
  334. s.signalRead()
  335. }
  336. // signalRead performs a non-blocking send on the readChan
  337. func (s *receiveStream) signalRead() {
  338. select {
  339. case s.readChan <- struct{}{}:
  340. default:
  341. }
  342. }