send_queue.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package quic
  2. import "github.com/Psiphon-Labs/quic-go/internal/protocol"
  3. type sender interface {
  4. Send(p *packetBuffer, gsoSize uint16, ecn protocol.ECN)
  5. Run() error
  6. WouldBlock() bool
  7. Available() <-chan struct{}
  8. Close()
  9. }
  10. type queueEntry struct {
  11. buf *packetBuffer
  12. gsoSize uint16
  13. ecn protocol.ECN
  14. }
  15. type sendQueue struct {
  16. queue chan queueEntry
  17. closeCalled chan struct{} // runStopped when Close() is called
  18. runStopped chan struct{} // runStopped when the run loop returns
  19. available chan struct{}
  20. conn sendConn
  21. }
  22. var _ sender = &sendQueue{}
  23. const sendQueueCapacity = 8
  24. func newSendQueue(conn sendConn) sender {
  25. return &sendQueue{
  26. conn: conn,
  27. runStopped: make(chan struct{}),
  28. closeCalled: make(chan struct{}),
  29. available: make(chan struct{}, 1),
  30. queue: make(chan queueEntry, sendQueueCapacity),
  31. }
  32. }
  33. // Send sends out a packet. It's guaranteed to not block.
  34. // Callers need to make sure that there's actually space in the send queue by calling WouldBlock.
  35. // Otherwise Send will panic.
  36. func (h *sendQueue) Send(p *packetBuffer, gsoSize uint16, ecn protocol.ECN) {
  37. select {
  38. case h.queue <- queueEntry{buf: p, gsoSize: gsoSize, ecn: ecn}:
  39. // clear available channel if we've reached capacity
  40. if len(h.queue) == sendQueueCapacity {
  41. select {
  42. case <-h.available:
  43. default:
  44. }
  45. }
  46. case <-h.runStopped:
  47. default:
  48. panic("sendQueue.Send would have blocked")
  49. }
  50. }
  51. func (h *sendQueue) WouldBlock() bool {
  52. return len(h.queue) == sendQueueCapacity
  53. }
  54. func (h *sendQueue) Available() <-chan struct{} {
  55. return h.available
  56. }
  57. func (h *sendQueue) Run() error {
  58. defer close(h.runStopped)
  59. var shouldClose bool
  60. for {
  61. if shouldClose && len(h.queue) == 0 {
  62. return nil
  63. }
  64. select {
  65. case <-h.closeCalled:
  66. h.closeCalled = nil // prevent this case from being selected again
  67. // make sure that all queued packets are actually sent out
  68. shouldClose = true
  69. case e := <-h.queue:
  70. if err := h.conn.Write(e.buf.Data, e.gsoSize, e.ecn); err != nil {
  71. // This additional check enables:
  72. // 1. Checking for "datagram too large" message from the kernel, as such,
  73. // 2. Path MTU discovery,and
  74. // 3. Eventual detection of loss PingFrame.
  75. if !isSendMsgSizeErr(err) {
  76. return err
  77. }
  78. }
  79. e.buf.Release()
  80. select {
  81. case h.available <- struct{}{}:
  82. default:
  83. }
  84. }
  85. }
  86. }
  87. func (h *sendQueue) Close() {
  88. close(h.closeCalled)
  89. // wait until the run loop returned
  90. <-h.runStopped
  91. }