framer.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. package quic
  2. import (
  3. "slices"
  4. "sync"
  5. "time"
  6. "github.com/Psiphon-Labs/quic-go/internal/ackhandler"
  7. "github.com/Psiphon-Labs/quic-go/internal/flowcontrol"
  8. "github.com/Psiphon-Labs/quic-go/internal/protocol"
  9. "github.com/Psiphon-Labs/quic-go/internal/utils/ringbuffer"
  10. "github.com/Psiphon-Labs/quic-go/internal/wire"
  11. "github.com/Psiphon-Labs/quic-go/quicvarint"
  12. )
  13. const (
  14. maxPathResponses = 256
  15. maxControlFrames = 16 << 10
  16. )
  17. // This is the largest possible size of a stream-related control frame
  18. // (which is the RESET_STREAM frame).
  19. const maxStreamControlFrameSize = 25
  20. type streamControlFrameGetter interface {
  21. getControlFrame(time.Time) (_ ackhandler.Frame, ok, hasMore bool)
  22. }
  23. type framer struct {
  24. mutex sync.Mutex
  25. activeStreams map[protocol.StreamID]sendStreamI
  26. streamQueue ringbuffer.RingBuffer[protocol.StreamID]
  27. streamsWithControlFrames map[protocol.StreamID]streamControlFrameGetter
  28. controlFrameMutex sync.Mutex
  29. controlFrames []wire.Frame
  30. pathResponses []*wire.PathResponseFrame
  31. connFlowController flowcontrol.ConnectionFlowController
  32. queuedTooManyControlFrames bool
  33. }
  34. func newFramer(connFlowController flowcontrol.ConnectionFlowController) *framer {
  35. return &framer{
  36. activeStreams: make(map[protocol.StreamID]sendStreamI),
  37. streamsWithControlFrames: make(map[protocol.StreamID]streamControlFrameGetter),
  38. connFlowController: connFlowController,
  39. }
  40. }
  41. func (f *framer) HasData() bool {
  42. f.mutex.Lock()
  43. hasData := !f.streamQueue.Empty()
  44. f.mutex.Unlock()
  45. if hasData {
  46. return true
  47. }
  48. f.controlFrameMutex.Lock()
  49. defer f.controlFrameMutex.Unlock()
  50. return len(f.streamsWithControlFrames) > 0 || len(f.controlFrames) > 0 || len(f.pathResponses) > 0
  51. }
  52. func (f *framer) QueueControlFrame(frame wire.Frame) {
  53. f.controlFrameMutex.Lock()
  54. defer f.controlFrameMutex.Unlock()
  55. if pr, ok := frame.(*wire.PathResponseFrame); ok {
  56. // Only queue up to maxPathResponses PATH_RESPONSE frames.
  57. // This limit should be high enough to never be hit in practice,
  58. // unless the peer is doing something malicious.
  59. if len(f.pathResponses) >= maxPathResponses {
  60. return
  61. }
  62. f.pathResponses = append(f.pathResponses, pr)
  63. return
  64. }
  65. // This is a hack.
  66. if len(f.controlFrames) >= maxControlFrames {
  67. f.queuedTooManyControlFrames = true
  68. return
  69. }
  70. f.controlFrames = append(f.controlFrames, frame)
  71. }
  72. func (f *framer) Append(
  73. frames []ackhandler.Frame,
  74. streamFrames []ackhandler.StreamFrame,
  75. maxLen protocol.ByteCount,
  76. now time.Time,
  77. v protocol.Version,
  78. ) ([]ackhandler.Frame, []ackhandler.StreamFrame, protocol.ByteCount) {
  79. f.controlFrameMutex.Lock()
  80. frames, controlFrameLen := f.appendControlFrames(frames, maxLen, now, v)
  81. maxLen -= controlFrameLen
  82. var lastFrame ackhandler.StreamFrame
  83. var streamFrameLen protocol.ByteCount
  84. f.mutex.Lock()
  85. // pop STREAM frames, until less than 128 bytes are left in the packet
  86. numActiveStreams := f.streamQueue.Len()
  87. for i := 0; i < numActiveStreams; i++ {
  88. if protocol.MinStreamFrameSize > maxLen {
  89. break
  90. }
  91. sf, blocked := f.getNextStreamFrame(maxLen, v)
  92. if sf.Frame != nil {
  93. streamFrames = append(streamFrames, sf)
  94. maxLen -= sf.Frame.Length(v)
  95. lastFrame = sf
  96. streamFrameLen += sf.Frame.Length(v)
  97. }
  98. // If the stream just became blocked on stream flow control, attempt to pack the
  99. // STREAM_DATA_BLOCKED into the same packet.
  100. if blocked != nil {
  101. l := blocked.Length(v)
  102. // In case it doesn't fit, queue it for the next packet.
  103. if maxLen < l {
  104. f.controlFrames = append(f.controlFrames, blocked)
  105. break
  106. }
  107. frames = append(frames, ackhandler.Frame{Frame: blocked})
  108. maxLen -= l
  109. controlFrameLen += l
  110. }
  111. }
  112. // The only way to become blocked on connection-level flow control is by sending STREAM frames.
  113. if isBlocked, offset := f.connFlowController.IsNewlyBlocked(); isBlocked {
  114. blocked := &wire.DataBlockedFrame{MaximumData: offset}
  115. l := blocked.Length(v)
  116. // In case it doesn't fit, queue it for the next packet.
  117. if maxLen >= l {
  118. frames = append(frames, ackhandler.Frame{Frame: blocked})
  119. controlFrameLen += l
  120. } else {
  121. f.controlFrames = append(f.controlFrames, blocked)
  122. }
  123. }
  124. f.mutex.Unlock()
  125. f.controlFrameMutex.Unlock()
  126. if lastFrame.Frame != nil {
  127. // account for the smaller size of the last STREAM frame
  128. streamFrameLen -= lastFrame.Frame.Length(v)
  129. lastFrame.Frame.DataLenPresent = false
  130. streamFrameLen += lastFrame.Frame.Length(v)
  131. }
  132. return frames, streamFrames, controlFrameLen + streamFrameLen
  133. }
  134. func (f *framer) appendControlFrames(
  135. frames []ackhandler.Frame,
  136. maxLen protocol.ByteCount,
  137. now time.Time,
  138. v protocol.Version,
  139. ) ([]ackhandler.Frame, protocol.ByteCount) {
  140. var length protocol.ByteCount
  141. // add a PATH_RESPONSE first, but only pack a single PATH_RESPONSE per packet
  142. if len(f.pathResponses) > 0 {
  143. frame := f.pathResponses[0]
  144. frameLen := frame.Length(v)
  145. if frameLen <= maxLen {
  146. frames = append(frames, ackhandler.Frame{Frame: frame})
  147. length += frameLen
  148. f.pathResponses = f.pathResponses[1:]
  149. }
  150. }
  151. // add stream-related control frames
  152. for id, str := range f.streamsWithControlFrames {
  153. start:
  154. remainingLen := maxLen - length
  155. if remainingLen <= maxStreamControlFrameSize {
  156. break
  157. }
  158. fr, ok, hasMore := str.getControlFrame(now)
  159. if !hasMore {
  160. delete(f.streamsWithControlFrames, id)
  161. }
  162. if !ok {
  163. continue
  164. }
  165. frames = append(frames, fr)
  166. length += fr.Frame.Length(v)
  167. if hasMore {
  168. // It is rare that a stream has more than one control frame to queue.
  169. // We don't want to spawn another loop for just to cover that case.
  170. goto start
  171. }
  172. }
  173. for len(f.controlFrames) > 0 {
  174. frame := f.controlFrames[len(f.controlFrames)-1]
  175. frameLen := frame.Length(v)
  176. if length+frameLen > maxLen {
  177. break
  178. }
  179. frames = append(frames, ackhandler.Frame{Frame: frame})
  180. length += frameLen
  181. f.controlFrames = f.controlFrames[:len(f.controlFrames)-1]
  182. }
  183. return frames, length
  184. }
  185. // QueuedTooManyControlFrames says if the control frame queue exceeded its maximum queue length.
  186. // This is a hack.
  187. // It is easier to implement than propagating an error return value in QueueControlFrame.
  188. // The correct solution would be to queue frames with their respective structs.
  189. // See https://github.com/quic-go/quic-go/issues/4271 for the queueing of stream-related control frames.
  190. func (f *framer) QueuedTooManyControlFrames() bool {
  191. return f.queuedTooManyControlFrames
  192. }
  193. func (f *framer) AddActiveStream(id protocol.StreamID, str sendStreamI) {
  194. f.mutex.Lock()
  195. if _, ok := f.activeStreams[id]; !ok {
  196. f.streamQueue.PushBack(id)
  197. f.activeStreams[id] = str
  198. }
  199. f.mutex.Unlock()
  200. }
  201. func (f *framer) AddStreamWithControlFrames(id protocol.StreamID, str streamControlFrameGetter) {
  202. f.controlFrameMutex.Lock()
  203. if _, ok := f.streamsWithControlFrames[id]; !ok {
  204. f.streamsWithControlFrames[id] = str
  205. }
  206. f.controlFrameMutex.Unlock()
  207. }
  208. // RemoveActiveStream is called when a stream completes.
  209. func (f *framer) RemoveActiveStream(id protocol.StreamID) {
  210. f.mutex.Lock()
  211. delete(f.activeStreams, id)
  212. // We don't delete the stream from the streamQueue,
  213. // since we'd have to iterate over the ringbuffer.
  214. // Instead, we check if the stream is still in activeStreams when appending STREAM frames.
  215. f.mutex.Unlock()
  216. }
  217. func (f *framer) getNextStreamFrame(maxLen protocol.ByteCount, v protocol.Version) (ackhandler.StreamFrame, *wire.StreamDataBlockedFrame) {
  218. id := f.streamQueue.PopFront()
  219. // This should never return an error. Better check it anyway.
  220. // The stream will only be in the streamQueue, if it enqueued itself there.
  221. str, ok := f.activeStreams[id]
  222. // The stream might have been removed after being enqueued.
  223. if !ok {
  224. return ackhandler.StreamFrame{}, nil
  225. }
  226. // For the last STREAM frame, we'll remove the DataLen field later.
  227. // Therefore, we can pretend to have more bytes available when popping
  228. // the STREAM frame (which will always have the DataLen set).
  229. maxLen += protocol.ByteCount(quicvarint.Len(uint64(maxLen)))
  230. frame, blocked, hasMoreData := str.popStreamFrame(maxLen, v)
  231. if hasMoreData { // put the stream back in the queue (at the end)
  232. f.streamQueue.PushBack(id)
  233. } else { // no more data to send. Stream is not active
  234. delete(f.activeStreams, id)
  235. }
  236. // Note that the frame.Frame can be nil:
  237. // * if the stream was canceled after it said it had data
  238. // * the remaining size doesn't allow us to add another STREAM frame
  239. return frame, blocked
  240. }
  241. func (f *framer) Handle0RTTRejection() {
  242. f.mutex.Lock()
  243. defer f.mutex.Unlock()
  244. f.controlFrameMutex.Lock()
  245. defer f.controlFrameMutex.Unlock()
  246. f.streamQueue.Clear()
  247. for id := range f.activeStreams {
  248. delete(f.activeStreams, id)
  249. }
  250. var j int
  251. for i, frame := range f.controlFrames {
  252. switch frame.(type) {
  253. case *wire.MaxDataFrame, *wire.MaxStreamDataFrame, *wire.MaxStreamsFrame,
  254. *wire.DataBlockedFrame, *wire.StreamDataBlockedFrame, *wire.StreamsBlockedFrame:
  255. continue
  256. default:
  257. f.controlFrames[j] = f.controlFrames[i]
  258. j++
  259. }
  260. }
  261. f.controlFrames = slices.Delete(f.controlFrames, j, len(f.controlFrames))
  262. }