Quellcode durchsuchen

Add a one-packet "stash" to QueuePacketConn.

I want a way to "unread" a packet from an send queue, in the case where
I'm packing packets into a fixed space and don't know when I'm done
until I've read one too many packets. There's no way to insert the extra
packet at the head of the cannel representing the send queue, so that it
will be the next thing received from OutgoingQueue. Instead, add an
separate one-element queue called the stash. The caller can stash an
excess packet, then prioritize checking it in the next round by calling
Unstash before OutgoingQueue.
David Fifield vor 6 Jahren
Ursprung
Commit
938463ce07
2 geänderte Dateien mit 62 neuen und 9 gelöschten Zeilen
  1. 25 0
      turbotunnel/queuepacketconn.go
  2. 37 9
      turbotunnel/remotemap.go

+ 25 - 0
turbotunnel/queuepacketconn.go

@@ -21,6 +21,17 @@ type taggedPacket struct {
 // method inserts a packet into the incoming queue, to eventually be returned by
 // ReadFrom. WriteTo inserts a packet into an address-specific outgoing queue,
 // which can later by accessed through the OutgoingQueue method.
+//
+// Besides the outgoing queues, there is also a one-element "stash" for each
+// remote peer address. You can stash a packet using the Stash method, and get
+// it back later by receiving from the channel returned by Unstash. The stash is
+// meant as a convenient place to temporarily store a single packet, such as
+// when you've read one too many packets from the send queue and need to store
+// the extra packet to be processed first in the next pass. It's the caller's
+// responsibility to Unstash what they have Stashed. Calling Stash does not put
+// the packet at the head of the send queue; if there is the possibility that a
+// packet has been stashed, it must be checked for by calling Unstash in
+// addition to OutgoingQueue.
 type QueuePacketConn struct {
 	remotes   *RemoteMap
 	localAddr net.Addr
@@ -68,6 +79,20 @@ func (c *QueuePacketConn) OutgoingQueue(addr net.Addr) <-chan []byte {
 	return c.remotes.SendQueue(addr)
 }
 
+// Stash places p in the stash for addr, if the stash is not already occupied.
+// Returns true if the packet was placed in the stash, or false if the stash was
+// already occupied. This method is similar to WriteTo, except that it puts the
+// packet in the stash queue (accessible via Unstash), rather than the outgoing
+// queue (accessible via OutgoingQueue).
+func (c *QueuePacketConn) Stash(p []byte, addr net.Addr) bool {
+	return c.remotes.Stash(addr, p)
+}
+
+// Unstash returns the channel that represents the stash for addr.
+func (c *QueuePacketConn) Unstash(addr net.Addr) <-chan []byte {
+	return c.remotes.Unstash(addr)
+}
+
 // ReadFrom returns a packet and address previously stored by QueueIncoming.
 func (c *QueuePacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
 	select {

+ 37 - 9
turbotunnel/remotemap.go

@@ -8,16 +8,22 @@ import (
 )
 
 // remoteRecord is a record of a recently seen remote peer, with the time it was
-// last seen and a send queue.
+// last seen and queues of outgoing packets.
 type remoteRecord struct {
 	Addr      net.Addr
 	LastSeen  time.Time
 	SendQueue chan []byte
+	Stash     chan []byte
 }
 
 // RemoteMap manages a mapping of live remote peers, keyed by address, to their
-// respective send queues. RemoteMap's functions are safe to call from multiple
-// goroutines.
+// respective send queues. Each peer has two queues: a primary send queue, and a
+// "stash". The primary send queue is returned by the SendQueue method. The
+// stash is an auxiliary one-element queue accessed using the Stash and Unstash
+// methods. The stash is meant for use by callers that need to "unread" a packet
+// that's already been removed from the primary send queue.
+//
+// RemoteMap's functions are safe to call from multiple goroutines.
 type RemoteMap struct {
 	// We use an inner structure to avoid exposing public heap.Interface
 	// functions to users of remoteMap.
@@ -62,7 +68,28 @@ func NewRemoteMap(timeout time.Duration) *RemoteMap {
 func (m *RemoteMap) SendQueue(addr net.Addr) chan []byte {
 	m.lock.Lock()
 	defer m.lock.Unlock()
-	return m.inner.SendQueue(addr, time.Now())
+	return m.inner.Lookup(addr, time.Now()).SendQueue
+}
+
+// Stash places p in the stash corresponding to addr, if the stash is not
+// already occupied. Returns true if the p was placed in the stash, false
+// otherwise.
+func (m *RemoteMap) Stash(addr net.Addr, p []byte) bool {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	select {
+	case m.inner.Lookup(addr, time.Now()).Stash <- p:
+		return true
+	default:
+		return false
+	}
+}
+
+// Unstash returns the channel that reads from the stash for addr.
+func (m *RemoteMap) Unstash(addr net.Addr) <-chan []byte {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	return m.inner.Lookup(addr, time.Now()).Stash
 }
 
 // remoteMapInner is the inner type of RemoteMap, implementing heap.Interface.
@@ -84,10 +111,10 @@ func (inner *remoteMapInner) removeExpired(now time.Time, timeout time.Duration)
 	}
 }
 
-// SendQueue finds the existing record corresponding to addr, or creates a new
-// one if none exists yet. It updates the record's LastSeen time and returns its
-// SendQueue.
-func (inner *remoteMapInner) SendQueue(addr net.Addr, now time.Time) chan []byte {
+// Lookup finds the existing record corresponding to addr, or creates a new
+// one if none exists yet. It updates the record's LastSeen time and returns the
+// record.
+func (inner *remoteMapInner) Lookup(addr net.Addr, now time.Time) *remoteRecord {
 	var record *remoteRecord
 	i, ok := inner.byAddr[addr]
 	if ok {
@@ -101,10 +128,11 @@ func (inner *remoteMapInner) SendQueue(addr net.Addr, now time.Time) chan []byte
 			Addr:      addr,
 			LastSeen:  now,
 			SendQueue: make(chan []byte, queueSize),
+			Stash:     make(chan []byte, 1),
 		}
 		heap.Push(inner, record)
 	}
-	return record.SendQueue
+	return record
 }
 
 // heap.Interface for remoteMapInner.