handler.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. package tun
  2. import (
  3. "context"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "github.com/xtls/xray-core/common"
  8. "github.com/xtls/xray-core/common/buf"
  9. c "github.com/xtls/xray-core/common/ctx"
  10. "github.com/xtls/xray-core/common/errors"
  11. "github.com/xtls/xray-core/common/net"
  12. "github.com/xtls/xray-core/common/protocol"
  13. "github.com/xtls/xray-core/common/session"
  14. "github.com/xtls/xray-core/common/signal/done"
  15. "github.com/xtls/xray-core/common/task"
  16. "github.com/xtls/xray-core/core"
  17. "github.com/xtls/xray-core/features/policy"
  18. "github.com/xtls/xray-core/features/routing"
  19. "github.com/xtls/xray-core/transport"
  20. "github.com/xtls/xray-core/transport/internet/stat"
  21. "github.com/xtls/xray-core/transport/pipe"
  22. "gvisor.dev/gvisor/pkg/buffer"
  23. "gvisor.dev/gvisor/pkg/tcpip"
  24. "gvisor.dev/gvisor/pkg/tcpip/checksum"
  25. "gvisor.dev/gvisor/pkg/tcpip/header"
  26. "gvisor.dev/gvisor/pkg/tcpip/stack"
  27. )
  28. type udpConn struct {
  29. lastActive int64
  30. reader buf.Reader
  31. writer buf.Writer
  32. done *done.Instance
  33. cancel context.CancelFunc
  34. }
  35. // Handler is managing object that tie together tun interface, ip stack and dispatch connections to the routing
  36. type Handler struct {
  37. sync.Mutex
  38. ctx context.Context
  39. config *Config
  40. stack Stack
  41. policyManager policy.Manager
  42. dispatcher routing.Dispatcher
  43. udpConns map[net.Destination]*udpConn
  44. udpChecker *task.Periodic
  45. }
  46. // ConnectionHandler interface with the only method that stack is going to push new connections to
  47. type ConnectionHandler interface {
  48. HandleConnection(conn net.Conn, destination net.Destination)
  49. }
  50. // Handler implements ConnectionHandler
  51. var _ ConnectionHandler = (*Handler)(nil)
  52. func (t *Handler) policy() policy.Session {
  53. return t.policyManager.ForLevel(t.config.UserLevel)
  54. }
  55. func (t *Handler) cleanupUDP() error {
  56. t.Lock()
  57. defer t.Unlock()
  58. if len(t.udpConns) == 0 {
  59. return errors.New("no connections")
  60. }
  61. now := time.Now().Unix()
  62. for src, conn := range t.udpConns {
  63. if now-atomic.LoadInt64(&conn.lastActive) > 300 {
  64. conn.cancel()
  65. common.Must(conn.done.Close())
  66. common.Must(common.Close(conn.writer))
  67. delete(t.udpConns, src)
  68. }
  69. }
  70. return nil
  71. }
  72. func (t *Handler) HandleUDPPacket(id stack.TransportEndpointID, pkt *stack.PacketBuffer, ipStack *stack.Stack) {
  73. src := net.UDPDestination(net.IPAddress(id.RemoteAddress.AsSlice()), net.Port(id.RemotePort))
  74. dest := net.UDPDestination(net.IPAddress(id.LocalAddress.AsSlice()), net.Port(id.LocalPort))
  75. data := pkt.Data().AsRange().ToSlice()
  76. if len(data) == 0 {
  77. return
  78. }
  79. t.Lock()
  80. conn, found := t.udpConns[src]
  81. if !found {
  82. reader, writer := pipe.New(pipe.DiscardOverflow(), pipe.WithSizeLimit(16*1024))
  83. conn = &udpConn{reader: reader, writer: writer, done: done.New()}
  84. t.udpConns[src] = conn
  85. if t.udpChecker != nil && len(t.udpConns) == 1 {
  86. common.Must(t.udpChecker.Start())
  87. }
  88. t.Unlock()
  89. go func() {
  90. ctx, cancel := context.WithCancel(t.ctx)
  91. conn.cancel = cancel
  92. defer func() {
  93. cancel()
  94. t.Lock()
  95. delete(t.udpConns, src)
  96. t.Unlock()
  97. common.Must(conn.done.Close())
  98. common.Must(common.Close(conn.writer))
  99. }()
  100. inbound := &session.Inbound{
  101. Name: "tun",
  102. Source: src,
  103. CanSpliceCopy: 1,
  104. User: &protocol.MemoryUser{Level: t.config.UserLevel},
  105. }
  106. ctx = session.ContextWithInbound(c.ContextWithID(ctx, session.NewID()), inbound)
  107. ctx = session.SubContextFromMuxInbound(ctx)
  108. link := &transport.Link{
  109. Reader: &buf.TimeoutWrapperReader{Reader: conn.reader},
  110. Writer: &udpWriter{stack: ipStack, src: dest, dest: src},
  111. }
  112. t.dispatcher.DispatchLink(ctx, dest, link)
  113. }()
  114. } else {
  115. atomic.StoreInt64(&conn.lastActive, time.Now().Unix())
  116. t.Unlock()
  117. }
  118. b := buf.New()
  119. b.Write(data)
  120. b.UDP = &dest
  121. conn.writer.WriteMultiBuffer(buf.MultiBuffer{b})
  122. }
  123. type udpWriter struct {
  124. stack *stack.Stack
  125. src net.Destination
  126. dest net.Destination
  127. }
  128. func (w *udpWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
  129. for _, b := range mb {
  130. // Validate return packet address matches expected destination
  131. if b.UDP != nil {
  132. if b.UDP.Address != w.dest.Address || b.UDP.Port != w.dest.Port {
  133. errors.LogWarning(
  134. context.Background(),
  135. "UDP return packet address mismatch: expected ",
  136. w.dest,
  137. ", got ",
  138. b.UDP,
  139. )
  140. b.Release()
  141. continue
  142. }
  143. }
  144. netProto := header.IPv4ProtocolNumber
  145. if !w.src.Address.Family().IsIPv4() {
  146. netProto = header.IPv6ProtocolNumber
  147. }
  148. route, err := w.stack.FindRoute(
  149. defaultNIC,
  150. tcpip.AddrFromSlice(w.src.Address.IP()),
  151. tcpip.AddrFromSlice(w.dest.Address.IP()),
  152. netProto,
  153. false,
  154. )
  155. if err != nil {
  156. b.Release()
  157. continue
  158. }
  159. pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{
  160. ReserveHeaderBytes: header.UDPMinimumSize,
  161. Payload: buffer.MakeWithData(b.Bytes()),
  162. })
  163. udp := header.UDP(pkt.TransportHeader().Push(header.UDPMinimumSize))
  164. udp.Encode(&header.UDPFields{
  165. SrcPort: uint16(w.src.Port),
  166. DstPort: uint16(w.dest.Port),
  167. Length: uint16(pkt.Size()),
  168. })
  169. xsum := route.PseudoHeaderChecksum(header.UDPProtocolNumber, uint16(pkt.Size()))
  170. udp.SetChecksum(^udp.CalculateChecksum(checksum.Checksum(b.Bytes(), xsum)))
  171. route.WritePacket(stack.NetworkHeaderParams{
  172. Protocol: header.UDPProtocolNumber,
  173. TTL: 64,
  174. }, pkt)
  175. pkt.DecRef()
  176. route.Release()
  177. b.Release()
  178. }
  179. return nil
  180. }
  181. // Init the Handler instance with necessary parameters
  182. func (t *Handler) Init(ctx context.Context, pm policy.Manager, dispatcher routing.Dispatcher) error {
  183. var err error
  184. t.ctx = core.ToBackgroundDetachedContext(ctx)
  185. t.policyManager = pm
  186. t.dispatcher = dispatcher
  187. t.udpConns = make(map[net.Destination]*udpConn)
  188. t.udpChecker = &task.Periodic{Interval: time.Minute, Execute: t.cleanupUDP}
  189. tunName := t.config.Name
  190. tunOptions := TunOptions{
  191. Name: tunName,
  192. MTU: t.config.MTU,
  193. }
  194. tunInterface, err := NewTun(tunOptions)
  195. if err != nil {
  196. return err
  197. }
  198. errors.LogInfo(t.ctx, tunName, " created")
  199. tunStackOptions := StackOptions{
  200. Tun: tunInterface,
  201. IdleTimeout: pm.ForLevel(t.config.UserLevel).Timeouts.ConnectionIdle,
  202. }
  203. tunStack, err := NewStack(t.ctx, tunStackOptions, t)
  204. if err != nil {
  205. _ = tunInterface.Close()
  206. return err
  207. }
  208. err = tunStack.Start()
  209. if err != nil {
  210. _ = tunStack.Close()
  211. _ = tunInterface.Close()
  212. return err
  213. }
  214. err = tunInterface.Start()
  215. if err != nil {
  216. _ = tunStack.Close()
  217. _ = tunInterface.Close()
  218. return err
  219. }
  220. t.stack = tunStack
  221. errors.LogInfo(t.ctx, tunName, " up")
  222. return nil
  223. }
  224. // HandleConnection pass the connection coming from the ip stack to the routing dispatcher
  225. func (t *Handler) HandleConnection(conn net.Conn, destination net.Destination) {
  226. sid := session.NewID()
  227. ctx := c.ContextWithID(t.ctx, sid)
  228. errors.LogInfo(ctx, "processing connection from: ", conn.RemoteAddr())
  229. inbound := session.Inbound{}
  230. inbound.Name = "tun"
  231. inbound.CanSpliceCopy = 1
  232. inbound.Source = net.DestinationFromAddr(conn.RemoteAddr())
  233. inbound.User = &protocol.MemoryUser{
  234. Level: t.config.UserLevel,
  235. }
  236. ctx = session.ContextWithInbound(ctx, &inbound)
  237. ctx = session.SubContextFromMuxInbound(ctx)
  238. link := &transport.Link{
  239. Reader: &buf.TimeoutWrapperReader{Reader: buf.NewReader(conn)},
  240. Writer: buf.NewWriter(conn),
  241. }
  242. if err := t.dispatcher.DispatchLink(ctx, destination, link); err != nil {
  243. errors.LogError(ctx, errors.New("connection closed").Base(err))
  244. return
  245. }
  246. errors.LogInfo(ctx, "connection completed")
  247. }
  248. // Network implements proxy.Inbound
  249. // and exists only to comply to proxy interface, declaring it doesn't listen on any network,
  250. // making the process not open any port for this inbound (input will be network interface)
  251. func (t *Handler) Network() []net.Network {
  252. return []net.Network{}
  253. }
  254. // Process implements proxy.Inbound
  255. // and exists only to comply to proxy interface, which should never get any inputs due to no listening ports
  256. func (t *Handler) Process(ctx context.Context, network net.Network, conn stat.Connection, dispatcher routing.Dispatcher) error {
  257. return nil
  258. }
  259. func init() {
  260. common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  261. t := &Handler{config: config.(*Config)}
  262. err := core.RequireFeatures(ctx, func(pm policy.Manager, dispatcher routing.Dispatcher) error {
  263. return t.Init(ctx, pm, dispatcher)
  264. })
  265. return t, err
  266. }))
  267. }