فهرست منبع

Starting on server.

David Fifield 6 سال پیش
کامیت
cdeecf2ec6
8فایلهای تغییر یافته به همراه569 افزوده شده و 0 حذف شده
  1. 1 0
      .gitignore
  2. 11 0
      dns/dns.go
  3. 234 0
      dnstt-server/main.go
  4. 6 0
      go.mod
  5. 28 0
      go.sum
  6. 144 0
      turbotunnel/clientmap.go
  7. 8 0
      turbotunnel/consts.go
  8. 137 0
      turbotunnel/queuepacketconn.go

+ 1 - 0
.gitignore

@@ -0,0 +1 @@
+dnstt-server/dnstt-server

+ 11 - 0
dns/dns.go

@@ -0,0 +1,11 @@
+package dns
+
+import (
+	"bytes"
+)
+
+type Name [][]byte
+
+func ParseName(s []byte) (Name, error) {
+	return bytes.Split(bytes.TrimSuffix(s, []byte(".")), []byte(".")), nil
+}

+ 234 - 0
dnstt-server/main.go

@@ -0,0 +1,234 @@
+package main
+
+import (
+	"flag"
+	"fmt"
+	"io"
+	"net"
+	"os"
+	"sync"
+	"time"
+
+	"github.com/xtaci/kcp-go/v5"
+	"github.com/xtaci/smux"
+	"www.bamsoftware.com/git/dnstt.git/dns"
+	"www.bamsoftware.com/git/dnstt.git/turbotunnel"
+)
+
+const (
+	idleTimeout = 10 * time.Minute
+)
+
+// handleStream bidirectionally connects a client stream with the ORPort.
+func handleStream(stream *smux.Stream, upstream *net.TCPAddr) error {
+	conn, err := net.DialTCP("tcp", nil, upstream)
+	if err != nil {
+		return err
+	}
+	defer conn.Close()
+
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		_, err := io.Copy(stream, conn)
+		if err != nil {
+			fmt.Fprintf(os.Stderr, "copy stream←upstream: %v\n", err)
+		}
+		stream.Close()
+	}()
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		_, err := io.Copy(conn, stream)
+		if err != nil {
+			fmt.Fprintf(os.Stderr, "copy upstream←stream: %v\n", err)
+		}
+		conn.Close()
+	}()
+	wg.Wait()
+
+	return nil
+}
+
+// acceptStreams layers an smux.Session on a KCP connection and awaits streams
+// on it. It passes each stream to handleStream.
+func acceptStreams(conn *kcp.UDPSession, upstream *net.TCPAddr) error {
+	smuxConfig := smux.DefaultConfig()
+	smuxConfig.Version = 2
+	smuxConfig.KeepAliveTimeout = idleTimeout
+	sess, err := smux.Server(conn, smuxConfig)
+	if err != nil {
+		return err
+	}
+
+	for {
+		stream, err := sess.AcceptStream()
+		if err != nil {
+			if err, ok := err.(net.Error); ok && err.Temporary() {
+				continue
+			}
+			return err
+		}
+		go func() {
+			defer stream.Close()
+			err := handleStream(stream, upstream)
+			if err != nil {
+				fmt.Fprintf(os.Stderr, "handleStream: %v\n", err)
+			}
+		}()
+	}
+}
+
+// acceptSessions listens for incoming KCP connections and passes them to
+// acceptStreams.
+func acceptSessions(ln *kcp.Listener, mtu int, upstream *net.TCPAddr) error {
+	for {
+		conn, err := ln.AcceptKCP()
+		if err != nil {
+			if err, ok := err.(net.Error); ok && err.Temporary() {
+				continue
+			}
+			return err
+		}
+		// Permit coalescing the payloads of consecutive sends.
+		conn.SetStreamMode(true)
+		// Disable the dynamic congestion window (limit only by the
+		// maximum of local and remote static windows).
+		conn.SetNoDelay(
+			0, // default nodelay
+			0, // default interval
+			0, // default resend
+			1, // nc=1 => congestion window off
+		)
+		// Set the maximum transmission unit.
+		conn.SetMtu(mtu)
+		go func() {
+			defer conn.Close()
+			err := acceptStreams(conn, upstream)
+			if err != nil {
+				fmt.Fprintf(os.Stderr, "acceptStreams: %v\n", err)
+			}
+		}()
+	}
+}
+
+func handle(c net.PacketConn, p []byte, addr net.Addr) error {
+	fmt.Printf("handle %v %x\n", addr, p)
+	_, err := c.WriteTo([]byte("hello"), addr)
+	return err
+}
+
+func loop(c net.PacketConn, domain dns.Name) error {
+	type taggedPacket struct {
+		P    []byte
+		Addr net.Addr
+	}
+
+	handleChan := make(chan taggedPacket, 64)
+	defer close(handleChan)
+	go func() {
+		for tp := range handleChan {
+			p := tp.P
+			addr := tp.Addr
+			err := handle(c, p, addr)
+			if err != nil {
+				fmt.Fprintf(os.Stderr, "handle from %v: %v\n", addr, err)
+			}
+		}
+	}()
+
+	for {
+		// One byte longer than we want, to check for truncation.
+		var buf [513]byte
+		n, addr, err := c.ReadFrom(buf[:])
+		if err != nil {
+			if err, ok := err.(net.Error); ok && err.Temporary() {
+				continue
+			}
+			return err
+		}
+		if n == len(buf) {
+			// Truncated packet.
+			continue
+		}
+		// Copy the packet data into its own buffer.
+		p := make([]byte, n)
+		copy(p, buf[:n])
+		select {
+		case handleChan <- taggedPacket{p, addr}:
+		default:
+			// Drop incoming packets if channel is full.
+		}
+	}
+}
+
+type dummyAddr struct{}
+
+func (addr dummyAddr) Network() string { return "dummy" }
+func (addr dummyAddr) String() string  { return "dummy" }
+
+func run(domain dns.Name, upstream net.Addr, udpAddr string) error {
+	// Start up the virtual PacketConn for turbotunnel.
+	pconn := turbotunnel.NewQueuePacketConn(dummyAddr{}, idleTimeout*2)
+	ln, err := kcp.ServeConn(nil, 0, 0, pconn)
+	if err != nil {
+		return fmt.Errorf("opening KCP listener: %v", err)
+	}
+	defer ln.Close()
+	go func() {
+		err := acceptSessions(ln, 120, upstream.(*net.TCPAddr)) // TODO: MTU appropriate for length of domain
+		if err != nil {
+			fmt.Fprintf(os.Stderr, "acceptSessions: %v\n", err)
+		}
+	}()
+
+	var wg sync.WaitGroup
+
+	if udpAddr != "" {
+		c, err := net.ListenPacket("udp", udpAddr)
+		if err != nil {
+			return fmt.Errorf("opening UDP listener: %v", err)
+		}
+		wg.Add(1)
+		go func() {
+			defer c.Close()
+			defer wg.Done()
+			err := loop(c, domain)
+			if err != nil {
+				fmt.Fprintf(os.Stderr, "error in UDP loop: %v\n", err)
+			}
+		}()
+	}
+
+	wg.Wait()
+	return nil
+}
+
+func main() {
+	var udpAddr string
+
+	flag.StringVar(&udpAddr, "udp", "", "UDP port to listen on")
+	flag.Parse()
+
+	if flag.NArg() != 2 {
+		fmt.Fprintf(os.Stderr, "usage: %s -udp ADDR DOMAIN UPSTREAMADDR\n", os.Args[0])
+		os.Exit(1)
+	}
+	domain, err := dns.ParseName([]byte(flag.Arg(0)))
+	if err != nil {
+		fmt.Fprintf(os.Stderr, "invalid domain %+q: %v\n", flag.Arg(0), err)
+		os.Exit(1)
+	}
+	upstream, err := net.ResolveTCPAddr("tcp", flag.Arg(1))
+	if err != nil {
+		fmt.Fprintf(os.Stderr, "cannot resolve %+q: %v\n", flag.Arg(1), err)
+		os.Exit(1)
+	}
+
+	err = run(domain, upstream, udpAddr)
+	if err != nil {
+		fmt.Fprintln(os.Stderr, err)
+		os.Exit(1)
+	}
+}

+ 6 - 0
go.mod

@@ -0,0 +1,6 @@
+module www.bamsoftware.com/git/dnstt.git
+
+require (
+	github.com/xtaci/kcp-go/v5 v5.5.12
+	github.com/xtaci/smux v1.5.12
+)

+ 28 - 0
go.sum

@@ -0,0 +1,28 @@
+github.com/klauspost/cpuid v1.2.2 h1:1xAgYebNnsb9LKCdLOvFWtAxGU/33mjJtyOVbmUa0Us=
+github.com/klauspost/cpuid v1.2.2/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
+github.com/klauspost/reedsolomon v1.9.3 h1:N/VzgeMfHmLc+KHMD1UL/tNkfXAt8FnUqlgXGIduwAY=
+github.com/klauspost/reedsolomon v1.9.3/go.mod h1:CwCi+NUr9pqSVktrkN+Ondf06rkhYZ/pcNv7fu+8Un4=
+github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
+github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/templexxx/cpu v0.0.1 h1:hY4WdLOgKdc8y13EYklu9OUTXik80BkxHoWvTO6MQQY=
+github.com/templexxx/cpu v0.0.1/go.mod h1:w7Tb+7qgcAlIyX4NhLuDKt78AHA5SzPmq0Wj6HiEnnk=
+github.com/templexxx/xorsimd v0.4.1 h1:iUZcywbOYDRAZUasAs2eSCUW8eobuZDy0I9FJiORkVg=
+github.com/templexxx/xorsimd v0.4.1/go.mod h1:W+ffZz8jJMH2SXwuKu9WhygqBMbFnp14G2fqEr8qaNo=
+github.com/tjfoc/gmsm v1.0.1 h1:R11HlqhXkDospckjZEihx9SW/2VW0RgdwrykyWMFOQU=
+github.com/tjfoc/gmsm v1.0.1/go.mod h1:XxO4hdhhrzAd+G4CjDqaOkd0hUzmtPR/d3EiBBMn/wc=
+github.com/xtaci/kcp-go/v5 v5.5.12 h1:iALGyvti/oBbl1TbVoUpHEUHCorDEb3tEKl1CPY3KXM=
+github.com/xtaci/kcp-go/v5 v5.5.12/go.mod h1:H0T/EJ+lPNytnFYsKLH0JHUtiwZjG3KXlTM6c+Q4YUo=
+github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae/go.mod h1:gXtu8J62kEgmN++bm9BVICuT/e8yiLI2KFobd/TRFsE=
+github.com/xtaci/smux v1.5.12 h1:n9OGjdqQuVZXLh46+L4IR5tR2wvuUFwRABnN/V55bIY=
+github.com/xtaci/smux v1.5.12/go.mod h1:OMlQbT5vcgl2gb49mFkYo6SMf+zP3rcjcwQz7ZU7IGY=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 h1:ULYEB3JvPRE/IfO+9uO7vKV/xzVTO7XPAwm8xbf4w2g=
+golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553 h1:efeOvDhwQ29Dj3SdAV/MJf8oukgn+8D8WgaCaRMchF8=
+golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8 h1:JA8d3MPx/IToSyXZG/RhwYEtfrKO1Fxrqe8KrkiLXKM=
+golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

+ 144 - 0
turbotunnel/clientmap.go

@@ -0,0 +1,144 @@
+package turbotunnel
+
+import (
+	"container/heap"
+	"net"
+	"sync"
+	"time"
+)
+
+// clientRecord is a record of a recently seen client, with the time it was last
+// seen and a send queue.
+type clientRecord struct {
+	Addr      net.Addr
+	LastSeen  time.Time
+	SendQueue chan []byte
+}
+
+// ClientMap manages a mapping of live clients (keyed by address, which will be
+// a ClientID) to their respective send queues. ClientMap's functions are safe
+// to call from multiple goroutines.
+type ClientMap struct {
+	// We use an inner structure to avoid exposing public heap.Interface
+	// functions to users of clientMap.
+	inner clientMapInner
+	// Synchronizes access to inner.
+	lock sync.Mutex
+}
+
+// NewClientMap creates a ClientMap that expires clients after a timeout.
+//
+// The timeout does not have to be kept in sync with smux's idle timeout. If a
+// client is removed from the client map while the smux session is still live,
+// the worst that can happen is a loss of whatever packets were in the send
+// queue at the time. If smux later decides to send more packets to the same
+// client, we'll instantiate a new send queue, and if the client ever connects
+// again with the proper client ID, we'll deliver them.
+func NewClientMap(timeout time.Duration) *ClientMap {
+	m := &ClientMap{
+		inner: clientMapInner{
+			byAge:  make([]*clientRecord, 0),
+			byAddr: make(map[net.Addr]int),
+		},
+	}
+	go func() {
+		for {
+			time.Sleep(timeout / 2)
+			now := time.Now()
+			m.lock.Lock()
+			m.inner.removeExpired(now, timeout)
+			m.lock.Unlock()
+		}
+	}()
+	return m
+}
+
+// SendQueue returns the send queue corresponding to addr, creating it if
+// necessary.
+func (m *ClientMap) SendQueue(addr net.Addr) chan []byte {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	return m.inner.SendQueue(addr, time.Now())
+}
+
+// clientMapInner is the inner type of ClientMap, implementing heap.Interface.
+// byAge is the backing store, a heap ordered by LastSeen time, to facilitate
+// expiring old client records. byAddr is a map from addresses (i.e., ClientIDs)
+// to heap indices, to allow looking up by address. Unlike ClientMap,
+// clientMapInner requires external synchonization.
+type clientMapInner struct {
+	byAge  []*clientRecord
+	byAddr map[net.Addr]int
+}
+
+// removeExpired removes all client records whose LastSeen timestamp is more
+// than timeout in the past.
+func (inner *clientMapInner) removeExpired(now time.Time, timeout time.Duration) {
+	for len(inner.byAge) > 0 && now.Sub(inner.byAge[0].LastSeen) >= timeout {
+		heap.Pop(inner)
+	}
+}
+
+// SendQueue finds the existing client record corresponding to addr, or creates
+// a new one if none exists yet. It updates the client record's LastSeen time
+// and returns its SendQueue.
+func (inner *clientMapInner) SendQueue(addr net.Addr, now time.Time) chan []byte {
+	var record *clientRecord
+	i, ok := inner.byAddr[addr]
+	if ok {
+		// Found one, update its LastSeen.
+		record = inner.byAge[i]
+		record.LastSeen = now
+		heap.Fix(inner, i)
+	} else {
+		// Not found, create a new one.
+		record = &clientRecord{
+			Addr:      addr,
+			LastSeen:  now,
+			SendQueue: make(chan []byte, queueSize),
+		}
+		heap.Push(inner, record)
+	}
+	return record.SendQueue
+}
+
+// heap.Interface for clientMapInner.
+
+func (inner *clientMapInner) Len() int {
+	if len(inner.byAge) != len(inner.byAddr) {
+		panic("inconsistent clientMap")
+	}
+	return len(inner.byAge)
+}
+
+func (inner *clientMapInner) Less(i, j int) bool {
+	return inner.byAge[i].LastSeen.Before(inner.byAge[j].LastSeen)
+}
+
+func (inner *clientMapInner) Swap(i, j int) {
+	inner.byAge[i], inner.byAge[j] = inner.byAge[j], inner.byAge[i]
+	inner.byAddr[inner.byAge[i].Addr] = i
+	inner.byAddr[inner.byAge[j].Addr] = j
+}
+
+func (inner *clientMapInner) Push(x interface{}) {
+	record := x.(*clientRecord)
+	if _, ok := inner.byAddr[record.Addr]; ok {
+		panic("duplicate address in clientMap")
+	}
+	// Insert into byAddr map.
+	inner.byAddr[record.Addr] = len(inner.byAge)
+	// Insert into byAge slice.
+	inner.byAge = append(inner.byAge, record)
+}
+
+func (inner *clientMapInner) Pop() interface{} {
+	n := len(inner.byAddr)
+	// Remove from byAge slice.
+	record := inner.byAge[n-1]
+	inner.byAge[n-1] = nil
+	inner.byAge = inner.byAge[:n-1]
+	// Remove from byAddr map.
+	delete(inner.byAddr, record.Addr)
+	return record
+}

+ 8 - 0
turbotunnel/consts.go

@@ -0,0 +1,8 @@
+package turbotunnel
+
+import "errors"
+
+const queueSize = 64
+
+var errClosedPacketConn = errors.New("operation on closed connection")
+var errNotImplemented = errors.New("not implemented")

+ 137 - 0
turbotunnel/queuepacketconn.go

@@ -0,0 +1,137 @@
+package turbotunnel
+
+import (
+	"net"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+// taggedPacket is a combination of a []byte and a net.Addr, encapsulating the
+// return type of PacketConn.ReadFrom.
+type taggedPacket struct {
+	P    []byte
+	Addr net.Addr
+}
+
+// QueuePacketConn implements net.PacketConn by storing queues of packets. There
+// is one incoming queue (where packets are additionally tagged by the source
+// address of the client that sent them). There are many outgoing queues, one
+// for each client address that has been recently seen. The QueueIncoming method
+// inserts a packet into the incoming queue, to eventually be returned by
+// ReadFrom. WriteTo inserts a packet into an address-specific outgoing queue,
+// which can later by accessed through the OutgoingQueue method.
+type QueuePacketConn struct {
+	clients   *ClientMap
+	localAddr net.Addr
+	recvQueue chan taggedPacket
+	closeOnce sync.Once
+	closed    chan struct{}
+	// What error to return when the QueuePacketConn is closed.
+	err atomic.Value
+}
+
+// NewQueuePacketConn makes a new QueuePacketConn, set to track recent clients
+// for at least a duration of timeout.
+func NewQueuePacketConn(localAddr net.Addr, timeout time.Duration) *QueuePacketConn {
+	return &QueuePacketConn{
+		clients:   NewClientMap(timeout),
+		localAddr: localAddr,
+		recvQueue: make(chan taggedPacket, queueSize),
+		closed:    make(chan struct{}),
+	}
+}
+
+// QueueIncoming queues and incoming packet and its source address, to be
+// returned in a future call to ReadFrom.
+func (c *QueuePacketConn) QueueIncoming(p []byte, addr net.Addr) {
+	select {
+	case <-c.closed:
+		// If we're closed, silently drop it.
+		return
+	default:
+	}
+	// Copy the slice so that the caller may reuse it.
+	buf := make([]byte, len(p))
+	copy(buf, p)
+	select {
+	case c.recvQueue <- taggedPacket{buf, addr}:
+	default:
+		// Drop the incoming packet if the receive queue is full.
+	}
+}
+
+// OutgoingQueue returns the queue of outgoing packets corresponding to addr,
+// creating it if necessary. The contents of the queue will be packets that are
+// written to the address in question using WriteTo.
+func (c *QueuePacketConn) OutgoingQueue(addr net.Addr) <-chan []byte {
+	return c.clients.SendQueue(addr)
+}
+
+// ReadFrom returns a packet and address previously stored by QueueIncoming.
+func (c *QueuePacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
+	select {
+	case <-c.closed:
+		return 0, nil, &net.OpError{Op: "read", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
+	default:
+	}
+	select {
+	case <-c.closed:
+		return 0, nil, &net.OpError{Op: "read", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
+	case packet := <-c.recvQueue:
+		return copy(p, packet.P), packet.Addr, nil
+	}
+}
+
+// WriteTo queues an outgoing packet for the given address. The queue can later
+// be retrieved using the OutgoingQueue method.
+func (c *QueuePacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
+	select {
+	case <-c.closed:
+		return 0, &net.OpError{Op: "write", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
+	default:
+	}
+	// Copy the slice so that the caller may reuse it.
+	buf := make([]byte, len(p))
+	copy(buf, p)
+	select {
+	case c.clients.SendQueue(addr) <- buf:
+		return len(buf), nil
+	default:
+		// Drop the outgoing packet if the send queue is full.
+		return len(buf), nil
+	}
+}
+
+// closeWithError unblocks pending operations and makes future operations fail
+// with the given error. If err is nil, it becomes errClosedPacketConn.
+func (c *QueuePacketConn) closeWithError(err error) error {
+	var newlyClosed bool
+	c.closeOnce.Do(func() {
+		newlyClosed = true
+		// Store the error to be returned by future PacketConn
+		// operations.
+		if err == nil {
+			err = errClosedPacketConn
+		}
+		c.err.Store(err)
+		close(c.closed)
+	})
+	if !newlyClosed {
+		return &net.OpError{Op: "close", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
+	}
+	return nil
+}
+
+// Close unblocks pending operations and makes future operations fail with a
+// "closed connection" error.
+func (c *QueuePacketConn) Close() error {
+	return c.closeWithError(nil)
+}
+
+// LocalAddr returns the localAddr value that was passed to NewQueuePacketConn.
+func (c *QueuePacketConn) LocalAddr() net.Addr { return c.localAddr }
+
+func (c *QueuePacketConn) SetDeadline(t time.Time) error      { return errNotImplemented }
+func (c *QueuePacketConn) SetReadDeadline(t time.Time) error  { return errNotImplemented }
+func (c *QueuePacketConn) SetWriteDeadline(t time.Time) error { return errNotImplemented }