| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354 |
- package hyperloglog
- import (
- "encoding/binary"
- "errors"
- "fmt"
- "math"
- "slices"
- )
- const (
- pp = uint8(25)
- mp = uint32(1) << pp
- version = 2
- )
- type Sketch struct {
- p uint8
- m uint32
- alpha float64
- tmpSet set
- sparseList *compressedList
- regs []uint8
- }
- // New returns a HyperLogLog Sketch with 2^14 registers (precision 14)
- func New() *Sketch { return New14() }
- // New14 returns a HyperLogLog Sketch with 2^14 registers (precision 14)
- func New14() *Sketch { return newSketchNoError(14, true) }
- // New16 returns a HyperLogLog Sketch with 2^16 registers (precision 16)
- func New16() *Sketch { return newSketchNoError(16, true) }
- // NewNoSparse returns a HyperLogLog Sketch with 2^14 registers (precision 14) that will not use a sparse representation
- func NewNoSparse() *Sketch { return newSketchNoError(14, false) }
- // New16NoSparse returns a HyperLogLog Sketch with 2^16 registers (precision 16) that will not use a sparse representation
- func New16NoSparse() *Sketch { return newSketchNoError(16, false) }
- func newSketchNoError(precision uint8, sparse bool) *Sketch {
- sk, _ := NewSketch(precision, sparse)
- return sk
- }
- func NewSketch(precision uint8, sparse bool) (*Sketch, error) {
- if precision < 4 || precision > 18 {
- return nil, fmt.Errorf("p has to be >= 4 and <= 18")
- }
- m := uint32(1) << precision
- s := &Sketch{
- m: m,
- p: precision,
- alpha: alpha(float64(m)),
- }
- if sparse {
- s.tmpSet = makeSet(0)
- s.sparseList = newCompressedList(0)
- } else {
- s.regs = make([]uint8, m)
- }
- return s, nil
- }
- func (sk *Sketch) sparse() bool { return sk.sparseList != nil }
- // Clone returns a deep copy of sk.
- func (sk *Sketch) Clone() *Sketch {
- clone := *sk
- clone.regs = append([]uint8(nil), sk.regs...)
- clone.tmpSet = sk.tmpSet.Clone()
- clone.sparseList = sk.sparseList.Clone()
- return &clone
- }
- func (sk *Sketch) maybeToNormal() {
- if uint32(sk.tmpSet.Len())*100 > sk.m {
- sk.mergeSparse()
- if uint32(sk.sparseList.Len()) > sk.m {
- sk.toNormal()
- }
- }
- }
- func (sk *Sketch) Merge(other *Sketch) error {
- if other == nil {
- return nil
- }
- if sk.p != other.p {
- return errors.New("precisions must be equal")
- }
- if sk.sparse() && other.sparse() {
- sk.mergeSparseSketch(other)
- } else {
- sk.mergeDenseSketch(other)
- }
- return nil
- }
- func (sk *Sketch) mergeSparseSketch(other *Sketch) {
- sk.tmpSet.Merge(other.tmpSet)
- for iter := other.sparseList.Iter(); iter.HasNext(); {
- sk.tmpSet.add(iter.Next())
- }
- sk.maybeToNormal()
- }
- func (sk *Sketch) mergeDenseSketch(other *Sketch) {
- if sk.sparse() {
- sk.toNormal()
- }
- if other.sparse() {
- other.tmpSet.ForEach(func(k uint32) {
- i, r := decodeHash(k, other.p, pp)
- sk.insert(i, r)
- })
- for iter := other.sparseList.Iter(); iter.HasNext(); {
- i, r := decodeHash(iter.Next(), other.p, pp)
- sk.insert(i, r)
- }
- } else {
- for i, v := range other.regs {
- if v > sk.regs[i] {
- sk.regs[i] = v
- }
- }
- }
- }
- func (sk *Sketch) toNormal() {
- if sk.tmpSet.Len() > 0 {
- sk.mergeSparse()
- }
- sk.regs = make([]uint8, sk.m)
- for iter := sk.sparseList.Iter(); iter.HasNext(); {
- i, r := decodeHash(iter.Next(), sk.p, pp)
- sk.insert(i, r)
- }
- sk.tmpSet = nilSet
- sk.sparseList = nil
- }
- func (sk *Sketch) insert(i uint32, r uint8) { sk.regs[i] = max(r, sk.regs[i]) }
- func (sk *Sketch) Insert(e []byte) { sk.InsertHash(hash(e)) }
- func (sk *Sketch) InsertHash(x uint64) {
- if sk.sparse() {
- if sk.tmpSet.add(encodeHash(x, sk.p, pp)) {
- sk.maybeToNormal()
- }
- return
- }
- i, r := getPosVal(x, sk.p)
- sk.insert(uint32(i), r)
- }
- func (sk *Sketch) Estimate() uint64 {
- if sk.sparse() {
- sk.mergeSparse()
- return uint64(linearCount(mp, mp-sk.sparseList.count))
- }
- sum, ez := sumAndZeros(sk.regs)
- m := float64(sk.m)
- est := sk.alpha * m * (m - ez) / (sum + beta(sk.p, ez))
- return uint64(est + 0.5)
- }
- func (sk *Sketch) mergeSparse() {
- if sk.tmpSet.Len() == 0 {
- return
- }
- keys := make([]uint32, 0, sk.tmpSet.Len())
- sk.tmpSet.ForEach(func(k uint32) {
- keys = append(keys, k)
- })
- slices.Sort(keys)
- newList := newCompressedList(4*sk.tmpSet.Len() + sk.sparseList.Len())
- for iter, i := sk.sparseList.Iter(), 0; iter.HasNext() || i < len(keys); {
- if !iter.HasNext() {
- newList.Append(keys[i])
- i++
- continue
- }
- if i >= len(keys) {
- newList.Append(iter.Next())
- continue
- }
- x1, adv := iter.Peek()
- x2 := keys[i]
- if x1 == x2 {
- newList.Append(x1)
- iter.Advance(x1, adv)
- i++
- } else if x1 > x2 {
- newList.Append(x2)
- i++
- } else {
- newList.Append(x1)
- iter.Advance(x1, adv)
- }
- }
- sk.sparseList = newList
- sk.tmpSet = makeSet(0)
- }
- // MarshalBinary implements the encoding.BinaryMarshaler interface.
- //
- // When the result will be appended to another buffer, consider using
- // AppendBinary to avoid additional allocations and copying.
- func (sk *Sketch) MarshalBinary() (data []byte, err error) {
- return sk.AppendBinary(nil)
- }
- // AppendBinary implements the encoding.BinaryAppender interface.
- func (sk *Sketch) AppendBinary(data []byte) ([]byte, error) {
- data = slices.Grow(data, 8+len(sk.regs))
- // Marshal a version marker.
- data = append(data, version)
- // Marshal p.
- data = append(data, sk.p)
- // Marshal b
- data = append(data, 0)
- if sk.sparse() {
- // It's using the sparse Sketch.
- data = append(data, byte(1))
- // Add the tmp_set
- data, err := sk.tmpSet.AppendBinary(data)
- if err != nil {
- return nil, err
- }
- // Add the sparse Sketch
- return sk.sparseList.AppendBinary(data)
- }
- // It's using the dense Sketch.
- data = append(data, byte(0))
- // Add the dense sketch Sketch.
- sz := len(sk.regs)
- data = append(data,
- byte(sz>>24),
- byte(sz>>16),
- byte(sz>>8),
- byte(sz),
- )
- // Marshal each element in the list.
- for _, v := range sk.regs {
- data = append(data, byte(v))
- }
- return data, nil
- }
- // ErrorTooShort is an error that UnmarshalBinary try to parse too short
- // binary.
- var ErrorTooShort = errors.New("too short binary")
- // UnmarshalBinary implements the encoding.BinaryUnmarshaler interface.
- func (sk *Sketch) UnmarshalBinary(data []byte) error {
- if len(data) < 8 {
- return ErrorTooShort
- }
- // Unmarshal version. We may need this in the future if we make
- // non-compatible changes.
- v := data[0]
- // Unmarshal p.
- p := data[1]
- // Unmarshal b.
- b := data[2]
- // Determine if we need a sparse Sketch
- sparse := data[3] == byte(1)
- // Make a newSketch Sketch if the precision doesn't match or if the Sketch was used
- if sk.p != p || sk.regs != nil || sk.tmpSet.Len() > 0 || (sk.sparseList != nil && sk.sparseList.Len() > 0) {
- newh, err := NewSketch(p, sparse)
- if err != nil {
- return err
- }
- *sk = *newh
- }
- // h is now initialised with the correct p. We just need to fill the
- // rest of the details out.
- if sparse {
- // Using the sparse Sketch.
- // Unmarshal the tmp_set.
- tssz := binary.BigEndian.Uint32(data[4:8])
- sk.tmpSet = makeSet(int(tssz))
- // We need to unmarshal tssz values in total, and each value requires us
- // to read 4 bytes.
- tsLastByte := int((tssz * 4) + 8)
- for i := 8; i < tsLastByte; i += 4 {
- k := binary.BigEndian.Uint32(data[i : i+4])
- sk.tmpSet.add(k)
- }
- // Unmarshal the sparse Sketch.
- return sk.sparseList.UnmarshalBinary(data[tsLastByte:])
- }
- // Using the dense Sketch.
- sk.sparseList = nil
- sk.tmpSet = nilSet
- if v == 1 {
- return sk.unmarshalBinaryV1(data[8:], b)
- }
- return sk.unmarshalBinaryV2(data)
- }
- func sumAndZeros(regs []uint8) (res, ez float64) {
- for _, v := range regs {
- if v == 0 {
- ez++
- }
- res += 1.0 / math.Pow(2.0, float64(v))
- }
- return res, ez
- }
- func (sk *Sketch) unmarshalBinaryV1(data []byte, b uint8) error {
- sk.regs = make([]uint8, len(data)*2)
- for i, v := range data {
- sk.regs[i*2] = uint8((v >> 4)) + b
- sk.regs[i*2+1] = uint8((v<<4)>>4) + b
- }
- return nil
- }
- func (sk *Sketch) unmarshalBinaryV2(data []byte) error {
- sk.regs = data[8:]
- return nil
- }
|