| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- package quic
- type sender interface {
- Send(p *packetBuffer)
- Run() error
- WouldBlock() bool
- Available() <-chan struct{}
- Close()
- }
- type sendQueue struct {
- queue chan *packetBuffer
- closeCalled chan struct{} // runStopped when Close() is called
- runStopped chan struct{} // runStopped when the run loop returns
- available chan struct{}
- conn sendConn
- }
- var _ sender = &sendQueue{}
- const sendQueueCapacity = 8
- func newSendQueue(conn sendConn) sender {
- return &sendQueue{
- conn: conn,
- runStopped: make(chan struct{}),
- closeCalled: make(chan struct{}),
- available: make(chan struct{}, 1),
- queue: make(chan *packetBuffer, sendQueueCapacity),
- }
- }
- // Send sends out a packet. It's guaranteed to not block.
- // Callers need to make sure that there's actually space in the send queue by calling WouldBlock.
- // Otherwise Send will panic.
- func (h *sendQueue) Send(p *packetBuffer) {
- select {
- case h.queue <- p:
- case <-h.runStopped:
- default:
- panic("sendQueue.Send would have blocked")
- }
- }
- func (h *sendQueue) WouldBlock() bool {
- return len(h.queue) == sendQueueCapacity
- }
- func (h *sendQueue) Available() <-chan struct{} {
- return h.available
- }
- func (h *sendQueue) Run() error {
- defer close(h.runStopped)
- var shouldClose bool
- for {
- if shouldClose && len(h.queue) == 0 {
- return nil
- }
- select {
- case <-h.closeCalled:
- h.closeCalled = nil // prevent this case from being selected again
- // make sure that all queued packets are actually sent out
- shouldClose = true
- case p := <-h.queue:
- if err := h.conn.Write(p.Data); err != nil {
- // This additional check enables:
- // 1. Checking for "datagram too large" message from the kernel, as such,
- // 2. Path MTU discovery,and
- // 3. Eventual detection of loss PingFrame.
- if !isMsgSizeErr(err) {
- return err
- }
- }
- p.Release()
- select {
- case h.available <- struct{}{}:
- default:
- }
- }
- }
- }
- func (h *sendQueue) Close() {
- close(h.closeCalled)
- // wait until the run loop returned
- <-h.runStopped
- }
|