receive_stream.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. package quic
  2. import (
  3. "fmt"
  4. "io"
  5. "sync"
  6. "time"
  7. "github.com/Psiphon-Labs/quic-go/internal/flowcontrol"
  8. "github.com/Psiphon-Labs/quic-go/internal/protocol"
  9. "github.com/Psiphon-Labs/quic-go/internal/qerr"
  10. "github.com/Psiphon-Labs/quic-go/internal/utils"
  11. "github.com/Psiphon-Labs/quic-go/internal/wire"
  12. )
  13. type receiveStreamI interface {
  14. ReceiveStream
  15. handleStreamFrame(*wire.StreamFrame) error
  16. handleResetStreamFrame(*wire.ResetStreamFrame) error
  17. closeForShutdown(error)
  18. getWindowUpdate() protocol.ByteCount
  19. }
  20. type receiveStream struct {
  21. mutex sync.Mutex
  22. streamID protocol.StreamID
  23. sender streamSender
  24. frameQueue *frameSorter
  25. finalOffset protocol.ByteCount
  26. currentFrame []byte
  27. currentFrameDone func()
  28. currentFrameIsLast bool // is the currentFrame the last frame on this stream
  29. readPosInFrame int
  30. closeForShutdownErr error
  31. cancelReadErr error
  32. resetRemotelyErr *StreamError
  33. closedForShutdown bool // set when CloseForShutdown() is called
  34. finRead bool // set once we read a frame with a Fin
  35. canceledRead bool // set when CancelRead() is called
  36. resetRemotely bool // set when HandleResetStreamFrame() is called
  37. readChan chan struct{}
  38. readOnce chan struct{} // cap: 1, to protect against concurrent use of Read
  39. deadline time.Time
  40. flowController flowcontrol.StreamFlowController
  41. version protocol.VersionNumber
  42. }
  43. var (
  44. _ ReceiveStream = &receiveStream{}
  45. _ receiveStreamI = &receiveStream{}
  46. )
  47. func newReceiveStream(
  48. streamID protocol.StreamID,
  49. sender streamSender,
  50. flowController flowcontrol.StreamFlowController,
  51. version protocol.VersionNumber,
  52. ) *receiveStream {
  53. return &receiveStream{
  54. streamID: streamID,
  55. sender: sender,
  56. flowController: flowController,
  57. frameQueue: newFrameSorter(),
  58. readChan: make(chan struct{}, 1),
  59. readOnce: make(chan struct{}, 1),
  60. finalOffset: protocol.MaxByteCount,
  61. version: version,
  62. }
  63. }
  64. func (s *receiveStream) StreamID() protocol.StreamID {
  65. return s.streamID
  66. }
  67. // Read implements io.Reader. It is not thread safe!
  68. func (s *receiveStream) Read(p []byte) (int, error) {
  69. // Concurrent use of Read is not permitted (and doesn't make any sense),
  70. // but sometimes people do it anyway.
  71. // Make sure that we only execute one call at any given time to avoid hard to debug failures.
  72. s.readOnce <- struct{}{}
  73. defer func() { <-s.readOnce }()
  74. s.mutex.Lock()
  75. completed, n, err := s.readImpl(p)
  76. s.mutex.Unlock()
  77. if completed {
  78. s.sender.onStreamCompleted(s.streamID)
  79. }
  80. return n, err
  81. }
  82. func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, error) {
  83. if s.finRead {
  84. return false, 0, io.EOF
  85. }
  86. if s.canceledRead {
  87. return false, 0, s.cancelReadErr
  88. }
  89. if s.resetRemotely {
  90. return false, 0, s.resetRemotelyErr
  91. }
  92. if s.closedForShutdown {
  93. return false, 0, s.closeForShutdownErr
  94. }
  95. var bytesRead int
  96. var deadlineTimer *utils.Timer
  97. for bytesRead < len(p) {
  98. if s.currentFrame == nil || s.readPosInFrame >= len(s.currentFrame) {
  99. s.dequeueNextFrame()
  100. }
  101. if s.currentFrame == nil && bytesRead > 0 {
  102. return false, bytesRead, s.closeForShutdownErr
  103. }
  104. for {
  105. // Stop waiting on errors
  106. if s.closedForShutdown {
  107. return false, bytesRead, s.closeForShutdownErr
  108. }
  109. if s.canceledRead {
  110. return false, bytesRead, s.cancelReadErr
  111. }
  112. if s.resetRemotely {
  113. return false, bytesRead, s.resetRemotelyErr
  114. }
  115. deadline := s.deadline
  116. if !deadline.IsZero() {
  117. if !time.Now().Before(deadline) {
  118. return false, bytesRead, errDeadline
  119. }
  120. if deadlineTimer == nil {
  121. deadlineTimer = utils.NewTimer()
  122. defer deadlineTimer.Stop()
  123. }
  124. deadlineTimer.Reset(deadline)
  125. }
  126. if s.currentFrame != nil || s.currentFrameIsLast {
  127. break
  128. }
  129. s.mutex.Unlock()
  130. if deadline.IsZero() {
  131. <-s.readChan
  132. } else {
  133. select {
  134. case <-s.readChan:
  135. case <-deadlineTimer.Chan():
  136. deadlineTimer.SetRead()
  137. }
  138. }
  139. s.mutex.Lock()
  140. if s.currentFrame == nil {
  141. s.dequeueNextFrame()
  142. }
  143. }
  144. if bytesRead > len(p) {
  145. return false, bytesRead, fmt.Errorf("BUG: bytesRead (%d) > len(p) (%d) in stream.Read", bytesRead, len(p))
  146. }
  147. if s.readPosInFrame > len(s.currentFrame) {
  148. return false, bytesRead, fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", s.readPosInFrame, len(s.currentFrame))
  149. }
  150. m := copy(p[bytesRead:], s.currentFrame[s.readPosInFrame:])
  151. s.readPosInFrame += m
  152. bytesRead += m
  153. // when a RESET_STREAM was received, the was already informed about the final byteOffset for this stream
  154. if !s.resetRemotely {
  155. s.flowController.AddBytesRead(protocol.ByteCount(m))
  156. }
  157. if s.readPosInFrame >= len(s.currentFrame) && s.currentFrameIsLast {
  158. s.finRead = true
  159. return true, bytesRead, io.EOF
  160. }
  161. }
  162. return false, bytesRead, nil
  163. }
  164. func (s *receiveStream) dequeueNextFrame() {
  165. var offset protocol.ByteCount
  166. // We're done with the last frame. Release the buffer.
  167. if s.currentFrameDone != nil {
  168. s.currentFrameDone()
  169. }
  170. offset, s.currentFrame, s.currentFrameDone = s.frameQueue.Pop()
  171. s.currentFrameIsLast = offset+protocol.ByteCount(len(s.currentFrame)) >= s.finalOffset
  172. s.readPosInFrame = 0
  173. }
  174. func (s *receiveStream) CancelRead(errorCode StreamErrorCode) {
  175. s.mutex.Lock()
  176. completed := s.cancelReadImpl(errorCode)
  177. s.mutex.Unlock()
  178. if completed {
  179. s.flowController.Abandon()
  180. s.sender.onStreamCompleted(s.streamID)
  181. }
  182. }
  183. func (s *receiveStream) cancelReadImpl(errorCode qerr.StreamErrorCode) bool /* completed */ {
  184. if s.finRead || s.canceledRead || s.resetRemotely {
  185. return false
  186. }
  187. s.canceledRead = true
  188. s.cancelReadErr = fmt.Errorf("Read on stream %d canceled with error code %d", s.streamID, errorCode)
  189. s.signalRead()
  190. s.sender.queueControlFrame(&wire.StopSendingFrame{
  191. StreamID: s.streamID,
  192. ErrorCode: errorCode,
  193. })
  194. // We're done with this stream if the final offset was already received.
  195. return s.finalOffset != protocol.MaxByteCount
  196. }
  197. func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error {
  198. s.mutex.Lock()
  199. completed, err := s.handleStreamFrameImpl(frame)
  200. s.mutex.Unlock()
  201. if completed {
  202. s.flowController.Abandon()
  203. s.sender.onStreamCompleted(s.streamID)
  204. }
  205. return err
  206. }
  207. func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame) (bool /* completed */, error) {
  208. maxOffset := frame.Offset + frame.DataLen()
  209. if err := s.flowController.UpdateHighestReceived(maxOffset, frame.Fin); err != nil {
  210. return false, err
  211. }
  212. var newlyRcvdFinalOffset bool
  213. if frame.Fin {
  214. newlyRcvdFinalOffset = s.finalOffset == protocol.MaxByteCount
  215. s.finalOffset = maxOffset
  216. }
  217. if s.canceledRead {
  218. return newlyRcvdFinalOffset, nil
  219. }
  220. if err := s.frameQueue.Push(frame.Data, frame.Offset, frame.PutBack); err != nil {
  221. return false, err
  222. }
  223. s.signalRead()
  224. return false, nil
  225. }
  226. func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) error {
  227. s.mutex.Lock()
  228. completed, err := s.handleResetStreamFrameImpl(frame)
  229. s.mutex.Unlock()
  230. if completed {
  231. s.flowController.Abandon()
  232. s.sender.onStreamCompleted(s.streamID)
  233. }
  234. return err
  235. }
  236. func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame) (bool /*completed */, error) {
  237. if s.closedForShutdown {
  238. return false, nil
  239. }
  240. if err := s.flowController.UpdateHighestReceived(frame.FinalSize, true); err != nil {
  241. return false, err
  242. }
  243. newlyRcvdFinalOffset := s.finalOffset == protocol.MaxByteCount
  244. s.finalOffset = frame.FinalSize
  245. // ignore duplicate RESET_STREAM frames for this stream (after checking their final offset)
  246. if s.resetRemotely {
  247. return false, nil
  248. }
  249. s.resetRemotely = true
  250. s.resetRemotelyErr = &StreamError{
  251. StreamID: s.streamID,
  252. ErrorCode: frame.ErrorCode,
  253. }
  254. s.signalRead()
  255. return newlyRcvdFinalOffset, nil
  256. }
  257. func (s *receiveStream) CloseRemote(offset protocol.ByteCount) {
  258. s.handleStreamFrame(&wire.StreamFrame{Fin: true, Offset: offset})
  259. }
  260. func (s *receiveStream) SetReadDeadline(t time.Time) error {
  261. s.mutex.Lock()
  262. s.deadline = t
  263. s.mutex.Unlock()
  264. s.signalRead()
  265. return nil
  266. }
  267. // CloseForShutdown closes a stream abruptly.
  268. // It makes Read unblock (and return the error) immediately.
  269. // The peer will NOT be informed about this: the stream is closed without sending a FIN or RESET.
  270. func (s *receiveStream) closeForShutdown(err error) {
  271. s.mutex.Lock()
  272. s.closedForShutdown = true
  273. s.closeForShutdownErr = err
  274. s.mutex.Unlock()
  275. s.signalRead()
  276. }
  277. func (s *receiveStream) getWindowUpdate() protocol.ByteCount {
  278. return s.flowController.GetWindowUpdate()
  279. }
  280. // signalRead performs a non-blocking send on the readChan
  281. func (s *receiveStream) signalRead() {
  282. select {
  283. case s.readChan <- struct{}{}:
  284. default:
  285. }
  286. }