Przeglądaj źródła

tun: performance enhancements and fixes

- Fix: deadlock when client disconnected due
  to using wrong runContext in PacketQueue.

- Workaround: hack workaround for server
  shutdown hand due to deviceIO.Close() not
  interrupting deviceIO.Read().

- Rewrite PacketQueue to pack framed packets
  into a contiguous buffer; benchmarks show
  significant performance increase.

- Server downstream packet queue tuned for
  upcoming max SSH channel window change.

- Add client upstream packet queue; benchmarks
  indicate performance increase.

- Remove DeviceBridge, which is not required
  at this time.
Rod Hynes 8 lat temu
rodzic
commit
4adf37da01

+ 0 - 15
MobileLibrary/iOS/PsiphonTunnel/PsiphonTunnel/PsiphonTunnel.h

@@ -288,14 +288,6 @@ typedef NS_ENUM(NSInteger, PsiphonConnectionState)
  */
 - (void)onClientUpgradeDownloaded:(NSString * _Nonnull)filename;
 
-/*!
- Implementing this method is *required* if whole device mode is enabled (and otherwise should not be implemented).
- The implementation of this must send the given packet to the device.
- @param packet  The data packet to send to the device.
- Swift: @code func send(toDevice packet: Data) @endcode
- */
-- (void)sendToDevice:(NSData * _Nonnull)packet;
-
 @end
 
 /*!
@@ -367,13 +359,6 @@ typedef NS_ENUM(NSInteger, PsiphonConnectionState)
  */
 - (NSString * _Nonnull)getPacketTunnelDNSResolverIPv6Address;
 
-/*!
- Only valid in whole device mode. This function should be called when a packet has been read from the device and should be sent through the tunnel.
- @param packet  The data packet.
- Swift: @code func received(fromDevice packet: Data) @endcode
- */
-- (void)receivedFromDevice:(NSMutableData *_Nonnull)packet;
-
 /*!
  Upload a feedback package to Psiphon Inc. The app collects feedback and diagnostics information in a particular format, then calls this function to upload it for later investigation.
  @note The key, server, path, and headers must be provided by Psiphon Inc.

+ 1 - 29
MobileLibrary/iOS/PsiphonTunnel/PsiphonTunnel/PsiphonTunnel.m

@@ -29,7 +29,7 @@
 #import "JailbreakCheck/JailbreakCheck.h"
 
 
-@interface PsiphonTunnel () <GoPsiPsiphonProvider, GoPsiPacketTunnelDeviceSender>
+@interface PsiphonTunnel () <GoPsiPsiphonProvider>
 
 @property (weak) id <TunneledAppDelegate> tunneledAppDelegate;
 
@@ -45,7 +45,6 @@
     NetworkStatus previousNetworkStatus;
 
     BOOL tunnelWholeDevice;
-    GoPsiPacketTunnelDeviceBridge* packetTunnelDeviceBridge; // only used in whole device mode
 }
 
 - (id)init {
@@ -54,7 +53,6 @@
     atomic_init(&localHttpProxyPort, 0);
     reachability = [Reachability reachabilityForInternetConnection];
     tunnelWholeDevice = FALSE;
-    packetTunnelDeviceBridge = NULL;
 
     return self;
 }
@@ -178,7 +176,6 @@
 
         atomic_store(&localSocksProxyPort, 0);
         atomic_store(&localHttpProxyPort, 0);
-        packetTunnelDeviceBridge = NULL;
 
         [self changeConnectionStateTo:PsiphonConnectionStateDisconnected evenIfSameState:NO];
     }
@@ -214,11 +211,6 @@
     return GoPsiGetPacketTunnelDNSResolverIPv6Address();
 }
 
-// See comment in header.
-- (void)receivedFromDevice:(NSMutableData * _Nonnull)packet {
-    [packetTunnelDeviceBridge receivedFromDevice:packet];
-}
-
 // See comment in header.
 - (void)sendFeedback:(NSString * _Nonnull)feedbackJson
            publicKey:(NSString * _Nonnull)b64EncodedPublicKey
@@ -403,10 +395,6 @@
 
     // We'll record our state about what mode we're in.
     tunnelWholeDevice = ([config[@"TunnelWholeDevice"] integerValue] == 1);
-    if (tunnelWholeDevice && ![self.tunneledAppDelegate respondsToSelector:@selector(sendToDevice:)]) {
-        [self logMessage:@"If TunnelWholeDevice is desired, then sendToDevice must be implemented"];
-        return nil;
-    }
 
     // Other optional fields not being altered. If not set, their defaults will be used:
     // * EstablishTunnelTimeoutSeconds
@@ -757,22 +745,6 @@
     [self handlePsiphonNotice:noticeJSON];
 }
 
-- (GoPsiPacketTunnelDeviceBridge*)getPacketTunnelDeviceBridge {
-    if (!packetTunnelDeviceBridge) {
-        packetTunnelDeviceBridge = GoPsiNewPacketTunnelDeviceBridge(self);
-    }
-    return packetTunnelDeviceBridge;
-}
-
-
-#pragma mark - GoPsiPacketTunnelDeviceSender protocol implementation (private)
-
-- (void)sendToDevice:(NSData *)packet {
-    // The check to see if the delegate responds to this optional selector was
-    // done when processing the config, so we're not going to do it again for every packet.
-    [self.tunneledAppDelegate sendToDevice:packet];
-}
-
 
 #pragma mark - Helpers (private)
 

+ 0 - 40
MobileLibrary/psi/psi.go

@@ -41,7 +41,6 @@ type PsiphonProvider interface {
 	IPv6Synthesize(IPv4Addr string) string
 	GetPrimaryDnsServer() string
 	GetSecondaryDnsServer() string
-	GetPacketTunnelDeviceBridge() *PacketTunnelDeviceBridge
 }
 
 var controllerMutex sync.Mutex
@@ -76,11 +75,6 @@ func Start(
 		config.IPv6Synthesizer = provider
 	}
 
-	deviceBridge := provider.GetPacketTunnelDeviceBridge()
-	if deviceBridge != nil {
-		config.PacketTunnelDeviceBridge = deviceBridge.bridge
-	}
-
 	psiphon.SetNoticeOutput(psiphon.NewNoticeReceiver(
 		func(notice []byte) {
 			provider.Notice(string(notice))
@@ -171,37 +165,3 @@ func GetPacketTunnelDNSResolverIPv4Address() string {
 func GetPacketTunnelDNSResolverIPv6Address() string {
 	return tun.GetTransparentDNSResolverIPv6Address().String()
 }
-
-// PacketTunnelDeviceBridge is a shim for tun.DeviceBeidge,
-// exposing just the necessary "psi" caller integration points.
-//
-// For performant packet tunneling, it's important to avoid memory
-// allocation and garbage collection per packet I/O.
-//
-// In Obj-C, gobind generates code that will not allocate/copy byte
-// slices _as long as a NSMutableData is passed to ReceivedFromDevice_;
-// and generates code that will not allocate/copy when calling
-// SendToDevice. E.g., generated code calls go_seq_to_objc_bytearray
-// and go_seq_from_objc_bytearray with copy set to 0.
-type PacketTunnelDeviceBridge struct {
-	bridge *tun.DeviceBridge
-}
-
-type PacketTunnelDeviceSender interface {
-	SendToDevice(packet []byte)
-}
-
-func NewPacketTunnelDeviceBridge(
-	sender PacketTunnelDeviceSender) *PacketTunnelDeviceBridge {
-
-	return &PacketTunnelDeviceBridge{
-		bridge: tun.NewDeviceBridge(
-			GetPacketTunnelMTU(),
-			0,
-			sender.SendToDevice),
-	}
-}
-
-func (r *PacketTunnelDeviceBridge) ReceivedFromDevice(packet []byte) {
-	r.bridge.ReceivedFromDevice(packet)
-}

+ 186 - 236
psiphon/common/tun/tun.go

@@ -142,8 +142,8 @@ import (
 
 const (
 	DEFAULT_MTU                          = 1500
-	DEFAULT_DOWNSTREAM_PACKET_QUEUE_SIZE = 64
-	DEFAULT_BRIDGE_PACKET_QUEUE_SIZE     = 64
+	DEFAULT_DOWNSTREAM_PACKET_QUEUE_SIZE = 32768 * 16
+	DEFAULT_UPSTREAM_PACKET_QUEUE_SIZE   = 32768
 	DEFAULT_IDLE_SESSION_EXPIRY_SECONDS  = 300
 	ORPHAN_METRICS_CHECKPOINTER_PERIOD   = 30 * time.Minute
 )
@@ -207,15 +207,16 @@ type ServerConfig struct {
 	// IPv6 DNS traffic. It functions like GetDNSResolverIPv4Addresses.
 	GetDNSResolverIPv6Addresses func() []net.IP
 
-	// DownStreamPacketQueueSize specifies the size of the downstream
+	// DownstreamPacketQueueSize specifies the size of the downstream
 	// packet queue. The packet tunnel server multiplexes all client
 	// packets through a single tun device, so when a packet is read,
 	// it must be queued or dropped if it cannot be immediately routed
 	// to the appropriate client. Note that the TCP and SSH windows
 	// for the underlying channel transport will impact transfer rate
 	// and queuing.
-	// When DownStreamPacketQueueSize is 0, a default value is used.
-	DownStreamPacketQueueSize int
+	// When DownstreamPacketQueueSize is 0, a default value tuned for
+	// Psiphon is used.
+	DownstreamPacketQueueSize int
 
 	// MTU specifies the maximum transmission unit for the packet
 	// tunnel. Clients must be configured with the same MTU. The
@@ -286,7 +287,12 @@ func (server *Server) Start() {
 	server.workers.Add(1)
 	go server.runOrphanMetricsCheckpointer()
 
-	server.workers.Add(1)
+	// TODO: this is a hack workaround for deviceIO.Read()
+	// not getting interrupted by deviceIO.Close(), and, as a
+	// result, runDeviceDownstream no terminating. This
+	// workaround breaks synchronized shutdown.
+	//
+	//server.workers.Add(1)
 	go server.runDeviceDownstream()
 }
 
@@ -389,9 +395,9 @@ func (server *Server) ClientConnected(
 
 	} else {
 
-		downStreamPacketQueueSize := DEFAULT_DOWNSTREAM_PACKET_QUEUE_SIZE
-		if server.config.DownStreamPacketQueueSize > 0 {
-			downStreamPacketQueueSize = server.config.DownStreamPacketQueueSize
+		downstreamPacketQueueSize := DEFAULT_DOWNSTREAM_PACKET_QUEUE_SIZE
+		if server.config.DownstreamPacketQueueSize > 0 {
+			downstreamPacketQueueSize = server.config.DownstreamPacketQueueSize
 		}
 
 		// Store IPv4 resolver addresses in 4-byte representation
@@ -411,7 +417,7 @@ func (server *Server) ClientConnected(
 			DNSResolverIPv6Addresses: append([]net.IP(nil), server.config.GetDNSResolverIPv6Addresses()...),
 			checkAllowedTCPPortFunc:  checkAllowedTCPPortFunc,
 			checkAllowedUDPPortFunc:  checkAllowedUDPPortFunc,
-			downstreamPackets:        NewPacketQueue(server.runContext, MTU, downStreamPacketQueueSize),
+			downstreamPackets:        NewPacketQueue(downstreamPacketQueueSize),
 			workers:                  new(sync.WaitGroup),
 		}
 
@@ -654,12 +660,31 @@ func (server *Server) runDeviceDownstream() {
 		//
 		// We allow packets to enqueue in an idle session in case a client
 		// is in the process of reconnecting.
+		//
+		// TODO: processPacket is performed here, instead of runClientDownstream,
+		// since packets are packed contiguously into the packet queue and if
+		// the packet it to be omitted, that should be done before enqueuing.
+		// The potential downside is that all packet processing is done in this
+		// single thread of execution, blocking the next packet for the next
+		// client. Try handing off the packet to another worker which will
+		// call processPacket and Enqueue?
 
-		ok = session.downstreamPackets.Enqueue(readPacket)
-		if !ok {
-			// Enqueue aborted due to server.runContext.Done()
-			return
+		// In downstream mode, processPacket rewrites the destination address
+		// to the original client source IP address, and also rewrites DNS
+		// packets. As documented in runClientUpstream, the original address
+		// should already be populated via an upstream packet; if not, the
+		// packet will be rejected.
+
+		if !processPacket(
+			session.metrics,
+			session,
+			packetDirectionServerDownstream,
+			readPacket) {
+			// Packet is rejected and dropped. Reason will be counted in metrics.
+			continue
 		}
+
+		session.downstreamPackets.Enqueue(readPacket)
 	}
 }
 
@@ -730,46 +755,29 @@ func (server *Server) runClientDownstream(session *session) {
 
 	for {
 
-		packet, ok := session.downstreamPackets.Dequeue()
+		packetBuffer, ok := session.downstreamPackets.DequeueFramedPackets(session.runContext)
 		if !ok {
-			// Dequeue aborted due to server.runContext.Done()
+			// Dequeue aborted due to session.runContext.Done()
 			return
 		}
 
-		// In downstream mode, processPacket rewrites the destination address
-		// to the original client source IP address, and also rewrites DNS
-		// packets. As documented in runClientUpstream, the original address
-		// should already be populated via an upstream packet; if not, the
-		// packet will be rejected.
-
-		if !processPacket(
-			session.metrics,
-			session,
-			packetDirectionServerDownstream,
-			packet) {
-
-			// Packet is rejected and dropped. Reason will be counted in metrics.
-
-			session.downstreamPackets.Replace(packet)
-			continue
-		}
-
-		err := session.channel.WritePacket(packet)
+		err := session.channel.WriteFramedPackets(packetBuffer)
 		if err != nil {
 
 			server.config.Logger.WithContextFields(
-				common.LogFields{"error": err}).Warning("write channel packet failed")
+				common.LogFields{"error": err}).Warning("write channel packets failed")
+
+			session.downstreamPackets.Replace(packetBuffer)
 
 			// Tear down the session. Must be invoked asynchronously.
 			go server.interruptSession(session)
 
-			session.downstreamPackets.Replace(packet)
 			return
 		}
 
 		session.touch()
 
-		session.downstreamPackets.Replace(packet)
+		session.downstreamPackets.Replace(packetBuffer)
 	}
 }
 
@@ -1131,99 +1139,103 @@ func (metrics *packetMetrics) checkpoint(
 	logger.LogMetric(logName, logFields)
 }
 
-// TODO: PacketQueue optimizations
-//
-// - Instead of a fixed number of packets, a fixed-size buffer could store
-//   a variable number of packets. This would allow enqueueing many more
-//   small packets in the same amount of memory.
-//
-// - Further, when dequeued packets are to be relayed to a Channel, if
-//   the queue was already stored in the Channel framing format, the entire
-//   queue could simply be copied to the Channel in one copy operation.
-
 // PacketQueue is a fixed-size, preallocated queue of packets.
+// Enqueued packets are packed into a contiguous buffer with channel
+// framing, allowing the entire queue to be written to a channel
+// in a single call.
+// Preallocating and reuse of the queue buffers avoids GC churn.
 type PacketQueue struct {
-	runContext  context.Context
-	packets     chan []byte
-	freeBuffers chan []byte
+	emptyBuffers chan []byte
+	activeBuffer chan []byte
 }
 
 // NewPacketQueue creates a new PacketQueue.
-func NewPacketQueue(
-	runContext context.Context,
-	MTU, queueSize int) *PacketQueue {
+// The caller must ensire that queueSize exceeds the
+// packet MTU, or packets will will never enqueue.
+func NewPacketQueue(queueSize int) *PacketQueue {
+
+	// Two buffers of size queueSize are allocated, to
+	// allow packets to continue to enqueue while one buffer
+	// is borrowed by the DequeueFramedPackets caller.
+	//
+	// TODO: is there a way to implement this without
+	// allocating 2x queueSize bytes? A circular queue
+	// won't work because we want DequeueFramedPackets
+	// to return a contiguous buffer. Perhaps a Bip
+	// Buffer would work here:
+	// https://www.codeproject.com/Articles/3479/The-Bip-Buffer-The-Circular-Buffer-with-a-Twist
 
 	queue := &PacketQueue{
-		runContext:  runContext,
-		packets:     make(chan []byte, queueSize),
-		freeBuffers: make(chan []byte, queueSize),
+		emptyBuffers: make(chan []byte, 2),
+		activeBuffer: make(chan []byte, 1),
 	}
 
-	// To avoid GC churn, downstream packet buffers are allocated
-	// once and reused. Available buffers are sent to the freeBuffers
-	// channel. When a packet is enqueued, a buffer is obtained from
-	// freeBuffers and sent to packets.
-	// TODO: allocate on first use? if the full queue size is not
-	// often used, preallocating all buffers is unnecessary.
-
-	for i := 0; i < queueSize; i++ {
-		queue.freeBuffers <- make([]byte, MTU)
-	}
+	queue.emptyBuffers <- make([]byte, 0, queueSize)
+	queue.emptyBuffers <- make([]byte, 0, queueSize)
 
 	return queue
 }
 
-// Enqueue enqueues the packet. The contents of packet are assumed
-// to be <= MTU and are copied into a preallocated, free packet
-// buffer area. If the queue is full, the packet is dropped.
-// Enqueue returns false if it receives runContext.Done().
-func (queue *PacketQueue) Enqueue(packet []byte) bool {
+// Enqueue adds a packet to the queue.
+// If the queue is full, the packet is dropped.
+// Enqueue is _not_ safe for concurrent calls.
+func (queue *PacketQueue) Enqueue(packet []byte) {
+
+	var buffer []byte
 
-	var packetBuffer []byte
 	select {
-	case packetBuffer = <-queue.freeBuffers:
-	case <-queue.runContext.Done():
-		return false
+	case buffer = <-queue.activeBuffer:
 	default:
-		// Queue is full, so drop packet.
-		return true
+		buffer = <-queue.emptyBuffers
 	}
 
-	// Reuse the preallocated packet buffer. This slice indexing
-	// assumes the size of the packet <= MTU and the preallocated
-	// capacity == MTU.
-	packetBuffer = packetBuffer[0:len(packet)]
-	copy(packetBuffer, packet)
+	packetSize := len(packet)
 
-	// This won't block: both freeBuffers/packets have queue-size
-	// capacity, and only queue-size packet buffers exist.
-	queue.packets <- packetBuffer
+	if cap(buffer)-len(buffer) >= channelHeaderSize+packetSize {
 
-	return true
+		// Assumes len(packet)/MTU <= 64K
+
+		offset := len(buffer)
+		buffer = buffer[0 : len(buffer)+channelHeaderSize+packetSize]
+		binary.BigEndian.PutUint16(buffer[offset:offset+channelHeaderSize], uint16(packetSize))
+		copy(buffer[offset+channelHeaderSize:], packet)
+	}
+	// Else, queue is full, so drop packet.
+
+	queue.activeBuffer <- buffer
 }
 
-// Dequeue waits until a packet is available and then dequeues and
-// returns it. The returned packet buffer remains part of the
-// PacketQueue and the caller must call Replace when done with the
-// packet.
-// Dequeue unblocks and returns false if it receives runContext.Done().
-func (queue *PacketQueue) Dequeue() ([]byte, bool) {
-	var packet []byte
+// DequeueFramedPackets waits until at least one packet is
+// enqueued, and then returns a packet buffer containing one
+// or more framed packets. The returned buffer remains part
+// of the PacketQueue structure and the caller _must_ replace
+// the buffer by calling Replace.
+// DequeueFramedPackets unblocks and returns false if it receives
+// runContext.Done().
+// DequeueFramedPackets is _not_ safe for concurrent calls.
+func (queue *PacketQueue) DequeueFramedPackets(
+	runContext context.Context) ([]byte, bool) {
+
+	var buffer []byte
+
 	select {
-	case packet = <-queue.packets:
-		return packet, true
-	case <-queue.runContext.Done():
+	case buffer = <-queue.activeBuffer:
+	case <-runContext.Done():
+		return nil, false
 	}
-	return nil, false
+
+	return buffer, true
 }
 
-// Replace returns a dequeued packet buffer to the free list. It
-// must be called for all, and must be called only with packets
-// returned by Dequeue.
-func (queue *PacketQueue) Replace(packet []byte) {
+// Replace returns the buffer to the PacketQueue to be
+// reused.
+// The input must be a return value from DequeueFramedPackets.
+func (queue *PacketQueue) Replace(buffer []byte) {
+
+	buffer = buffer[0:0]
 
-	// This won't block (as long as it is a Dequeue return value).
-	queue.freeBuffers <- packet
+	// This won't block (as long as it is a DequeueFramedPackets return value).
+	queue.emptyBuffers <- buffer
 }
 
 // ClientConfig specifies the configuration of a packet tunnel client.
@@ -1248,6 +1260,12 @@ type ClientConfig struct {
 	// When MTU is 0, a default value is used.
 	MTU int
 
+	// UpstreamPacketQueueSize specifies the size of the upstream
+	// packet queue.
+	// When UpstreamPacketQueueSize is 0, a default value tuned for
+	// Psiphon is used.
+	UpstreamPacketQueueSize int
+
 	// Transport is an established transport channel that
 	// will be used to relay packets to and from a packet
 	// tunnel server.
@@ -1267,11 +1285,6 @@ type ClientConfig struct {
 	// and create and configure a tun device.
 	TunFileDescriptor int
 
-	// TunDeviceBridge specifies a DeviceBridge to use to read
-	// and write packets. Client operation is the same as when a
-	// TunFileDescriptor is used.
-	TunDeviceBridge *DeviceBridge
-
 	// IPv4AddressCIDR is the IPv4 address and netmask to
 	// assign to a newly created tun device.
 	IPv4AddressCIDR string
@@ -1290,13 +1303,14 @@ type ClientConfig struct {
 // relays packets between a local tun device and a packet
 // tunnel server via a transport channel.
 type Client struct {
-	config      *ClientConfig
-	device      *Device
-	channel     *Channel
-	metrics     *packetMetrics
-	runContext  context.Context
-	stopRunning context.CancelFunc
-	workers     *sync.WaitGroup
+	config          *ClientConfig
+	device          *Device
+	channel         *Channel
+	upstreamPackets *PacketQueue
+	metrics         *packetMetrics
+	runContext      context.Context
+	stopRunning     context.CancelFunc
+	workers         *sync.WaitGroup
 }
 
 // NewClient initializes a new Client. Unless using the
@@ -1309,8 +1323,6 @@ func NewClient(config *ClientConfig) (*Client, error) {
 
 	if config.TunFileDescriptor > 0 {
 		device, err = NewClientDeviceFromFD(config)
-	} else if config.TunDeviceBridge != nil {
-		device, err = NewClientDeviceFromBridge(config)
 	} else {
 		device, err = NewClientDevice(config)
 	}
@@ -1319,16 +1331,22 @@ func NewClient(config *ClientConfig) (*Client, error) {
 		return nil, common.ContextError(err)
 	}
 
+	upstreamPacketQueueSize := DEFAULT_UPSTREAM_PACKET_QUEUE_SIZE
+	if config.UpstreamPacketQueueSize > 0 {
+		upstreamPacketQueueSize = config.UpstreamPacketQueueSize
+	}
+
 	runContext, stopRunning := context.WithCancel(context.Background())
 
 	return &Client{
-		config:      config,
-		device:      device,
-		channel:     NewChannel(config.Transport, getMTU(config.MTU)),
-		metrics:     new(packetMetrics),
-		runContext:  runContext,
-		stopRunning: stopRunning,
-		workers:     new(sync.WaitGroup),
+		config:          config,
+		device:          device,
+		channel:         NewChannel(config.Transport, getMTU(config.MTU)),
+		upstreamPackets: NewPacketQueue(upstreamPacketQueueSize),
+		metrics:         new(packetMetrics),
+		runContext:      runContext,
+		stopRunning:     stopRunning,
+		workers:         new(sync.WaitGroup),
 	}, nil
 }
 
@@ -1371,11 +1389,34 @@ func (client *Client) Start() {
 				continue
 			}
 
-			err = client.channel.WritePacket(readPacket)
+			// Instead of immediately writing to the channel, the
+			// packet is enqueued, which has the effect of batching
+			// up IP packets into a single channel packet (for Psiphon,
+			// and SSH packet) to minimize overhead and, as benchmarked,
+			// improve throughput.
+			// Packet will be dropped if queue is full.
+
+			client.upstreamPackets.Enqueue(readPacket)
+		}
+	}()
+
+	client.workers.Add(1)
+	go func() {
+		defer client.workers.Done()
+		for {
+			packetBuffer, ok := client.upstreamPackets.DequeueFramedPackets(client.runContext)
+			if !ok {
+				// Dequeue aborted due to session.runContext.Done()
+				return
+			}
+
+			err := client.channel.WriteFramedPackets(packetBuffer)
+
+			client.upstreamPackets.Replace(packetBuffer)
 
 			if err != nil {
 				client.config.Logger.WithContextFields(
-					common.LogFields{"error": err}).Info("write channel packet failed")
+					common.LogFields{"error": err}).Info("write channel packets failed")
 				// Only this goroutine exits and no alarm is raised. It's assumed
 				// that if the channel fails, the outer client will know about it.
 				return
@@ -2031,7 +2072,6 @@ packet debugging snippet:
 // preallocated buffers to avoid GC churn.
 type Device struct {
 	name           string
-	usingBridge    bool
 	deviceIO       io.ReadWriteCloser
 	inboundBuffer  []byte
 	outboundBuffer []byte
@@ -2054,7 +2094,6 @@ func NewServerDevice(config *ServerConfig) (*Device, error) {
 
 	return newDevice(
 		deviceName,
-		false,
 		deviceIO,
 		getMTU(config.MTU)), nil
 }
@@ -2076,23 +2115,20 @@ func NewClientDevice(config *ClientConfig) (*Device, error) {
 
 	return newDevice(
 		deviceName,
-		false,
 		deviceIO,
 		getMTU(config.MTU)), nil
 }
 
 func newDevice(
 	name string,
-	usingBridge bool,
 	deviceIO io.ReadWriteCloser,
 	MTU int) *Device {
 
 	return &Device{
 		name:           name,
-		usingBridge:    usingBridge,
 		deviceIO:       deviceIO,
-		inboundBuffer:  makeDeviceInboundBuffer(usingBridge, MTU),
-		outboundBuffer: makeDeviceOutboundBuffer(usingBridge, MTU),
+		inboundBuffer:  makeDeviceInboundBuffer(MTU),
+		outboundBuffer: makeDeviceOutboundBuffer(MTU),
 	}
 }
 
@@ -2110,26 +2146,14 @@ func NewClientDeviceFromFD(config *ClientConfig) (*Device, error) {
 
 	return &Device{
 		name:           "",
-		usingBridge:    false,
 		deviceIO:       file,
-		inboundBuffer:  makeDeviceInboundBuffer(false, MTU),
-		outboundBuffer: makeDeviceOutboundBuffer(false, MTU),
+		inboundBuffer:  makeDeviceInboundBuffer(MTU),
+		outboundBuffer: makeDeviceOutboundBuffer(MTU),
 	}, nil
 }
 
-// NewClientDeviceFromBridge wraps an existing tun device that is
-// accessed via a DeviceBridge.
-func NewClientDeviceFromBridge(config *ClientConfig) (*Device, error) {
-	return newDevice(
-		"",
-		true,
-		config.TunDeviceBridge,
-		getMTU(config.MTU)), nil
-}
-
 // Name returns the interface name for a created tun device,
-// or returns "" for a device created by NewClientDeviceFromFD
-// or NewClientDeviceFromBridge.
+// or returns "" for a device created by NewClientDeviceFromFD.
 // The interface name may be used for additional network and
 // routing configuration.
 func (device *Device) Name() string {
@@ -2221,88 +2245,6 @@ func (device *Device) Close() error {
 	return device.deviceIO.Close()
 }
 
-// DeviceBridge is a bridge between a function-based packet I/O
-// API, such as Apple's NEPacketTunnelFlow, and the Device interface,
-// which expects an io.ReadWriteCloser.
-//
-// The API side provides a sendToDevice call back to write packets
-// to the tun device and calls ReceivedFromDevice with packets that
-// have been read from the tun device. The Device uses Read/Write/Close.
-type DeviceBridge struct {
-	runContext   context.Context
-	stopRunning  context.CancelFunc
-	readPackets  *PacketQueue
-	writeMutex   sync.Mutex
-	sendToDevice func(p []byte)
-}
-
-// NewDeviceBridge creates a new DeviceBridge.
-// Calls to sendToDevice are serialized, so it need not be safe for
-// concurrent access. Calls to sendToDevice will block calls to
-// Write and will not be interrupted by Close; sendToDevice _should_
-// not block.
-func NewDeviceBridge(
-	MTU, readPacketQueueSize int,
-	sendToDevice func(p []byte)) *DeviceBridge {
-
-	runContext, stopRunning := context.WithCancel(context.Background())
-
-	if readPacketQueueSize <= 0 {
-		readPacketQueueSize = DEFAULT_BRIDGE_PACKET_QUEUE_SIZE
-	}
-
-	return &DeviceBridge{
-		runContext:   runContext,
-		stopRunning:  stopRunning,
-		readPackets:  NewPacketQueue(runContext, MTU, readPacketQueueSize),
-		sendToDevice: sendToDevice,
-	}
-}
-
-// ReceivedFromDevice accepts packets read from the tun device. Packets
-// are enqueued for subsequent return to callers of Read. ReceivedFromDevice
-// does not block when the queue is full or waiting for a Read call. When
-// the queue is full, packets are dropped.
-func (bridge *DeviceBridge) ReceivedFromDevice(p []byte) {
-	_ = bridge.readPackets.Enqueue(p)
-}
-
-// Read blocks until an enqueued packet is available or the DeviceBridge
-// is closed.
-func (bridge *DeviceBridge) Read(p []byte) (int, error) {
-	packet, ok := bridge.readPackets.Dequeue()
-	if !ok {
-		return 0, common.ContextError(errors.New("bridge is closed"))
-	}
-
-	// Assumes both p and packet are <= MTU
-	copy(p, packet)
-
-	bridge.readPackets.Replace(packet)
-
-	return len(p), nil
-}
-
-// Write calls through to sendToDevice. Close will not interrupt a
-// blocking call to writeToDevice.
-func (bridge *DeviceBridge) Write(p []byte) (int, error) {
-
-	// Use mutex since writeToDevice isn't required
-	// to be safe for concurrent calls.
-	bridge.writeMutex.Lock()
-	defer bridge.writeMutex.Unlock()
-
-	bridge.sendToDevice(p)
-
-	return len(p), nil
-}
-
-// Close interrupts blocking reads.
-func (bridge *DeviceBridge) Close() error {
-	bridge.stopRunning()
-	return nil
-}
-
 // Channel manages packet transport over a communications channel.
 // Any io.ReadWriteCloser can provide transport. In psiphond, the
 // io.ReadWriteCloser will be an SSH channel. Channel I/O frames
@@ -2368,15 +2310,12 @@ func (channel *Channel) WritePacket(packet []byte) error {
 	// way, the channel window size will influence the TCP window size for
 	// tunneled traffic.
 
-	// Writes are not batched up but dispatched immediately. When the
-	// transport is an SSH channel, the overhead per tunneled packet includes:
+	// When the transport is an SSH channel, the overhead per packet message
+	// includes:
 	//
 	// - SSH_MSG_CHANNEL_DATA: 5 bytes (https://tools.ietf.org/html/rfc4254#section-5.2)
 	// - SSH packet: ~28 bytes (https://tools.ietf.org/html/rfc4253#section-5.3), with MAC
 	// - TCP/IP transport for SSH: 40 bytes for IPv4
-	//
-	// Also, when the transport in an SSH channel, batching of packets will
-	// naturally occur when the SSH channel window is full.
 
 	// Assumes MTU <= 64K and len(packet) <= MTU
 
@@ -2391,6 +2330,17 @@ func (channel *Channel) WritePacket(packet []byte) error {
 	return nil
 }
 
+// WriteFramedPackets writes a buffer of pre-framed packets to
+// the channel.
+// Concurrent calls to WriteFramedPackets are not supported.
+func (channel *Channel) WriteFramedPackets(packetBuffer []byte) error {
+	_, err := channel.transport.Write(packetBuffer)
+	if err != nil {
+		return common.ContextError(err)
+	}
+	return nil
+}
+
 // Close interrupts any blocking Read/Write calls and
 // closes the channel transport.
 func (channel *Channel) Close() error {

+ 2 - 24
psiphon/common/tun/tun_darwin.go

@@ -68,20 +68,12 @@ const (
 	DEFAULT_PUBLIC_INTERFACE_NAME = "en0"
 )
 
-func makeDeviceInboundBuffer(usingBridge bool, MTU int) []byte {
-	if usingBridge {
-		// No utun packet header when using a bridge
-		return make([]byte, MTU)
-	}
+func makeDeviceInboundBuffer(MTU int) []byte {
 	// 4 extra bytes to read a utun packet header
 	return make([]byte, 4+MTU)
 }
 
-func makeDeviceOutboundBuffer(usingBridge bool, MTU int) []byte {
-	if usingBridge {
-		// No outbound buffer is used
-		return nil
-	}
+func makeDeviceOutboundBuffer(MTU int) []byte {
 	// 4 extra bytes to write a utun packet header
 	return make([]byte, 4+MTU)
 }
@@ -198,11 +190,6 @@ func (device *Device) readTunPacket() (int, int, error) {
 		return 0, 0, common.ContextError(err)
 	}
 
-	if device.usingBridge {
-		// No utun packet header when using a bridge
-		return 0, n, nil
-	}
-
 	if n < 4 {
 		return 0, 0, common.ContextError(errors.New("missing packet prefix"))
 	}
@@ -212,15 +199,6 @@ func (device *Device) readTunPacket() (int, int, error) {
 
 func (device *Device) writeTunPacket(packet []byte) error {
 
-	if device.usingBridge {
-		// No utun packet header when using a bridge
-		_, err := device.deviceIO.Write(packet)
-		if err != nil {
-			return common.ContextError(err)
-		}
-		return nil
-	}
-
 	// Note: can't use writev via net.Buffers. os.File isn't
 	// a net.Conn and can't wrap with net.FileConn due to
 	// fd type. So writes use an intermediate buffer to add

+ 2 - 2
psiphon/common/tun/tun_linux.go

@@ -37,11 +37,11 @@ const (
 	DEFAULT_PUBLIC_INTERFACE_NAME = "eth0"
 )
 
-func makeDeviceInboundBuffer(usingBridge bool, MTU int) []byte {
+func makeDeviceInboundBuffer(MTU int) []byte {
 	return make([]byte, MTU)
 }
 
-func makeDeviceOutboundBuffer(usingBridge bool, MTU int) []byte {
+func makeDeviceOutboundBuffer(MTU int) []byte {
 	// On Linux, no outbound buffer is used
 	return nil
 }

+ 1 - 18
psiphon/config.go

@@ -31,7 +31,6 @@ import (
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
-	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tun"
 )
 
 // TODO: allow all params to be configured
@@ -475,12 +474,6 @@ type Config struct {
 	// file descriptor. The file descriptor is duped in NewController.
 	// When PacketTunnelTunDeviceFileDescriptor is set, TunnelPoolSize must be 1.
 	PacketTunnelTunFileDescriptor int
-
-	// PacketTunnelDeviceBridge specifies a tun device bridge to use for running a
-	// packet tunnel. This is an alternate interface to a tun device when a file
-	// descriptor is not directly available.
-	// When PacketTunnelDeviceBridge is set, TunnelPoolSize must be 1.
-	PacketTunnelDeviceBridge *tun.DeviceBridge
 }
 
 // DownloadURL specifies a URL for downloading resources along with parameters
@@ -585,11 +578,6 @@ func LoadConfig(configJson []byte) (*Config, error) {
 			errors.New("DnsServerGetter interface must be set at runtime"))
 	}
 
-	if config.PacketTunnelDeviceBridge != nil {
-		return nil, common.ContextError(
-			errors.New("PacketTunnelDeviceBridge value must be set at runtime"))
-	}
-
 	if !common.Contains(
 		[]string{"", TRANSFORM_HOST_NAMES_ALWAYS, TRANSFORM_HOST_NAMES_NEVER},
 		config.TransformHostNames) {
@@ -668,13 +656,8 @@ func LoadConfig(configJson []byte) (*Config, error) {
 		}
 	}
 
-	if config.PacketTunnelTunFileDescriptor > 0 && config.PacketTunnelDeviceBridge != nil {
-		return nil, common.ContextError(errors.New("only one of PacketTunnelTunFileDescriptor and PacketTunnelDeviceBridge may be set"))
-	}
-
 	// This constraint is expected by logic in Controller.runTunnels()
-	if (config.PacketTunnelTunFileDescriptor > 0 || config.PacketTunnelDeviceBridge != nil) &&
-		config.TunnelPoolSize != 1 {
+	if config.PacketTunnelTunFileDescriptor > 0 && config.TunnelPoolSize != 1 {
 		return nil, common.ContextError(errors.New("packet tunnel mode requires TunnelPoolSize to be 1"))
 	}
 

+ 1 - 3
psiphon/controller.go

@@ -145,8 +145,7 @@ func NewController(config *Config) (controller *Controller, err error) {
 
 	controller.splitTunnelClassifier = NewSplitTunnelClassifier(config, controller)
 
-	if config.PacketTunnelTunFileDescriptor > 0 ||
-		config.PacketTunnelDeviceBridge != nil {
+	if config.PacketTunnelTunFileDescriptor > 0 {
 
 		// Run a packet tunnel client. The lifetime of the tun.Client is the
 		// lifetime of the Controller, so it exists across tunnel establishments
@@ -159,7 +158,6 @@ func NewController(config *Config) (controller *Controller, err error) {
 		packetTunnelClient, err := tun.NewClient(&tun.ClientConfig{
 			Logger:            NoticeCommonLogger(),
 			TunFileDescriptor: config.PacketTunnelTunFileDescriptor,
-			TunDeviceBridge:   config.PacketTunnelDeviceBridge,
 			Transport:         packetTunnelTransport,
 		})
 		if err != nil {