handler.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package tun
  2. import (
  3. "context"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "github.com/xtls/xray-core/common"
  8. "github.com/xtls/xray-core/common/buf"
  9. c "github.com/xtls/xray-core/common/ctx"
  10. "github.com/xtls/xray-core/common/errors"
  11. "github.com/xtls/xray-core/common/net"
  12. "github.com/xtls/xray-core/common/protocol"
  13. "github.com/xtls/xray-core/common/session"
  14. "github.com/xtls/xray-core/common/signal/done"
  15. "github.com/xtls/xray-core/common/task"
  16. "github.com/xtls/xray-core/core"
  17. "github.com/xtls/xray-core/features/policy"
  18. "github.com/xtls/xray-core/features/routing"
  19. "github.com/xtls/xray-core/transport"
  20. "github.com/xtls/xray-core/transport/internet/stat"
  21. )
  22. type udpConn struct {
  23. lastActive atomic.Int64
  24. reader buf.Reader
  25. writer buf.Writer
  26. done *done.Instance
  27. cancel context.CancelFunc
  28. }
  29. // Handler is managing object that tie together tun interface, ip stack and dispatch connections to the routing
  30. type Handler struct {
  31. sync.Mutex
  32. ctx context.Context
  33. config *Config
  34. stack Stack
  35. policyManager policy.Manager
  36. dispatcher routing.Dispatcher
  37. udpConns map[net.Destination]*udpConn
  38. udpChecker *task.Periodic
  39. }
  40. // ConnectionHandler interface with the only method that stack is going to push new connections to
  41. type ConnectionHandler interface {
  42. HandleConnection(conn net.Conn, destination net.Destination)
  43. }
  44. // Handler implements ConnectionHandler
  45. var _ ConnectionHandler = (*Handler)(nil)
  46. func (t *Handler) policy() policy.Session {
  47. return t.policyManager.ForLevel(t.config.UserLevel)
  48. }
  49. func (t *Handler) cleanupUDP() error {
  50. t.Lock()
  51. defer t.Unlock()
  52. if len(t.udpConns) == 0 {
  53. return errors.New("no connections")
  54. }
  55. now := time.Now().Unix()
  56. for src, conn := range t.udpConns {
  57. if now-conn.lastActive.Load() > 300 {
  58. conn.cancel()
  59. common.Must(conn.done.Close())
  60. common.Must(common.Close(conn.writer))
  61. delete(t.udpConns, src)
  62. }
  63. }
  64. return nil
  65. }
  66. // Init the Handler instance with necessary parameters
  67. func (t *Handler) Init(ctx context.Context, pm policy.Manager, dispatcher routing.Dispatcher) error {
  68. var err error
  69. t.ctx = core.ToBackgroundDetachedContext(ctx)
  70. t.policyManager = pm
  71. t.dispatcher = dispatcher
  72. t.udpConns = make(map[net.Destination]*udpConn)
  73. t.udpChecker = &task.Periodic{Interval: time.Minute, Execute: t.cleanupUDP}
  74. tunName := t.config.Name
  75. tunOptions := TunOptions{
  76. Name: tunName,
  77. MTU: t.config.MTU,
  78. }
  79. tunInterface, err := NewTun(tunOptions)
  80. if err != nil {
  81. return err
  82. }
  83. errors.LogInfo(t.ctx, tunName, " created")
  84. tunStackOptions := StackOptions{
  85. Tun: tunInterface,
  86. IdleTimeout: pm.ForLevel(t.config.UserLevel).Timeouts.ConnectionIdle,
  87. }
  88. tunStack, err := NewStack(t.ctx, tunStackOptions, t)
  89. if err != nil {
  90. _ = tunInterface.Close()
  91. return err
  92. }
  93. err = tunStack.Start()
  94. if err != nil {
  95. _ = tunStack.Close()
  96. _ = tunInterface.Close()
  97. return err
  98. }
  99. err = tunInterface.Start()
  100. if err != nil {
  101. _ = tunStack.Close()
  102. _ = tunInterface.Close()
  103. return err
  104. }
  105. t.stack = tunStack
  106. errors.LogInfo(t.ctx, tunName, " up")
  107. return nil
  108. }
  109. // HandleConnection pass the connection coming from the ip stack to the routing dispatcher
  110. func (t *Handler) HandleConnection(conn net.Conn, destination net.Destination) {
  111. sid := session.NewID()
  112. ctx := c.ContextWithID(t.ctx, sid)
  113. errors.LogInfo(ctx, "processing connection from: ", conn.RemoteAddr())
  114. inbound := session.Inbound{}
  115. inbound.Name = "tun"
  116. inbound.CanSpliceCopy = 1
  117. inbound.Source = net.DestinationFromAddr(conn.RemoteAddr())
  118. inbound.User = &protocol.MemoryUser{
  119. Level: t.config.UserLevel,
  120. }
  121. ctx = session.ContextWithInbound(ctx, &inbound)
  122. ctx = session.SubContextFromMuxInbound(ctx)
  123. link := &transport.Link{
  124. Reader: &buf.TimeoutWrapperReader{Reader: buf.NewReader(conn)},
  125. Writer: buf.NewWriter(conn),
  126. }
  127. if err := t.dispatcher.DispatchLink(ctx, destination, link); err != nil {
  128. errors.LogError(ctx, errors.New("connection closed").Base(err))
  129. return
  130. }
  131. errors.LogInfo(ctx, "connection completed")
  132. }
  133. // Network implements proxy.Inbound
  134. // and exists only to comply to proxy interface, declaring it doesn't listen on any network,
  135. // making the process not open any port for this inbound (input will be network interface)
  136. func (t *Handler) Network() []net.Network {
  137. return []net.Network{}
  138. }
  139. // Process implements proxy.Inbound
  140. // and exists only to comply to proxy interface, which should never get any inputs due to no listening ports
  141. func (t *Handler) Process(ctx context.Context, network net.Network, conn stat.Connection, dispatcher routing.Dispatcher) error {
  142. return nil
  143. }
  144. func init() {
  145. common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  146. t := &Handler{config: config.(*Config)}
  147. err := core.RequireFeatures(ctx, func(pm policy.Manager, dispatcher routing.Dispatcher) error {
  148. return t.Init(ctx, pm, dispatcher)
  149. })
  150. return t, err
  151. }))
  152. }