|
|
@@ -143,6 +143,7 @@ import (
|
|
|
const (
|
|
|
DEFAULT_MTU = 1500
|
|
|
DEFAULT_DOWNSTREAM_PACKET_QUEUE_SIZE = 64
|
|
|
+ DEFAULT_BRIDGE_PACKET_QUEUE_SIZE = 64
|
|
|
DEFAULT_IDLE_SESSION_EXPIRY_SECONDS = 300
|
|
|
ORPHAN_METRICS_CHECKPOINTER_PERIOD = 30 * time.Minute
|
|
|
)
|
|
|
@@ -400,22 +401,10 @@ func (server *Server) ClientConnected(
|
|
|
DNSResolverIPv6Addresses: append([]net.IP(nil), server.config.GetDNSResolverIPv6Addresses()...),
|
|
|
checkAllowedTCPPortFunc: checkAllowedTCPPortFunc,
|
|
|
checkAllowedUDPPortFunc: checkAllowedUDPPortFunc,
|
|
|
- downstreamPackets: make(chan []byte, downStreamPacketQueueSize),
|
|
|
- freePackets: make(chan []byte, downStreamPacketQueueSize),
|
|
|
+ downstreamPackets: NewPacketQueue(server.runContext, MTU, downStreamPacketQueueSize),
|
|
|
workers: new(sync.WaitGroup),
|
|
|
}
|
|
|
|
|
|
- // To avoid GC churn, downstream packet buffers are allocated
|
|
|
- // once and reused. Available buffers are sent to the freePackets
|
|
|
- // channel. When a packet is enqueued, a buffer is obtained from
|
|
|
- // freePackets and sent to downstreamPackets.
|
|
|
- // TODO: allocate on first use? if the full queue size is not
|
|
|
- // often used, preallocating all buffers is unnecessary.
|
|
|
-
|
|
|
- for i := 0; i < downStreamPacketQueueSize; i++ {
|
|
|
- clientSession.freePackets <- make([]byte, MTU)
|
|
|
- }
|
|
|
-
|
|
|
// allocateIndex initializes session.index, session.assignedIPv4Address,
|
|
|
// and session.assignedIPv6Address; and updates server.indexToSession and
|
|
|
// server.sessionIDToIndex.
|
|
|
@@ -656,26 +645,11 @@ func (server *Server) runDeviceDownstream() {
|
|
|
// We allow packets to enqueue in an idle session in case a client
|
|
|
// is in the process of reconnecting.
|
|
|
|
|
|
- var packet []byte
|
|
|
- select {
|
|
|
- case packet = <-session.freePackets:
|
|
|
- case <-server.runContext.Done():
|
|
|
+ ok = session.downstreamPackets.Enqueue(readPacket)
|
|
|
+ if !ok {
|
|
|
+ // Enqueue aborted due to server.runContext.Done()
|
|
|
return
|
|
|
- default:
|
|
|
- // Queue is full, so drop packet.
|
|
|
- continue
|
|
|
}
|
|
|
-
|
|
|
- // Reuse the preallocated packet buffer. This slice indexing
|
|
|
- // assumes the size of the packet <= MTU and the preallocated
|
|
|
- // capacity == MTU.
|
|
|
- packet = packet[0:len(readPacket)]
|
|
|
- copy(packet, readPacket)
|
|
|
-
|
|
|
- // This won't block: both freePackets/downstreamPackets have
|
|
|
- // queue-size capacity, and only queue-size packet buffers
|
|
|
- // exist.
|
|
|
- session.downstreamPackets <- packet
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -745,10 +719,10 @@ func (server *Server) runClientDownstream(session *session) {
|
|
|
// Dequeue, process, and relay packets to be sent to the client channel.
|
|
|
|
|
|
for {
|
|
|
- var packet []byte
|
|
|
- select {
|
|
|
- case packet = <-session.downstreamPackets:
|
|
|
- case <-session.runContext.Done():
|
|
|
+
|
|
|
+ packet, ok := session.downstreamPackets.Dequeue()
|
|
|
+ if !ok {
|
|
|
+ // Dequeue aborted due to server.runContext.Done()
|
|
|
return
|
|
|
}
|
|
|
|
|
|
@@ -765,22 +739,27 @@ func (server *Server) runClientDownstream(session *session) {
|
|
|
packet) {
|
|
|
|
|
|
// Packet is rejected and dropped. Reason will be counted in metrics.
|
|
|
+
|
|
|
+ session.downstreamPackets.Replace(packet)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
err := session.channel.WritePacket(packet)
|
|
|
if err != nil {
|
|
|
+
|
|
|
server.config.Logger.WithContextFields(
|
|
|
common.LogFields{"error": err}).Warning("write channel packet failed")
|
|
|
+
|
|
|
// Tear down the session. Must be invoked asynchronously.
|
|
|
go server.interruptSession(session)
|
|
|
+
|
|
|
+ session.downstreamPackets.Replace(packet)
|
|
|
return
|
|
|
}
|
|
|
|
|
|
session.touch()
|
|
|
|
|
|
- // This won't block.
|
|
|
- session.freePackets <- packet
|
|
|
+ session.downstreamPackets.Replace(packet)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -941,8 +920,7 @@ type session struct {
|
|
|
originalIPv6Address net.IP
|
|
|
checkAllowedTCPPortFunc AllowedPortChecker
|
|
|
checkAllowedUDPPortFunc AllowedPortChecker
|
|
|
- downstreamPackets chan []byte
|
|
|
- freePackets chan []byte
|
|
|
+ downstreamPackets *PacketQueue
|
|
|
workers *sync.WaitGroup
|
|
|
mutex sync.Mutex
|
|
|
channel *Channel
|
|
|
@@ -1143,6 +1121,101 @@ func (metrics *packetMetrics) checkpoint(
|
|
|
logger.LogMetric(logName, logFields)
|
|
|
}
|
|
|
|
|
|
+// TODO: PacketQueue optimizations
|
|
|
+//
|
|
|
+// - Instead of a fixed number of packets, a fixed-size buffer could store
|
|
|
+// a variable number of packets. This would allow enqueueing many more
|
|
|
+// small packets in the same amount of memory.
|
|
|
+//
|
|
|
+// - Further, when dequeued packets are to be relayed to a Channel, if
|
|
|
+// the queue was already stored in the Channel framing format, the entire
|
|
|
+// queue could simply be copied to the Channel in one copy operation.
|
|
|
+
|
|
|
+// PacketQueue is a fixed-size, preallocated queue of packets.
|
|
|
+type PacketQueue struct {
|
|
|
+ runContext context.Context
|
|
|
+ packets chan []byte
|
|
|
+ freeBuffers chan []byte
|
|
|
+}
|
|
|
+
|
|
|
+// NewPacketQueue creates a new PacketQueue.
|
|
|
+func NewPacketQueue(
|
|
|
+ runContext context.Context,
|
|
|
+ MTU, queueSize int) *PacketQueue {
|
|
|
+
|
|
|
+ queue := &PacketQueue{
|
|
|
+ runContext: runContext,
|
|
|
+ packets: make(chan []byte, queueSize),
|
|
|
+ freeBuffers: make(chan []byte, queueSize),
|
|
|
+ }
|
|
|
+
|
|
|
+ // To avoid GC churn, downstream packet buffers are allocated
|
|
|
+ // once and reused. Available buffers are sent to the freeBuffers
|
|
|
+ // channel. When a packet is enqueued, a buffer is obtained from
|
|
|
+ // freeBuffers and sent to packets.
|
|
|
+ // TODO: allocate on first use? if the full queue size is not
|
|
|
+ // often used, preallocating all buffers is unnecessary.
|
|
|
+
|
|
|
+ for i := 0; i < queueSize; i++ {
|
|
|
+ queue.freeBuffers <- make([]byte, MTU)
|
|
|
+ }
|
|
|
+
|
|
|
+ return queue
|
|
|
+}
|
|
|
+
|
|
|
+// Enqueue enqueues the packet. The contents of packet are assumed
|
|
|
+// to be <= MTU and are copied into a preallocated, free packet
|
|
|
+// buffer area. If the queue is full, the packet is dropped.
|
|
|
+// Enqueue returns false if it receives runContext.Done().
|
|
|
+func (queue *PacketQueue) Enqueue(packet []byte) bool {
|
|
|
+
|
|
|
+ var packetBuffer []byte
|
|
|
+ select {
|
|
|
+ case packetBuffer = <-queue.freeBuffers:
|
|
|
+ case <-queue.runContext.Done():
|
|
|
+ return false
|
|
|
+ default:
|
|
|
+ // Queue is full, so drop packet.
|
|
|
+ return true
|
|
|
+ }
|
|
|
+
|
|
|
+ // Reuse the preallocated packet buffer. This slice indexing
|
|
|
+ // assumes the size of the packet <= MTU and the preallocated
|
|
|
+ // capacity == MTU.
|
|
|
+ packetBuffer = packetBuffer[0:len(packet)]
|
|
|
+ copy(packetBuffer, packet)
|
|
|
+
|
|
|
+ // This won't block: both freeBuffers/packets have queue-size
|
|
|
+ // capacity, and only queue-size packet buffers exist.
|
|
|
+ queue.packets <- packetBuffer
|
|
|
+
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+// Dequeue waits until a packet is available and then dequeues and
|
|
|
+// returns it. The returned packet buffer remains part of the
|
|
|
+// PacketQueue and the caller must call Replace when done with the
|
|
|
+// packet.
|
|
|
+// Dequeue unblocks and returns false if it receives runContext.Done().
|
|
|
+func (queue *PacketQueue) Dequeue() ([]byte, bool) {
|
|
|
+ var packet []byte
|
|
|
+ select {
|
|
|
+ case packet = <-queue.packets:
|
|
|
+ return packet, true
|
|
|
+ case <-queue.runContext.Done():
|
|
|
+ }
|
|
|
+ return nil, false
|
|
|
+}
|
|
|
+
|
|
|
+// Replace returns a dequeued packet buffer to the free list. It
|
|
|
+// must be called for all, and must be called only with packets
|
|
|
+// returned by Dequeue.
|
|
|
+func (queue *PacketQueue) Replace(packet []byte) {
|
|
|
+
|
|
|
+ // This won't block (as long as it is a Dequeue return value).
|
|
|
+ queue.freeBuffers <- packet
|
|
|
+}
|
|
|
+
|
|
|
// ClientConfig specifies the configuration of a packet tunnel client.
|
|
|
type ClientConfig struct {
|
|
|
|
|
|
@@ -1178,6 +1251,11 @@ type ClientConfig struct {
|
|
|
// and create and configure a tun device.
|
|
|
TunFileDescriptor int
|
|
|
|
|
|
+ // TunDeviceBridge specifies a DeviceBridge to use to read
|
|
|
+ // and write packets. Client operation is the same as when a
|
|
|
+ // TunFileDescriptor is used.
|
|
|
+ TunDeviceBridge *DeviceBridge
|
|
|
+
|
|
|
// IPv4AddressCIDR is the IPv4 address and netmask to
|
|
|
// assign to a newly created tun device.
|
|
|
IPv4AddressCIDR string
|
|
|
@@ -1213,11 +1291,14 @@ func NewClient(config *ClientConfig) (*Client, error) {
|
|
|
var device *Device
|
|
|
var err error
|
|
|
|
|
|
- if config.TunFileDescriptor <= 0 {
|
|
|
- device, err = NewClientDevice(config)
|
|
|
- } else {
|
|
|
+ if config.TunFileDescriptor > 0 {
|
|
|
device, err = NewClientDeviceFromFD(config)
|
|
|
+ } else if config.TunDeviceBridge != nil {
|
|
|
+ device, err = NewClientDeviceFromBridge(config)
|
|
|
+ } else {
|
|
|
+ device, err = NewClientDevice(config)
|
|
|
}
|
|
|
+
|
|
|
if err != nil {
|
|
|
return nil, common.ContextError(err)
|
|
|
}
|
|
|
@@ -1934,6 +2015,7 @@ packet debugging snippet:
|
|
|
// preallocated buffers to avoid GC churn.
|
|
|
type Device struct {
|
|
|
name string
|
|
|
+ usingBridge bool
|
|
|
deviceIO io.ReadWriteCloser
|
|
|
inboundBuffer []byte
|
|
|
outboundBuffer []byte
|
|
|
@@ -1954,7 +2036,11 @@ func NewServerDevice(config *ServerConfig) (*Device, error) {
|
|
|
return nil, common.ContextError(err)
|
|
|
}
|
|
|
|
|
|
- return newDevice(deviceName, deviceIO, getMTU(config.MTU)), nil
|
|
|
+ return newDevice(
|
|
|
+ deviceName,
|
|
|
+ false,
|
|
|
+ deviceIO,
|
|
|
+ getMTU(config.MTU)), nil
|
|
|
}
|
|
|
|
|
|
// NewClientDevice creates and configures a new client tun device.
|
|
|
@@ -1972,17 +2058,25 @@ func NewClientDevice(config *ClientConfig) (*Device, error) {
|
|
|
return nil, common.ContextError(err)
|
|
|
}
|
|
|
|
|
|
- return newDevice(deviceName, deviceIO, getMTU(config.MTU)), nil
|
|
|
+ return newDevice(
|
|
|
+ deviceName,
|
|
|
+ false,
|
|
|
+ deviceIO,
|
|
|
+ getMTU(config.MTU)), nil
|
|
|
}
|
|
|
|
|
|
func newDevice(
|
|
|
- name string, deviceIO io.ReadWriteCloser, MTU int) *Device {
|
|
|
+ name string,
|
|
|
+ usingBridge bool,
|
|
|
+ deviceIO io.ReadWriteCloser,
|
|
|
+ MTU int) *Device {
|
|
|
|
|
|
return &Device{
|
|
|
name: name,
|
|
|
+ usingBridge: usingBridge,
|
|
|
deviceIO: deviceIO,
|
|
|
- inboundBuffer: makeDeviceInboundBuffer(MTU),
|
|
|
- outboundBuffer: makeDeviceOutboundBuffer(MTU),
|
|
|
+ inboundBuffer: makeDeviceInboundBuffer(usingBridge, MTU),
|
|
|
+ outboundBuffer: makeDeviceOutboundBuffer(usingBridge, MTU),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -2000,14 +2094,26 @@ func NewClientDeviceFromFD(config *ClientConfig) (*Device, error) {
|
|
|
|
|
|
return &Device{
|
|
|
name: "",
|
|
|
+ usingBridge: false,
|
|
|
deviceIO: file,
|
|
|
- inboundBuffer: makeDeviceInboundBuffer(MTU),
|
|
|
- outboundBuffer: makeDeviceOutboundBuffer(MTU),
|
|
|
+ inboundBuffer: makeDeviceInboundBuffer(false, MTU),
|
|
|
+ outboundBuffer: makeDeviceOutboundBuffer(false, MTU),
|
|
|
}, nil
|
|
|
}
|
|
|
|
|
|
+// NewClientDeviceFromBridge wraps an existing tun device that is
|
|
|
+// accessed via a DeviceBridge.
|
|
|
+func NewClientDeviceFromBridge(config *ClientConfig) (*Device, error) {
|
|
|
+ return newDevice(
|
|
|
+ "",
|
|
|
+ true,
|
|
|
+ config.TunDeviceBridge,
|
|
|
+ getMTU(config.MTU)), nil
|
|
|
+}
|
|
|
+
|
|
|
// Name returns the interface name for a created tun device,
|
|
|
-// or returns "" for a device created by NewClientDeviceFromFD.
|
|
|
+// or returns "" for a device created by NewClientDeviceFromFD
|
|
|
+// or NewClientDeviceFromBridge.
|
|
|
// The interface name may be used for additional network and
|
|
|
// routing configuration.
|
|
|
func (device *Device) Name() string {
|
|
|
@@ -2099,6 +2205,93 @@ func (device *Device) Close() error {
|
|
|
return device.deviceIO.Close()
|
|
|
}
|
|
|
|
|
|
+// DeviceBridge is a bridge between a function-based packet I/O
|
|
|
+// API, such as Apple's NEPacketTunnelFlow, and the Device interface,
|
|
|
+// which expects an io.ReadWriteCloser.
|
|
|
+//
|
|
|
+// The API side provides a writeToDevice call back to write packets
|
|
|
+// to the tun device and calls ReadFromDevice with packets that have
|
|
|
+// been read from the tun device. The Device uses Read/Write/Close.
|
|
|
+type DeviceBridge struct {
|
|
|
+ runContext context.Context
|
|
|
+ stopRunning context.CancelFunc
|
|
|
+ readPackets *PacketQueue
|
|
|
+ writeMutex sync.Mutex
|
|
|
+ writeToDevice func(p []byte) error
|
|
|
+}
|
|
|
+
|
|
|
+// NewDeviceBridge creates a new DeviceBridge.
|
|
|
+// Calls to writeToDevice are serialized, so it need not be safe for
|
|
|
+// concurrent access. Calls to writeToDevice will block calls to
|
|
|
+// Write and will not be interrupted by Close; writeToDevice _should_
|
|
|
+// not block.
|
|
|
+func NewDeviceBridge(
|
|
|
+ MTU, readPacketQueueSize int,
|
|
|
+ writeToDevice func(p []byte) error) *DeviceBridge {
|
|
|
+
|
|
|
+ runContext, stopRunning := context.WithCancel(context.Background())
|
|
|
+
|
|
|
+ if readPacketQueueSize <= 0 {
|
|
|
+ readPacketQueueSize = DEFAULT_BRIDGE_PACKET_QUEUE_SIZE
|
|
|
+ }
|
|
|
+
|
|
|
+ return &DeviceBridge{
|
|
|
+ runContext: runContext,
|
|
|
+ stopRunning: stopRunning,
|
|
|
+ readPackets: NewPacketQueue(runContext, MTU, readPacketQueueSize),
|
|
|
+ writeToDevice: writeToDevice,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// ReadFromDevice accepts packets read from the tun device. Packets are
|
|
|
+// enqueued for subsequent return to callers of Read. ReadFromDevice does
|
|
|
+// not block when the queue is full or waiting for a Read call. When the
|
|
|
+// queue is full, packets are dropped.
|
|
|
+func (bridge *DeviceBridge) ReadFromDevice(p []byte) {
|
|
|
+ _ = bridge.readPackets.Enqueue(p)
|
|
|
+}
|
|
|
+
|
|
|
+// Read blocks until an enqueued packet is available or the DeviceBridge
|
|
|
+// is closed.
|
|
|
+func (bridge *DeviceBridge) Read(p []byte) (int, error) {
|
|
|
+ packet, ok := bridge.readPackets.Dequeue()
|
|
|
+ if !ok {
|
|
|
+ return 0, common.ContextError(errors.New("bridge is closed"))
|
|
|
+ }
|
|
|
+
|
|
|
+ // Assumes both p and packet are <= MTU
|
|
|
+ copy(p, packet)
|
|
|
+
|
|
|
+ bridge.readPackets.Replace(packet)
|
|
|
+
|
|
|
+ return len(p), nil
|
|
|
+}
|
|
|
+
|
|
|
+// Write calls through to writeToDevice. Close will not interrupt a
|
|
|
+// blocking call to writeToDevice.
|
|
|
+func (bridge *DeviceBridge) Write(p []byte) (int, error) {
|
|
|
+
|
|
|
+ // Use mutex since writeToDevice isn't required
|
|
|
+ // to be safe for concurrent calls.
|
|
|
+ bridge.writeMutex.Lock()
|
|
|
+ defer bridge.writeMutex.Unlock()
|
|
|
+
|
|
|
+ n := len(p)
|
|
|
+ err := bridge.writeToDevice(p)
|
|
|
+ if err != nil {
|
|
|
+ n = 0
|
|
|
+ err = common.ContextError(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ return n, err
|
|
|
+}
|
|
|
+
|
|
|
+// Close interrupts blocking reads.
|
|
|
+func (bridge *DeviceBridge) Close() error {
|
|
|
+ bridge.stopRunning()
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
// Channel manages packet transport over a communications channel.
|
|
|
// Any io.ReadWriteCloser can provide transport. In psiphond, the
|
|
|
// io.ReadWriteCloser will be an SSH channel. Channel I/O frames
|