stream.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package gquic
  2. import (
  3. "net"
  4. "sync"
  5. "time"
  6. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go/internal/flowcontrol"
  7. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go/internal/protocol"
  8. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go/internal/wire"
  9. )
  10. const (
  11. errorCodeStopping protocol.ApplicationErrorCode = 0
  12. errorCodeStoppingGQUIC protocol.ApplicationErrorCode = 7
  13. )
  14. // The streamSender is notified by the stream about various events.
  15. type streamSender interface {
  16. queueControlFrame(wire.Frame)
  17. onHasStreamData(protocol.StreamID)
  18. // must be called without holding the mutex that is acquired by closeForShutdown
  19. onStreamCompleted(protocol.StreamID)
  20. }
  21. // Each of the both stream halves gets its own uniStreamSender.
  22. // This is necessary in order to keep track when both halves have been completed.
  23. type uniStreamSender struct {
  24. streamSender
  25. onStreamCompletedImpl func()
  26. }
  27. func (s *uniStreamSender) queueControlFrame(f wire.Frame) {
  28. s.streamSender.queueControlFrame(f)
  29. }
  30. func (s *uniStreamSender) onHasStreamData(id protocol.StreamID) {
  31. s.streamSender.onHasStreamData(id)
  32. }
  33. func (s *uniStreamSender) onStreamCompleted(protocol.StreamID) {
  34. s.onStreamCompletedImpl()
  35. }
  36. var _ streamSender = &uniStreamSender{}
  37. type streamI interface {
  38. Stream
  39. closeForShutdown(error)
  40. // for receiving
  41. handleStreamFrame(*wire.StreamFrame) error
  42. handleRstStreamFrame(*wire.RstStreamFrame) error
  43. getWindowUpdate() protocol.ByteCount
  44. // for sending
  45. hasData() bool
  46. handleStopSendingFrame(*wire.StopSendingFrame)
  47. popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool)
  48. handleMaxStreamDataFrame(*wire.MaxStreamDataFrame)
  49. }
  50. var _ receiveStreamI = (streamI)(nil)
  51. var _ sendStreamI = (streamI)(nil)
  52. // A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface
  53. //
  54. // Read() and Write() may be called concurrently, but multiple calls to Read() or Write() individually must be synchronized manually.
  55. type stream struct {
  56. receiveStream
  57. sendStream
  58. completedMutex sync.Mutex
  59. sender streamSender
  60. receiveStreamCompleted bool
  61. sendStreamCompleted bool
  62. version protocol.VersionNumber
  63. }
  64. var _ Stream = &stream{}
  65. type deadlineError struct{}
  66. func (deadlineError) Error() string { return "deadline exceeded" }
  67. func (deadlineError) Temporary() bool { return true }
  68. func (deadlineError) Timeout() bool { return true }
  69. var errDeadline net.Error = &deadlineError{}
  70. type streamCanceledError struct {
  71. error
  72. errorCode protocol.ApplicationErrorCode
  73. }
  74. func (streamCanceledError) Canceled() bool { return true }
  75. func (e streamCanceledError) ErrorCode() protocol.ApplicationErrorCode { return e.errorCode }
  76. var _ StreamError = &streamCanceledError{}
  77. // newStream creates a new Stream
  78. func newStream(streamID protocol.StreamID,
  79. sender streamSender,
  80. flowController flowcontrol.StreamFlowController,
  81. version protocol.VersionNumber,
  82. ) *stream {
  83. s := &stream{sender: sender, version: version}
  84. senderForSendStream := &uniStreamSender{
  85. streamSender: sender,
  86. onStreamCompletedImpl: func() {
  87. s.completedMutex.Lock()
  88. s.sendStreamCompleted = true
  89. s.checkIfCompleted()
  90. s.completedMutex.Unlock()
  91. },
  92. }
  93. s.sendStream = *newSendStream(streamID, senderForSendStream, flowController, version)
  94. senderForReceiveStream := &uniStreamSender{
  95. streamSender: sender,
  96. onStreamCompletedImpl: func() {
  97. s.completedMutex.Lock()
  98. s.receiveStreamCompleted = true
  99. s.checkIfCompleted()
  100. s.completedMutex.Unlock()
  101. },
  102. }
  103. s.receiveStream = *newReceiveStream(streamID, senderForReceiveStream, flowController, version)
  104. return s
  105. }
  106. // need to define StreamID() here, since both receiveStream and readStream have a StreamID()
  107. func (s *stream) StreamID() protocol.StreamID {
  108. // the result is same for receiveStream and sendStream
  109. return s.sendStream.StreamID()
  110. }
  111. func (s *stream) Close() error {
  112. if err := s.sendStream.Close(); err != nil {
  113. return err
  114. }
  115. // in gQUIC, we need to send a RST_STREAM with the final offset if CancelRead() was called
  116. s.receiveStream.onClose(s.sendStream.getWriteOffset())
  117. return nil
  118. }
  119. func (s *stream) SetDeadline(t time.Time) error {
  120. _ = s.SetReadDeadline(t) // SetReadDeadline never errors
  121. _ = s.SetWriteDeadline(t) // SetWriteDeadline never errors
  122. return nil
  123. }
  124. // CloseForShutdown closes a stream abruptly.
  125. // It makes Read and Write unblock (and return the error) immediately.
  126. // The peer will NOT be informed about this: the stream is closed without sending a FIN or RST.
  127. func (s *stream) closeForShutdown(err error) {
  128. s.sendStream.closeForShutdown(err)
  129. s.receiveStream.closeForShutdown(err)
  130. }
  131. func (s *stream) handleRstStreamFrame(frame *wire.RstStreamFrame) error {
  132. if err := s.receiveStream.handleRstStreamFrame(frame); err != nil {
  133. return err
  134. }
  135. if !s.version.UsesIETFFrameFormat() {
  136. s.handleStopSendingFrame(&wire.StopSendingFrame{
  137. StreamID: s.StreamID(),
  138. ErrorCode: frame.ErrorCode,
  139. })
  140. }
  141. return nil
  142. }
  143. // checkIfCompleted is called from the uniStreamSender, when one of the stream halves is completed.
  144. // It makes sure that the onStreamCompleted callback is only called if both receive and send side have completed.
  145. func (s *stream) checkIfCompleted() {
  146. if s.sendStreamCompleted && s.receiveStreamCompleted {
  147. s.sender.onStreamCompleted(s.StreamID())
  148. }
  149. }