replay.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. /*
  2. * Copyright (c) 2020, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package server
  20. import (
  21. "fmt"
  22. "sync"
  23. "time"
  24. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  25. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  26. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  27. lrucache "github.com/cognusion/go-cache-lru"
  28. )
  29. const (
  30. REPLAY_CACHE_MAX_ENTRIES = 100000
  31. REPLAY_CACHE_CLEANUP_INTERVAL = 1 * time.Minute
  32. )
  33. // ReplayCache is a cache of recently used and successful network obfuscation
  34. // parameters that may be replayed -- reused -- for subsequent tunnel
  35. // connections.
  36. //
  37. // Server-side replay is analogous to client-side replay, with one key
  38. // difference: server-side replay can be applied across multiple clients in
  39. // the same GeoIP scope.
  40. //
  41. // Replay is enabled with tactics, and tactics determine the tunnel quality
  42. // targets for establishing and clearing replay parameters.
  43. //
  44. // ReplayCache has a maximum capacity with an LRU strategy to cap memory
  45. // overhead.
  46. type ReplayCache struct {
  47. support *SupportServices
  48. cacheMutex sync.Mutex
  49. cache *lrucache.Cache
  50. metrics *replayCacheMetrics
  51. }
  52. type replayCacheMetrics struct {
  53. MaxCacheEntries int64
  54. SetReplayCount int64
  55. GetReplayHitCount int64
  56. GetReplayMissCount int64
  57. FailedReplayCount int64
  58. DeleteReplayCount int64
  59. }
  60. type replayParameters struct {
  61. replayPacketManipulation bool
  62. packetManipulationSpecName string
  63. replayFragmentor bool
  64. fragmentorSeed *prng.Seed
  65. failedCount int
  66. }
  67. // NewReplayCache creates a new ReplayCache.
  68. func NewReplayCache(support *SupportServices) *ReplayCache {
  69. // Cache TTL may vary based on tactics filtering, so each cache.Add must set
  70. // the entry TTL.
  71. return &ReplayCache{
  72. support: support,
  73. cache: lrucache.NewWithLRU(
  74. lrucache.NoExpiration,
  75. REPLAY_CACHE_CLEANUP_INTERVAL,
  76. REPLAY_CACHE_MAX_ENTRIES),
  77. metrics: &replayCacheMetrics{},
  78. }
  79. }
  80. // Flush clears all entries in the ReplayCache. Flush should be called when
  81. // tactics hot reload and change to clear any cached replay parameters that
  82. // may be based on stale tactics.
  83. func (r *ReplayCache) Flush() {
  84. r.cacheMutex.Lock()
  85. defer r.cacheMutex.Unlock()
  86. r.cache.Flush()
  87. }
  88. // GetMetrics returns a snapshop of current ReplayCache event counters and
  89. // resets all counters to zero.
  90. func (r *ReplayCache) GetMetrics() LogFields {
  91. r.cacheMutex.Lock()
  92. defer r.cacheMutex.Unlock()
  93. logFields := LogFields{
  94. "replay_max_cache_entries": r.metrics.MaxCacheEntries,
  95. "replay_set_replay_count": r.metrics.SetReplayCount,
  96. "replay_get_replay_hit_count": r.metrics.GetReplayHitCount,
  97. "replay_get_replay_miss_count": r.metrics.GetReplayMissCount,
  98. "replay_failed_replay_count": r.metrics.FailedReplayCount,
  99. "replay_delete_replay_count": r.metrics.DeleteReplayCount,
  100. }
  101. r.metrics = &replayCacheMetrics{}
  102. return logFields
  103. }
  104. // GetReplayTargetDuration returns the tactics replay target tunnel duration
  105. // for the specified GeoIP data. Tunnels which are active for the specified
  106. // duration are candidates for setting or extending replay parameters. Wait
  107. // for the returned wait duration before evaluating the tunnel duration. Once
  108. // this target is met, call SetReplayParameters, which will check additional
  109. // targets and conditionally set replay parameters.
  110. func (r *ReplayCache) GetReplayTargetDuration(
  111. geoIPData GeoIPData) (bool, time.Duration, time.Duration) {
  112. p, err := r.support.ServerTacticsParametersCache.Get(geoIPData)
  113. if err != nil {
  114. log.WithTraceFields(LogFields{"error": errors.Trace(err)}).Warning(
  115. "ServerTacticsParametersCache.Get failed")
  116. return false, 0, 0
  117. }
  118. if p.IsNil() {
  119. // No tactics are configured; replay is disabled.
  120. return false, 0, 0
  121. }
  122. if !p.Bool(parameters.ServerReplayUnknownGeoIP) &&
  123. geoIPData.Country == GEOIP_UNKNOWN_VALUE &&
  124. geoIPData.ASN == GEOIP_UNKNOWN_VALUE {
  125. // Unless configured otherwise, skip replay for unknown GeoIP, since clients
  126. // may not have equivilent network conditions.
  127. return false, 0, 0
  128. }
  129. TTL := p.Duration(parameters.ServerReplayTTL)
  130. if TTL == 0 {
  131. // Server replay is disabled when TTL is 0.
  132. return false, 0, 0
  133. }
  134. return true,
  135. p.Duration(parameters.ServerReplayTargetWaitDuration),
  136. p.Duration(parameters.ServerReplayTargetTunnelDuration)
  137. }
  138. // SetReplayParameters sets replay parameters, packetManipulationSpecName and
  139. // fragmentorSeed, for the specified tunnel protocol and GeoIP scope.
  140. // Once set, replay parameters are active for a tactics-configurable TTL.
  141. //
  142. // The specified tunneledBytesUp/Down must meet tactics replay bytes
  143. // transferred targets. SetReplayParameters should be called only after first
  144. // calling ReplayTargetDuration and ensuring the tunnel meets the active
  145. // tunnel duration target. When cached replay parameters exist, their TTL is
  146. // extended and any failure counts are reset to zero.
  147. //
  148. // SetReplayParameters must be called only once per tunnel. Extending replay
  149. // parameters TTL should only be done only immediately after a successful
  150. // tunnel dial and target achievement, as this is the part of a tunnel
  151. // lifecycle at highest risk of blocking.
  152. //
  153. // The value pointed to by fragmentorSeed must not be mutated after calling
  154. // SetReplayParameters.
  155. func (r *ReplayCache) SetReplayParameters(
  156. tunnelProtocol string,
  157. geoIPData GeoIPData,
  158. packetManipulationSpecName string,
  159. fragmentorSeed *prng.Seed,
  160. tunneledBytesUp int64,
  161. tunneledBytesDown int64) {
  162. p, err := r.support.ServerTacticsParametersCache.Get(geoIPData)
  163. if err != nil {
  164. log.WithTraceFields(LogFields{"error": errors.Trace(err)}).Warning(
  165. "ServerTacticsParametersCache.Get failed")
  166. return
  167. }
  168. if p.IsNil() {
  169. // No tactics are configured; replay is disabled.
  170. return
  171. }
  172. TTL := p.Duration(parameters.ServerReplayTTL)
  173. if TTL == 0 {
  174. return
  175. }
  176. targetUpstreamBytes := p.Int(parameters.ServerReplayTargetUpstreamBytes)
  177. targetDownstreamBytes := p.Int(parameters.ServerReplayTargetDownstreamBytes)
  178. if tunneledBytesUp < int64(targetUpstreamBytes) {
  179. return
  180. }
  181. if tunneledBytesDown < int64(targetDownstreamBytes) {
  182. return
  183. }
  184. key := r.makeKey(tunnelProtocol, geoIPData)
  185. value := &replayParameters{}
  186. if p.Bool(parameters.ServerReplayPacketManipulation) {
  187. value.replayPacketManipulation = true
  188. value.packetManipulationSpecName = packetManipulationSpecName
  189. }
  190. if p.Bool(parameters.ServerReplayFragmentor) {
  191. value.replayFragmentor = (fragmentorSeed != nil)
  192. value.fragmentorSeed = fragmentorSeed
  193. }
  194. r.cacheMutex.Lock()
  195. defer r.cacheMutex.Unlock()
  196. r.cache.Add(key, value, TTL)
  197. // go-cache-lru is typically safe for concurrent access but explicit
  198. // synchronization is required when accessing Items. Items may include
  199. // entries that are expired but not yet purged.
  200. cacheSize := int64(len(r.cache.Items()))
  201. if cacheSize > r.metrics.MaxCacheEntries {
  202. r.metrics.MaxCacheEntries = cacheSize
  203. }
  204. r.metrics.SetReplayCount += 1
  205. }
  206. // GetReplayPacketManipulation returns an active replay packet manipulation
  207. // spec for the specified tunnel protocol and GeoIP scope.
  208. //
  209. // While Flush should be called to clear parameters based on stale tactics,
  210. // it's still possible for GetReplayPacketManipulation to return a spec name
  211. // that's no longer in the current list of known specs.
  212. func (r *ReplayCache) GetReplayPacketManipulation(
  213. tunnelProtocol string,
  214. geoIPData GeoIPData) (string, bool) {
  215. r.cacheMutex.Lock()
  216. defer r.cacheMutex.Unlock()
  217. parameters, ok := r.getReplayParameters(
  218. tunnelProtocol, geoIPData)
  219. if !ok {
  220. return "", false
  221. }
  222. if !parameters.replayPacketManipulation {
  223. return "", false
  224. }
  225. return parameters.packetManipulationSpecName, true
  226. }
  227. // GetReplayFragmentor returns an active replay fragmentor seed for the
  228. // specified tunnel protocol and GeoIP scope.
  229. func (r *ReplayCache) GetReplayFragmentor(
  230. tunnelProtocol string,
  231. geoIPData GeoIPData) (*prng.Seed, bool) {
  232. r.cacheMutex.Lock()
  233. defer r.cacheMutex.Unlock()
  234. parameters, ok := r.getReplayParameters(
  235. tunnelProtocol, geoIPData)
  236. if !ok {
  237. return nil, false
  238. }
  239. if !parameters.replayFragmentor {
  240. return nil, false
  241. }
  242. return parameters.fragmentorSeed, true
  243. }
  244. func (r *ReplayCache) getReplayParameters(
  245. tunnelProtocol string,
  246. geoIPData GeoIPData) (*replayParameters, bool) {
  247. key := r.makeKey(tunnelProtocol, geoIPData)
  248. value, ok := r.cache.Get(key)
  249. if !ok {
  250. r.metrics.GetReplayMissCount += 1
  251. return nil, false
  252. }
  253. r.metrics.GetReplayHitCount += 1
  254. parameters, ok := value.(*replayParameters)
  255. return parameters, ok
  256. }
  257. // FailedReplayParameters increments the count of tunnels which failed to
  258. // complete any liveness test and API handshake after using replay parameters.
  259. // Once a failure threshold is reached, cached replay parameters are cleared.
  260. // Call this function for tunnels which meet the failure criteria.
  261. func (r *ReplayCache) FailedReplayParameters(
  262. tunnelProtocol string,
  263. geoIPData GeoIPData,
  264. packetManipulationSpecName string,
  265. fragmentorSeed *prng.Seed) {
  266. p, err := r.support.ServerTacticsParametersCache.Get(geoIPData)
  267. if err != nil {
  268. log.WithTraceFields(LogFields{"error": errors.Trace(err)}).Warning(
  269. "ServerTacticsParametersCache.Get failed")
  270. return
  271. }
  272. thresholdFailedCount := p.Int(parameters.ServerReplayFailedCountThreshold)
  273. key := r.makeKey(tunnelProtocol, geoIPData)
  274. r.cacheMutex.Lock()
  275. defer r.cacheMutex.Unlock()
  276. parameters, ok := r.getReplayParameters(tunnelProtocol, geoIPData)
  277. if !ok {
  278. return
  279. }
  280. // Do not count the failure if the replay values for the tunnel protocol and
  281. // GeoIP scope are now different; these parameters now reflect a newer,
  282. // successful tunnel.
  283. if (parameters.replayPacketManipulation &&
  284. parameters.packetManipulationSpecName != packetManipulationSpecName) ||
  285. (parameters.replayFragmentor &&
  286. (fragmentorSeed == nil ||
  287. *parameters.fragmentorSeed != *fragmentorSeed)) {
  288. return
  289. }
  290. parameters.failedCount += 1
  291. r.metrics.FailedReplayCount += 1
  292. if thresholdFailedCount == 0 {
  293. // No failure limit; the entry will not be deleted.
  294. return
  295. }
  296. if parameters.failedCount >= thresholdFailedCount {
  297. r.cache.Delete(key)
  298. r.metrics.DeleteReplayCount += 1
  299. }
  300. }
  301. func (r *ReplayCache) makeKey(
  302. tunnelProtocol string, geoIPData GeoIPData) string {
  303. return fmt.Sprintf(
  304. "%s-%s-%s",
  305. tunnelProtocol, geoIPData.Country, geoIPData.ASN)
  306. }