stream.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package quic
  2. import (
  3. "net"
  4. "os"
  5. "sync"
  6. "time"
  7. "github.com/Psiphon-Labs/quic-go/internal/ackhandler"
  8. "github.com/Psiphon-Labs/quic-go/internal/flowcontrol"
  9. "github.com/Psiphon-Labs/quic-go/internal/protocol"
  10. "github.com/Psiphon-Labs/quic-go/internal/wire"
  11. )
  12. type deadlineError struct{}
  13. func (deadlineError) Error() string { return "deadline exceeded" }
  14. func (deadlineError) Temporary() bool { return true }
  15. func (deadlineError) Timeout() bool { return true }
  16. func (deadlineError) Unwrap() error { return os.ErrDeadlineExceeded }
  17. var errDeadline net.Error = &deadlineError{}
  18. // The streamSender is notified by the stream about various events.
  19. type streamSender interface {
  20. queueControlFrame(wire.Frame)
  21. onHasStreamData(protocol.StreamID)
  22. // must be called without holding the mutex that is acquired by closeForShutdown
  23. onStreamCompleted(protocol.StreamID)
  24. }
  25. // Each of the both stream halves gets its own uniStreamSender.
  26. // This is necessary in order to keep track when both halves have been completed.
  27. type uniStreamSender struct {
  28. streamSender
  29. onStreamCompletedImpl func()
  30. }
  31. func (s *uniStreamSender) queueControlFrame(f wire.Frame) {
  32. s.streamSender.queueControlFrame(f)
  33. }
  34. func (s *uniStreamSender) onHasStreamData(id protocol.StreamID) {
  35. s.streamSender.onHasStreamData(id)
  36. }
  37. func (s *uniStreamSender) onStreamCompleted(protocol.StreamID) {
  38. s.onStreamCompletedImpl()
  39. }
  40. var _ streamSender = &uniStreamSender{}
  41. type streamI interface {
  42. Stream
  43. closeForShutdown(error)
  44. // for receiving
  45. handleStreamFrame(*wire.StreamFrame) error
  46. handleResetStreamFrame(*wire.ResetStreamFrame) error
  47. getWindowUpdate() protocol.ByteCount
  48. // for sending
  49. hasData() bool
  50. handleStopSendingFrame(*wire.StopSendingFrame)
  51. popStreamFrame(maxBytes protocol.ByteCount) (*ackhandler.Frame, bool)
  52. updateSendWindow(protocol.ByteCount)
  53. }
  54. var (
  55. _ receiveStreamI = (streamI)(nil)
  56. _ sendStreamI = (streamI)(nil)
  57. )
  58. // A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface
  59. //
  60. // Read() and Write() may be called concurrently, but multiple calls to Read() or Write() individually must be synchronized manually.
  61. type stream struct {
  62. receiveStream
  63. sendStream
  64. completedMutex sync.Mutex
  65. sender streamSender
  66. receiveStreamCompleted bool
  67. sendStreamCompleted bool
  68. version protocol.VersionNumber
  69. }
  70. var _ Stream = &stream{}
  71. // newStream creates a new Stream
  72. func newStream(streamID protocol.StreamID,
  73. sender streamSender,
  74. flowController flowcontrol.StreamFlowController,
  75. version protocol.VersionNumber,
  76. ) *stream {
  77. s := &stream{sender: sender, version: version}
  78. senderForSendStream := &uniStreamSender{
  79. streamSender: sender,
  80. onStreamCompletedImpl: func() {
  81. s.completedMutex.Lock()
  82. s.sendStreamCompleted = true
  83. s.checkIfCompleted()
  84. s.completedMutex.Unlock()
  85. },
  86. }
  87. s.sendStream = *newSendStream(streamID, senderForSendStream, flowController, version)
  88. senderForReceiveStream := &uniStreamSender{
  89. streamSender: sender,
  90. onStreamCompletedImpl: func() {
  91. s.completedMutex.Lock()
  92. s.receiveStreamCompleted = true
  93. s.checkIfCompleted()
  94. s.completedMutex.Unlock()
  95. },
  96. }
  97. s.receiveStream = *newReceiveStream(streamID, senderForReceiveStream, flowController, version)
  98. return s
  99. }
  100. // need to define StreamID() here, since both receiveStream and readStream have a StreamID()
  101. func (s *stream) StreamID() protocol.StreamID {
  102. // the result is same for receiveStream and sendStream
  103. return s.sendStream.StreamID()
  104. }
  105. func (s *stream) Close() error {
  106. return s.sendStream.Close()
  107. }
  108. func (s *stream) SetDeadline(t time.Time) error {
  109. _ = s.SetReadDeadline(t) // SetReadDeadline never errors
  110. _ = s.SetWriteDeadline(t) // SetWriteDeadline never errors
  111. return nil
  112. }
  113. // CloseForShutdown closes a stream abruptly.
  114. // It makes Read and Write unblock (and return the error) immediately.
  115. // The peer will NOT be informed about this: the stream is closed without sending a FIN or RST.
  116. func (s *stream) closeForShutdown(err error) {
  117. s.sendStream.closeForShutdown(err)
  118. s.receiveStream.closeForShutdown(err)
  119. }
  120. // checkIfCompleted is called from the uniStreamSender, when one of the stream halves is completed.
  121. // It makes sure that the onStreamCompleted callback is only called if both receive and send side have completed.
  122. func (s *stream) checkIfCompleted() {
  123. if s.sendStreamCompleted && s.receiveStreamCompleted {
  124. s.sender.onStreamCompleted(s.StreamID())
  125. }
  126. }