| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 |
- package quic
- import (
- "slices"
- "sync"
- "time"
- "github.com/Psiphon-Labs/quic-go/internal/ackhandler"
- "github.com/Psiphon-Labs/quic-go/internal/flowcontrol"
- "github.com/Psiphon-Labs/quic-go/internal/protocol"
- "github.com/Psiphon-Labs/quic-go/internal/utils/ringbuffer"
- "github.com/Psiphon-Labs/quic-go/internal/wire"
- "github.com/Psiphon-Labs/quic-go/quicvarint"
- )
- const (
- maxPathResponses = 256
- maxControlFrames = 16 << 10
- )
- // This is the largest possible size of a stream-related control frame
- // (which is the RESET_STREAM frame).
- const maxStreamControlFrameSize = 25
- type streamControlFrameGetter interface {
- getControlFrame(time.Time) (_ ackhandler.Frame, ok, hasMore bool)
- }
- type framer struct {
- mutex sync.Mutex
- activeStreams map[protocol.StreamID]sendStreamI
- streamQueue ringbuffer.RingBuffer[protocol.StreamID]
- streamsWithControlFrames map[protocol.StreamID]streamControlFrameGetter
- controlFrameMutex sync.Mutex
- controlFrames []wire.Frame
- pathResponses []*wire.PathResponseFrame
- connFlowController flowcontrol.ConnectionFlowController
- queuedTooManyControlFrames bool
- }
- func newFramer(connFlowController flowcontrol.ConnectionFlowController) *framer {
- return &framer{
- activeStreams: make(map[protocol.StreamID]sendStreamI),
- streamsWithControlFrames: make(map[protocol.StreamID]streamControlFrameGetter),
- connFlowController: connFlowController,
- }
- }
- func (f *framer) HasData() bool {
- f.mutex.Lock()
- hasData := !f.streamQueue.Empty()
- f.mutex.Unlock()
- if hasData {
- return true
- }
- f.controlFrameMutex.Lock()
- defer f.controlFrameMutex.Unlock()
- return len(f.streamsWithControlFrames) > 0 || len(f.controlFrames) > 0 || len(f.pathResponses) > 0
- }
- func (f *framer) QueueControlFrame(frame wire.Frame) {
- f.controlFrameMutex.Lock()
- defer f.controlFrameMutex.Unlock()
- if pr, ok := frame.(*wire.PathResponseFrame); ok {
- // Only queue up to maxPathResponses PATH_RESPONSE frames.
- // This limit should be high enough to never be hit in practice,
- // unless the peer is doing something malicious.
- if len(f.pathResponses) >= maxPathResponses {
- return
- }
- f.pathResponses = append(f.pathResponses, pr)
- return
- }
- // This is a hack.
- if len(f.controlFrames) >= maxControlFrames {
- f.queuedTooManyControlFrames = true
- return
- }
- f.controlFrames = append(f.controlFrames, frame)
- }
- func (f *framer) Append(
- frames []ackhandler.Frame,
- streamFrames []ackhandler.StreamFrame,
- maxLen protocol.ByteCount,
- now time.Time,
- v protocol.Version,
- ) ([]ackhandler.Frame, []ackhandler.StreamFrame, protocol.ByteCount) {
- f.controlFrameMutex.Lock()
- frames, controlFrameLen := f.appendControlFrames(frames, maxLen, now, v)
- maxLen -= controlFrameLen
- var lastFrame ackhandler.StreamFrame
- var streamFrameLen protocol.ByteCount
- f.mutex.Lock()
- // pop STREAM frames, until less than 128 bytes are left in the packet
- numActiveStreams := f.streamQueue.Len()
- for i := 0; i < numActiveStreams; i++ {
- if protocol.MinStreamFrameSize > maxLen {
- break
- }
- sf, blocked := f.getNextStreamFrame(maxLen, v)
- if sf.Frame != nil {
- streamFrames = append(streamFrames, sf)
- maxLen -= sf.Frame.Length(v)
- lastFrame = sf
- streamFrameLen += sf.Frame.Length(v)
- }
- // If the stream just became blocked on stream flow control, attempt to pack the
- // STREAM_DATA_BLOCKED into the same packet.
- if blocked != nil {
- l := blocked.Length(v)
- // In case it doesn't fit, queue it for the next packet.
- if maxLen < l {
- f.controlFrames = append(f.controlFrames, blocked)
- break
- }
- frames = append(frames, ackhandler.Frame{Frame: blocked})
- maxLen -= l
- controlFrameLen += l
- }
- }
- // The only way to become blocked on connection-level flow control is by sending STREAM frames.
- if isBlocked, offset := f.connFlowController.IsNewlyBlocked(); isBlocked {
- blocked := &wire.DataBlockedFrame{MaximumData: offset}
- l := blocked.Length(v)
- // In case it doesn't fit, queue it for the next packet.
- if maxLen >= l {
- frames = append(frames, ackhandler.Frame{Frame: blocked})
- controlFrameLen += l
- } else {
- f.controlFrames = append(f.controlFrames, blocked)
- }
- }
- f.mutex.Unlock()
- f.controlFrameMutex.Unlock()
- if lastFrame.Frame != nil {
- // account for the smaller size of the last STREAM frame
- streamFrameLen -= lastFrame.Frame.Length(v)
- lastFrame.Frame.DataLenPresent = false
- streamFrameLen += lastFrame.Frame.Length(v)
- }
- return frames, streamFrames, controlFrameLen + streamFrameLen
- }
- func (f *framer) appendControlFrames(
- frames []ackhandler.Frame,
- maxLen protocol.ByteCount,
- now time.Time,
- v protocol.Version,
- ) ([]ackhandler.Frame, protocol.ByteCount) {
- var length protocol.ByteCount
- // add a PATH_RESPONSE first, but only pack a single PATH_RESPONSE per packet
- if len(f.pathResponses) > 0 {
- frame := f.pathResponses[0]
- frameLen := frame.Length(v)
- if frameLen <= maxLen {
- frames = append(frames, ackhandler.Frame{Frame: frame})
- length += frameLen
- f.pathResponses = f.pathResponses[1:]
- }
- }
- // add stream-related control frames
- for id, str := range f.streamsWithControlFrames {
- start:
- remainingLen := maxLen - length
- if remainingLen <= maxStreamControlFrameSize {
- break
- }
- fr, ok, hasMore := str.getControlFrame(now)
- if !hasMore {
- delete(f.streamsWithControlFrames, id)
- }
- if !ok {
- continue
- }
- frames = append(frames, fr)
- length += fr.Frame.Length(v)
- if hasMore {
- // It is rare that a stream has more than one control frame to queue.
- // We don't want to spawn another loop for just to cover that case.
- goto start
- }
- }
- for len(f.controlFrames) > 0 {
- frame := f.controlFrames[len(f.controlFrames)-1]
- frameLen := frame.Length(v)
- if length+frameLen > maxLen {
- break
- }
- frames = append(frames, ackhandler.Frame{Frame: frame})
- length += frameLen
- f.controlFrames = f.controlFrames[:len(f.controlFrames)-1]
- }
- return frames, length
- }
- // QueuedTooManyControlFrames says if the control frame queue exceeded its maximum queue length.
- // This is a hack.
- // It is easier to implement than propagating an error return value in QueueControlFrame.
- // The correct solution would be to queue frames with their respective structs.
- // See https://github.com/quic-go/quic-go/issues/4271 for the queueing of stream-related control frames.
- func (f *framer) QueuedTooManyControlFrames() bool {
- return f.queuedTooManyControlFrames
- }
- func (f *framer) AddActiveStream(id protocol.StreamID, str sendStreamI) {
- f.mutex.Lock()
- if _, ok := f.activeStreams[id]; !ok {
- f.streamQueue.PushBack(id)
- f.activeStreams[id] = str
- }
- f.mutex.Unlock()
- }
- func (f *framer) AddStreamWithControlFrames(id protocol.StreamID, str streamControlFrameGetter) {
- f.controlFrameMutex.Lock()
- if _, ok := f.streamsWithControlFrames[id]; !ok {
- f.streamsWithControlFrames[id] = str
- }
- f.controlFrameMutex.Unlock()
- }
- // RemoveActiveStream is called when a stream completes.
- func (f *framer) RemoveActiveStream(id protocol.StreamID) {
- f.mutex.Lock()
- delete(f.activeStreams, id)
- // We don't delete the stream from the streamQueue,
- // since we'd have to iterate over the ringbuffer.
- // Instead, we check if the stream is still in activeStreams when appending STREAM frames.
- f.mutex.Unlock()
- }
- func (f *framer) getNextStreamFrame(maxLen protocol.ByteCount, v protocol.Version) (ackhandler.StreamFrame, *wire.StreamDataBlockedFrame) {
- id := f.streamQueue.PopFront()
- // This should never return an error. Better check it anyway.
- // The stream will only be in the streamQueue, if it enqueued itself there.
- str, ok := f.activeStreams[id]
- // The stream might have been removed after being enqueued.
- if !ok {
- return ackhandler.StreamFrame{}, nil
- }
- // For the last STREAM frame, we'll remove the DataLen field later.
- // Therefore, we can pretend to have more bytes available when popping
- // the STREAM frame (which will always have the DataLen set).
- maxLen += protocol.ByteCount(quicvarint.Len(uint64(maxLen)))
- frame, blocked, hasMoreData := str.popStreamFrame(maxLen, v)
- if hasMoreData { // put the stream back in the queue (at the end)
- f.streamQueue.PushBack(id)
- } else { // no more data to send. Stream is not active
- delete(f.activeStreams, id)
- }
- // Note that the frame.Frame can be nil:
- // * if the stream was canceled after it said it had data
- // * the remaining size doesn't allow us to add another STREAM frame
- return frame, blocked
- }
- func (f *framer) Handle0RTTRejection() {
- f.mutex.Lock()
- defer f.mutex.Unlock()
- f.controlFrameMutex.Lock()
- defer f.controlFrameMutex.Unlock()
- f.streamQueue.Clear()
- for id := range f.activeStreams {
- delete(f.activeStreams, id)
- }
- var j int
- for i, frame := range f.controlFrames {
- switch frame.(type) {
- case *wire.MaxDataFrame, *wire.MaxStreamDataFrame, *wire.MaxStreamsFrame,
- *wire.DataBlockedFrame, *wire.StreamDataBlockedFrame, *wire.StreamsBlockedFrame:
- continue
- default:
- f.controlFrames[j] = f.controlFrames[i]
- j++
- }
- }
- f.controlFrames = slices.Delete(f.controlFrames, j, len(f.controlFrames))
- }
|