stream.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package quic
  2. import (
  3. "context"
  4. "net"
  5. "os"
  6. "sync"
  7. "time"
  8. "github.com/Psiphon-Labs/quic-go/internal/ackhandler"
  9. "github.com/Psiphon-Labs/quic-go/internal/flowcontrol"
  10. "github.com/Psiphon-Labs/quic-go/internal/protocol"
  11. "github.com/Psiphon-Labs/quic-go/internal/wire"
  12. )
  13. type deadlineError struct{}
  14. func (deadlineError) Error() string { return "deadline exceeded" }
  15. func (deadlineError) Temporary() bool { return true }
  16. func (deadlineError) Timeout() bool { return true }
  17. func (deadlineError) Unwrap() error { return os.ErrDeadlineExceeded }
  18. var errDeadline net.Error = &deadlineError{}
  19. // The streamSender is notified by the stream about various events.
  20. type streamSender interface {
  21. onHasConnectionData()
  22. onHasStreamData(protocol.StreamID, sendStreamI)
  23. onHasStreamControlFrame(protocol.StreamID, streamControlFrameGetter)
  24. // must be called without holding the mutex that is acquired by closeForShutdown
  25. onStreamCompleted(protocol.StreamID)
  26. }
  27. // Each of the both stream halves gets its own uniStreamSender.
  28. // This is necessary in order to keep track when both halves have been completed.
  29. type uniStreamSender struct {
  30. streamSender
  31. onStreamCompletedImpl func()
  32. onHasStreamControlFrameImpl func(protocol.StreamID, streamControlFrameGetter)
  33. }
  34. func (s *uniStreamSender) onHasStreamData(id protocol.StreamID, str sendStreamI) {
  35. s.streamSender.onHasStreamData(id, str)
  36. }
  37. func (s *uniStreamSender) onStreamCompleted(protocol.StreamID) { s.onStreamCompletedImpl() }
  38. func (s *uniStreamSender) onHasStreamControlFrame(id protocol.StreamID, str streamControlFrameGetter) {
  39. s.onHasStreamControlFrameImpl(id, str)
  40. }
  41. var _ streamSender = &uniStreamSender{}
  42. type streamI interface {
  43. Stream
  44. closeForShutdown(error)
  45. // for receiving
  46. handleStreamFrame(*wire.StreamFrame, time.Time) error
  47. handleResetStreamFrame(*wire.ResetStreamFrame, time.Time) error
  48. // for sending
  49. hasData() bool
  50. handleStopSendingFrame(*wire.StopSendingFrame)
  51. popStreamFrame(protocol.ByteCount, protocol.Version) (_ ackhandler.StreamFrame, _ *wire.StreamDataBlockedFrame, hasMore bool)
  52. updateSendWindow(protocol.ByteCount)
  53. }
  54. var (
  55. _ receiveStreamI = (streamI)(nil)
  56. _ sendStreamI = (streamI)(nil)
  57. )
  58. // A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface
  59. //
  60. // Read() and Write() may be called concurrently, but multiple calls to Read() or Write() individually must be synchronized manually.
  61. type stream struct {
  62. receiveStream
  63. sendStream
  64. completedMutex sync.Mutex
  65. sender streamSender
  66. receiveStreamCompleted bool
  67. sendStreamCompleted bool
  68. }
  69. var (
  70. _ Stream = &stream{}
  71. _ streamControlFrameGetter = &receiveStream{}
  72. )
  73. // newStream creates a new Stream
  74. func newStream(
  75. ctx context.Context,
  76. streamID protocol.StreamID,
  77. sender streamSender,
  78. flowController flowcontrol.StreamFlowController,
  79. ) *stream {
  80. s := &stream{sender: sender}
  81. senderForSendStream := &uniStreamSender{
  82. streamSender: sender,
  83. onStreamCompletedImpl: func() {
  84. s.completedMutex.Lock()
  85. s.sendStreamCompleted = true
  86. s.checkIfCompleted()
  87. s.completedMutex.Unlock()
  88. },
  89. onHasStreamControlFrameImpl: func(id protocol.StreamID, str streamControlFrameGetter) {
  90. sender.onHasStreamControlFrame(streamID, s)
  91. },
  92. }
  93. s.sendStream = *newSendStream(ctx, streamID, senderForSendStream, flowController)
  94. senderForReceiveStream := &uniStreamSender{
  95. streamSender: sender,
  96. onStreamCompletedImpl: func() {
  97. s.completedMutex.Lock()
  98. s.receiveStreamCompleted = true
  99. s.checkIfCompleted()
  100. s.completedMutex.Unlock()
  101. },
  102. onHasStreamControlFrameImpl: func(id protocol.StreamID, str streamControlFrameGetter) {
  103. sender.onHasStreamControlFrame(streamID, s)
  104. },
  105. }
  106. s.receiveStream = *newReceiveStream(streamID, senderForReceiveStream, flowController)
  107. return s
  108. }
  109. // need to define StreamID() here, since both receiveStream and readStream have a StreamID()
  110. func (s *stream) StreamID() protocol.StreamID {
  111. // the result is same for receiveStream and sendStream
  112. return s.sendStream.StreamID()
  113. }
  114. func (s *stream) Close() error {
  115. return s.sendStream.Close()
  116. }
  117. func (s *stream) getControlFrame(now time.Time) (_ ackhandler.Frame, ok, hasMore bool) {
  118. f, ok, _ := s.sendStream.getControlFrame(now)
  119. if ok {
  120. return f, true, true
  121. }
  122. return s.receiveStream.getControlFrame(now)
  123. }
  124. func (s *stream) SetDeadline(t time.Time) error {
  125. _ = s.SetReadDeadline(t) // SetReadDeadline never errors
  126. _ = s.SetWriteDeadline(t) // SetWriteDeadline never errors
  127. return nil
  128. }
  129. // CloseForShutdown closes a stream abruptly.
  130. // It makes Read and Write unblock (and return the error) immediately.
  131. // The peer will NOT be informed about this: the stream is closed without sending a FIN or RST.
  132. func (s *stream) closeForShutdown(err error) {
  133. s.sendStream.closeForShutdown(err)
  134. s.receiveStream.closeForShutdown(err)
  135. }
  136. // checkIfCompleted is called from the uniStreamSender, when one of the stream halves is completed.
  137. // It makes sure that the onStreamCompleted callback is only called if both receive and send side have completed.
  138. func (s *stream) checkIfCompleted() {
  139. if s.sendStreamCompleted && s.receiveStreamCompleted {
  140. s.sender.onStreamCompleted(s.StreamID())
  141. }
  142. }