consistent.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. // Copyright (c) 2018-2022 Burak Sezer
  2. // All rights reserved.
  3. //
  4. // This code is licensed under the MIT License.
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining a copy
  7. // of this software and associated documentation files(the "Software"), to deal
  8. // in the Software without restriction, including without limitation the rights
  9. // to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
  10. // copies of the Software, and to permit persons to whom the Software is
  11. // furnished to do so, subject to the following conditions :
  12. //
  13. // The above copyright notice and this permission notice shall be included in
  14. // all copies or substantial portions of the Software.
  15. //
  16. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
  19. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22. // THE SOFTWARE.
  23. // Package consistent provides a consistent hashing function with bounded loads. This implementation also adds
  24. // partitioning logic on top of the original algorithm. For more information about the underlying algorithm,
  25. // please take a look at https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
  26. //
  27. // Example Use:
  28. //
  29. // cfg := consistent.Config{
  30. // PartitionCount: 71,
  31. // ReplicationFactor: 20,
  32. // Load: 1.25,
  33. // Hasher: hasher{},
  34. // }
  35. //
  36. // Now you can create a new Consistent instance. This function can take a list of the members.
  37. //
  38. // c := consistent.New(members, cfg)
  39. //
  40. // In the following sample, you add a new Member to the consistent hash ring. myMember is just a Go struct that
  41. // implements the Member interface. You should know that modifying the consistent hash ring distributes partitions among
  42. // members using the algorithm defined on Google Research Blog.
  43. //
  44. // c.Add(myMember)
  45. //
  46. // Remove a member from the consistent hash ring:
  47. //
  48. // c.Remove(member-name)
  49. //
  50. // LocateKey hashes the key and calculates partition ID with this modulo operation: MOD(hash result, partition count)
  51. // The owner of the partition is already calculated by New/Add/Remove. LocateKey just returns the member that is responsible
  52. // for the key.
  53. //
  54. // key := []byte("my-key")
  55. // member := c.LocateKey(key)
  56. package consistent
  57. import (
  58. "encoding/binary"
  59. "errors"
  60. "fmt"
  61. "math"
  62. "sort"
  63. "sync"
  64. )
  65. const (
  66. DefaultPartitionCount int = 271
  67. DefaultReplicationFactor int = 20
  68. DefaultLoad float64 = 1.25
  69. )
  70. // ErrInsufficientMemberCount represents an error which means there are not enough members to complete the task.
  71. var ErrInsufficientMemberCount = errors.New("insufficient member count")
  72. // Hasher is responsible for generating unsigned, 64-bit hash of provided byte slice.
  73. // Hasher should minimize collisions (generating same hash for different byte slice)
  74. // and while performance is also important fast functions are preferable (i.e.
  75. // you can use FarmHash family).
  76. type Hasher interface {
  77. Sum64([]byte) uint64
  78. }
  79. // Member interface represents a member in consistent hash ring.
  80. type Member interface {
  81. String() string
  82. }
  83. // Config represents a structure to control consistent package.
  84. type Config struct {
  85. // Hasher is responsible for generating unsigned, 64-bit hash of provided byte slice.
  86. Hasher Hasher
  87. // Keys are distributed among partitions. Prime numbers are good to
  88. // distribute keys uniformly. Select a big PartitionCount if you have
  89. // too many keys.
  90. PartitionCount int
  91. // Members are replicated on consistent hash ring. This number means that a member
  92. // how many times replicated on the ring.
  93. ReplicationFactor int
  94. // Load is used to calculate average load. See the code, the paper and Google's blog post to learn about it.
  95. Load float64
  96. }
  97. // Consistent holds the information about the members of the consistent hash circle.
  98. type Consistent struct {
  99. mu sync.RWMutex
  100. config Config
  101. hasher Hasher
  102. sortedSet []uint64
  103. partitionCount uint64
  104. loads map[string]float64
  105. members map[string]*Member
  106. partitions map[int]*Member
  107. ring map[uint64]*Member
  108. }
  109. // New creates and returns a new Consistent object.
  110. func New(members []Member, config Config) *Consistent {
  111. if config.Hasher == nil {
  112. panic("Hasher cannot be nil")
  113. }
  114. if config.PartitionCount == 0 {
  115. config.PartitionCount = DefaultPartitionCount
  116. }
  117. if config.ReplicationFactor == 0 {
  118. config.ReplicationFactor = DefaultReplicationFactor
  119. }
  120. if config.Load == 0 {
  121. config.Load = DefaultLoad
  122. }
  123. c := &Consistent{
  124. config: config,
  125. members: make(map[string]*Member),
  126. partitionCount: uint64(config.PartitionCount),
  127. ring: make(map[uint64]*Member),
  128. }
  129. c.hasher = config.Hasher
  130. for _, member := range members {
  131. c.add(member)
  132. }
  133. if members != nil {
  134. c.distributePartitions()
  135. }
  136. return c
  137. }
  138. // GetMembers returns a thread-safe copy of members. If there are no members, it returns an empty slice of Member.
  139. func (c *Consistent) GetMembers() []Member {
  140. c.mu.RLock()
  141. defer c.mu.RUnlock()
  142. // Create a thread-safe copy of member list.
  143. members := make([]Member, 0, len(c.members))
  144. for _, member := range c.members {
  145. members = append(members, *member)
  146. }
  147. return members
  148. }
  149. // AverageLoad exposes the current average load.
  150. func (c *Consistent) AverageLoad() float64 {
  151. c.mu.RLock()
  152. defer c.mu.RUnlock()
  153. return c.averageLoad()
  154. }
  155. func (c *Consistent) averageLoad() float64 {
  156. if len(c.members) == 0 {
  157. return 0
  158. }
  159. avgLoad := float64(c.partitionCount/uint64(len(c.members))) * c.config.Load
  160. return math.Ceil(avgLoad)
  161. }
  162. func (c *Consistent) distributeWithLoad(partID, idx int, partitions map[int]*Member, loads map[string]float64) {
  163. avgLoad := c.averageLoad()
  164. var count int
  165. for {
  166. count++
  167. if count >= len(c.sortedSet) {
  168. // User needs to decrease partition count, increase member count or increase load factor.
  169. panic("not enough room to distribute partitions")
  170. }
  171. i := c.sortedSet[idx]
  172. member := *c.ring[i]
  173. load := loads[member.String()]
  174. if load+1 <= avgLoad {
  175. partitions[partID] = &member
  176. loads[member.String()]++
  177. return
  178. }
  179. idx++
  180. if idx >= len(c.sortedSet) {
  181. idx = 0
  182. }
  183. }
  184. }
  185. func (c *Consistent) distributePartitions() {
  186. loads := make(map[string]float64)
  187. partitions := make(map[int]*Member)
  188. bs := make([]byte, 8)
  189. for partID := uint64(0); partID < c.partitionCount; partID++ {
  190. binary.LittleEndian.PutUint64(bs, partID)
  191. key := c.hasher.Sum64(bs)
  192. idx := sort.Search(len(c.sortedSet), func(i int) bool {
  193. return c.sortedSet[i] >= key
  194. })
  195. if idx >= len(c.sortedSet) {
  196. idx = 0
  197. }
  198. c.distributeWithLoad(int(partID), idx, partitions, loads)
  199. }
  200. c.partitions = partitions
  201. c.loads = loads
  202. }
  203. func (c *Consistent) add(member Member) {
  204. for i := 0; i < c.config.ReplicationFactor; i++ {
  205. key := []byte(fmt.Sprintf("%s%d", member.String(), i))
  206. h := c.hasher.Sum64(key)
  207. c.ring[h] = &member
  208. c.sortedSet = append(c.sortedSet, h)
  209. }
  210. // sort hashes ascendingly
  211. sort.Slice(c.sortedSet, func(i int, j int) bool {
  212. return c.sortedSet[i] < c.sortedSet[j]
  213. })
  214. // Storing member at this map is useful to find backup members of a partition.
  215. c.members[member.String()] = &member
  216. }
  217. // Add adds a new member to the consistent hash circle.
  218. func (c *Consistent) Add(member Member) {
  219. c.mu.Lock()
  220. defer c.mu.Unlock()
  221. if _, ok := c.members[member.String()]; ok {
  222. // We already have this member. Quit immediately.
  223. return
  224. }
  225. c.add(member)
  226. c.distributePartitions()
  227. }
  228. func (c *Consistent) delSlice(val uint64) {
  229. for i := 0; i < len(c.sortedSet); i++ {
  230. if c.sortedSet[i] == val {
  231. c.sortedSet = append(c.sortedSet[:i], c.sortedSet[i+1:]...)
  232. break
  233. }
  234. }
  235. }
  236. // Remove removes a member from the consistent hash circle.
  237. func (c *Consistent) Remove(name string) {
  238. c.mu.Lock()
  239. defer c.mu.Unlock()
  240. if _, ok := c.members[name]; !ok {
  241. // There is no member with that name. Quit immediately.
  242. return
  243. }
  244. for i := 0; i < c.config.ReplicationFactor; i++ {
  245. key := []byte(fmt.Sprintf("%s%d", name, i))
  246. h := c.hasher.Sum64(key)
  247. delete(c.ring, h)
  248. c.delSlice(h)
  249. }
  250. delete(c.members, name)
  251. if len(c.members) == 0 {
  252. // consistent hash ring is empty now. Reset the partition table.
  253. c.partitions = make(map[int]*Member)
  254. return
  255. }
  256. c.distributePartitions()
  257. }
  258. // LoadDistribution exposes load distribution of members.
  259. func (c *Consistent) LoadDistribution() map[string]float64 {
  260. c.mu.RLock()
  261. defer c.mu.RUnlock()
  262. // Create a thread-safe copy
  263. res := make(map[string]float64)
  264. for member, load := range c.loads {
  265. res[member] = load
  266. }
  267. return res
  268. }
  269. // FindPartitionID returns partition id for given key.
  270. func (c *Consistent) FindPartitionID(key []byte) int {
  271. hkey := c.hasher.Sum64(key)
  272. return int(hkey % c.partitionCount)
  273. }
  274. // GetPartitionOwner returns the owner of the given partition.
  275. func (c *Consistent) GetPartitionOwner(partID int) Member {
  276. c.mu.RLock()
  277. defer c.mu.RUnlock()
  278. return c.getPartitionOwner(partID)
  279. }
  280. // getPartitionOwner returns the owner of the given partition. It's not thread-safe.
  281. func (c *Consistent) getPartitionOwner(partID int) Member {
  282. member, ok := c.partitions[partID]
  283. if !ok {
  284. return nil
  285. }
  286. // Create a thread-safe copy of member and return it.
  287. return *member
  288. }
  289. // LocateKey finds a home for given key
  290. func (c *Consistent) LocateKey(key []byte) Member {
  291. partID := c.FindPartitionID(key)
  292. return c.GetPartitionOwner(partID)
  293. }
  294. func (c *Consistent) getClosestN(partID, count int) ([]Member, error) {
  295. c.mu.RLock()
  296. defer c.mu.RUnlock()
  297. var res []Member
  298. if count > len(c.members) {
  299. return res, ErrInsufficientMemberCount
  300. }
  301. var ownerKey uint64
  302. owner := c.getPartitionOwner(partID)
  303. // Hash and sort all the names.
  304. var keys []uint64
  305. kmems := make(map[uint64]*Member)
  306. for name, member := range c.members {
  307. key := c.hasher.Sum64([]byte(name))
  308. if name == owner.String() {
  309. ownerKey = key
  310. }
  311. keys = append(keys, key)
  312. kmems[key] = member
  313. }
  314. sort.Slice(keys, func(i, j int) bool {
  315. return keys[i] < keys[j]
  316. })
  317. // Find the key owner
  318. idx := 0
  319. for idx < len(keys) {
  320. if keys[idx] == ownerKey {
  321. key := keys[idx]
  322. res = append(res, *kmems[key])
  323. break
  324. }
  325. idx++
  326. }
  327. // Find the closest(replica owners) members.
  328. for len(res) < count {
  329. idx++
  330. if idx >= len(keys) {
  331. idx = 0
  332. }
  333. key := keys[idx]
  334. res = append(res, *kmems[key])
  335. }
  336. return res, nil
  337. }
  338. // GetClosestN returns the closest N member to a key in the hash ring.
  339. // This may be useful to find members for replication.
  340. func (c *Consistent) GetClosestN(key []byte, count int) ([]Member, error) {
  341. partID := c.FindPartitionID(key)
  342. return c.getClosestN(partID, count)
  343. }
  344. // GetClosestNForPartition returns the closest N member for given partition.
  345. // This may be useful to find members for replication.
  346. func (c *Consistent) GetClosestNForPartition(partID, count int) ([]Member, error) {
  347. return c.getClosestN(partID, count)
  348. }