framer.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package quic
  2. import (
  3. "errors"
  4. "sync"
  5. "github.com/Psiphon-Labs/quic-go/internal/ackhandler"
  6. "github.com/Psiphon-Labs/quic-go/internal/protocol"
  7. "github.com/Psiphon-Labs/quic-go/internal/wire"
  8. "github.com/Psiphon-Labs/quic-go/quicvarint"
  9. )
  10. type framer interface {
  11. HasData() bool
  12. QueueControlFrame(wire.Frame)
  13. AppendControlFrames([]*ackhandler.Frame, protocol.ByteCount, protocol.VersionNumber) ([]*ackhandler.Frame, protocol.ByteCount)
  14. AddActiveStream(protocol.StreamID)
  15. AppendStreamFrames([]*ackhandler.Frame, protocol.ByteCount, protocol.VersionNumber) ([]*ackhandler.Frame, protocol.ByteCount)
  16. Handle0RTTRejection() error
  17. }
  18. type framerI struct {
  19. mutex sync.Mutex
  20. streamGetter streamGetter
  21. activeStreams map[protocol.StreamID]struct{}
  22. streamQueue []protocol.StreamID
  23. controlFrameMutex sync.Mutex
  24. controlFrames []wire.Frame
  25. }
  26. var _ framer = &framerI{}
  27. func newFramer(streamGetter streamGetter) framer {
  28. return &framerI{
  29. streamGetter: streamGetter,
  30. activeStreams: make(map[protocol.StreamID]struct{}),
  31. }
  32. }
  33. func (f *framerI) HasData() bool {
  34. f.mutex.Lock()
  35. hasData := len(f.streamQueue) > 0
  36. f.mutex.Unlock()
  37. if hasData {
  38. return true
  39. }
  40. f.controlFrameMutex.Lock()
  41. hasData = len(f.controlFrames) > 0
  42. f.controlFrameMutex.Unlock()
  43. return hasData
  44. }
  45. func (f *framerI) QueueControlFrame(frame wire.Frame) {
  46. f.controlFrameMutex.Lock()
  47. f.controlFrames = append(f.controlFrames, frame)
  48. f.controlFrameMutex.Unlock()
  49. }
  50. func (f *framerI) AppendControlFrames(frames []*ackhandler.Frame, maxLen protocol.ByteCount, v protocol.VersionNumber) ([]*ackhandler.Frame, protocol.ByteCount) {
  51. var length protocol.ByteCount
  52. f.controlFrameMutex.Lock()
  53. for len(f.controlFrames) > 0 {
  54. frame := f.controlFrames[len(f.controlFrames)-1]
  55. frameLen := frame.Length(v)
  56. if length+frameLen > maxLen {
  57. break
  58. }
  59. af := ackhandler.GetFrame()
  60. af.Frame = frame
  61. frames = append(frames, af)
  62. length += frameLen
  63. f.controlFrames = f.controlFrames[:len(f.controlFrames)-1]
  64. }
  65. f.controlFrameMutex.Unlock()
  66. return frames, length
  67. }
  68. func (f *framerI) AddActiveStream(id protocol.StreamID) {
  69. f.mutex.Lock()
  70. if _, ok := f.activeStreams[id]; !ok {
  71. f.streamQueue = append(f.streamQueue, id)
  72. f.activeStreams[id] = struct{}{}
  73. }
  74. f.mutex.Unlock()
  75. }
  76. func (f *framerI) AppendStreamFrames(frames []*ackhandler.Frame, maxLen protocol.ByteCount, v protocol.VersionNumber) ([]*ackhandler.Frame, protocol.ByteCount) {
  77. var length protocol.ByteCount
  78. var lastFrame *ackhandler.Frame
  79. f.mutex.Lock()
  80. // pop STREAM frames, until less than MinStreamFrameSize bytes are left in the packet
  81. numActiveStreams := len(f.streamQueue)
  82. for i := 0; i < numActiveStreams; i++ {
  83. if protocol.MinStreamFrameSize+length > maxLen {
  84. break
  85. }
  86. id := f.streamQueue[0]
  87. f.streamQueue = f.streamQueue[1:]
  88. // This should never return an error. Better check it anyway.
  89. // The stream will only be in the streamQueue, if it enqueued itself there.
  90. str, err := f.streamGetter.GetOrOpenSendStream(id)
  91. // The stream can be nil if it completed after it said it had data.
  92. if str == nil || err != nil {
  93. delete(f.activeStreams, id)
  94. continue
  95. }
  96. remainingLen := maxLen - length
  97. // For the last STREAM frame, we'll remove the DataLen field later.
  98. // Therefore, we can pretend to have more bytes available when popping
  99. // the STREAM frame (which will always have the DataLen set).
  100. remainingLen += quicvarint.Len(uint64(remainingLen))
  101. frame, hasMoreData := str.popStreamFrame(remainingLen, v)
  102. if hasMoreData { // put the stream back in the queue (at the end)
  103. f.streamQueue = append(f.streamQueue, id)
  104. } else { // no more data to send. Stream is not active any more
  105. delete(f.activeStreams, id)
  106. }
  107. // The frame can be nil
  108. // * if the receiveStream was canceled after it said it had data
  109. // * the remaining size doesn't allow us to add another STREAM frame
  110. if frame == nil {
  111. continue
  112. }
  113. frames = append(frames, frame)
  114. length += frame.Length(v)
  115. lastFrame = frame
  116. }
  117. f.mutex.Unlock()
  118. if lastFrame != nil {
  119. lastFrameLen := lastFrame.Length(v)
  120. // account for the smaller size of the last STREAM frame
  121. lastFrame.Frame.(*wire.StreamFrame).DataLenPresent = false
  122. length += lastFrame.Length(v) - lastFrameLen
  123. }
  124. return frames, length
  125. }
  126. func (f *framerI) Handle0RTTRejection() error {
  127. f.mutex.Lock()
  128. defer f.mutex.Unlock()
  129. f.controlFrameMutex.Lock()
  130. f.streamQueue = f.streamQueue[:0]
  131. for id := range f.activeStreams {
  132. delete(f.activeStreams, id)
  133. }
  134. var j int
  135. for i, frame := range f.controlFrames {
  136. switch frame.(type) {
  137. case *wire.MaxDataFrame, *wire.MaxStreamDataFrame, *wire.MaxStreamsFrame:
  138. return errors.New("didn't expect MAX_DATA / MAX_STREAM_DATA / MAX_STREAMS frame to be sent in 0-RTT")
  139. case *wire.DataBlockedFrame, *wire.StreamDataBlockedFrame, *wire.StreamsBlockedFrame:
  140. continue
  141. default:
  142. f.controlFrames[j] = f.controlFrames[i]
  143. j++
  144. }
  145. }
  146. f.controlFrames = f.controlFrames[:j]
  147. f.controlFrameMutex.Unlock()
  148. return nil
  149. }