webhook.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. package router
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "io"
  7. "net"
  8. "net/http"
  9. "path/filepath"
  10. "runtime"
  11. "strings"
  12. "sync"
  13. "syscall"
  14. "time"
  15. "github.com/xtls/xray-core/common/errors"
  16. "github.com/xtls/xray-core/features/routing"
  17. routing_session "github.com/xtls/xray-core/features/routing/session"
  18. )
  19. // parseURL splits a webhook URL into an HTTP URL and an optional Unix socket
  20. // path. For regular http/https URLs the input is returned unchanged with an
  21. // empty socketPath. For Unix sockets the format is:
  22. //
  23. // /path/to/socket.sock:/http/path
  24. // @abstract:/http/path
  25. // @@padded:/http/path
  26. //
  27. // The :/ separator after the socket path delimits the HTTP request path.
  28. // If omitted, "/" is used.
  29. func parseURL(raw string) (httpURL, socketPath string) {
  30. if len(raw) == 0 || (!filepath.IsAbs(raw) && raw[0] != '@') {
  31. return raw, ""
  32. }
  33. if idx := strings.Index(raw, ":/"); idx >= 0 {
  34. return "http://localhost" + raw[idx+1:], raw[:idx]
  35. }
  36. return "http://localhost/", raw
  37. }
  38. // resolveSocketPath applies platform-specific transformations to a Unix
  39. // socket path, matching the behaviour of the listen side in
  40. // transport/internet/system_listener.go.
  41. //
  42. // For abstract sockets (prefix @) on Linux/Android:
  43. // - single @ — used as-is (lock-free abstract socket)
  44. // - double @@ — stripped to single @ and padded to
  45. // syscall.RawSockaddrUnix{}.Path length (HAProxy compat)
  46. func resolveSocketPath(path string) string {
  47. if len(path) == 0 || path[0] != '@' {
  48. return path
  49. }
  50. if runtime.GOOS != "linux" && runtime.GOOS != "android" {
  51. return path
  52. }
  53. if len(path) > 1 && path[1] == '@' {
  54. fullAddr := make([]byte, len(syscall.RawSockaddrUnix{}.Path))
  55. copy(fullAddr, path[1:])
  56. return string(fullAddr)
  57. }
  58. return path
  59. }
  60. func ptr[T any](v T) *T { return &v }
  61. type event struct {
  62. Email *string `json:"email"`
  63. Level *uint32 `json:"level"`
  64. Protocol *string `json:"protocol"`
  65. Network *string `json:"network"`
  66. Source *string `json:"source"`
  67. Destination *string `json:"destination"`
  68. OriginalTarget *string `json:"originalTarget"`
  69. RouteTarget *string `json:"routeTarget"`
  70. InboundTag *string `json:"inboundTag"`
  71. InboundName *string `json:"inboundName"`
  72. InboundLocal *string `json:"inboundLocal"`
  73. OutboundTag *string `json:"outboundTag"`
  74. Timestamp int64 `json:"ts"`
  75. }
  76. type WebhookNotifier struct {
  77. url string
  78. headers map[string]string
  79. deduplication uint32
  80. client *http.Client
  81. seen sync.Map
  82. done chan struct{}
  83. wg sync.WaitGroup
  84. closeOnce sync.Once
  85. }
  86. func NewWebhookNotifier(cfg *WebhookConfig) (*WebhookNotifier, error) {
  87. if cfg == nil || cfg.Url == "" {
  88. return nil, nil
  89. }
  90. httpURL, socketPath := parseURL(cfg.Url)
  91. h := &WebhookNotifier{
  92. url: httpURL,
  93. deduplication: cfg.Deduplication,
  94. client: &http.Client{
  95. Timeout: 5 * time.Second,
  96. },
  97. done: make(chan struct{}),
  98. }
  99. if socketPath != "" {
  100. dialAddr := resolveSocketPath(socketPath)
  101. h.client.Transport = &http.Transport{
  102. DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
  103. var d net.Dialer
  104. return d.DialContext(ctx, "unix", dialAddr)
  105. },
  106. }
  107. }
  108. if len(cfg.Headers) > 0 {
  109. h.headers = make(map[string]string, len(cfg.Headers))
  110. for k, v := range cfg.Headers {
  111. h.headers[k] = v
  112. }
  113. }
  114. if h.deduplication > 0 {
  115. h.wg.Add(1)
  116. go h.cleanupLoop()
  117. }
  118. return h, nil
  119. }
  120. func (h *WebhookNotifier) Fire(ctx routing.Context, outboundTag string) {
  121. ev := buildEvent(ctx, outboundTag)
  122. email := ""
  123. if ev.Email != nil {
  124. email = *ev.Email
  125. }
  126. if h.isDuplicate(email) {
  127. return
  128. }
  129. h.wg.Add(1)
  130. select {
  131. case <-h.done:
  132. h.wg.Done()
  133. return
  134. default:
  135. }
  136. go func() {
  137. defer h.wg.Done()
  138. h.post(ev)
  139. }()
  140. }
  141. func buildEvent(ctx routing.Context, outboundTag string) *event {
  142. ev := &event{
  143. Timestamp: time.Now().Unix(),
  144. OutboundTag: ptr(outboundTag),
  145. InboundTag: ptr(ctx.GetInboundTag()),
  146. Protocol: ptr(ctx.GetProtocol()),
  147. Network: ptr(ctx.GetNetwork().SystemString()),
  148. }
  149. if user := ctx.GetUser(); user != "" {
  150. ev.Email = ptr(user)
  151. }
  152. if srcIPs := ctx.GetSourceIPs(); len(srcIPs) > 0 {
  153. srcPort := ctx.GetSourcePort()
  154. ev.Source = ptr(net.JoinHostPort(srcIPs[0].String(), srcPort.String()))
  155. }
  156. targetPort := ctx.GetTargetPort()
  157. if domain := ctx.GetTargetDomain(); domain != "" {
  158. ev.Destination = ptr(net.JoinHostPort(domain, targetPort.String()))
  159. } else if targetIPs := ctx.GetTargetIPs(); len(targetIPs) > 0 {
  160. ev.Destination = ptr(net.JoinHostPort(targetIPs[0].String(), targetPort.String()))
  161. }
  162. if localIPs := ctx.GetLocalIPs(); len(localIPs) > 0 {
  163. localPort := ctx.GetLocalPort()
  164. ev.InboundLocal = ptr(net.JoinHostPort(localIPs[0].String(), localPort.String()))
  165. }
  166. if sctx, ok := ctx.(*routing_session.Context); ok {
  167. enrichFromSession(ev, sctx)
  168. }
  169. return ev
  170. }
  171. func enrichFromSession(ev *event, sctx *routing_session.Context) {
  172. if sctx.Inbound != nil {
  173. ev.InboundName = ptr(sctx.Inbound.Name)
  174. if sctx.Inbound.User != nil {
  175. ev.Level = ptr(sctx.Inbound.User.Level)
  176. }
  177. }
  178. if sctx.Outbound != nil {
  179. if sctx.Outbound.OriginalTarget.Address != nil {
  180. ev.OriginalTarget = ptr(sctx.Outbound.OriginalTarget.String())
  181. }
  182. if sctx.Outbound.RouteTarget.Address != nil {
  183. ev.RouteTarget = ptr(sctx.Outbound.RouteTarget.String())
  184. }
  185. }
  186. }
  187. func (h *WebhookNotifier) post(ev *event) {
  188. body, err := json.Marshal(ev)
  189. if err != nil {
  190. errors.LogWarning(context.Background(), "webhook: marshal failed: ", err)
  191. return
  192. }
  193. req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, h.url, bytes.NewReader(body))
  194. if err != nil {
  195. errors.LogWarning(context.Background(), "webhook: request build failed: ", err)
  196. return
  197. }
  198. req.Header.Set("Content-Type", "application/json")
  199. for k, v := range h.headers {
  200. req.Header.Set(k, v)
  201. }
  202. resp, err := h.client.Do(req)
  203. if err != nil {
  204. errors.LogInfo(context.Background(), "webhook: POST failed: ", err)
  205. return
  206. }
  207. defer func() {
  208. io.Copy(io.Discard, resp.Body)
  209. resp.Body.Close()
  210. }()
  211. if resp.StatusCode >= 400 {
  212. errors.LogWarning(context.Background(), "webhook: POST returned status ", resp.StatusCode)
  213. }
  214. }
  215. func (h *WebhookNotifier) isDuplicate(email string) bool {
  216. if h.deduplication == 0 || email == "" {
  217. return false
  218. }
  219. ttl := time.Duration(h.deduplication) * time.Second
  220. now := time.Now()
  221. if v, loaded := h.seen.LoadOrStore(email, now); loaded {
  222. if now.Sub(v.(time.Time)) < ttl {
  223. return true
  224. }
  225. h.seen.Store(email, now)
  226. }
  227. return false
  228. }
  229. func (h *WebhookNotifier) cleanupLoop() {
  230. defer h.wg.Done()
  231. ttl := time.Duration(h.deduplication) * time.Second
  232. ticker := time.NewTicker(ttl)
  233. defer ticker.Stop()
  234. for {
  235. select {
  236. case <-h.done:
  237. return
  238. case <-ticker.C:
  239. now := time.Now()
  240. h.seen.Range(func(key, value any) bool {
  241. if now.Sub(value.(time.Time)) >= ttl {
  242. h.seen.Delete(key)
  243. }
  244. return true
  245. })
  246. }
  247. }
  248. }
  249. func (h *WebhookNotifier) Close() error {
  250. h.closeOnce.Do(func() {
  251. close(h.done)
  252. })
  253. h.wg.Wait()
  254. h.client.CloseIdleConnections()
  255. return nil
  256. }