consistent.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. // Copyright (c) 2018-2022 Burak Sezer
  2. // All rights reserved.
  3. //
  4. // This code is licensed under the MIT License.
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining a copy
  7. // of this software and associated documentation files(the "Software"), to deal
  8. // in the Software without restriction, including without limitation the rights
  9. // to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
  10. // copies of the Software, and to permit persons to whom the Software is
  11. // furnished to do so, subject to the following conditions :
  12. //
  13. // The above copyright notice and this permission notice shall be included in
  14. // all copies or substantial portions of the Software.
  15. //
  16. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
  19. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22. // THE SOFTWARE.
  23. // Package consistent provides a consistent hashing function with bounded loads. This implementation also adds
  24. // partitioning logic on top of the original algorithm. For more information about the underlying algorithm,
  25. // please take a look at https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
  26. //
  27. // Example Use:
  28. //
  29. // cfg := consistent.Config{
  30. // PartitionCount: 71,
  31. // ReplicationFactor: 20,
  32. // Load: 1.25,
  33. // Hasher: hasher{},
  34. // }
  35. //
  36. // Now you can create a new Consistent instance. This function can take a list of the members.
  37. //
  38. // c := consistent.New(members, cfg)
  39. //
  40. // In the following sample, you add a new Member to the consistent hash ring. myMember is just a Go struct that
  41. // implements the Member interface. You should know that modifying the consistent hash ring distributes partitions among
  42. // members using the algorithm defined on Google Research Blog.
  43. //
  44. // c.Add(myMember)
  45. //
  46. // Remove a member from the consistent hash ring:
  47. //
  48. // c.Remove(member-name)
  49. //
  50. // LocateKey hashes the key and calculates partition ID with this modulo operation: MOD(hash result, partition count)
  51. // The owner of the partition is already calculated by New/Add/Remove. LocateKey just returns the member that is responsible
  52. // for the key.
  53. //
  54. // key := []byte("my-key")
  55. // member := c.LocateKey(key)
  56. package consistent
  57. import (
  58. "encoding/binary"
  59. "errors"
  60. "fmt"
  61. "math"
  62. "sort"
  63. "sync"
  64. )
  65. const (
  66. DefaultPartitionCount int = 271
  67. DefaultReplicationFactor int = 20
  68. DefaultLoad float64 = 1.25
  69. )
  70. // ErrInsufficientMemberCount represents an error which means there are not enough members to complete the task.
  71. var ErrInsufficientMemberCount = errors.New("insufficient member count")
  72. // Hasher is responsible for generating unsigned, 64-bit hash of provided byte slice.
  73. // Hasher should minimize collisions (generating same hash for different byte slice)
  74. // and while performance is also important fast functions are preferable (i.e.
  75. // you can use FarmHash family).
  76. type Hasher interface {
  77. Sum64([]byte) uint64
  78. }
  79. // Member interface represents a member in consistent hash ring.
  80. type Member interface {
  81. String() string
  82. }
  83. // Config represents a structure to control consistent package.
  84. type Config struct {
  85. // Hasher is responsible for generating unsigned, 64-bit hash of provided byte slice.
  86. Hasher Hasher
  87. // Keys are distributed among partitions. Prime numbers are good to
  88. // distribute keys uniformly. Select a big PartitionCount if you have
  89. // too many keys.
  90. PartitionCount int
  91. // Members are replicated on consistent hash ring. This number means that a member
  92. // how many times replicated on the ring.
  93. ReplicationFactor int
  94. // Load is used to calculate average load. See the code, the paper and Google's blog post to learn about it.
  95. Load float64
  96. }
  97. // Consistent holds the information about the members of the consistent hash circle.
  98. type Consistent struct {
  99. mu sync.RWMutex
  100. config Config
  101. hasher Hasher
  102. sortedSet []uint64
  103. partitionCount uint64
  104. loads map[string]float64
  105. members map[string]*Member
  106. partitions map[int]*Member
  107. ring map[uint64]*Member
  108. }
  109. // New creates and returns a new Consistent object.
  110. func New(members []Member, config Config) *Consistent {
  111. if config.Hasher == nil {
  112. panic("Hasher cannot be nil")
  113. }
  114. if config.PartitionCount == 0 {
  115. config.PartitionCount = DefaultPartitionCount
  116. }
  117. if config.ReplicationFactor == 0 {
  118. config.ReplicationFactor = DefaultReplicationFactor
  119. }
  120. if config.Load == 0 {
  121. config.Load = DefaultLoad
  122. }
  123. c := &Consistent{
  124. config: config,
  125. members: make(map[string]*Member),
  126. partitionCount: uint64(config.PartitionCount),
  127. ring: make(map[uint64]*Member),
  128. }
  129. c.hasher = config.Hasher
  130. for _, member := range members {
  131. c.add(member)
  132. }
  133. if members != nil {
  134. c.distributePartitions()
  135. }
  136. return c
  137. }
  138. // GetMembers returns a thread-safe copy of members. If there are no members, it returns an empty slice of Member.
  139. func (c *Consistent) GetMembers() []Member {
  140. c.mu.RLock()
  141. defer c.mu.RUnlock()
  142. // Create a thread-safe copy of member list.
  143. members := make([]Member, 0, len(c.members))
  144. for _, member := range c.members {
  145. members = append(members, *member)
  146. }
  147. return members
  148. }
  149. // AverageLoad exposes the current average load.
  150. func (c *Consistent) AverageLoad() float64 {
  151. c.mu.RLock()
  152. defer c.mu.RUnlock()
  153. return c.averageLoad()
  154. }
  155. func (c *Consistent) averageLoad() float64 {
  156. if len(c.members) == 0 {
  157. return 0
  158. }
  159. avgLoad := float64(c.partitionCount/uint64(len(c.members))) * c.config.Load
  160. return math.Ceil(avgLoad)
  161. }
  162. func (c *Consistent) distributeWithLoad(partID, idx int, partitions map[int]*Member, loads map[string]float64) {
  163. avgLoad := c.averageLoad()
  164. var count int
  165. for {
  166. count++
  167. // [Psiphon]
  168. // Fix: changed ">=" to ">"; otherwise tests showed that 1 member may
  169. // be excluded when there is more than one member and that using a
  170. // single member results in a crash.
  171. if count > len(c.sortedSet) {
  172. // User needs to decrease partition count, increase member count or increase load factor.
  173. panic("not enough room to distribute partitions")
  174. }
  175. i := c.sortedSet[idx]
  176. member := *c.ring[i]
  177. load := loads[member.String()]
  178. if load+1 <= avgLoad {
  179. partitions[partID] = &member
  180. loads[member.String()]++
  181. return
  182. }
  183. idx++
  184. if idx >= len(c.sortedSet) {
  185. idx = 0
  186. }
  187. }
  188. }
  189. func (c *Consistent) distributePartitions() {
  190. loads := make(map[string]float64)
  191. partitions := make(map[int]*Member)
  192. bs := make([]byte, 8)
  193. for partID := uint64(0); partID < c.partitionCount; partID++ {
  194. binary.LittleEndian.PutUint64(bs, partID)
  195. key := c.hasher.Sum64(bs)
  196. idx := sort.Search(len(c.sortedSet), func(i int) bool {
  197. return c.sortedSet[i] >= key
  198. })
  199. if idx >= len(c.sortedSet) {
  200. idx = 0
  201. }
  202. c.distributeWithLoad(int(partID), idx, partitions, loads)
  203. }
  204. c.partitions = partitions
  205. c.loads = loads
  206. }
  207. func (c *Consistent) add(member Member) {
  208. for i := 0; i < c.config.ReplicationFactor; i++ {
  209. key := []byte(fmt.Sprintf("%s%d", member.String(), i))
  210. h := c.hasher.Sum64(key)
  211. c.ring[h] = &member
  212. c.sortedSet = append(c.sortedSet, h)
  213. }
  214. // sort hashes ascendingly
  215. sort.Slice(c.sortedSet, func(i int, j int) bool {
  216. return c.sortedSet[i] < c.sortedSet[j]
  217. })
  218. // Storing member at this map is useful to find backup members of a partition.
  219. c.members[member.String()] = &member
  220. }
  221. // Add adds a new member to the consistent hash circle.
  222. func (c *Consistent) Add(member Member) {
  223. c.mu.Lock()
  224. defer c.mu.Unlock()
  225. if _, ok := c.members[member.String()]; ok {
  226. // We already have this member. Quit immediately.
  227. return
  228. }
  229. c.add(member)
  230. c.distributePartitions()
  231. }
  232. func (c *Consistent) delSlice(val uint64) {
  233. for i := 0; i < len(c.sortedSet); i++ {
  234. if c.sortedSet[i] == val {
  235. c.sortedSet = append(c.sortedSet[:i], c.sortedSet[i+1:]...)
  236. break
  237. }
  238. }
  239. }
  240. // Remove removes a member from the consistent hash circle.
  241. func (c *Consistent) Remove(name string) {
  242. c.mu.Lock()
  243. defer c.mu.Unlock()
  244. if _, ok := c.members[name]; !ok {
  245. // There is no member with that name. Quit immediately.
  246. return
  247. }
  248. for i := 0; i < c.config.ReplicationFactor; i++ {
  249. key := []byte(fmt.Sprintf("%s%d", name, i))
  250. h := c.hasher.Sum64(key)
  251. delete(c.ring, h)
  252. c.delSlice(h)
  253. }
  254. delete(c.members, name)
  255. if len(c.members) == 0 {
  256. // consistent hash ring is empty now. Reset the partition table.
  257. c.partitions = make(map[int]*Member)
  258. return
  259. }
  260. c.distributePartitions()
  261. }
  262. // LoadDistribution exposes load distribution of members.
  263. func (c *Consistent) LoadDistribution() map[string]float64 {
  264. c.mu.RLock()
  265. defer c.mu.RUnlock()
  266. // Create a thread-safe copy
  267. res := make(map[string]float64)
  268. for member, load := range c.loads {
  269. res[member] = load
  270. }
  271. return res
  272. }
  273. // FindPartitionID returns partition id for given key.
  274. func (c *Consistent) FindPartitionID(key []byte) int {
  275. hkey := c.hasher.Sum64(key)
  276. return int(hkey % c.partitionCount)
  277. }
  278. // GetPartitionOwner returns the owner of the given partition.
  279. func (c *Consistent) GetPartitionOwner(partID int) Member {
  280. c.mu.RLock()
  281. defer c.mu.RUnlock()
  282. return c.getPartitionOwner(partID)
  283. }
  284. // getPartitionOwner returns the owner of the given partition. It's not thread-safe.
  285. func (c *Consistent) getPartitionOwner(partID int) Member {
  286. member, ok := c.partitions[partID]
  287. if !ok {
  288. return nil
  289. }
  290. // Create a thread-safe copy of member and return it.
  291. return *member
  292. }
  293. // LocateKey finds a home for given key
  294. func (c *Consistent) LocateKey(key []byte) Member {
  295. partID := c.FindPartitionID(key)
  296. return c.GetPartitionOwner(partID)
  297. }
  298. func (c *Consistent) getClosestN(partID, count int) ([]Member, error) {
  299. c.mu.RLock()
  300. defer c.mu.RUnlock()
  301. var res []Member
  302. if count > len(c.members) {
  303. return res, ErrInsufficientMemberCount
  304. }
  305. var ownerKey uint64
  306. owner := c.getPartitionOwner(partID)
  307. // Hash and sort all the names.
  308. var keys []uint64
  309. kmems := make(map[uint64]*Member)
  310. for name, member := range c.members {
  311. key := c.hasher.Sum64([]byte(name))
  312. if name == owner.String() {
  313. ownerKey = key
  314. }
  315. keys = append(keys, key)
  316. kmems[key] = member
  317. }
  318. sort.Slice(keys, func(i, j int) bool {
  319. return keys[i] < keys[j]
  320. })
  321. // Find the key owner
  322. idx := 0
  323. for idx < len(keys) {
  324. if keys[idx] == ownerKey {
  325. key := keys[idx]
  326. res = append(res, *kmems[key])
  327. break
  328. }
  329. idx++
  330. }
  331. // Find the closest(replica owners) members.
  332. for len(res) < count {
  333. idx++
  334. if idx >= len(keys) {
  335. idx = 0
  336. }
  337. key := keys[idx]
  338. res = append(res, *kmems[key])
  339. }
  340. return res, nil
  341. }
  342. // GetClosestN returns the closest N member to a key in the hash ring.
  343. // This may be useful to find members for replication.
  344. func (c *Consistent) GetClosestN(key []byte, count int) ([]Member, error) {
  345. partID := c.FindPartitionID(key)
  346. return c.getClosestN(partID, count)
  347. }
  348. // GetClosestNForPartition returns the closest N member for given partition.
  349. // This may be useful to find members for replication.
  350. func (c *Consistent) GetClosestNForPartition(partID, count int) ([]Member, error) {
  351. return c.getClosestN(partID, count)
  352. }