framer.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package quic
  2. import (
  3. "errors"
  4. "sync"
  5. "github.com/Psiphon-Labs/quic-go/internal/ackhandler"
  6. "github.com/Psiphon-Labs/quic-go/internal/protocol"
  7. "github.com/Psiphon-Labs/quic-go/internal/wire"
  8. "github.com/Psiphon-Labs/quic-go/quicvarint"
  9. )
  10. type framer interface {
  11. HasData() bool
  12. QueueControlFrame(wire.Frame)
  13. AppendControlFrames([]ackhandler.Frame, protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount)
  14. AddActiveStream(protocol.StreamID)
  15. AppendStreamFrames([]ackhandler.Frame, protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount)
  16. Handle0RTTRejection() error
  17. }
  18. type framerI struct {
  19. mutex sync.Mutex
  20. streamGetter streamGetter
  21. version protocol.VersionNumber
  22. activeStreams map[protocol.StreamID]struct{}
  23. streamQueue []protocol.StreamID
  24. controlFrameMutex sync.Mutex
  25. controlFrames []wire.Frame
  26. }
  27. var _ framer = &framerI{}
  28. func newFramer(
  29. streamGetter streamGetter,
  30. v protocol.VersionNumber,
  31. ) framer {
  32. return &framerI{
  33. streamGetter: streamGetter,
  34. activeStreams: make(map[protocol.StreamID]struct{}),
  35. version: v,
  36. }
  37. }
  38. func (f *framerI) HasData() bool {
  39. f.mutex.Lock()
  40. hasData := len(f.streamQueue) > 0
  41. f.mutex.Unlock()
  42. if hasData {
  43. return true
  44. }
  45. f.controlFrameMutex.Lock()
  46. hasData = len(f.controlFrames) > 0
  47. f.controlFrameMutex.Unlock()
  48. return hasData
  49. }
  50. func (f *framerI) QueueControlFrame(frame wire.Frame) {
  51. f.controlFrameMutex.Lock()
  52. f.controlFrames = append(f.controlFrames, frame)
  53. f.controlFrameMutex.Unlock()
  54. }
  55. func (f *framerI) AppendControlFrames(frames []ackhandler.Frame, maxLen protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) {
  56. var length protocol.ByteCount
  57. f.controlFrameMutex.Lock()
  58. for len(f.controlFrames) > 0 {
  59. frame := f.controlFrames[len(f.controlFrames)-1]
  60. frameLen := frame.Length(f.version)
  61. if length+frameLen > maxLen {
  62. break
  63. }
  64. frames = append(frames, ackhandler.Frame{Frame: frame})
  65. length += frameLen
  66. f.controlFrames = f.controlFrames[:len(f.controlFrames)-1]
  67. }
  68. f.controlFrameMutex.Unlock()
  69. return frames, length
  70. }
  71. func (f *framerI) AddActiveStream(id protocol.StreamID) {
  72. f.mutex.Lock()
  73. if _, ok := f.activeStreams[id]; !ok {
  74. f.streamQueue = append(f.streamQueue, id)
  75. f.activeStreams[id] = struct{}{}
  76. }
  77. f.mutex.Unlock()
  78. }
  79. func (f *framerI) AppendStreamFrames(frames []ackhandler.Frame, maxLen protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) {
  80. var length protocol.ByteCount
  81. var lastFrame *ackhandler.Frame
  82. f.mutex.Lock()
  83. // pop STREAM frames, until less than MinStreamFrameSize bytes are left in the packet
  84. numActiveStreams := len(f.streamQueue)
  85. for i := 0; i < numActiveStreams; i++ {
  86. if protocol.MinStreamFrameSize+length > maxLen {
  87. break
  88. }
  89. id := f.streamQueue[0]
  90. f.streamQueue = f.streamQueue[1:]
  91. // This should never return an error. Better check it anyway.
  92. // The stream will only be in the streamQueue, if it enqueued itself there.
  93. str, err := f.streamGetter.GetOrOpenSendStream(id)
  94. // The stream can be nil if it completed after it said it had data.
  95. if str == nil || err != nil {
  96. delete(f.activeStreams, id)
  97. continue
  98. }
  99. remainingLen := maxLen - length
  100. // For the last STREAM frame, we'll remove the DataLen field later.
  101. // Therefore, we can pretend to have more bytes available when popping
  102. // the STREAM frame (which will always have the DataLen set).
  103. remainingLen += quicvarint.Len(uint64(remainingLen))
  104. frame, hasMoreData := str.popStreamFrame(remainingLen)
  105. if hasMoreData { // put the stream back in the queue (at the end)
  106. f.streamQueue = append(f.streamQueue, id)
  107. } else { // no more data to send. Stream is not active any more
  108. delete(f.activeStreams, id)
  109. }
  110. // The frame can be nil
  111. // * if the receiveStream was canceled after it said it had data
  112. // * the remaining size doesn't allow us to add another STREAM frame
  113. if frame == nil {
  114. continue
  115. }
  116. frames = append(frames, *frame)
  117. length += frame.Length(f.version)
  118. lastFrame = frame
  119. }
  120. f.mutex.Unlock()
  121. if lastFrame != nil {
  122. lastFrameLen := lastFrame.Length(f.version)
  123. // account for the smaller size of the last STREAM frame
  124. lastFrame.Frame.(*wire.StreamFrame).DataLenPresent = false
  125. length += lastFrame.Length(f.version) - lastFrameLen
  126. }
  127. return frames, length
  128. }
  129. func (f *framerI) Handle0RTTRejection() error {
  130. f.mutex.Lock()
  131. defer f.mutex.Unlock()
  132. f.controlFrameMutex.Lock()
  133. f.streamQueue = f.streamQueue[:0]
  134. for id := range f.activeStreams {
  135. delete(f.activeStreams, id)
  136. }
  137. var j int
  138. for i, frame := range f.controlFrames {
  139. switch frame.(type) {
  140. case *wire.MaxDataFrame, *wire.MaxStreamDataFrame, *wire.MaxStreamsFrame:
  141. return errors.New("didn't expect MAX_DATA / MAX_STREAM_DATA / MAX_STREAMS frame to be sent in 0-RTT")
  142. case *wire.DataBlockedFrame, *wire.StreamDataBlockedFrame, *wire.StreamsBlockedFrame:
  143. continue
  144. default:
  145. f.controlFrames[j] = f.controlFrames[i]
  146. j++
  147. }
  148. }
  149. f.controlFrames = f.controlFrames[:j]
  150. f.controlFrameMutex.Unlock()
  151. return nil
  152. }