receive_stream.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. package gquic
  2. import (
  3. "fmt"
  4. "io"
  5. "sync"
  6. "time"
  7. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go/internal/flowcontrol"
  8. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go/internal/protocol"
  9. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go/internal/utils"
  10. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go/internal/wire"
  11. )
  12. type receiveStreamI interface {
  13. ReceiveStream
  14. handleStreamFrame(*wire.StreamFrame) error
  15. handleRstStreamFrame(*wire.RstStreamFrame) error
  16. closeForShutdown(error)
  17. getWindowUpdate() protocol.ByteCount
  18. }
  19. type receiveStream struct {
  20. mutex sync.Mutex
  21. streamID protocol.StreamID
  22. sender streamSender
  23. frameQueue *frameSorter
  24. readOffset protocol.ByteCount
  25. currentFrame []byte
  26. currentFrameIsLast bool // is the currentFrame the last frame on this stream
  27. readPosInFrame int
  28. closeForShutdownErr error
  29. cancelReadErr error
  30. resetRemotelyErr StreamError
  31. closedForShutdown bool // set when CloseForShutdown() is called
  32. finRead bool // set once we read a frame with a FinBit
  33. canceledRead bool // set when CancelRead() is called
  34. resetRemotely bool // set when HandleRstStreamFrame() is called
  35. readChan chan struct{}
  36. deadline time.Time
  37. flowController flowcontrol.StreamFlowController
  38. version protocol.VersionNumber
  39. }
  40. var _ ReceiveStream = &receiveStream{}
  41. var _ receiveStreamI = &receiveStream{}
  42. func newReceiveStream(
  43. streamID protocol.StreamID,
  44. sender streamSender,
  45. flowController flowcontrol.StreamFlowController,
  46. version protocol.VersionNumber,
  47. ) *receiveStream {
  48. return &receiveStream{
  49. streamID: streamID,
  50. sender: sender,
  51. flowController: flowController,
  52. frameQueue: newFrameSorter(),
  53. readChan: make(chan struct{}, 1),
  54. version: version,
  55. }
  56. }
  57. func (s *receiveStream) StreamID() protocol.StreamID {
  58. return s.streamID
  59. }
  60. // Read implements io.Reader. It is not thread safe!
  61. func (s *receiveStream) Read(p []byte) (int, error) {
  62. completed, n, err := s.readImpl(p)
  63. if completed {
  64. s.sender.onStreamCompleted(s.streamID)
  65. }
  66. return n, err
  67. }
  68. func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, error) {
  69. s.mutex.Lock()
  70. defer s.mutex.Unlock()
  71. if s.finRead {
  72. return false, 0, io.EOF
  73. }
  74. if s.canceledRead {
  75. return false, 0, s.cancelReadErr
  76. }
  77. if s.resetRemotely {
  78. return false, 0, s.resetRemotelyErr
  79. }
  80. if s.closedForShutdown {
  81. return false, 0, s.closeForShutdownErr
  82. }
  83. bytesRead := 0
  84. for bytesRead < len(p) {
  85. if s.currentFrame == nil || s.readPosInFrame >= len(s.currentFrame) {
  86. s.dequeueNextFrame()
  87. }
  88. if s.currentFrame == nil && bytesRead > 0 {
  89. return false, bytesRead, s.closeForShutdownErr
  90. }
  91. var deadlineTimer *utils.Timer
  92. for {
  93. // Stop waiting on errors
  94. if s.closedForShutdown {
  95. return false, bytesRead, s.closeForShutdownErr
  96. }
  97. if s.canceledRead {
  98. return false, bytesRead, s.cancelReadErr
  99. }
  100. if s.resetRemotely {
  101. return false, bytesRead, s.resetRemotelyErr
  102. }
  103. deadline := s.deadline
  104. if !deadline.IsZero() {
  105. if !time.Now().Before(deadline) {
  106. return false, bytesRead, errDeadline
  107. }
  108. if deadlineTimer == nil {
  109. deadlineTimer = utils.NewTimer()
  110. }
  111. deadlineTimer.Reset(deadline)
  112. }
  113. if s.currentFrame != nil || s.currentFrameIsLast {
  114. break
  115. }
  116. s.mutex.Unlock()
  117. if deadline.IsZero() {
  118. <-s.readChan
  119. } else {
  120. select {
  121. case <-s.readChan:
  122. case <-deadlineTimer.Chan():
  123. deadlineTimer.SetRead()
  124. }
  125. }
  126. s.mutex.Lock()
  127. if s.currentFrame == nil {
  128. s.dequeueNextFrame()
  129. }
  130. }
  131. // [Psiphon]
  132. // Stop timer to immediately release resources
  133. if deadlineTimer != nil {
  134. deadlineTimer.Reset(time.Time{})
  135. }
  136. if bytesRead > len(p) {
  137. return false, bytesRead, fmt.Errorf("BUG: bytesRead (%d) > len(p) (%d) in stream.Read", bytesRead, len(p))
  138. }
  139. if s.readPosInFrame > len(s.currentFrame) {
  140. return false, bytesRead, fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", s.readPosInFrame, len(s.currentFrame))
  141. }
  142. s.mutex.Unlock()
  143. m := copy(p[bytesRead:], s.currentFrame[s.readPosInFrame:])
  144. s.readPosInFrame += m
  145. bytesRead += m
  146. s.readOffset += protocol.ByteCount(m)
  147. s.mutex.Lock()
  148. // when a RST_STREAM was received, the was already informed about the final byteOffset for this stream
  149. if !s.resetRemotely {
  150. s.flowController.AddBytesRead(protocol.ByteCount(m))
  151. }
  152. // increase the flow control window, if necessary
  153. if s.streamID != s.version.CryptoStreamID() {
  154. s.flowController.MaybeQueueWindowUpdate()
  155. }
  156. if s.readPosInFrame >= len(s.currentFrame) && s.currentFrameIsLast {
  157. s.finRead = true
  158. return true, bytesRead, io.EOF
  159. }
  160. }
  161. return false, bytesRead, nil
  162. }
  163. func (s *receiveStream) dequeueNextFrame() {
  164. s.currentFrame, s.currentFrameIsLast = s.frameQueue.Pop()
  165. s.readPosInFrame = 0
  166. }
  167. func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) error {
  168. s.mutex.Lock()
  169. defer s.mutex.Unlock()
  170. if s.finRead {
  171. return nil
  172. }
  173. if s.canceledRead {
  174. return nil
  175. }
  176. s.canceledRead = true
  177. s.cancelReadErr = fmt.Errorf("Read on stream %d canceled with error code %d", s.streamID, errorCode)
  178. s.signalRead()
  179. if s.version.UsesIETFFrameFormat() {
  180. s.sender.queueControlFrame(&wire.StopSendingFrame{
  181. StreamID: s.streamID,
  182. ErrorCode: errorCode,
  183. })
  184. }
  185. return nil
  186. }
  187. func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error {
  188. maxOffset := frame.Offset + frame.DataLen()
  189. if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil {
  190. return err
  191. }
  192. s.mutex.Lock()
  193. defer s.mutex.Unlock()
  194. if err := s.frameQueue.Push(frame.Data, frame.Offset, frame.FinBit); err != nil {
  195. return err
  196. }
  197. s.signalRead()
  198. return nil
  199. }
  200. func (s *receiveStream) handleRstStreamFrame(frame *wire.RstStreamFrame) error {
  201. completed, err := s.handleRstStreamFrameImpl(frame)
  202. if completed {
  203. s.sender.onStreamCompleted(s.streamID)
  204. }
  205. return err
  206. }
  207. func (s *receiveStream) handleRstStreamFrameImpl(frame *wire.RstStreamFrame) (bool /*completed */, error) {
  208. s.mutex.Lock()
  209. defer s.mutex.Unlock()
  210. if s.closedForShutdown {
  211. return false, nil
  212. }
  213. if err := s.flowController.UpdateHighestReceived(frame.ByteOffset, true); err != nil {
  214. return false, err
  215. }
  216. // In gQUIC, error code 0 has a special meaning.
  217. // The peer will reliably continue transmitting, but is not interested in reading from the stream.
  218. // We should therefore just continue reading from the stream, until we encounter the FIN bit.
  219. if !s.version.UsesIETFFrameFormat() && frame.ErrorCode == 0 {
  220. return false, nil
  221. }
  222. // ignore duplicate RST_STREAM frames for this stream (after checking their final offset)
  223. if s.resetRemotely {
  224. return false, nil
  225. }
  226. s.resetRemotely = true
  227. s.resetRemotelyErr = streamCanceledError{
  228. errorCode: frame.ErrorCode,
  229. error: fmt.Errorf("Stream %d was reset with error code %d", s.streamID, frame.ErrorCode),
  230. }
  231. s.signalRead()
  232. return true, nil
  233. }
  234. func (s *receiveStream) CloseRemote(offset protocol.ByteCount) {
  235. s.handleStreamFrame(&wire.StreamFrame{FinBit: true, Offset: offset})
  236. }
  237. func (s *receiveStream) onClose(offset protocol.ByteCount) {
  238. // [Psiphon]
  239. // Fix race condition.
  240. s.mutex.Lock()
  241. canceledRead := s.canceledRead
  242. s.mutex.Unlock()
  243. // [Psiphon]
  244. if canceledRead && !s.version.UsesIETFFrameFormat() {
  245. s.sender.queueControlFrame(&wire.RstStreamFrame{
  246. StreamID: s.streamID,
  247. ByteOffset: offset,
  248. ErrorCode: 0,
  249. })
  250. }
  251. }
  252. func (s *receiveStream) SetReadDeadline(t time.Time) error {
  253. s.mutex.Lock()
  254. s.deadline = t
  255. s.mutex.Unlock()
  256. s.signalRead()
  257. return nil
  258. }
  259. // CloseForShutdown closes a stream abruptly.
  260. // It makes Read unblock (and return the error) immediately.
  261. // The peer will NOT be informed about this: the stream is closed without sending a FIN or RST.
  262. func (s *receiveStream) closeForShutdown(err error) {
  263. s.mutex.Lock()
  264. s.closedForShutdown = true
  265. s.closeForShutdownErr = err
  266. s.mutex.Unlock()
  267. s.signalRead()
  268. }
  269. func (s *receiveStream) getWindowUpdate() protocol.ByteCount {
  270. return s.flowController.GetWindowUpdate()
  271. }
  272. // signalRead performs a non-blocking send on the readChan
  273. func (s *receiveStream) signalRead() {
  274. select {
  275. case s.readChan <- struct{}{}:
  276. default:
  277. }
  278. }