framer.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package quic
  2. import (
  3. "errors"
  4. "sync"
  5. "github.com/Psiphon-Labs/quic-go/internal/ackhandler"
  6. "github.com/Psiphon-Labs/quic-go/internal/protocol"
  7. "github.com/Psiphon-Labs/quic-go/internal/utils/ringbuffer"
  8. "github.com/Psiphon-Labs/quic-go/internal/wire"
  9. "github.com/Psiphon-Labs/quic-go/quicvarint"
  10. )
  11. type framer interface {
  12. HasData() bool
  13. QueueControlFrame(wire.Frame)
  14. AppendControlFrames([]ackhandler.Frame, protocol.ByteCount, protocol.VersionNumber) ([]ackhandler.Frame, protocol.ByteCount)
  15. AddActiveStream(protocol.StreamID)
  16. AppendStreamFrames([]ackhandler.StreamFrame, protocol.ByteCount, protocol.VersionNumber) ([]ackhandler.StreamFrame, protocol.ByteCount)
  17. Handle0RTTRejection() error
  18. }
  19. const maxPathResponses = 256
  20. type framerI struct {
  21. mutex sync.Mutex
  22. streamGetter streamGetter
  23. activeStreams map[protocol.StreamID]struct{}
  24. streamQueue ringbuffer.RingBuffer[protocol.StreamID]
  25. controlFrameMutex sync.Mutex
  26. controlFrames []wire.Frame
  27. pathResponses []*wire.PathResponseFrame
  28. }
  29. var _ framer = &framerI{}
  30. func newFramer(streamGetter streamGetter) framer {
  31. return &framerI{
  32. streamGetter: streamGetter,
  33. activeStreams: make(map[protocol.StreamID]struct{}),
  34. }
  35. }
  36. func (f *framerI) HasData() bool {
  37. f.mutex.Lock()
  38. hasData := !f.streamQueue.Empty()
  39. f.mutex.Unlock()
  40. if hasData {
  41. return true
  42. }
  43. f.controlFrameMutex.Lock()
  44. defer f.controlFrameMutex.Unlock()
  45. return len(f.controlFrames) > 0 || len(f.pathResponses) > 0
  46. }
  47. func (f *framerI) QueueControlFrame(frame wire.Frame) {
  48. f.controlFrameMutex.Lock()
  49. defer f.controlFrameMutex.Unlock()
  50. if pr, ok := frame.(*wire.PathResponseFrame); ok {
  51. // Only queue up to maxPathResponses PATH_RESPONSE frames.
  52. // This limit should be high enough to never be hit in practice,
  53. // unless the peer is doing something malicious.
  54. if len(f.pathResponses) >= maxPathResponses {
  55. return
  56. }
  57. f.pathResponses = append(f.pathResponses, pr)
  58. return
  59. }
  60. f.controlFrames = append(f.controlFrames, frame)
  61. }
  62. func (f *framerI) AppendControlFrames(frames []ackhandler.Frame, maxLen protocol.ByteCount, v protocol.VersionNumber) ([]ackhandler.Frame, protocol.ByteCount) {
  63. f.controlFrameMutex.Lock()
  64. defer f.controlFrameMutex.Unlock()
  65. var length protocol.ByteCount
  66. // add a PATH_RESPONSE first, but only pack a single PATH_RESPONSE per packet
  67. if len(f.pathResponses) > 0 {
  68. frame := f.pathResponses[0]
  69. frameLen := frame.Length(v)
  70. if frameLen <= maxLen {
  71. frames = append(frames, ackhandler.Frame{Frame: frame})
  72. length += frameLen
  73. f.pathResponses = f.pathResponses[1:]
  74. }
  75. }
  76. for len(f.controlFrames) > 0 {
  77. frame := f.controlFrames[len(f.controlFrames)-1]
  78. frameLen := frame.Length(v)
  79. if length+frameLen > maxLen {
  80. break
  81. }
  82. frames = append(frames, ackhandler.Frame{Frame: frame})
  83. length += frameLen
  84. f.controlFrames = f.controlFrames[:len(f.controlFrames)-1]
  85. }
  86. return frames, length
  87. }
  88. func (f *framerI) AddActiveStream(id protocol.StreamID) {
  89. f.mutex.Lock()
  90. if _, ok := f.activeStreams[id]; !ok {
  91. f.streamQueue.PushBack(id)
  92. f.activeStreams[id] = struct{}{}
  93. }
  94. f.mutex.Unlock()
  95. }
  96. func (f *framerI) AppendStreamFrames(frames []ackhandler.StreamFrame, maxLen protocol.ByteCount, v protocol.VersionNumber) ([]ackhandler.StreamFrame, protocol.ByteCount) {
  97. startLen := len(frames)
  98. var length protocol.ByteCount
  99. f.mutex.Lock()
  100. // pop STREAM frames, until less than MinStreamFrameSize bytes are left in the packet
  101. numActiveStreams := f.streamQueue.Len()
  102. for i := 0; i < numActiveStreams; i++ {
  103. if protocol.MinStreamFrameSize+length > maxLen {
  104. break
  105. }
  106. id := f.streamQueue.PopFront()
  107. // This should never return an error. Better check it anyway.
  108. // The stream will only be in the streamQueue, if it enqueued itself there.
  109. str, err := f.streamGetter.GetOrOpenSendStream(id)
  110. // The stream can be nil if it completed after it said it had data.
  111. if str == nil || err != nil {
  112. delete(f.activeStreams, id)
  113. continue
  114. }
  115. remainingLen := maxLen - length
  116. // For the last STREAM frame, we'll remove the DataLen field later.
  117. // Therefore, we can pretend to have more bytes available when popping
  118. // the STREAM frame (which will always have the DataLen set).
  119. remainingLen += quicvarint.Len(uint64(remainingLen))
  120. frame, ok, hasMoreData := str.popStreamFrame(remainingLen, v)
  121. if hasMoreData { // put the stream back in the queue (at the end)
  122. f.streamQueue.PushBack(id)
  123. } else { // no more data to send. Stream is not active
  124. delete(f.activeStreams, id)
  125. }
  126. // The frame can be "nil"
  127. // * if the receiveStream was canceled after it said it had data
  128. // * the remaining size doesn't allow us to add another STREAM frame
  129. if !ok {
  130. continue
  131. }
  132. frames = append(frames, frame)
  133. length += frame.Frame.Length(v)
  134. }
  135. f.mutex.Unlock()
  136. if len(frames) > startLen {
  137. l := frames[len(frames)-1].Frame.Length(v)
  138. // account for the smaller size of the last STREAM frame
  139. frames[len(frames)-1].Frame.DataLenPresent = false
  140. length += frames[len(frames)-1].Frame.Length(v) - l
  141. }
  142. return frames, length
  143. }
  144. func (f *framerI) Handle0RTTRejection() error {
  145. f.mutex.Lock()
  146. defer f.mutex.Unlock()
  147. f.controlFrameMutex.Lock()
  148. f.streamQueue.Clear()
  149. for id := range f.activeStreams {
  150. delete(f.activeStreams, id)
  151. }
  152. var j int
  153. for i, frame := range f.controlFrames {
  154. switch frame.(type) {
  155. case *wire.MaxDataFrame, *wire.MaxStreamDataFrame, *wire.MaxStreamsFrame:
  156. return errors.New("didn't expect MAX_DATA / MAX_STREAM_DATA / MAX_STREAMS frame to be sent in 0-RTT")
  157. case *wire.DataBlockedFrame, *wire.StreamDataBlockedFrame, *wire.StreamsBlockedFrame:
  158. continue
  159. default:
  160. f.controlFrames[j] = f.controlFrames[i]
  161. j++
  162. }
  163. }
  164. f.controlFrames = f.controlFrames[:j]
  165. f.controlFrameMutex.Unlock()
  166. return nil
  167. }