destBytes.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. /*
  2. * Copyright (c) 2026, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package server
  20. import (
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  25. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  26. )
  27. const (
  28. destBytesSoftMaxEntries = 100000
  29. destBytesHardMaxEntries = 1000000
  30. )
  31. // destBytesLogger accumulates ASN and domain destination bytes metrics,
  32. // aggregates into coarse-grained buckets, and periodically logs destination
  33. // byte events.
  34. type destBytesLogger struct {
  35. support *SupportServices
  36. runMutex sync.Mutex
  37. running bool
  38. stopBroadcast chan struct{}
  39. waitGroup *sync.WaitGroup
  40. asnBytesMutex sync.Mutex
  41. asnBytes map[destBytesBucket]destBytesCounters
  42. domainBytesMutex sync.Mutex
  43. domainBytes map[destBytesBucket]destBytesCounters
  44. signalLogASNBytes chan struct{}
  45. signalLogDomainBytes chan struct{}
  46. loggedHardMax atomic.Bool
  47. }
  48. type destBytesBucket struct {
  49. destination string
  50. clientRegion string
  51. clientASN string
  52. sponsorID string
  53. clientPlatform string
  54. deviceRegion string
  55. }
  56. type destBytesCounters struct {
  57. TCP int64
  58. UDP int64
  59. }
  60. // newDestBytesLogger initializes a new destBytesLogger.
  61. func newDestBytesLogger(support *SupportServices) *destBytesLogger {
  62. return &destBytesLogger{
  63. support: support,
  64. asnBytes: make(map[destBytesBucket]destBytesCounters),
  65. domainBytes: make(map[destBytesBucket]destBytesCounters),
  66. signalLogASNBytes: make(chan struct{}, 1),
  67. signalLogDomainBytes: make(chan struct{}, 1),
  68. }
  69. }
  70. // Start begins the periodic logging worker.
  71. func (d *destBytesLogger) Start() error {
  72. d.runMutex.Lock()
  73. defer d.runMutex.Unlock()
  74. if d.running {
  75. return errors.TraceNew("already running")
  76. }
  77. d.running = true
  78. d.stopBroadcast = make(chan struct{})
  79. d.waitGroup = new(sync.WaitGroup)
  80. d.waitGroup.Add(1)
  81. go func() {
  82. defer d.waitGroup.Done()
  83. d.run()
  84. }()
  85. return nil
  86. }
  87. // Stop halts the periodic logging worker. Any remaining aggregated metrics
  88. // will be logged before Stop returns.
  89. func (d *destBytesLogger) Stop() {
  90. d.runMutex.Lock()
  91. defer d.runMutex.Unlock()
  92. if !d.running {
  93. return
  94. }
  95. close(d.stopBroadcast)
  96. d.waitGroup.Wait()
  97. d.running = false
  98. d.stopBroadcast = nil
  99. d.waitGroup = nil
  100. }
  101. // AddASNBytes adds ASN destination bytes to the aggregation.
  102. func (d *destBytesLogger) AddASNBytes(
  103. destination string,
  104. clientGeoIPData GeoIPData,
  105. apiParams common.APIParameters,
  106. bytesTCP int64,
  107. bytesUDP int64) {
  108. if d == nil {
  109. // !RunDestBytesLogger case.
  110. return
  111. }
  112. d.addBytes(
  113. true,
  114. destination,
  115. clientGeoIPData,
  116. apiParams,
  117. bytesTCP,
  118. bytesUDP)
  119. }
  120. // AddDomainBytes adds domain destination bytes to the aggregation.
  121. func (d *destBytesLogger) AddDomainBytes(
  122. destination string,
  123. clientGeoIPData GeoIPData,
  124. apiParams common.APIParameters,
  125. bytesTCP int64,
  126. bytesUDP int64) {
  127. if d == nil {
  128. // !RunDestBytesLogger case.
  129. return
  130. }
  131. d.addBytes(
  132. false,
  133. destination,
  134. clientGeoIPData,
  135. apiParams,
  136. bytesTCP,
  137. bytesUDP)
  138. }
  139. func (d *destBytesLogger) run() {
  140. ticker := time.NewTicker(d.support.Config.destinationBytesPeriod)
  141. defer ticker.Stop()
  142. for {
  143. select {
  144. case <-ticker.C:
  145. d.logAccumulatedASNDestBytes()
  146. d.logAccumulatedDomainDestBytes()
  147. case <-d.signalLogASNBytes:
  148. d.logAccumulatedASNDestBytes()
  149. case <-d.signalLogDomainBytes:
  150. d.logAccumulatedDomainDestBytes()
  151. case <-d.stopBroadcast:
  152. // Log on stop to record metrics accumulated since the last
  153. // periodic logging.
  154. d.logAccumulatedASNDestBytes()
  155. d.logAccumulatedDomainDestBytes()
  156. return
  157. }
  158. }
  159. }
  160. func (d *destBytesLogger) logAccumulatedASNDestBytes() {
  161. // Take a snapshot of the aggregation, and then unlock immediately to
  162. // avoid blocking addBytes calls while logging.
  163. //
  164. // Resetting the aggregation here also frees memory associated with rarer
  165. // buckets that don't reoccur often.
  166. d.asnBytesMutex.Lock()
  167. asnBytes := d.asnBytes
  168. d.asnBytes = make(map[destBytesBucket]destBytesCounters)
  169. d.asnBytesMutex.Unlock()
  170. for bucket, counters := range asnBytes {
  171. logFields := make(LogFields)
  172. logFields["event_name"] = "asn_dest_bytes"
  173. logFields["asn"] = bucket.destination
  174. d.addLogFields(logFields, bucket, counters)
  175. log.LogRawFieldsWithTimestamp(logFields)
  176. }
  177. }
  178. func (d *destBytesLogger) logAccumulatedDomainDestBytes() {
  179. // See snapshot comment in logAccumulatedDomainDestBytes.
  180. d.domainBytesMutex.Lock()
  181. domainBytes := d.domainBytes
  182. d.domainBytes = make(map[destBytesBucket]destBytesCounters)
  183. d.domainBytesMutex.Unlock()
  184. for bucket, counters := range domainBytes {
  185. logFields := make(LogFields)
  186. logFields["event_name"] = "domain_dest_bytes"
  187. logFields["domain"] = bucket.destination
  188. d.addLogFields(logFields, bucket, counters)
  189. log.LogRawFieldsWithTimestamp(logFields)
  190. }
  191. }
  192. func (d *destBytesLogger) addLogFields(
  193. logFields LogFields,
  194. bucket destBytesBucket,
  195. counters destBytesCounters) {
  196. logFields["client_region"] = bucket.clientRegion
  197. logFields["client_asn"] = bucket.clientASN
  198. logFields["sponsor_id"] = bucket.sponsorID
  199. logFields["client_platform"] = bucket.clientPlatform
  200. logFields["device_region"] = bucket.deviceRegion
  201. logFields["bytes_tcp"] = counters.TCP
  202. logFields["bytes_udp"] = counters.UDP
  203. logFields["bytes"] = counters.TCP + counters.UDP
  204. }
  205. func (d *destBytesLogger) addBytes(
  206. isASN bool,
  207. destination string,
  208. clientGeoIPData GeoIPData,
  209. apiParams common.APIParameters,
  210. bytesTCP int64,
  211. bytesUDP int64) {
  212. if bytesTCP == 0 && bytesUDP == 0 {
  213. // Some cases, such as client submitted domain bytes, may report all 0
  214. // bytes. Skip this data.
  215. return
  216. }
  217. sponsorID, _ := getOptionalStringRequestParam(apiParams, "sponsor_id")
  218. clientPlatform, _ := getOptionalStringRequestParam(apiParams, "client_platform")
  219. deviceRegion, _ := getOptionalStringRequestParam(apiParams, "device_region")
  220. bucket := destBytesBucket{
  221. destination: destination,
  222. clientRegion: clientGeoIPData.Country,
  223. clientASN: clientGeoIPData.ASN,
  224. sponsorID: sponsorID,
  225. clientPlatform: normalizeClientPlatform(clientPlatform),
  226. deviceRegion: deviceRegion,
  227. }
  228. // The map key is a comparable struct of strings. The non-pointer struct
  229. // types used for the map keys and values avoids allocations.
  230. var destBytes map[destBytesBucket]destBytesCounters
  231. var logSignal chan struct{}
  232. if isASN {
  233. d.asnBytesMutex.Lock()
  234. defer d.asnBytesMutex.Unlock()
  235. destBytes = d.asnBytes
  236. logSignal = d.signalLogASNBytes
  237. } else {
  238. d.domainBytesMutex.Lock()
  239. defer d.domainBytesMutex.Unlock()
  240. destBytes = d.domainBytes
  241. logSignal = d.signalLogDomainBytes
  242. }
  243. counters, ok := destBytes[bucket]
  244. if !ok {
  245. // A new aggregation map entry will be added. To avoid the map getting
  246. // too large, signal an immediate log dump without awaiting the next
  247. // period.
  248. //
  249. // When the soft limit is reached, logging is signaled. If the hard
  250. // limit is reached, the new data is dropped.
  251. count := len(destBytes)
  252. if count >= destBytesSoftMaxEntries {
  253. select {
  254. case logSignal <- struct{}{}:
  255. default:
  256. }
  257. }
  258. if count >= destBytesHardMaxEntries {
  259. if d.loggedHardMax.CompareAndSwap(false, true) {
  260. log.WithTrace().Warning("destBytesLogger hard max exceeded")
  261. }
  262. return
  263. }
  264. }
  265. counters.TCP += bytesTCP
  266. counters.UDP += bytesUDP
  267. destBytes[bucket] = counters
  268. }