|
|
@@ -1354,35 +1354,40 @@ func (metrics *packetMetrics) checkpoint(
|
|
|
// Enqueued packets are packed into a contiguous buffer with channel
|
|
|
// framing, allowing the entire queue to be written to a channel
|
|
|
// in a single call.
|
|
|
-// Preallocating and reuse of the queue buffers avoids GC churn.
|
|
|
+// Reuse of the queue buffers avoids GC churn. To avoid memory use
|
|
|
+// spikes when many clients connect and may disconnect before relaying
|
|
|
+// packets, the packet queue buffers start small and grow when required,
|
|
|
+// up to the maximum size, and then remain static.
|
|
|
type PacketQueue struct {
|
|
|
+ maxSize int
|
|
|
emptyBuffers chan []byte
|
|
|
activeBuffer chan []byte
|
|
|
}
|
|
|
|
|
|
// NewPacketQueue creates a new PacketQueue.
|
|
|
-// The caller must ensire that queueSize exceeds the
|
|
|
+// The caller must ensure that maxSize exceeds the
|
|
|
// packet MTU, or packets will will never enqueue.
|
|
|
-func NewPacketQueue(queueSize int) *PacketQueue {
|
|
|
+func NewPacketQueue(maxSize int) *PacketQueue {
|
|
|
|
|
|
- // Two buffers of size queueSize are allocated, to
|
|
|
+ // Two buffers of size up to maxSize are allocated, to
|
|
|
// allow packets to continue to enqueue while one buffer
|
|
|
// is borrowed by the DequeueFramedPackets caller.
|
|
|
//
|
|
|
// TODO: is there a way to implement this without
|
|
|
- // allocating 2x queueSize bytes? A circular queue
|
|
|
+ // allocating up to 2x maxSize bytes? A circular queue
|
|
|
// won't work because we want DequeueFramedPackets
|
|
|
// to return a contiguous buffer. Perhaps a Bip
|
|
|
// Buffer would work here:
|
|
|
// https://www.codeproject.com/Articles/3479/The-Bip-Buffer-The-Circular-Buffer-with-a-Twist
|
|
|
|
|
|
queue := &PacketQueue{
|
|
|
+ maxSize: maxSize,
|
|
|
emptyBuffers: make(chan []byte, 2),
|
|
|
activeBuffer: make(chan []byte, 1),
|
|
|
}
|
|
|
|
|
|
- queue.emptyBuffers <- make([]byte, 0, queueSize)
|
|
|
- queue.emptyBuffers <- make([]byte, 0, queueSize)
|
|
|
+ queue.emptyBuffers <- make([]byte, 0)
|
|
|
+ queue.emptyBuffers <- make([]byte, 0)
|
|
|
|
|
|
return queue
|
|
|
}
|
|
|
@@ -1402,14 +1407,18 @@ func (queue *PacketQueue) Enqueue(packet []byte) {
|
|
|
|
|
|
packetSize := len(packet)
|
|
|
|
|
|
- if cap(buffer)-len(buffer) >= channelHeaderSize+packetSize {
|
|
|
-
|
|
|
+ if queue.maxSize-len(buffer) >= channelHeaderSize+packetSize {
|
|
|
// Assumes len(packet)/MTU <= 64K
|
|
|
+ var channelHeader [channelHeaderSize]byte
|
|
|
+ binary.BigEndian.PutUint16(channelHeader[:], uint16(packetSize))
|
|
|
+
|
|
|
+ // Once the buffer has reached maxSize capacity
|
|
|
+ // and been replaced (buffer = buffer[0:0]), these
|
|
|
+ // appends should no longer allocate new memory and
|
|
|
+ // should just copy to preallocated memory.
|
|
|
|
|
|
- offset := len(buffer)
|
|
|
- buffer = buffer[0 : len(buffer)+channelHeaderSize+packetSize]
|
|
|
- binary.BigEndian.PutUint16(buffer[offset:offset+channelHeaderSize], uint16(packetSize))
|
|
|
- copy(buffer[offset+channelHeaderSize:], packet)
|
|
|
+ buffer = append(buffer, channelHeader[:]...)
|
|
|
+ buffer = append(buffer, packet...)
|
|
|
}
|
|
|
// Else, queue is full, so drop packet.
|
|
|
|