receive_stream.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. package quic
  2. import (
  3. "fmt"
  4. "io"
  5. "sync"
  6. "time"
  7. "github.com/Psiphon-Labs/quic-go/internal/flowcontrol"
  8. "github.com/Psiphon-Labs/quic-go/internal/protocol"
  9. "github.com/Psiphon-Labs/quic-go/internal/utils"
  10. "github.com/Psiphon-Labs/quic-go/internal/wire"
  11. )
  12. type receiveStreamI interface {
  13. ReceiveStream
  14. handleStreamFrame(*wire.StreamFrame) error
  15. handleResetStreamFrame(*wire.ResetStreamFrame) error
  16. closeForShutdown(error)
  17. getWindowUpdate() protocol.ByteCount
  18. }
  19. type receiveStream struct {
  20. mutex sync.Mutex
  21. streamID protocol.StreamID
  22. sender streamSender
  23. frameQueue *frameSorter
  24. readOffset protocol.ByteCount
  25. finalOffset protocol.ByteCount
  26. currentFrame []byte
  27. currentFrameDone func()
  28. currentFrameIsLast bool // is the currentFrame the last frame on this stream
  29. readPosInFrame int
  30. closeForShutdownErr error
  31. cancelReadErr error
  32. resetRemotelyErr StreamError
  33. closedForShutdown bool // set when CloseForShutdown() is called
  34. finRead bool // set once we read a frame with a FinBit
  35. canceledRead bool // set when CancelRead() is called
  36. resetRemotely bool // set when HandleResetStreamFrame() is called
  37. readChan chan struct{}
  38. deadline time.Time
  39. flowController flowcontrol.StreamFlowController
  40. version protocol.VersionNumber
  41. }
  42. var _ ReceiveStream = &receiveStream{}
  43. var _ receiveStreamI = &receiveStream{}
  44. func newReceiveStream(
  45. streamID protocol.StreamID,
  46. sender streamSender,
  47. flowController flowcontrol.StreamFlowController,
  48. version protocol.VersionNumber,
  49. ) *receiveStream {
  50. return &receiveStream{
  51. streamID: streamID,
  52. sender: sender,
  53. flowController: flowController,
  54. frameQueue: newFrameSorter(),
  55. readChan: make(chan struct{}, 1),
  56. finalOffset: protocol.MaxByteCount,
  57. version: version,
  58. }
  59. }
  60. func (s *receiveStream) StreamID() protocol.StreamID {
  61. return s.streamID
  62. }
  63. // Read implements io.Reader. It is not thread safe!
  64. func (s *receiveStream) Read(p []byte) (int, error) {
  65. s.mutex.Lock()
  66. completed, n, err := s.readImpl(p)
  67. s.mutex.Unlock()
  68. if completed {
  69. s.sender.onStreamCompleted(s.streamID)
  70. }
  71. return n, err
  72. }
  73. func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, error) {
  74. if s.finRead {
  75. return false, 0, io.EOF
  76. }
  77. if s.canceledRead {
  78. return false, 0, s.cancelReadErr
  79. }
  80. if s.resetRemotely {
  81. return false, 0, s.resetRemotelyErr
  82. }
  83. if s.closedForShutdown {
  84. return false, 0, s.closeForShutdownErr
  85. }
  86. bytesRead := 0
  87. for bytesRead < len(p) {
  88. if s.currentFrame == nil || s.readPosInFrame >= len(s.currentFrame) {
  89. s.dequeueNextFrame()
  90. }
  91. if s.currentFrame == nil && bytesRead > 0 {
  92. return false, bytesRead, s.closeForShutdownErr
  93. }
  94. var deadlineTimer *utils.Timer
  95. for {
  96. // Stop waiting on errors
  97. if s.closedForShutdown {
  98. return false, bytesRead, s.closeForShutdownErr
  99. }
  100. if s.canceledRead {
  101. return false, bytesRead, s.cancelReadErr
  102. }
  103. if s.resetRemotely {
  104. return false, bytesRead, s.resetRemotelyErr
  105. }
  106. deadline := s.deadline
  107. if !deadline.IsZero() {
  108. if !time.Now().Before(deadline) {
  109. return false, bytesRead, errDeadline
  110. }
  111. if deadlineTimer == nil {
  112. deadlineTimer = utils.NewTimer()
  113. }
  114. deadlineTimer.Reset(deadline)
  115. }
  116. if s.currentFrame != nil || s.currentFrameIsLast {
  117. break
  118. }
  119. s.mutex.Unlock()
  120. if deadline.IsZero() {
  121. <-s.readChan
  122. } else {
  123. select {
  124. case <-s.readChan:
  125. case <-deadlineTimer.Chan():
  126. deadlineTimer.SetRead()
  127. }
  128. }
  129. s.mutex.Lock()
  130. if s.currentFrame == nil {
  131. s.dequeueNextFrame()
  132. }
  133. }
  134. // [Psiphon]
  135. // Stop timer to immediately release resources
  136. if deadlineTimer != nil {
  137. deadlineTimer.Reset(time.Time{})
  138. }
  139. if bytesRead > len(p) {
  140. return false, bytesRead, fmt.Errorf("BUG: bytesRead (%d) > len(p) (%d) in stream.Read", bytesRead, len(p))
  141. }
  142. if s.readPosInFrame > len(s.currentFrame) {
  143. return false, bytesRead, fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", s.readPosInFrame, len(s.currentFrame))
  144. }
  145. s.mutex.Unlock()
  146. m := copy(p[bytesRead:], s.currentFrame[s.readPosInFrame:])
  147. s.readPosInFrame += m
  148. bytesRead += m
  149. s.readOffset += protocol.ByteCount(m)
  150. s.mutex.Lock()
  151. // when a RESET_STREAM was received, the was already informed about the final byteOffset for this stream
  152. if !s.resetRemotely {
  153. s.flowController.AddBytesRead(protocol.ByteCount(m))
  154. }
  155. if s.readPosInFrame >= len(s.currentFrame) && s.currentFrameIsLast {
  156. s.finRead = true
  157. return true, bytesRead, io.EOF
  158. }
  159. }
  160. return false, bytesRead, nil
  161. }
  162. func (s *receiveStream) dequeueNextFrame() {
  163. var offset protocol.ByteCount
  164. // We're done with the last frame. Release the buffer.
  165. if s.currentFrameDone != nil {
  166. s.currentFrameDone()
  167. }
  168. offset, s.currentFrame, s.currentFrameDone = s.frameQueue.Pop()
  169. s.currentFrameIsLast = offset+protocol.ByteCount(len(s.currentFrame)) >= s.finalOffset
  170. s.readPosInFrame = 0
  171. }
  172. func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) {
  173. s.mutex.Lock()
  174. completed := s.cancelReadImpl(errorCode)
  175. s.mutex.Unlock()
  176. if completed {
  177. s.flowController.Abandon()
  178. s.sender.onStreamCompleted(s.streamID)
  179. }
  180. }
  181. func (s *receiveStream) cancelReadImpl(errorCode protocol.ApplicationErrorCode) bool /* completed */ {
  182. if s.finRead || s.canceledRead || s.resetRemotely {
  183. return false
  184. }
  185. s.canceledRead = true
  186. s.cancelReadErr = fmt.Errorf("Read on stream %d canceled with error code %d", s.streamID, errorCode)
  187. s.signalRead()
  188. s.sender.queueControlFrame(&wire.StopSendingFrame{
  189. StreamID: s.streamID,
  190. ErrorCode: errorCode,
  191. })
  192. // We're done with this stream if the final offset was already received.
  193. return s.finalOffset != protocol.MaxByteCount
  194. }
  195. func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error {
  196. s.mutex.Lock()
  197. completed, err := s.handleStreamFrameImpl(frame)
  198. s.mutex.Unlock()
  199. if completed {
  200. s.flowController.Abandon()
  201. s.sender.onStreamCompleted(s.streamID)
  202. }
  203. return err
  204. }
  205. func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame) (bool /* completed */, error) {
  206. maxOffset := frame.Offset + frame.DataLen()
  207. if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil {
  208. return false, err
  209. }
  210. var newlyRcvdFinalOffset bool
  211. if frame.FinBit {
  212. newlyRcvdFinalOffset = s.finalOffset == protocol.MaxByteCount
  213. s.finalOffset = maxOffset
  214. }
  215. if s.canceledRead {
  216. return newlyRcvdFinalOffset, nil
  217. }
  218. if err := s.frameQueue.Push(frame.Data, frame.Offset, frame.PutBack); err != nil {
  219. return false, err
  220. }
  221. s.signalRead()
  222. return false, nil
  223. }
  224. func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) error {
  225. s.mutex.Lock()
  226. completed, err := s.handleResetStreamFrameImpl(frame)
  227. s.mutex.Unlock()
  228. if completed {
  229. s.flowController.Abandon()
  230. s.sender.onStreamCompleted(s.streamID)
  231. }
  232. return err
  233. }
  234. func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame) (bool /*completed */, error) {
  235. if s.closedForShutdown {
  236. return false, nil
  237. }
  238. if err := s.flowController.UpdateHighestReceived(frame.ByteOffset, true); err != nil {
  239. return false, err
  240. }
  241. newlyRcvdFinalOffset := s.finalOffset == protocol.MaxByteCount
  242. s.finalOffset = frame.ByteOffset
  243. // ignore duplicate RESET_STREAM frames for this stream (after checking their final offset)
  244. if s.resetRemotely {
  245. return false, nil
  246. }
  247. s.resetRemotely = true
  248. s.resetRemotelyErr = streamCanceledError{
  249. errorCode: frame.ErrorCode,
  250. error: fmt.Errorf("stream %d was reset with error code %d", s.streamID, frame.ErrorCode),
  251. }
  252. s.signalRead()
  253. return newlyRcvdFinalOffset, nil
  254. }
  255. func (s *receiveStream) CloseRemote(offset protocol.ByteCount) {
  256. s.handleStreamFrame(&wire.StreamFrame{FinBit: true, Offset: offset})
  257. }
  258. func (s *receiveStream) SetReadDeadline(t time.Time) error {
  259. s.mutex.Lock()
  260. s.deadline = t
  261. s.mutex.Unlock()
  262. s.signalRead()
  263. return nil
  264. }
  265. // CloseForShutdown closes a stream abruptly.
  266. // It makes Read unblock (and return the error) immediately.
  267. // The peer will NOT be informed about this: the stream is closed without sending a FIN or RESET.
  268. func (s *receiveStream) closeForShutdown(err error) {
  269. s.mutex.Lock()
  270. s.closedForShutdown = true
  271. s.closeForShutdownErr = err
  272. s.mutex.Unlock()
  273. s.signalRead()
  274. }
  275. func (s *receiveStream) getWindowUpdate() protocol.ByteCount {
  276. return s.flowController.GetWindowUpdate()
  277. }
  278. // signalRead performs a non-blocking send on the readChan
  279. func (s *receiveStream) signalRead() {
  280. select {
  281. case s.readChan <- struct{}{}:
  282. default:
  283. }
  284. }