framer.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package gquic
  2. import (
  3. "sync"
  4. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go/internal/protocol"
  5. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go/internal/wire"
  6. )
  7. type framer struct {
  8. streamGetter streamGetter
  9. cryptoStream cryptoStream
  10. version protocol.VersionNumber
  11. streamQueueMutex sync.Mutex
  12. activeStreams map[protocol.StreamID]struct{}
  13. streamQueue []protocol.StreamID
  14. controlFrameMutex sync.Mutex
  15. controlFrames []wire.Frame
  16. }
  17. func newFramer(
  18. cryptoStream cryptoStream,
  19. streamGetter streamGetter,
  20. v protocol.VersionNumber,
  21. ) *framer {
  22. return &framer{
  23. streamGetter: streamGetter,
  24. cryptoStream: cryptoStream,
  25. activeStreams: make(map[protocol.StreamID]struct{}),
  26. version: v,
  27. }
  28. }
  29. func (f *framer) QueueControlFrame(frame wire.Frame) {
  30. f.controlFrameMutex.Lock()
  31. f.controlFrames = append(f.controlFrames, frame)
  32. f.controlFrameMutex.Unlock()
  33. }
  34. func (f *framer) AppendControlFrames(frames []wire.Frame, maxLen protocol.ByteCount) ([]wire.Frame, protocol.ByteCount) {
  35. var length protocol.ByteCount
  36. f.controlFrameMutex.Lock()
  37. for len(f.controlFrames) > 0 {
  38. frame := f.controlFrames[len(f.controlFrames)-1]
  39. frameLen := frame.Length(f.version)
  40. if length+frameLen > maxLen {
  41. break
  42. }
  43. frames = append(frames, frame)
  44. length += frameLen
  45. f.controlFrames = f.controlFrames[:len(f.controlFrames)-1]
  46. }
  47. f.controlFrameMutex.Unlock()
  48. return frames, length
  49. }
  50. // AddActiveStream adds a stream that has data to write.
  51. // It should not be used for the crypto stream.
  52. func (f *framer) AddActiveStream(id protocol.StreamID) {
  53. f.streamQueueMutex.Lock()
  54. if _, ok := f.activeStreams[id]; !ok {
  55. f.streamQueue = append(f.streamQueue, id)
  56. f.activeStreams[id] = struct{}{}
  57. }
  58. f.streamQueueMutex.Unlock()
  59. }
  60. func (f *framer) AppendStreamFrames(frames []wire.Frame, maxLen protocol.ByteCount) []wire.Frame {
  61. var length protocol.ByteCount
  62. f.streamQueueMutex.Lock()
  63. // pop STREAM frames, until less than MinStreamFrameSize bytes are left in the packet
  64. numActiveStreams := len(f.streamQueue)
  65. for i := 0; i < numActiveStreams; i++ {
  66. if maxLen-length < protocol.MinStreamFrameSize {
  67. break
  68. }
  69. id := f.streamQueue[0]
  70. f.streamQueue = f.streamQueue[1:]
  71. // This should never return an error. Better check it anyway.
  72. // The stream will only be in the streamQueue, if it enqueued itself there.
  73. str, err := f.streamGetter.GetOrOpenSendStream(id)
  74. // The stream can be nil if it completed after it said it had data.
  75. if str == nil || err != nil {
  76. delete(f.activeStreams, id)
  77. continue
  78. }
  79. frame, hasMoreData := str.popStreamFrame(maxLen - length)
  80. if hasMoreData { // put the stream back in the queue (at the end)
  81. f.streamQueue = append(f.streamQueue, id)
  82. } else { // no more data to send. Stream is not active any more
  83. delete(f.activeStreams, id)
  84. }
  85. if frame == nil { // can happen if the receiveStream was canceled after it said it had data
  86. continue
  87. }
  88. frames = append(frames, frame)
  89. length += frame.Length(f.version)
  90. }
  91. f.streamQueueMutex.Unlock()
  92. return frames
  93. }