Quellcode durchsuchen

API: Fix Online Map (#5732)

https://github.com/XTLS/Xray-core/pull/5732#pullrequestreview-3863990264
Yury Kastov vor 3 Monaten
Ursprung
Commit
eec280262d
5 geänderte Dateien mit 81 neuen und 70 gelöschten Zeilen
  1. 4 9
      app/dispatcher/default.go
  2. 1 1
      app/stats/command/command.go
  3. 66 52
      app/stats/online_map.go
  4. 3 3
      app/stats/stats.go
  5. 7 5
      features/stats/stats.go

+ 4 - 9
app/dispatcher/default.go

@@ -183,12 +183,9 @@ func (d *DefaultDispatcher) getLink(ctx context.Context) (*transport.Link, *tran
 		if p.Stats.UserOnline {
 			name := "user>>>" + user.Email + ">>>online"
 			if om, _ := stats.GetOrRegisterOnlineMap(d.stats, name); om != nil {
-				sessionInbounds := session.InboundFromContext(ctx)
-				userIP := sessionInbounds.Source.Address.String()
+				userIP := sessionInbound.Source.Address.String()
 				om.AddIP(userIP)
-				// log Online user with ips
-				// errors.LogDebug(ctx, "user>>>" + user.Email + ">>>online", om.Count(), om.List())
-
+				context.AfterFunc(ctx, func() { om.RemoveIP(userIP) })
 			}
 		}
 	}
@@ -225,11 +222,9 @@ func WrapLink(ctx context.Context, policyManager policy.Manager, statsManager st
 		if p.Stats.UserOnline {
 			name := "user>>>" + user.Email + ">>>online"
 			if om, _ := stats.GetOrRegisterOnlineMap(statsManager, name); om != nil {
-				sessionInbounds := session.InboundFromContext(ctx)
-				userIP := sessionInbounds.Source.Address.String()
+				userIP := sessionInbound.Source.Address.String()
 				om.AddIP(userIP)
-				// log Online user with ips
-				// errors.LogDebug(ctx, "user>>>" + user.Email + ">>>online", om.Count(), om.List())
+				context.AfterFunc(ctx, func() { om.RemoveIP(userIP) })
 			}
 		}
 	}

+ 1 - 1
app/stats/command/command.go

@@ -70,7 +70,7 @@ func (s *statsServer) GetStatsOnlineIpList(ctx context.Context, request *GetStat
 	}
 
 	ips := make(map[string]int64)
-	for ip, t := range c.IpTimeMap() {
+	for ip, t := range c.IPTimeMap() {
 		ips[ip] = t.Unix()
 	}
 

+ 66 - 52
app/stats/online_map.go

@@ -2,84 +2,98 @@ package stats
 
 import (
 	"sync"
+	"sync/atomic"
 	"time"
 )
 
-// OnlineMap is an implementation of stats.OnlineMap.
+const (
+	localhostIPv4 = "127.0.0.1"
+	localhostIPv6 = "[::1]"
+)
+
+type ipEntry struct {
+	refCount int
+	lastSeen time.Time
+}
+
+// OnlineMap is a refcount-based implementation of stats.OnlineMap.
+// IPs are tracked by reference counting: AddIP increments, RemoveIP decrements.
+// An IP is removed from the map when its reference count reaches zero.
 type OnlineMap struct {
-	ipList        map[string]time.Time
-	access        sync.RWMutex
-	lastCleanup   time.Time
-	cleanupPeriod time.Duration
+	entries map[string]*ipEntry
+	access  sync.Mutex
+	count   atomic.Int64
 }
 
-// NewOnlineMap creates a new instance of OnlineMap.
+// NewOnlineMap creates a new OnlineMap instance.
 func NewOnlineMap() *OnlineMap {
 	return &OnlineMap{
-		ipList:        make(map[string]time.Time),
-		lastCleanup:   time.Now(),
-		cleanupPeriod: 10 * time.Second,
+		entries: make(map[string]*ipEntry),
 	}
 }
 
-// Count implements stats.OnlineMap.
-func (c *OnlineMap) Count() int {
-	c.access.RLock()
-	defer c.access.RUnlock()
-
-	return len(c.ipList)
-}
-
-// List implements stats.OnlineMap.
-func (c *OnlineMap) List() []string {
-	return c.GetKeys()
-}
-
 // AddIP implements stats.OnlineMap.
-func (c *OnlineMap) AddIP(ip string) {
-	if ip == "127.0.0.1" {
+func (om *OnlineMap) AddIP(ip string) {
+	if ip == localhostIPv4 || ip == localhostIPv6 {
 		return
 	}
 
-	c.access.Lock()
-	c.ipList[ip] = time.Now()
-	c.access.Unlock()
+	om.access.Lock()
+	defer om.access.Unlock()
 
-	if time.Since(c.lastCleanup) > c.cleanupPeriod {
-		c.RemoveExpiredIPs()
-		c.lastCleanup = time.Now()
+	if e, ok := om.entries[ip]; ok {
+		e.refCount++
+		e.lastSeen = time.Now()
+	} else {
+		om.entries[ip] = &ipEntry{
+			refCount: 1,
+			lastSeen: time.Now(),
+		}
+		om.count.Add(1)
 	}
 }
 
-func (c *OnlineMap) GetKeys() []string {
-	c.access.RLock()
-	defer c.access.RUnlock()
+// RemoveIP implements stats.OnlineMap.
+func (om *OnlineMap) RemoveIP(ip string) {
+	om.access.Lock()
+	defer om.access.Unlock()
 
-	keys := []string{}
-	for k := range c.ipList {
-		keys = append(keys, k)
+	e, ok := om.entries[ip]
+	if !ok {
+		return
+	}
+	e.refCount--
+	if e.refCount <= 0 {
+		delete(om.entries, ip)
+		om.count.Add(-1)
 	}
-	return keys
 }
 
-func (c *OnlineMap) RemoveExpiredIPs() {
-	c.access.Lock()
-	defer c.access.Unlock()
+// Count implements stats.OnlineMap.
+func (om *OnlineMap) Count() int {
+	return int(om.count.Load())
+}
 
-	now := time.Now()
-	for k, t := range c.ipList {
-		diff := now.Sub(t)
-		if diff.Seconds() > 20 {
-			delete(c.ipList, k)
-		}
+// List implements stats.OnlineMap.
+func (om *OnlineMap) List() []string {
+	om.access.Lock()
+	defer om.access.Unlock()
+
+	keys := make([]string, 0, len(om.entries))
+	for ip := range om.entries {
+		keys = append(keys, ip)
 	}
+	return keys
 }
 
-func (c *OnlineMap) IpTimeMap() map[string]time.Time {
-	if time.Since(c.lastCleanup) > c.cleanupPeriod {
-		c.RemoveExpiredIPs()
-		c.lastCleanup = time.Now()
-	}
+// IPTimeMap implements stats.OnlineMap.
+func (om *OnlineMap) IPTimeMap() map[string]time.Time {
+	om.access.Lock()
+	defer om.access.Unlock()
 
-	return c.ipList
+	result := make(map[string]time.Time, len(om.entries))
+	for ip, e := range om.entries {
+		result[ip] = e.lastSeen
+	}
+	return result
 }

+ 3 - 3
app/stats/stats.go

@@ -163,12 +163,12 @@ func (m *Manager) GetChannel(name string) stats.Channel {
 
 // GetAllOnlineUsers implements stats.Manager.
 func (m *Manager) GetAllOnlineUsers() []string {
-	m.access.Lock()
-	defer m.access.Unlock()
+	m.access.RLock()
+	defer m.access.RUnlock()
 
 	usersOnline := make([]string, 0, len(m.onlineMap))
 	for user, onlineMap := range m.onlineMap {
-		if len(onlineMap.IpTimeMap()) > 0 {
+		if onlineMap.Count() > 0 {
 			usersOnline = append(usersOnline, user)
 		}
 	}

+ 7 - 5
features/stats/stats.go

@@ -25,14 +25,16 @@ type Counter interface {
 //
 // xray:api:stable
 type OnlineMap interface {
-	// Count is the current value of the OnlineMap.
+	// Count returns the number of unique online IPs.
 	Count() int
-	// AddIP adds a ip to the current OnlineMap.
+	// AddIP increments the reference count for the given IP.
 	AddIP(string)
-	// List is the current OnlineMap ip list.
+	// RemoveIP decrements the reference count for the given IP. Deletes at zero.
+	RemoveIP(string)
+	// List returns all currently online IPs.
 	List() []string
-	// IpTimeMap return client ips and their last access time.
-	IpTimeMap() map[string]time.Time
+	// IPTimeMap returns a snapshot copy of IPs to their last-seen times.
+	IPTimeMap() map[string]time.Time
 }
 
 // Channel is the interface for stats channel.