consistent.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. /*
  2. * Copyright (c) 2024, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package discovery
  20. import (
  21. "net"
  22. "sync"
  23. "github.com/Psiphon-Labs/consistent"
  24. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/server/psinet"
  25. "github.com/cespare/xxhash"
  26. )
  27. type hasher struct{}
  28. // consistent.Hasher implementation.
  29. func (h hasher) Sum64(data []byte) uint64 {
  30. return xxhash.Sum64(data)
  31. }
  32. type consistentHashingDiscovery struct {
  33. clk clock
  34. config *consistent.Config
  35. ring *consistent.Consistent
  36. sync.RWMutex
  37. }
  38. func NewConsistentHashingDiscovery() (*consistentHashingDiscovery, error) {
  39. return newConsistentHashingDiscovery(realClock{})
  40. }
  41. func newConsistentHashingDiscovery(clk clock) (*consistentHashingDiscovery, error) {
  42. return &consistentHashingDiscovery{
  43. clk: clk,
  44. config: &consistent.Config{
  45. PartitionCount: 0, // set in serversChanged
  46. ReplicationFactor: 1, // ensure all servers are discoverable
  47. Load: 1, // ensure all servers are discoverable
  48. Hasher: hasher{},
  49. },
  50. }, nil
  51. }
  52. func (c *consistentHashingDiscovery) serversChanged(newServers []*psinet.DiscoveryServer) {
  53. if len(newServers) == 0 {
  54. c.RWMutex.Lock()
  55. c.ring = nil
  56. c.RWMutex.Unlock()
  57. } else {
  58. members := make([]consistent.Member, len(newServers))
  59. for i, server := range newServers {
  60. members[i] = server
  61. }
  62. // Note: requires full reinitialization because we cannot change
  63. // PartitionCount on the fly. Add/Remove do not update PartitionCount
  64. // and updating ParitionCount is required to ensure that there is not
  65. // a panic in the Psiphon-Labs/consistent package and that all servers
  66. // are discoverable.
  67. c.config.PartitionCount = len(newServers)
  68. c.RWMutex.Lock()
  69. c.ring = consistent.New(members, *c.config)
  70. c.RWMutex.Unlock()
  71. }
  72. }
  73. func (c *consistentHashingDiscovery) selectServers(clientIP net.IP) []*psinet.DiscoveryServer {
  74. c.RWMutex.RLock()
  75. defer c.RWMutex.RUnlock()
  76. if c.ring == nil {
  77. // No discoverable servers.
  78. return nil
  79. }
  80. member := c.ring.LocateKey(clientIP)
  81. if member == nil {
  82. // Should never happen.
  83. return nil
  84. }
  85. server := member.(*psinet.DiscoveryServer)
  86. discoveryDate := c.clk.Now()
  87. // Double check that server is discoverable at this time.
  88. if discoveryDate.Before(server.DiscoveryDateRange[0]) ||
  89. !discoveryDate.Before(server.DiscoveryDateRange[1]) {
  90. return nil
  91. }
  92. return []*psinet.DiscoveryServer{server}
  93. }