stream.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package quic
  2. import (
  3. "net"
  4. "os"
  5. "sync"
  6. "time"
  7. "github.com/Psiphon-Labs/quic-go/internal/ackhandler"
  8. "github.com/Psiphon-Labs/quic-go/internal/flowcontrol"
  9. "github.com/Psiphon-Labs/quic-go/internal/protocol"
  10. "github.com/Psiphon-Labs/quic-go/internal/wire"
  11. )
  12. type deadlineError struct{}
  13. func (deadlineError) Error() string { return "deadline exceeded" }
  14. func (deadlineError) Temporary() bool { return true }
  15. func (deadlineError) Timeout() bool { return true }
  16. func (deadlineError) Unwrap() error { return os.ErrDeadlineExceeded }
  17. var errDeadline net.Error = &deadlineError{}
  18. // The streamSender is notified by the stream about various events.
  19. type streamSender interface {
  20. queueControlFrame(wire.Frame)
  21. onHasStreamData(protocol.StreamID)
  22. // must be called without holding the mutex that is acquired by closeForShutdown
  23. onStreamCompleted(protocol.StreamID)
  24. }
  25. // Each of the both stream halves gets its own uniStreamSender.
  26. // This is necessary in order to keep track when both halves have been completed.
  27. type uniStreamSender struct {
  28. streamSender
  29. onStreamCompletedImpl func()
  30. }
  31. func (s *uniStreamSender) queueControlFrame(f wire.Frame) {
  32. s.streamSender.queueControlFrame(f)
  33. }
  34. func (s *uniStreamSender) onHasStreamData(id protocol.StreamID) {
  35. s.streamSender.onHasStreamData(id)
  36. }
  37. func (s *uniStreamSender) onStreamCompleted(protocol.StreamID) {
  38. s.onStreamCompletedImpl()
  39. }
  40. var _ streamSender = &uniStreamSender{}
  41. type streamI interface {
  42. Stream
  43. closeForShutdown(error)
  44. // for receiving
  45. handleStreamFrame(*wire.StreamFrame) error
  46. handleResetStreamFrame(*wire.ResetStreamFrame) error
  47. getWindowUpdate() protocol.ByteCount
  48. // for sending
  49. hasData() bool
  50. handleStopSendingFrame(*wire.StopSendingFrame)
  51. popStreamFrame(maxBytes protocol.ByteCount, v protocol.VersionNumber) (ackhandler.StreamFrame, bool, bool)
  52. updateSendWindow(protocol.ByteCount)
  53. }
  54. var (
  55. _ receiveStreamI = (streamI)(nil)
  56. _ sendStreamI = (streamI)(nil)
  57. )
  58. // A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface
  59. //
  60. // Read() and Write() may be called concurrently, but multiple calls to Read() or Write() individually must be synchronized manually.
  61. type stream struct {
  62. receiveStream
  63. sendStream
  64. completedMutex sync.Mutex
  65. sender streamSender
  66. receiveStreamCompleted bool
  67. sendStreamCompleted bool
  68. }
  69. var _ Stream = &stream{}
  70. // newStream creates a new Stream
  71. func newStream(streamID protocol.StreamID,
  72. sender streamSender,
  73. flowController flowcontrol.StreamFlowController,
  74. ) *stream {
  75. s := &stream{sender: sender}
  76. senderForSendStream := &uniStreamSender{
  77. streamSender: sender,
  78. onStreamCompletedImpl: func() {
  79. s.completedMutex.Lock()
  80. s.sendStreamCompleted = true
  81. s.checkIfCompleted()
  82. s.completedMutex.Unlock()
  83. },
  84. }
  85. s.sendStream = *newSendStream(streamID, senderForSendStream, flowController)
  86. senderForReceiveStream := &uniStreamSender{
  87. streamSender: sender,
  88. onStreamCompletedImpl: func() {
  89. s.completedMutex.Lock()
  90. s.receiveStreamCompleted = true
  91. s.checkIfCompleted()
  92. s.completedMutex.Unlock()
  93. },
  94. }
  95. s.receiveStream = *newReceiveStream(streamID, senderForReceiveStream, flowController)
  96. return s
  97. }
  98. // need to define StreamID() here, since both receiveStream and readStream have a StreamID()
  99. func (s *stream) StreamID() protocol.StreamID {
  100. // the result is same for receiveStream and sendStream
  101. return s.sendStream.StreamID()
  102. }
  103. func (s *stream) Close() error {
  104. return s.sendStream.Close()
  105. }
  106. func (s *stream) SetDeadline(t time.Time) error {
  107. _ = s.SetReadDeadline(t) // SetReadDeadline never errors
  108. _ = s.SetWriteDeadline(t) // SetWriteDeadline never errors
  109. return nil
  110. }
  111. // CloseForShutdown closes a stream abruptly.
  112. // It makes Read and Write unblock (and return the error) immediately.
  113. // The peer will NOT be informed about this: the stream is closed without sending a FIN or RST.
  114. func (s *stream) closeForShutdown(err error) {
  115. s.sendStream.closeForShutdown(err)
  116. s.receiveStream.closeForShutdown(err)
  117. }
  118. // checkIfCompleted is called from the uniStreamSender, when one of the stream halves is completed.
  119. // It makes sure that the onStreamCompleted callback is only called if both receive and send side have completed.
  120. func (s *stream) checkIfCompleted() {
  121. if s.sendStreamCompleted && s.receiveStreamCompleted {
  122. s.sender.onStreamCompleted(s.StreamID())
  123. }
  124. }