framer.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package quic
  2. import (
  3. "sync"
  4. "github.com/Psiphon-Labs/quic-go/internal/ackhandler"
  5. "github.com/Psiphon-Labs/quic-go/internal/protocol"
  6. "github.com/Psiphon-Labs/quic-go/internal/utils"
  7. "github.com/Psiphon-Labs/quic-go/internal/wire"
  8. )
  9. type framer interface {
  10. QueueControlFrame(wire.Frame)
  11. AppendControlFrames([]ackhandler.Frame, protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount)
  12. AddActiveStream(protocol.StreamID)
  13. AppendStreamFrames([]ackhandler.Frame, protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount)
  14. }
  15. type framerI struct {
  16. mutex sync.Mutex
  17. streamGetter streamGetter
  18. version protocol.VersionNumber
  19. activeStreams map[protocol.StreamID]struct{}
  20. streamQueue []protocol.StreamID
  21. controlFrameMutex sync.Mutex
  22. controlFrames []wire.Frame
  23. }
  24. var _ framer = &framerI{}
  25. func newFramer(
  26. streamGetter streamGetter,
  27. v protocol.VersionNumber,
  28. ) framer {
  29. return &framerI{
  30. streamGetter: streamGetter,
  31. activeStreams: make(map[protocol.StreamID]struct{}),
  32. version: v,
  33. }
  34. }
  35. func (f *framerI) QueueControlFrame(frame wire.Frame) {
  36. f.controlFrameMutex.Lock()
  37. f.controlFrames = append(f.controlFrames, frame)
  38. f.controlFrameMutex.Unlock()
  39. }
  40. func (f *framerI) AppendControlFrames(frames []ackhandler.Frame, maxLen protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) {
  41. var length protocol.ByteCount
  42. f.controlFrameMutex.Lock()
  43. for len(f.controlFrames) > 0 {
  44. frame := f.controlFrames[len(f.controlFrames)-1]
  45. frameLen := frame.Length(f.version)
  46. if length+frameLen > maxLen {
  47. break
  48. }
  49. frames = append(frames, ackhandler.Frame{Frame: frame})
  50. length += frameLen
  51. f.controlFrames = f.controlFrames[:len(f.controlFrames)-1]
  52. }
  53. f.controlFrameMutex.Unlock()
  54. return frames, length
  55. }
  56. func (f *framerI) AddActiveStream(id protocol.StreamID) {
  57. f.mutex.Lock()
  58. if _, ok := f.activeStreams[id]; !ok {
  59. f.streamQueue = append(f.streamQueue, id)
  60. f.activeStreams[id] = struct{}{}
  61. }
  62. f.mutex.Unlock()
  63. }
  64. func (f *framerI) AppendStreamFrames(frames []ackhandler.Frame, maxLen protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) {
  65. var length protocol.ByteCount
  66. var lastFrame *ackhandler.Frame
  67. f.mutex.Lock()
  68. // pop STREAM frames, until less than MinStreamFrameSize bytes are left in the packet
  69. numActiveStreams := len(f.streamQueue)
  70. for i := 0; i < numActiveStreams; i++ {
  71. if protocol.MinStreamFrameSize+length > maxLen {
  72. break
  73. }
  74. id := f.streamQueue[0]
  75. f.streamQueue = f.streamQueue[1:]
  76. // This should never return an error. Better check it anyway.
  77. // The stream will only be in the streamQueue, if it enqueued itself there.
  78. str, err := f.streamGetter.GetOrOpenSendStream(id)
  79. // The stream can be nil if it completed after it said it had data.
  80. if str == nil || err != nil {
  81. delete(f.activeStreams, id)
  82. continue
  83. }
  84. remainingLen := maxLen - length
  85. // For the last STREAM frame, we'll remove the DataLen field later.
  86. // Therefore, we can pretend to have more bytes available when popping
  87. // the STREAM frame (which will always have the DataLen set).
  88. remainingLen += utils.VarIntLen(uint64(remainingLen))
  89. frame, hasMoreData := str.popStreamFrame(remainingLen)
  90. if hasMoreData { // put the stream back in the queue (at the end)
  91. f.streamQueue = append(f.streamQueue, id)
  92. } else { // no more data to send. Stream is not active any more
  93. delete(f.activeStreams, id)
  94. }
  95. // The frame can be nil
  96. // * if the receiveStream was canceled after it said it had data
  97. // * the remaining size doesn't allow us to add another STREAM frame
  98. if frame == nil {
  99. continue
  100. }
  101. frames = append(frames, *frame)
  102. length += frame.Length(f.version)
  103. lastFrame = frame
  104. }
  105. f.mutex.Unlock()
  106. if lastFrame != nil {
  107. lastFrameLen := lastFrame.Length(f.version)
  108. // account for the smaller size of the last STREAM frame
  109. lastFrame.Frame.(*wire.StreamFrame).DataLenPresent = false
  110. length += lastFrame.Length(f.version) - lastFrameLen
  111. }
  112. return frames, length
  113. }