clientmetric.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. // Package clientmetric provides client-side metrics whose values
  4. // get occasionally logged.
  5. package clientmetric
  6. import (
  7. "bytes"
  8. "encoding/binary"
  9. "encoding/hex"
  10. "fmt"
  11. "io"
  12. "sort"
  13. "strings"
  14. "sync"
  15. "sync/atomic"
  16. "time"
  17. )
  18. var (
  19. mu sync.Mutex // guards vars in this block
  20. metrics = map[string]*Metric{}
  21. numWireID int // how many wireIDs have been allocated
  22. lastDelta time.Time // time of last call to EncodeLogTailMetricsDelta
  23. sortedDirty bool // whether sorted needs to be rebuilt
  24. sorted []*Metric // by name
  25. lastLogVal []scanEntry // by Metric.regIdx
  26. unsorted []*Metric // by Metric.regIdx
  27. // valFreeList is a set of free contiguous int64s whose
  28. // element addresses get assigned to Metric.v.
  29. // Any memory address in len(valFreeList) is free for use.
  30. // They're contiguous to reduce cache churn during diff scans.
  31. // When out of length, a new backing array is made.
  32. valFreeList []int64
  33. )
  34. // scanEntry contains the minimal data needed for quickly scanning
  35. // memory for changed values. It's small to reduce memory pressure.
  36. type scanEntry struct {
  37. v *int64 // Metric.v
  38. f func() int64 // Metric.f
  39. lastLogged int64 // last logged value
  40. }
  41. // Type is a metric type: counter or gauge.
  42. type Type uint8
  43. const (
  44. TypeGauge Type = iota
  45. TypeCounter
  46. )
  47. // Metric is an integer metric value that's tracked over time.
  48. //
  49. // It's safe for concurrent use.
  50. type Metric struct {
  51. v *int64 // atomic; the metric value
  52. f func() int64 // value function (v is ignored if f is non-nil)
  53. regIdx int // index into lastLogVal and unsorted
  54. name string
  55. typ Type
  56. deltasDisabled bool
  57. // The following fields are owned by the package-level 'mu':
  58. // wireID is the lazily-allocated "wire ID". Until a metric is encoded
  59. // in the logs (by EncodeLogTailMetricsDelta), it has no wireID. This
  60. // ensures that unused metrics don't waste valuable low numbers, which
  61. // encode with varints with fewer bytes.
  62. wireID int
  63. // lastNamed is the last time the name of this metric was
  64. // written on the wire.
  65. lastNamed time.Time
  66. }
  67. func (m *Metric) Name() string { return m.name }
  68. func (m *Metric) Value() int64 {
  69. if m.f != nil {
  70. return m.f()
  71. }
  72. return atomic.LoadInt64(m.v)
  73. }
  74. func (m *Metric) Type() Type { return m.typ }
  75. // DisableDeltas disables uploading of deltas for this metric (absolute values
  76. // are always uploaded).
  77. func (m *Metric) DisableDeltas() {
  78. m.deltasDisabled = true
  79. }
  80. // Add increments m's value by n.
  81. //
  82. // If m is of type counter, n should not be negative.
  83. func (m *Metric) Add(n int64) {
  84. if m.f != nil {
  85. panic("Add() called on metric with value function")
  86. }
  87. atomic.AddInt64(m.v, n)
  88. }
  89. // Set sets m's value to v.
  90. //
  91. // If m is of type counter, Set should not be used.
  92. func (m *Metric) Set(v int64) {
  93. if m.f != nil {
  94. panic("Set() called on metric with value function")
  95. }
  96. atomic.StoreInt64(m.v, v)
  97. }
  98. // Publish registers a metric in the global map.
  99. // It panics if the name is a duplicate anywhere in the process.
  100. func (m *Metric) Publish() {
  101. mu.Lock()
  102. defer mu.Unlock()
  103. if m.name == "" {
  104. panic("unnamed Metric")
  105. }
  106. if _, dup := metrics[m.name]; dup {
  107. panic("duplicate metric " + m.name)
  108. }
  109. metrics[m.name] = m
  110. sortedDirty = true
  111. if m.f != nil {
  112. lastLogVal = append(lastLogVal, scanEntry{f: m.f})
  113. } else {
  114. if len(valFreeList) == 0 {
  115. valFreeList = make([]int64, 256)
  116. }
  117. m.v = &valFreeList[0]
  118. valFreeList = valFreeList[1:]
  119. lastLogVal = append(lastLogVal, scanEntry{v: m.v})
  120. }
  121. m.regIdx = len(unsorted)
  122. unsorted = append(unsorted, m)
  123. }
  124. // Metrics returns the sorted list of metrics.
  125. //
  126. // The returned slice should not be mutated.
  127. func Metrics() []*Metric {
  128. mu.Lock()
  129. defer mu.Unlock()
  130. if sortedDirty {
  131. sortedDirty = false
  132. sorted = make([]*Metric, 0, len(metrics))
  133. for _, m := range metrics {
  134. sorted = append(sorted, m)
  135. }
  136. sort.Slice(sorted, func(i, j int) bool {
  137. return sorted[i].name < sorted[j].name
  138. })
  139. }
  140. return sorted
  141. }
  142. // HasPublished reports whether a metric with the given name has already been
  143. // published.
  144. func HasPublished(name string) bool {
  145. mu.Lock()
  146. defer mu.Unlock()
  147. _, ok := metrics[name]
  148. return ok
  149. }
  150. // NewUnpublished initializes a new Metric without calling Publish on
  151. // it.
  152. func NewUnpublished(name string, typ Type) *Metric {
  153. if i := strings.IndexFunc(name, isIllegalMetricRune); name == "" || i != -1 {
  154. panic(fmt.Sprintf("illegal metric name %q (index %v)", name, i))
  155. }
  156. return &Metric{
  157. name: name,
  158. typ: typ,
  159. }
  160. }
  161. func isIllegalMetricRune(r rune) bool {
  162. return !(r >= 'a' && r <= 'z' ||
  163. r >= 'A' && r <= 'Z' ||
  164. r >= '0' && r <= '9' ||
  165. r == '_')
  166. }
  167. // NewCounter returns a new metric that can only increment.
  168. func NewCounter(name string) *Metric {
  169. m := NewUnpublished(name, TypeCounter)
  170. m.Publish()
  171. return m
  172. }
  173. // NewGauge returns a new metric that can both increment and decrement.
  174. func NewGauge(name string) *Metric {
  175. m := NewUnpublished(name, TypeGauge)
  176. m.Publish()
  177. return m
  178. }
  179. // NewCounterFunc returns a counter metric that has its value determined by
  180. // calling the provided function (calling Add() and Set() will panic). No
  181. // locking guarantees are made for the invocation.
  182. func NewCounterFunc(name string, f func() int64) *Metric {
  183. m := NewUnpublished(name, TypeCounter)
  184. m.f = f
  185. m.Publish()
  186. return m
  187. }
  188. // NewGaugeFunc returns a gauge metric that has its value determined by
  189. // calling the provided function (calling Add() and Set() will panic). No
  190. // locking guarantees are made for the invocation.
  191. func NewGaugeFunc(name string, f func() int64) *Metric {
  192. m := NewUnpublished(name, TypeGauge)
  193. m.f = f
  194. m.Publish()
  195. return m
  196. }
  197. // WritePrometheusExpositionFormat writes all client metrics to w in
  198. // the Prometheus text-based exposition format.
  199. //
  200. // See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md
  201. func WritePrometheusExpositionFormat(w io.Writer) {
  202. for _, m := range Metrics() {
  203. switch m.Type() {
  204. case TypeGauge:
  205. fmt.Fprintf(w, "# TYPE %s gauge\n", m.Name())
  206. case TypeCounter:
  207. fmt.Fprintf(w, "# TYPE %s counter\n", m.Name())
  208. }
  209. fmt.Fprintf(w, "%s %v\n", m.Name(), m.Value())
  210. }
  211. }
  212. const (
  213. // metricLogNameFrequency is how often a metric's name=>id
  214. // mapping is redundantly put in the logs. In other words,
  215. // this is how far in the logs you need to fetch from a
  216. // given point in time to recompute the metrics at that point
  217. // in time.
  218. metricLogNameFrequency = 4 * time.Hour
  219. // minMetricEncodeInterval is the minimum interval that the
  220. // metrics will be scanned for changes before being encoded
  221. // for logtail.
  222. minMetricEncodeInterval = 15 * time.Second
  223. )
  224. // EncodeLogTailMetricsDelta return an encoded string representing the metrics
  225. // differences since the previous call.
  226. //
  227. // It implements the requirements of a logtail.Config.MetricsDelta
  228. // func. Notably, its output is safe to embed in a JSON string literal
  229. // without further escaping.
  230. //
  231. // The current encoding is:
  232. // - name immediately following metric:
  233. // 'N' + hex(varint(len(name))) + name
  234. // - set value of a metric:
  235. // 'S' + hex(varint(wireid)) + hex(varint(value))
  236. // - increment a metric: (decrements if negative)
  237. // 'I' + hex(varint(wireid)) + hex(varint(value))
  238. func EncodeLogTailMetricsDelta() string {
  239. mu.Lock()
  240. defer mu.Unlock()
  241. now := time.Now()
  242. if !lastDelta.IsZero() && now.Sub(lastDelta) < minMetricEncodeInterval {
  243. return ""
  244. }
  245. lastDelta = now
  246. var enc *deltaEncBuf // lazy
  247. for i, ent := range lastLogVal {
  248. var val int64
  249. if ent.f != nil {
  250. val = ent.f()
  251. } else {
  252. val = atomic.LoadInt64(ent.v)
  253. }
  254. delta := val - ent.lastLogged
  255. if delta == 0 {
  256. continue
  257. }
  258. lastLogVal[i].lastLogged = val
  259. m := unsorted[i]
  260. if enc == nil {
  261. enc = deltaPool.Get().(*deltaEncBuf)
  262. enc.buf.Reset()
  263. }
  264. if m.wireID == 0 {
  265. numWireID++
  266. m.wireID = numWireID
  267. }
  268. writeValue := m.deltasDisabled
  269. if m.lastNamed.IsZero() || now.Sub(m.lastNamed) > metricLogNameFrequency {
  270. enc.writeName(m.Name(), m.Type())
  271. m.lastNamed = now
  272. writeValue = true
  273. }
  274. if writeValue {
  275. enc.writeValue(m.wireID, val)
  276. } else {
  277. enc.writeDelta(m.wireID, delta)
  278. }
  279. }
  280. if enc == nil {
  281. return ""
  282. }
  283. defer deltaPool.Put(enc)
  284. return enc.buf.String()
  285. }
  286. var deltaPool = &sync.Pool{
  287. New: func() any {
  288. return new(deltaEncBuf)
  289. },
  290. }
  291. // deltaEncBuf encodes metrics per the format described
  292. // on EncodeLogTailMetricsDelta above.
  293. type deltaEncBuf struct {
  294. buf bytes.Buffer
  295. scratch [binary.MaxVarintLen64]byte
  296. }
  297. // writeName writes a "name" (N) record to the buffer, which notes
  298. // that the immediately following record's wireID has the provided
  299. // name.
  300. func (b *deltaEncBuf) writeName(name string, typ Type) {
  301. var namePrefix string
  302. if typ == TypeGauge {
  303. // Add the gauge_ prefix so that tsweb knows that this is a gauge metric
  304. // when generating the Prometheus version.
  305. namePrefix = "gauge_"
  306. }
  307. b.buf.WriteByte('N')
  308. b.writeHexVarint(int64(len(namePrefix) + len(name)))
  309. b.buf.WriteString(namePrefix)
  310. b.buf.WriteString(name)
  311. }
  312. // writeDelta writes a "set" (S) record to the buffer, noting that the
  313. // metric with the given wireID now has value v.
  314. func (b *deltaEncBuf) writeValue(wireID int, v int64) {
  315. b.buf.WriteByte('S')
  316. b.writeHexVarint(int64(wireID))
  317. b.writeHexVarint(v)
  318. }
  319. // writeDelta writes an "increment" (I) delta value record to the
  320. // buffer, noting that the metric with the given wireID now has a
  321. // value that's v larger (or smaller if v is negative).
  322. func (b *deltaEncBuf) writeDelta(wireID int, v int64) {
  323. b.buf.WriteByte('I')
  324. b.writeHexVarint(int64(wireID))
  325. b.writeHexVarint(v)
  326. }
  327. // writeHexVarint writes v to the buffer as a hex-encoded varint.
  328. func (b *deltaEncBuf) writeHexVarint(v int64) {
  329. n := binary.PutVarint(b.scratch[:], v)
  330. hexLen := n * 2
  331. oldLen := b.buf.Len()
  332. b.buf.Grow(hexLen)
  333. hexBuf := b.buf.Bytes()[oldLen : oldLen+hexLen]
  334. hex.Encode(hexBuf, b.scratch[:n])
  335. b.buf.Write(hexBuf)
  336. }
  337. var TestHooks testHooks
  338. type testHooks struct{}
  339. func (testHooks) ResetLastDelta() {
  340. lastDelta = time.Time{}
  341. }