dataStore.go 49 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "encoding/json"
  23. "errors"
  24. "fmt"
  25. "sync"
  26. "time"
  27. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  30. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  31. )
  32. var (
  33. datastoreServerEntriesBucket = []byte("serverEntries")
  34. datastoreServerEntryTagsBucket = []byte("serverEntryTags")
  35. datastoreServerEntryTombstoneTagsBucket = []byte("serverEntryTombstoneTags")
  36. datastoreSplitTunnelRouteETagsBucket = []byte("splitTunnelRouteETags")
  37. datastoreSplitTunnelRouteDataBucket = []byte("splitTunnelRouteData")
  38. datastoreUrlETagsBucket = []byte("urlETags")
  39. datastoreKeyValueBucket = []byte("keyValues")
  40. datastoreRemoteServerListStatsBucket = []byte("remoteServerListStats")
  41. datastoreFailedTunnelStatsBucket = []byte("failedTunnelStats")
  42. datastoreSLOKsBucket = []byte("SLOKs")
  43. datastoreTacticsBucket = []byte("tactics")
  44. datastoreSpeedTestSamplesBucket = []byte("speedTestSamples")
  45. datastoreDialParametersBucket = []byte("dialParameters")
  46. datastoreLastConnectedKey = "lastConnected"
  47. datastoreLastServerEntryFilterKey = []byte("lastServerEntryFilter")
  48. datastoreAffinityServerEntryIDKey = []byte("affinityServerEntryID")
  49. datastorePersistentStatTypeRemoteServerList = string(datastoreRemoteServerListStatsBucket)
  50. datastorePersistentStatTypeFailedTunnel = string(datastoreFailedTunnelStatsBucket)
  51. datastoreServerEntryFetchGCThreshold = 20
  52. datastoreMutex sync.RWMutex
  53. activeDatastoreDB *datastoreDB
  54. )
  55. // OpenDataStore opens and initializes the singleton data store instance.
  56. func OpenDataStore(config *Config) error {
  57. datastoreMutex.Lock()
  58. existingDB := activeDatastoreDB
  59. if existingDB != nil {
  60. datastoreMutex.Unlock()
  61. return common.ContextError(errors.New("db already open"))
  62. }
  63. newDB, err := datastoreOpenDB(config.DataStoreDirectory)
  64. if err != nil {
  65. datastoreMutex.Unlock()
  66. return common.ContextError(err)
  67. }
  68. activeDatastoreDB = newDB
  69. datastoreMutex.Unlock()
  70. _ = resetAllPersistentStatsToUnreported()
  71. return nil
  72. }
  73. // CloseDataStore closes the singleton data store instance, if open.
  74. func CloseDataStore() {
  75. datastoreMutex.Lock()
  76. defer datastoreMutex.Unlock()
  77. if activeDatastoreDB == nil {
  78. return
  79. }
  80. err := activeDatastoreDB.close()
  81. if err != nil {
  82. NoticeAlert("failed to close database: %s", common.ContextError(err))
  83. }
  84. activeDatastoreDB = nil
  85. }
  86. func datastoreView(fn func(tx *datastoreTx) error) error {
  87. datastoreMutex.RLock()
  88. defer datastoreMutex.RUnlock()
  89. if activeDatastoreDB == nil {
  90. return common.ContextError(errors.New("database not open"))
  91. }
  92. err := activeDatastoreDB.view(fn)
  93. if err != nil {
  94. err = common.ContextError(err)
  95. }
  96. return err
  97. }
  98. func datastoreUpdate(fn func(tx *datastoreTx) error) error {
  99. datastoreMutex.RLock()
  100. defer datastoreMutex.RUnlock()
  101. if activeDatastoreDB == nil {
  102. return common.ContextError(errors.New("database not open"))
  103. }
  104. err := activeDatastoreDB.update(fn)
  105. if err != nil {
  106. err = common.ContextError(err)
  107. }
  108. return err
  109. }
  110. // StoreServerEntry adds the server entry to the data store.
  111. //
  112. // When a server entry already exists for a given server, it will be
  113. // replaced only if replaceIfExists is set or if the the ConfigurationVersion
  114. // field of the new entry is strictly higher than the existing entry.
  115. //
  116. // If the server entry data is malformed, an alert notice is issued and
  117. // the entry is skipped; no error is returned.
  118. func StoreServerEntry(serverEntryFields protocol.ServerEntryFields, replaceIfExists bool) error {
  119. // TODO: call serverEntryFields.VerifySignature. At this time, we do not do
  120. // this as not all server entries have an individual signature field. All
  121. // StoreServerEntry callers either call VerifySignature or obtain server
  122. // entries from a trusted source (embedded in a signed client, or in a signed
  123. // authenticated package).
  124. // Server entries should already be validated before this point,
  125. // so instead of skipping we fail with an error.
  126. err := protocol.ValidateServerEntryFields(serverEntryFields)
  127. if err != nil {
  128. return common.ContextError(
  129. fmt.Errorf("invalid server entry: %s", err))
  130. }
  131. // BoltDB implementation note:
  132. // For simplicity, we don't maintain indexes on server entry
  133. // region or supported protocols. Instead, we perform full-bucket
  134. // scans with a filter. With a small enough database (thousands or
  135. // even tens of thousand of server entries) and common enough
  136. // values (e.g., many servers support all protocols), performance
  137. // is expected to be acceptable.
  138. err = datastoreUpdate(func(tx *datastoreTx) error {
  139. serverEntries := tx.bucket(datastoreServerEntriesBucket)
  140. serverEntryTags := tx.bucket(datastoreServerEntryTagsBucket)
  141. serverEntryTombstoneTags := tx.bucket(datastoreServerEntryTombstoneTagsBucket)
  142. serverEntryID := []byte(serverEntryFields.GetIPAddress())
  143. // Check not only that the entry exists, but is valid. This
  144. // will replace in the rare case where the data is corrupt.
  145. existingConfigurationVersion := -1
  146. existingData := serverEntries.get(serverEntryID)
  147. if existingData != nil {
  148. var existingServerEntry *protocol.ServerEntry
  149. err := json.Unmarshal(existingData, &existingServerEntry)
  150. if err == nil {
  151. existingConfigurationVersion = existingServerEntry.ConfigurationVersion
  152. }
  153. }
  154. exists := existingConfigurationVersion > -1
  155. newer := exists && existingConfigurationVersion < serverEntryFields.GetConfigurationVersion()
  156. update := !exists || replaceIfExists || newer
  157. if !update {
  158. // Disabling this notice, for now, as it generates too much noise
  159. // in diagnostics with clients that always submit embedded servers
  160. // to the core on each run.
  161. // NoticeInfo("ignored update for server %s", serverEntry.IpAddress)
  162. return nil
  163. }
  164. serverEntryTag := serverEntryFields.GetTag()
  165. // Generate a derived tag when the server entry has no tag.
  166. if serverEntryTag == "" {
  167. serverEntryTag = protocol.GenerateServerEntryTag(
  168. serverEntryFields.GetIPAddress(),
  169. serverEntryFields.GetWebServerSecret())
  170. serverEntryFields.SetTag(serverEntryTag)
  171. }
  172. serverEntryTagBytes := []byte(serverEntryTag)
  173. // Ignore the server entry if it was previously pruned and a tombstone is
  174. // set.
  175. //
  176. // This logic is enforced only for embedded server entries, as all other
  177. // sources are considered to be definitive and non-stale. These exceptions
  178. // intentionally allow the scenario where a server is temporarily deleted
  179. // and then restored; in this case, it's desired for pruned server entries
  180. // to be restored.
  181. if serverEntryFields.GetLocalSource() == protocol.SERVER_ENTRY_SOURCE_EMBEDDED {
  182. if serverEntryTombstoneTags.get(serverEntryTagBytes) != nil {
  183. return nil
  184. }
  185. }
  186. data, err := json.Marshal(serverEntryFields)
  187. if err != nil {
  188. return common.ContextError(err)
  189. }
  190. err = serverEntries.put(serverEntryID, data)
  191. if err != nil {
  192. return common.ContextError(err)
  193. }
  194. err = serverEntryTags.put(serverEntryTagBytes, serverEntryID)
  195. if err != nil {
  196. return common.ContextError(err)
  197. }
  198. NoticeInfo("updated server %s", serverEntryFields.GetIPAddress())
  199. return nil
  200. })
  201. if err != nil {
  202. return common.ContextError(err)
  203. }
  204. return nil
  205. }
  206. // StoreServerEntries stores a list of server entries.
  207. // There is an independent transaction for each entry insert/update.
  208. func StoreServerEntries(
  209. config *Config,
  210. serverEntries []protocol.ServerEntryFields,
  211. replaceIfExists bool) error {
  212. for _, serverEntryFields := range serverEntries {
  213. err := StoreServerEntry(serverEntryFields, replaceIfExists)
  214. if err != nil {
  215. return common.ContextError(err)
  216. }
  217. }
  218. return nil
  219. }
  220. // StreamingStoreServerEntries stores a list of server entries.
  221. // There is an independent transaction for each entry insert/update.
  222. func StreamingStoreServerEntries(
  223. config *Config,
  224. serverEntries *protocol.StreamingServerEntryDecoder,
  225. replaceIfExists bool) error {
  226. // Note: both StreamingServerEntryDecoder.Next and StoreServerEntry
  227. // allocate temporary memory buffers for hex/JSON decoding/encoding,
  228. // so this isn't true constant-memory streaming (it depends on garbage
  229. // collection).
  230. n := 0
  231. for {
  232. serverEntry, err := serverEntries.Next()
  233. if err != nil {
  234. return common.ContextError(err)
  235. }
  236. if serverEntry == nil {
  237. // No more server entries
  238. break
  239. }
  240. err = StoreServerEntry(serverEntry, replaceIfExists)
  241. if err != nil {
  242. return common.ContextError(err)
  243. }
  244. n += 1
  245. if n == datastoreServerEntryFetchGCThreshold {
  246. DoGarbageCollection()
  247. n = 0
  248. }
  249. }
  250. return nil
  251. }
  252. // PromoteServerEntry sets the server affinity server entry ID to the
  253. // specified server entry IP address.
  254. func PromoteServerEntry(config *Config, ipAddress string) error {
  255. err := datastoreUpdate(func(tx *datastoreTx) error {
  256. serverEntryID := []byte(ipAddress)
  257. // Ensure the corresponding server entry exists before
  258. // setting server affinity.
  259. bucket := tx.bucket(datastoreServerEntriesBucket)
  260. data := bucket.get(serverEntryID)
  261. if data == nil {
  262. NoticeAlert(
  263. "PromoteServerEntry: ignoring unknown server entry: %s",
  264. ipAddress)
  265. return nil
  266. }
  267. bucket = tx.bucket(datastoreKeyValueBucket)
  268. err := bucket.put(datastoreAffinityServerEntryIDKey, serverEntryID)
  269. if err != nil {
  270. return common.ContextError(err)
  271. }
  272. // Store the current server entry filter (e.g, region, etc.) that
  273. // was in use when the entry was promoted. This is used to detect
  274. // when the top ranked server entry was promoted under a different
  275. // filter.
  276. currentFilter, err := makeServerEntryFilterValue(config)
  277. if err != nil {
  278. return common.ContextError(err)
  279. }
  280. return bucket.put(datastoreLastServerEntryFilterKey, currentFilter)
  281. })
  282. if err != nil {
  283. return common.ContextError(err)
  284. }
  285. return nil
  286. }
  287. func makeServerEntryFilterValue(config *Config) ([]byte, error) {
  288. // Currently, only a change of EgressRegion will "break" server affinity.
  289. // If the tunnel protocol filter changes, any existing affinity server
  290. // either passes the new filter, or it will be skipped anyway.
  291. return []byte(config.EgressRegion), nil
  292. }
  293. func hasServerEntryFilterChanged(config *Config) (bool, error) {
  294. currentFilter, err := makeServerEntryFilterValue(config)
  295. if err != nil {
  296. return false, common.ContextError(err)
  297. }
  298. changed := false
  299. err = datastoreView(func(tx *datastoreTx) error {
  300. bucket := tx.bucket(datastoreKeyValueBucket)
  301. previousFilter := bucket.get(datastoreLastServerEntryFilterKey)
  302. // When not found, previousFilter will be nil; ensures this
  303. // results in "changed", even if currentFilter is len(0).
  304. if previousFilter == nil ||
  305. bytes.Compare(previousFilter, currentFilter) != 0 {
  306. changed = true
  307. }
  308. return nil
  309. })
  310. if err != nil {
  311. return false, common.ContextError(err)
  312. }
  313. return changed, nil
  314. }
  315. // ServerEntryIterator is used to iterate over
  316. // stored server entries in rank order.
  317. type ServerEntryIterator struct {
  318. config *Config
  319. applyServerAffinity bool
  320. serverEntryIDs [][]byte
  321. serverEntryIndex int
  322. isTacticsServerEntryIterator bool
  323. isTargetServerEntryIterator bool
  324. hasNextTargetServerEntry bool
  325. targetServerEntry *protocol.ServerEntry
  326. }
  327. // NewServerEntryIterator creates a new ServerEntryIterator.
  328. //
  329. // The boolean return value indicates whether to treat the first server(s)
  330. // as affinity servers or not. When the server entry selection filter changes
  331. // such as from a specific region to any region, or when there was no previous
  332. // filter/iterator, the the first server(s) are arbitrary and should not be
  333. // given affinity treatment.
  334. //
  335. // NewServerEntryIterator and any returned ServerEntryIterator are not
  336. // designed for concurrent use as not all related datastore operations are
  337. // performed in a single transaction.
  338. //
  339. func NewServerEntryIterator(config *Config) (bool, *ServerEntryIterator, error) {
  340. // When configured, this target server entry is the only candidate
  341. if config.TargetServerEntry != "" {
  342. return newTargetServerEntryIterator(config, false)
  343. }
  344. filterChanged, err := hasServerEntryFilterChanged(config)
  345. if err != nil {
  346. return false, nil, common.ContextError(err)
  347. }
  348. applyServerAffinity := !filterChanged
  349. iterator := &ServerEntryIterator{
  350. config: config,
  351. applyServerAffinity: applyServerAffinity,
  352. }
  353. err = iterator.reset(true)
  354. if err != nil {
  355. return false, nil, common.ContextError(err)
  356. }
  357. return applyServerAffinity, iterator, nil
  358. }
  359. func NewTacticsServerEntryIterator(config *Config) (*ServerEntryIterator, error) {
  360. // When configured, this target server entry is the only candidate
  361. if config.TargetServerEntry != "" {
  362. _, iterator, err := newTargetServerEntryIterator(config, true)
  363. return iterator, err
  364. }
  365. iterator := &ServerEntryIterator{
  366. config: config,
  367. isTacticsServerEntryIterator: true,
  368. }
  369. err := iterator.reset(true)
  370. if err != nil {
  371. return nil, common.ContextError(err)
  372. }
  373. return iterator, nil
  374. }
  375. // newTargetServerEntryIterator is a helper for initializing the TargetServerEntry case
  376. func newTargetServerEntryIterator(config *Config, isTactics bool) (bool, *ServerEntryIterator, error) {
  377. serverEntry, err := protocol.DecodeServerEntry(
  378. config.TargetServerEntry, config.loadTimestamp, protocol.SERVER_ENTRY_SOURCE_TARGET)
  379. if err != nil {
  380. return false, nil, common.ContextError(err)
  381. }
  382. if serverEntry.Tag == "" {
  383. serverEntry.Tag = protocol.GenerateServerEntryTag(
  384. serverEntry.IpAddress, serverEntry.WebServerSecret)
  385. }
  386. if isTactics {
  387. if len(serverEntry.GetSupportedTacticsProtocols()) == 0 {
  388. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support tactics protocols"))
  389. }
  390. } else {
  391. if config.EgressRegion != "" && serverEntry.Region != config.EgressRegion {
  392. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support EgressRegion"))
  393. }
  394. limitTunnelProtocols := config.GetClientParametersSnapshot().TunnelProtocols(parameters.LimitTunnelProtocols)
  395. if len(limitTunnelProtocols) > 0 {
  396. // At the ServerEntryIterator level, only limitTunnelProtocols is applied;
  397. // excludeIntensive is handled higher up.
  398. if len(serverEntry.GetSupportedProtocols(
  399. config.UseUpstreamProxy(), limitTunnelProtocols, false)) == 0 {
  400. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support LimitTunnelProtocols"))
  401. }
  402. }
  403. }
  404. iterator := &ServerEntryIterator{
  405. isTacticsServerEntryIterator: isTactics,
  406. isTargetServerEntryIterator: true,
  407. hasNextTargetServerEntry: true,
  408. targetServerEntry: serverEntry,
  409. }
  410. NoticeInfo("using TargetServerEntry: %s", serverEntry.IpAddress)
  411. return false, iterator, nil
  412. }
  413. // Reset a NewServerEntryIterator to the start of its cycle. The next
  414. // call to Next will return the first server entry.
  415. func (iterator *ServerEntryIterator) Reset() error {
  416. return iterator.reset(false)
  417. }
  418. func (iterator *ServerEntryIterator) reset(isInitialRound bool) error {
  419. iterator.Close()
  420. if iterator.isTargetServerEntryIterator {
  421. iterator.hasNextTargetServerEntry = true
  422. return nil
  423. }
  424. // BoltDB implementation note:
  425. // We don't keep a transaction open for the duration of the iterator
  426. // because this would expose the following semantics to consumer code:
  427. //
  428. // Read-only transactions and read-write transactions ... generally
  429. // shouldn't be opened simultaneously in the same goroutine. This can
  430. // cause a deadlock as the read-write transaction needs to periodically
  431. // re-map the data file but it cannot do so while a read-only
  432. // transaction is open.
  433. // (https://github.com/boltdb/bolt)
  434. //
  435. // So the underlying serverEntriesBucket could change after the serverEntryIDs
  436. // list is built.
  437. var serverEntryIDs [][]byte
  438. err := datastoreView(func(tx *datastoreTx) error {
  439. bucket := tx.bucket(datastoreKeyValueBucket)
  440. serverEntryIDs = make([][]byte, 0)
  441. shuffleHead := 0
  442. var affinityServerEntryID []byte
  443. // In the first round only, move any server affinity candiate to the
  444. // very first position.
  445. if isInitialRound &&
  446. iterator.applyServerAffinity {
  447. affinityServerEntryID = bucket.get(datastoreAffinityServerEntryIDKey)
  448. if affinityServerEntryID != nil {
  449. serverEntryIDs = append(serverEntryIDs, append([]byte(nil), affinityServerEntryID...))
  450. shuffleHead = 1
  451. }
  452. }
  453. bucket = tx.bucket(datastoreServerEntriesBucket)
  454. cursor := bucket.cursor()
  455. for key := cursor.firstKey(); key != nil; key = cursor.nextKey() {
  456. if affinityServerEntryID != nil {
  457. if bytes.Equal(affinityServerEntryID, key) {
  458. continue
  459. }
  460. }
  461. serverEntryIDs = append(serverEntryIDs, append([]byte(nil), key...))
  462. }
  463. cursor.close()
  464. // Randomly shuffle the entire list of server IDs, excluding the
  465. // server affinity candidate.
  466. for i := len(serverEntryIDs) - 1; i > shuffleHead-1; i-- {
  467. j := prng.Intn(i+1-shuffleHead) + shuffleHead
  468. serverEntryIDs[i], serverEntryIDs[j] = serverEntryIDs[j], serverEntryIDs[i]
  469. }
  470. // In the first round, or with some probability, move _potential_ replay
  471. // candidates to the front of the list (excepting the server affinity slot,
  472. // if any). This move is post-shuffle so the order is still randomized. To
  473. // save the memory overhead of unmarshalling all dial parameters, this
  474. // operation just moves any server with a dial parameter record to the
  475. // front. Whether the dial parameter remains valid for replay -- TTL,
  476. // tactics/config unchanged, etc. --- is checked later.
  477. //
  478. // TODO: move only up to parameters.ReplayCandidateCount to front?
  479. if (isInitialRound ||
  480. iterator.config.GetClientParametersSnapshot().WeightedCoinFlip(
  481. parameters.ReplayLaterRoundMoveToFrontProbability)) &&
  482. iterator.config.GetClientParametersSnapshot().Int(
  483. parameters.ReplayCandidateCount) != 0 {
  484. networkID := []byte(iterator.config.GetNetworkID())
  485. dialParamsBucket := tx.bucket(datastoreDialParametersBucket)
  486. i := shuffleHead
  487. j := len(serverEntryIDs) - 1
  488. for {
  489. for ; i < j; i++ {
  490. key := makeDialParametersKey(serverEntryIDs[i], networkID)
  491. if dialParamsBucket.get(key) == nil {
  492. break
  493. }
  494. }
  495. for ; i < j; j-- {
  496. key := makeDialParametersKey(serverEntryIDs[j], networkID)
  497. if dialParamsBucket.get(key) != nil {
  498. break
  499. }
  500. }
  501. if i < j {
  502. serverEntryIDs[i], serverEntryIDs[j] = serverEntryIDs[j], serverEntryIDs[i]
  503. i++
  504. j--
  505. } else {
  506. break
  507. }
  508. }
  509. }
  510. return nil
  511. })
  512. if err != nil {
  513. return common.ContextError(err)
  514. }
  515. iterator.serverEntryIDs = serverEntryIDs
  516. iterator.serverEntryIndex = 0
  517. return nil
  518. }
  519. // Close cleans up resources associated with a ServerEntryIterator.
  520. func (iterator *ServerEntryIterator) Close() {
  521. iterator.serverEntryIDs = nil
  522. iterator.serverEntryIndex = 0
  523. }
  524. // Next returns the next server entry, by rank, for a ServerEntryIterator.
  525. // Returns nil with no error when there is no next item.
  526. func (iterator *ServerEntryIterator) Next() (*protocol.ServerEntry, error) {
  527. var serverEntry *protocol.ServerEntry
  528. var err error
  529. defer func() {
  530. if err != nil {
  531. iterator.Close()
  532. }
  533. }()
  534. if iterator.isTargetServerEntryIterator {
  535. if iterator.hasNextTargetServerEntry {
  536. iterator.hasNextTargetServerEntry = false
  537. return MakeCompatibleServerEntry(iterator.targetServerEntry), nil
  538. }
  539. return nil, nil
  540. }
  541. // There are no region/protocol indexes for the server entries bucket.
  542. // Loop until we have the next server entry that matches the iterator
  543. // filter requirements.
  544. for {
  545. if iterator.serverEntryIndex >= len(iterator.serverEntryIDs) {
  546. // There is no next item
  547. return nil, nil
  548. }
  549. serverEntryID := iterator.serverEntryIDs[iterator.serverEntryIndex]
  550. iterator.serverEntryIndex += 1
  551. serverEntry = nil
  552. err = datastoreView(func(tx *datastoreTx) error {
  553. serverEntries := tx.bucket(datastoreServerEntriesBucket)
  554. value := serverEntries.get(serverEntryID)
  555. if value == nil {
  556. return nil
  557. }
  558. // Must unmarshal here as slice is only valid within transaction.
  559. err = json.Unmarshal(value, &serverEntry)
  560. if err != nil {
  561. // In case of data corruption or a bug causing this condition,
  562. // do not stop iterating.
  563. serverEntry = nil
  564. NoticeAlert(
  565. "ServerEntryIterator.Next: json.Unmarshal failed: %s",
  566. common.ContextError(err))
  567. }
  568. return nil
  569. })
  570. if err != nil {
  571. return nil, common.ContextError(err)
  572. }
  573. if serverEntry == nil {
  574. // In case of data corruption or a bug causing this condition,
  575. // do not stop iterating.
  576. NoticeAlert(
  577. "ServerEntryIterator.Next: unexpected missing server entry: %s",
  578. string(serverEntryID))
  579. continue
  580. }
  581. // Generate a derived server entry tag for server entries with no tag. Store
  582. // back the updated server entry so that (a) the tag doesn't need to be
  583. // regenerated; (b) the server entry can be looked up by tag (currently used
  584. // in the status request prune case).
  585. //
  586. // This is a distinct transaction so as to avoid the overhead of regular
  587. // write transactions in the iterator; once tags have been stored back, most
  588. // iterator transactions will remain read-only.
  589. if serverEntry.Tag == "" {
  590. serverEntry.Tag = protocol.GenerateServerEntryTag(
  591. serverEntry.IpAddress, serverEntry.WebServerSecret)
  592. err = datastoreUpdate(func(tx *datastoreTx) error {
  593. serverEntries := tx.bucket(datastoreServerEntriesBucket)
  594. serverEntryTags := tx.bucket(datastoreServerEntryTagsBucket)
  595. // We must reload and store back the server entry _fields_ to preserve any
  596. // currently unrecognized fields, for future compatibility.
  597. value := serverEntries.get(serverEntryID)
  598. if value == nil {
  599. return nil
  600. }
  601. var serverEntryFields protocol.ServerEntryFields
  602. err := json.Unmarshal(value, &serverEntryFields)
  603. if err != nil {
  604. return common.ContextError(err)
  605. }
  606. // As there is minor race condition between loading/checking serverEntry
  607. // and reloading/modifying serverEntryFields, this transaction references
  608. // only the freshly loaded fields when checking and setting the tag.
  609. serverEntryTag := serverEntryFields.GetTag()
  610. if serverEntryTag != "" {
  611. return nil
  612. }
  613. serverEntryTag = protocol.GenerateServerEntryTag(
  614. serverEntryFields.GetIPAddress(),
  615. serverEntryFields.GetWebServerSecret())
  616. serverEntryFields.SetTag(serverEntryTag)
  617. jsonServerEntryFields, err := json.Marshal(serverEntryFields)
  618. if err != nil {
  619. return common.ContextError(err)
  620. }
  621. serverEntries.put(serverEntryID, jsonServerEntryFields)
  622. if err != nil {
  623. return common.ContextError(err)
  624. }
  625. serverEntryTags.put([]byte(serverEntryTag), serverEntryID)
  626. if err != nil {
  627. return common.ContextError(err)
  628. }
  629. return nil
  630. })
  631. if err != nil {
  632. // Do not stop.
  633. NoticeAlert(
  634. "ServerEntryIterator.Next: update server entry failed: %s",
  635. common.ContextError(err))
  636. }
  637. }
  638. if iterator.serverEntryIndex%datastoreServerEntryFetchGCThreshold == 0 {
  639. DoGarbageCollection()
  640. }
  641. // Check filter requirements
  642. if iterator.isTacticsServerEntryIterator {
  643. // Tactics doesn't filter by egress region.
  644. if len(serverEntry.GetSupportedTacticsProtocols()) > 0 {
  645. break
  646. }
  647. } else {
  648. if iterator.config.EgressRegion == "" ||
  649. serverEntry.Region == iterator.config.EgressRegion {
  650. break
  651. }
  652. }
  653. }
  654. return MakeCompatibleServerEntry(serverEntry), nil
  655. }
  656. // MakeCompatibleServerEntry provides backwards compatibility with old server entries
  657. // which have a single meekFrontingDomain and not a meekFrontingAddresses array.
  658. // By copying this one meekFrontingDomain into meekFrontingAddresses, this client effectively
  659. // uses that single value as legacy clients do.
  660. func MakeCompatibleServerEntry(serverEntry *protocol.ServerEntry) *protocol.ServerEntry {
  661. if len(serverEntry.MeekFrontingAddresses) == 0 && serverEntry.MeekFrontingDomain != "" {
  662. serverEntry.MeekFrontingAddresses =
  663. append(serverEntry.MeekFrontingAddresses, serverEntry.MeekFrontingDomain)
  664. }
  665. return serverEntry
  666. }
  667. // PruneServerEntry deletes the server entry, along with associated data,
  668. // corresponding to the specified server entry tag. Pruning is subject to an
  669. // age check. In the case of an error, a notice is emitted.
  670. func PruneServerEntry(config *Config, serverEntryTag string) {
  671. err := pruneServerEntry(config, serverEntryTag)
  672. if err != nil {
  673. NoticeAlert(
  674. "PruneServerEntry failed: %s: %s",
  675. serverEntryTag, common.ContextError(err))
  676. return
  677. }
  678. NoticePruneServerEntry(serverEntryTag)
  679. }
  680. func pruneServerEntry(config *Config, serverEntryTag string) error {
  681. minimumAgeForPruning := config.GetClientParametersSnapshot().Duration(
  682. parameters.ServerEntryMinimumAgeForPruning)
  683. return datastoreUpdate(func(tx *datastoreTx) error {
  684. serverEntries := tx.bucket(datastoreServerEntriesBucket)
  685. serverEntryTags := tx.bucket(datastoreServerEntryTagsBucket)
  686. serverEntryTombstoneTags := tx.bucket(datastoreServerEntryTombstoneTagsBucket)
  687. keyValues := tx.bucket(datastoreKeyValueBucket)
  688. dialParameters := tx.bucket(datastoreDialParametersBucket)
  689. serverEntryTagBytes := []byte(serverEntryTag)
  690. serverEntryID := serverEntryTags.get(serverEntryTagBytes)
  691. if serverEntryID == nil {
  692. return common.ContextError(errors.New("server entry tag not found"))
  693. }
  694. serverEntryJson := serverEntries.get(serverEntryID)
  695. if serverEntryJson == nil {
  696. return common.ContextError(errors.New("server entry not found"))
  697. }
  698. var serverEntry *protocol.ServerEntry
  699. err := json.Unmarshal(serverEntryJson, &serverEntry)
  700. if err != nil {
  701. common.ContextError(err)
  702. }
  703. // Only prune sufficiently old server entries. This mitigates the case where
  704. // stale data in psiphond will incorrectly identify brand new servers as
  705. // being invalid/deleted.
  706. serverEntryLocalTimestamp, err := time.Parse(time.RFC3339, serverEntry.LocalTimestamp)
  707. if err != nil {
  708. common.ContextError(err)
  709. }
  710. if serverEntryLocalTimestamp.Add(minimumAgeForPruning).After(time.Now()) {
  711. return nil
  712. }
  713. // Handle the server IP recycle case where multiple serverEntryTags records
  714. // refer to the same server IP. Only delete the server entry record when its
  715. // tag matches the pruned tag. Otherwise, the server entry record is
  716. // associated with another tag. The pruned tag is still deleted.
  717. deleteServerEntry := (serverEntry.Tag == serverEntryTag)
  718. err = serverEntryTags.delete(serverEntryTagBytes)
  719. if err != nil {
  720. common.ContextError(err)
  721. }
  722. if deleteServerEntry {
  723. err = serverEntries.delete(serverEntryID)
  724. if err != nil {
  725. common.ContextError(err)
  726. }
  727. affinityServerEntryID := keyValues.get(datastoreAffinityServerEntryIDKey)
  728. if 0 == bytes.Compare(affinityServerEntryID, serverEntryID) {
  729. err = keyValues.delete(datastoreAffinityServerEntryIDKey)
  730. if err != nil {
  731. return common.ContextError(err)
  732. }
  733. }
  734. // TODO: expose boltdb Seek functionality to skip to first matching record.
  735. cursor := dialParameters.cursor()
  736. defer cursor.close()
  737. foundFirstMatch := false
  738. for key, _ := cursor.first(); key != nil; key, _ = cursor.next() {
  739. // Dial parameters key has serverID as a prefix; see makeDialParametersKey.
  740. if bytes.HasPrefix(key, serverEntryID) {
  741. foundFirstMatch = true
  742. err := dialParameters.delete(key)
  743. if err != nil {
  744. return common.ContextError(err)
  745. }
  746. } else if foundFirstMatch {
  747. break
  748. }
  749. }
  750. }
  751. // Tombstones prevent reimporting pruned server entries. Tombstone
  752. // identifiers are tags, which are derived from the web server secret in
  753. // addition to the server IP, so tombstones will not clobber recycled server
  754. // IPs as long as new web server secrets are generated in the recycle case.
  755. //
  756. // Tombstones are set only for embedded server entries, as all other sources
  757. // are expected to provide valid server entries; this also provides a fail-
  758. // safe mechanism to restore pruned server entries through all non-embedded
  759. // sources.
  760. if serverEntry.LocalSource == protocol.SERVER_ENTRY_SOURCE_EMBEDDED {
  761. err = serverEntryTombstoneTags.put(serverEntryTagBytes, []byte{1})
  762. if err != nil {
  763. return common.ContextError(err)
  764. }
  765. }
  766. return nil
  767. })
  768. }
  769. func scanServerEntries(scanner func(*protocol.ServerEntry)) error {
  770. err := datastoreView(func(tx *datastoreTx) error {
  771. bucket := tx.bucket(datastoreServerEntriesBucket)
  772. cursor := bucket.cursor()
  773. n := 0
  774. for key, value := cursor.first(); key != nil; key, value = cursor.next() {
  775. var serverEntry *protocol.ServerEntry
  776. err := json.Unmarshal(value, &serverEntry)
  777. if err != nil {
  778. // In case of data corruption or a bug causing this condition,
  779. // do not stop iterating.
  780. NoticeAlert("scanServerEntries: %s", common.ContextError(err))
  781. continue
  782. }
  783. scanner(serverEntry)
  784. n += 1
  785. if n == datastoreServerEntryFetchGCThreshold {
  786. DoGarbageCollection()
  787. n = 0
  788. }
  789. }
  790. cursor.close()
  791. return nil
  792. })
  793. if err != nil {
  794. return common.ContextError(err)
  795. }
  796. return nil
  797. }
  798. // CountServerEntries returns a count of stored server entries.
  799. func CountServerEntries() int {
  800. count := 0
  801. err := scanServerEntries(func(_ *protocol.ServerEntry) {
  802. count += 1
  803. })
  804. if err != nil {
  805. NoticeAlert("CountServerEntries failed: %s", err)
  806. return 0
  807. }
  808. return count
  809. }
  810. // CountServerEntriesWithConstraints returns a count of stored server entries for
  811. // the specified region and tunnel protocol limits.
  812. func CountServerEntriesWithConstraints(
  813. useUpstreamProxy bool,
  814. region string,
  815. constraints *protocolSelectionConstraints) (int, int) {
  816. // When CountServerEntriesWithConstraints is called only
  817. // limitTunnelProtocolState is fixed; excludeIntensive is transitory.
  818. excludeIntensive := false
  819. initialCount := 0
  820. count := 0
  821. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  822. if region == "" || serverEntry.Region == region {
  823. if constraints.isInitialCandidate(excludeIntensive, serverEntry) {
  824. initialCount += 1
  825. }
  826. if constraints.isCandidate(excludeIntensive, serverEntry) {
  827. count += 1
  828. }
  829. }
  830. })
  831. if err != nil {
  832. NoticeAlert("CountServerEntriesWithConstraints failed: %s", err)
  833. return 0, 0
  834. }
  835. return initialCount, count
  836. }
  837. // ReportAvailableRegions prints a notice with the available egress regions.
  838. // When limitState has initial protocols, the available regions are limited
  839. // to those available for the initial protocols; or if limitState has general
  840. // limited protocols, the available regions are similarly limited.
  841. func ReportAvailableRegions(config *Config, constraints *protocolSelectionConstraints) {
  842. // When ReportAvailableRegions is called only limitTunnelProtocolState is
  843. // fixed; excludeIntensive is transitory.
  844. excludeIntensive := false
  845. regions := make(map[string]bool)
  846. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  847. isCandidate := false
  848. if constraints.hasInitialProtocols() {
  849. isCandidate = constraints.isInitialCandidate(excludeIntensive, serverEntry)
  850. } else {
  851. isCandidate = constraints.isCandidate(excludeIntensive, serverEntry)
  852. }
  853. if isCandidate {
  854. regions[serverEntry.Region] = true
  855. }
  856. })
  857. if err != nil {
  858. NoticeAlert("ReportAvailableRegions failed: %s", err)
  859. return
  860. }
  861. regionList := make([]string, 0, len(regions))
  862. for region := range regions {
  863. // Some server entries do not have a region, but it makes no sense to return
  864. // an empty string as an "available region".
  865. if region != "" {
  866. regionList = append(regionList, region)
  867. }
  868. }
  869. NoticeAvailableEgressRegions(regionList)
  870. }
  871. // SetSplitTunnelRoutes updates the cached routes data for
  872. // the given region. The associated etag is also stored and
  873. // used to make efficient web requests for updates to the data.
  874. func SetSplitTunnelRoutes(region, etag string, data []byte) error {
  875. err := datastoreUpdate(func(tx *datastoreTx) error {
  876. bucket := tx.bucket(datastoreSplitTunnelRouteETagsBucket)
  877. err := bucket.put([]byte(region), []byte(etag))
  878. if err != nil {
  879. return common.ContextError(err)
  880. }
  881. bucket = tx.bucket(datastoreSplitTunnelRouteDataBucket)
  882. err = bucket.put([]byte(region), data)
  883. if err != nil {
  884. return common.ContextError(err)
  885. }
  886. return nil
  887. })
  888. if err != nil {
  889. return common.ContextError(err)
  890. }
  891. return nil
  892. }
  893. // GetSplitTunnelRoutesETag retrieves the etag for cached routes
  894. // data for the specified region. If not found, it returns an empty string value.
  895. func GetSplitTunnelRoutesETag(region string) (string, error) {
  896. var etag string
  897. err := datastoreView(func(tx *datastoreTx) error {
  898. bucket := tx.bucket(datastoreSplitTunnelRouteETagsBucket)
  899. etag = string(bucket.get([]byte(region)))
  900. return nil
  901. })
  902. if err != nil {
  903. return "", common.ContextError(err)
  904. }
  905. return etag, nil
  906. }
  907. // GetSplitTunnelRoutesData retrieves the cached routes data
  908. // for the specified region. If not found, it returns a nil value.
  909. func GetSplitTunnelRoutesData(region string) ([]byte, error) {
  910. var data []byte
  911. err := datastoreView(func(tx *datastoreTx) error {
  912. bucket := tx.bucket(datastoreSplitTunnelRouteDataBucket)
  913. value := bucket.get([]byte(region))
  914. if value != nil {
  915. // Must make a copy as slice is only valid within transaction.
  916. data = make([]byte, len(value))
  917. copy(data, value)
  918. }
  919. return nil
  920. })
  921. if err != nil {
  922. return nil, common.ContextError(err)
  923. }
  924. return data, nil
  925. }
  926. // SetUrlETag stores an ETag for the specfied URL.
  927. // Note: input URL is treated as a string, and is not
  928. // encoded or decoded or otherwise canonicalized.
  929. func SetUrlETag(url, etag string) error {
  930. err := datastoreUpdate(func(tx *datastoreTx) error {
  931. bucket := tx.bucket(datastoreUrlETagsBucket)
  932. err := bucket.put([]byte(url), []byte(etag))
  933. if err != nil {
  934. return common.ContextError(err)
  935. }
  936. return nil
  937. })
  938. if err != nil {
  939. return common.ContextError(err)
  940. }
  941. return nil
  942. }
  943. // GetUrlETag retrieves a previously stored an ETag for the
  944. // specfied URL. If not found, it returns an empty string value.
  945. func GetUrlETag(url string) (string, error) {
  946. var etag string
  947. err := datastoreView(func(tx *datastoreTx) error {
  948. bucket := tx.bucket(datastoreUrlETagsBucket)
  949. etag = string(bucket.get([]byte(url)))
  950. return nil
  951. })
  952. if err != nil {
  953. return "", common.ContextError(err)
  954. }
  955. return etag, nil
  956. }
  957. // SetKeyValue stores a key/value pair.
  958. func SetKeyValue(key, value string) error {
  959. err := datastoreUpdate(func(tx *datastoreTx) error {
  960. bucket := tx.bucket(datastoreKeyValueBucket)
  961. err := bucket.put([]byte(key), []byte(value))
  962. if err != nil {
  963. return common.ContextError(err)
  964. }
  965. return nil
  966. })
  967. if err != nil {
  968. return common.ContextError(err)
  969. }
  970. return nil
  971. }
  972. // GetKeyValue retrieves the value for a given key. If not found,
  973. // it returns an empty string value.
  974. func GetKeyValue(key string) (string, error) {
  975. var value string
  976. err := datastoreView(func(tx *datastoreTx) error {
  977. bucket := tx.bucket(datastoreKeyValueBucket)
  978. value = string(bucket.get([]byte(key)))
  979. return nil
  980. })
  981. if err != nil {
  982. return "", common.ContextError(err)
  983. }
  984. return value, nil
  985. }
  986. // Persistent stat records in the persistentStatStateUnreported
  987. // state are available for take out.
  988. //
  989. // Records in the persistentStatStateReporting have been taken
  990. // out and are pending either deletion (for a successful request)
  991. // or change to StateUnreported (for a failed request).
  992. //
  993. // All persistent stat records are reverted to StateUnreported
  994. // when the datastore is initialized at start up.
  995. var persistentStatStateUnreported = []byte("0")
  996. var persistentStatStateReporting = []byte("1")
  997. var persistentStatTypes = []string{
  998. datastorePersistentStatTypeRemoteServerList,
  999. datastorePersistentStatTypeFailedTunnel,
  1000. }
  1001. // StorePersistentStat adds a new persistent stat record, which
  1002. // is set to StateUnreported and is an immediate candidate for
  1003. // reporting.
  1004. //
  1005. // The stat is a JSON byte array containing fields as
  1006. // required by the Psiphon server API. It's assumed that the
  1007. // JSON value contains enough unique information for the value to
  1008. // function as a key in the key/value datastore.
  1009. //
  1010. // Only up to PersistentStatsMaxStoreRecords are stored. Once this
  1011. // limit is reached, new records are discarded.
  1012. func StorePersistentStat(config *Config, statType string, stat []byte) error {
  1013. if !common.Contains(persistentStatTypes, statType) {
  1014. return common.ContextError(fmt.Errorf("invalid persistent stat type: %s", statType))
  1015. }
  1016. maxStoreRecords := config.GetClientParametersSnapshot().Int(
  1017. parameters.PersistentStatsMaxStoreRecords)
  1018. err := datastoreUpdate(func(tx *datastoreTx) error {
  1019. bucket := tx.bucket([]byte(statType))
  1020. count := 0
  1021. cursor := bucket.cursor()
  1022. for key, _ := cursor.first(); key != nil; key, _ = cursor.next() {
  1023. count++
  1024. }
  1025. cursor.close()
  1026. // TODO: assuming newer metrics are more useful, replace oldest record
  1027. // instead of discarding?
  1028. if count >= maxStoreRecords {
  1029. // Silently discard.
  1030. return nil
  1031. }
  1032. err := bucket.put(stat, persistentStatStateUnreported)
  1033. if err != nil {
  1034. return common.ContextError(err)
  1035. }
  1036. return nil
  1037. })
  1038. if err != nil {
  1039. return common.ContextError(err)
  1040. }
  1041. return nil
  1042. }
  1043. // CountUnreportedPersistentStats returns the number of persistent
  1044. // stat records in StateUnreported.
  1045. func CountUnreportedPersistentStats() int {
  1046. unreported := 0
  1047. err := datastoreView(func(tx *datastoreTx) error {
  1048. for _, statType := range persistentStatTypes {
  1049. bucket := tx.bucket([]byte(statType))
  1050. cursor := bucket.cursor()
  1051. for key, value := cursor.first(); key != nil; key, value = cursor.next() {
  1052. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  1053. unreported++
  1054. }
  1055. }
  1056. cursor.close()
  1057. }
  1058. return nil
  1059. })
  1060. if err != nil {
  1061. NoticeAlert("CountUnreportedPersistentStats failed: %s", err)
  1062. return 0
  1063. }
  1064. return unreported
  1065. }
  1066. // TakeOutUnreportedPersistentStats returns persistent stats records that are
  1067. // in StateUnreported. At least one record, if present, will be returned and
  1068. // then additional records up to PersistentStatsMaxSendBytes. The records are
  1069. // set to StateReporting. If the records are successfully reported, clear them
  1070. // with ClearReportedPersistentStats. If the records are not successfully
  1071. // reported, restore them with PutBackUnreportedPersistentStats.
  1072. func TakeOutUnreportedPersistentStats(config *Config) (map[string][][]byte, error) {
  1073. stats := make(map[string][][]byte)
  1074. maxSendBytes := config.GetClientParametersSnapshot().Int(
  1075. parameters.PersistentStatsMaxSendBytes)
  1076. err := datastoreUpdate(func(tx *datastoreTx) error {
  1077. sendBytes := 0
  1078. for _, statType := range persistentStatTypes {
  1079. bucket := tx.bucket([]byte(statType))
  1080. cursor := bucket.cursor()
  1081. for key, value := cursor.first(); key != nil; key, value = cursor.next() {
  1082. // Perform a test JSON unmarshaling. In case of data corruption or a bug,
  1083. // delete and skip the record.
  1084. var jsonData interface{}
  1085. err := json.Unmarshal(key, &jsonData)
  1086. if err != nil {
  1087. NoticeAlert(
  1088. "Invalid key in TakeOutUnreportedPersistentStats: %s: %s",
  1089. string(key), err)
  1090. bucket.delete(key)
  1091. continue
  1092. }
  1093. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  1094. // Must make a copy as slice is only valid within transaction.
  1095. data := make([]byte, len(key))
  1096. copy(data, key)
  1097. if stats[statType] == nil {
  1098. stats[statType] = make([][]byte, 0)
  1099. }
  1100. stats[statType] = append(stats[statType], data)
  1101. sendBytes += len(data)
  1102. if sendBytes >= maxSendBytes {
  1103. break
  1104. }
  1105. }
  1106. }
  1107. cursor.close()
  1108. for _, key := range stats[statType] {
  1109. err := bucket.put(key, persistentStatStateReporting)
  1110. if err != nil {
  1111. return common.ContextError(err)
  1112. }
  1113. }
  1114. }
  1115. return nil
  1116. })
  1117. if err != nil {
  1118. return nil, common.ContextError(err)
  1119. }
  1120. return stats, nil
  1121. }
  1122. // PutBackUnreportedPersistentStats restores a list of persistent
  1123. // stat records to StateUnreported.
  1124. func PutBackUnreportedPersistentStats(stats map[string][][]byte) error {
  1125. err := datastoreUpdate(func(tx *datastoreTx) error {
  1126. for _, statType := range persistentStatTypes {
  1127. bucket := tx.bucket([]byte(statType))
  1128. for _, key := range stats[statType] {
  1129. err := bucket.put(key, persistentStatStateUnreported)
  1130. if err != nil {
  1131. return common.ContextError(err)
  1132. }
  1133. }
  1134. }
  1135. return nil
  1136. })
  1137. if err != nil {
  1138. return common.ContextError(err)
  1139. }
  1140. return nil
  1141. }
  1142. // ClearReportedPersistentStats deletes a list of persistent
  1143. // stat records that were successfully reported.
  1144. func ClearReportedPersistentStats(stats map[string][][]byte) error {
  1145. err := datastoreUpdate(func(tx *datastoreTx) error {
  1146. for _, statType := range persistentStatTypes {
  1147. bucket := tx.bucket([]byte(statType))
  1148. for _, key := range stats[statType] {
  1149. err := bucket.delete(key)
  1150. if err != nil {
  1151. return err
  1152. }
  1153. }
  1154. }
  1155. return nil
  1156. })
  1157. if err != nil {
  1158. return common.ContextError(err)
  1159. }
  1160. return nil
  1161. }
  1162. // resetAllPersistentStatsToUnreported sets all persistent stat
  1163. // records to StateUnreported. This reset is called when the
  1164. // datastore is initialized at start up, as we do not know if
  1165. // persistent records in StateReporting were reported or not.
  1166. func resetAllPersistentStatsToUnreported() error {
  1167. err := datastoreUpdate(func(tx *datastoreTx) error {
  1168. for _, statType := range persistentStatTypes {
  1169. bucket := tx.bucket([]byte(statType))
  1170. resetKeys := make([][]byte, 0)
  1171. cursor := bucket.cursor()
  1172. for key := cursor.firstKey(); key != nil; key = cursor.nextKey() {
  1173. resetKeys = append(resetKeys, key)
  1174. }
  1175. cursor.close()
  1176. // TODO: data mutation is done outside cursor. Is this
  1177. // strictly necessary in this case? As is, this means
  1178. // all stats need to be loaded into memory at once.
  1179. // https://godoc.org/github.com/boltdb/bolt#Cursor
  1180. for _, key := range resetKeys {
  1181. err := bucket.put(key, persistentStatStateUnreported)
  1182. if err != nil {
  1183. return common.ContextError(err)
  1184. }
  1185. }
  1186. }
  1187. return nil
  1188. })
  1189. if err != nil {
  1190. return common.ContextError(err)
  1191. }
  1192. return nil
  1193. }
  1194. // CountSLOKs returns the total number of SLOK records.
  1195. func CountSLOKs() int {
  1196. count := 0
  1197. err := datastoreView(func(tx *datastoreTx) error {
  1198. bucket := tx.bucket(datastoreSLOKsBucket)
  1199. cursor := bucket.cursor()
  1200. for key := cursor.firstKey(); key != nil; key = cursor.nextKey() {
  1201. count++
  1202. }
  1203. cursor.close()
  1204. return nil
  1205. })
  1206. if err != nil {
  1207. NoticeAlert("CountSLOKs failed: %s", err)
  1208. return 0
  1209. }
  1210. return count
  1211. }
  1212. // DeleteSLOKs deletes all SLOK records.
  1213. func DeleteSLOKs() error {
  1214. err := datastoreUpdate(func(tx *datastoreTx) error {
  1215. return tx.clearBucket(datastoreSLOKsBucket)
  1216. })
  1217. if err != nil {
  1218. return common.ContextError(err)
  1219. }
  1220. return nil
  1221. }
  1222. // SetSLOK stores a SLOK key, referenced by its ID. The bool
  1223. // return value indicates whether the SLOK was already stored.
  1224. func SetSLOK(id, key []byte) (bool, error) {
  1225. var duplicate bool
  1226. err := datastoreUpdate(func(tx *datastoreTx) error {
  1227. bucket := tx.bucket(datastoreSLOKsBucket)
  1228. duplicate = bucket.get(id) != nil
  1229. err := bucket.put([]byte(id), []byte(key))
  1230. if err != nil {
  1231. return common.ContextError(err)
  1232. }
  1233. return nil
  1234. })
  1235. if err != nil {
  1236. return false, common.ContextError(err)
  1237. }
  1238. return duplicate, nil
  1239. }
  1240. // GetSLOK returns a SLOK key for the specified ID. The return
  1241. // value is nil if the SLOK is not found.
  1242. func GetSLOK(id []byte) ([]byte, error) {
  1243. var key []byte
  1244. err := datastoreView(func(tx *datastoreTx) error {
  1245. bucket := tx.bucket(datastoreSLOKsBucket)
  1246. key = bucket.get(id)
  1247. return nil
  1248. })
  1249. if err != nil {
  1250. return nil, common.ContextError(err)
  1251. }
  1252. return key, nil
  1253. }
  1254. func makeDialParametersKey(serverIPAddress, networkID []byte) []byte {
  1255. // TODO: structured key?
  1256. return append(append([]byte(nil), serverIPAddress...), networkID...)
  1257. }
  1258. // SetDialParameters stores dial parameters associated with the specified
  1259. // server/network ID.
  1260. func SetDialParameters(serverIPAddress, networkID string, dialParams *DialParameters) error {
  1261. key := makeDialParametersKey([]byte(serverIPAddress), []byte(networkID))
  1262. data, err := json.Marshal(dialParams)
  1263. if err != nil {
  1264. return common.ContextError(err)
  1265. }
  1266. return setBucketValue(datastoreDialParametersBucket, key, data)
  1267. }
  1268. // GetDialParameters fetches any dial parameters associated with the specified
  1269. // server/network ID. Returns nil, nil when no record is found.
  1270. func GetDialParameters(serverIPAddress, networkID string) (*DialParameters, error) {
  1271. key := makeDialParametersKey([]byte(serverIPAddress), []byte(networkID))
  1272. data, err := getBucketValue(datastoreDialParametersBucket, key)
  1273. if err != nil {
  1274. return nil, common.ContextError(err)
  1275. }
  1276. if data == nil {
  1277. return nil, nil
  1278. }
  1279. var dialParams *DialParameters
  1280. err = json.Unmarshal(data, &dialParams)
  1281. if err != nil {
  1282. return nil, common.ContextError(err)
  1283. }
  1284. return dialParams, nil
  1285. }
  1286. // DeleteDialParameters clears any dial parameters associated with the
  1287. // specified server/network ID.
  1288. func DeleteDialParameters(serverIPAddress, networkID string) error {
  1289. key := makeDialParametersKey([]byte(serverIPAddress), []byte(networkID))
  1290. return deleteBucketValue(datastoreDialParametersBucket, key)
  1291. }
  1292. // TacticsStorer implements tactics.Storer.
  1293. type TacticsStorer struct {
  1294. }
  1295. func (t *TacticsStorer) SetTacticsRecord(networkID string, record []byte) error {
  1296. return setBucketValue(datastoreTacticsBucket, []byte(networkID), record)
  1297. }
  1298. func (t *TacticsStorer) GetTacticsRecord(networkID string) ([]byte, error) {
  1299. return getBucketValue(datastoreTacticsBucket, []byte(networkID))
  1300. }
  1301. func (t *TacticsStorer) SetSpeedTestSamplesRecord(networkID string, record []byte) error {
  1302. return setBucketValue(datastoreSpeedTestSamplesBucket, []byte(networkID), record)
  1303. }
  1304. func (t *TacticsStorer) GetSpeedTestSamplesRecord(networkID string) ([]byte, error) {
  1305. return getBucketValue(datastoreSpeedTestSamplesBucket, []byte(networkID))
  1306. }
  1307. // GetTacticsStorer creates a TacticsStorer.
  1308. func GetTacticsStorer() *TacticsStorer {
  1309. return &TacticsStorer{}
  1310. }
  1311. // GetAffinityServerEntryAndDialParameters fetches the current affinity server
  1312. // entry value and any corresponding dial parameters for the specified network
  1313. // ID. An error is returned when no affinity server is available. The
  1314. // DialParameter output may be nil when a server entry is found but has no
  1315. // dial parameters.
  1316. func GetAffinityServerEntryAndDialParameters(
  1317. networkID string) (protocol.ServerEntryFields, *DialParameters, error) {
  1318. var serverEntryFields protocol.ServerEntryFields
  1319. var dialParams *DialParameters
  1320. err := datastoreView(func(tx *datastoreTx) error {
  1321. keyValues := tx.bucket(datastoreKeyValueBucket)
  1322. serverEntries := tx.bucket(datastoreServerEntriesBucket)
  1323. dialParameters := tx.bucket(datastoreDialParametersBucket)
  1324. affinityServerEntryID := keyValues.get(datastoreAffinityServerEntryIDKey)
  1325. if affinityServerEntryID == nil {
  1326. return common.ContextError(errors.New("no affinity server available"))
  1327. }
  1328. serverEntryRecord := serverEntries.get(affinityServerEntryID)
  1329. if serverEntryRecord == nil {
  1330. return common.ContextError(errors.New("affinity server entry not found"))
  1331. }
  1332. err := json.Unmarshal(
  1333. serverEntryRecord,
  1334. &serverEntryFields)
  1335. if err != nil {
  1336. return common.ContextError(err)
  1337. }
  1338. dialParamsKey := makeDialParametersKey(
  1339. []byte(serverEntryFields.GetIPAddress()),
  1340. []byte(networkID))
  1341. dialParamsRecord := dialParameters.get(dialParamsKey)
  1342. if dialParamsRecord != nil {
  1343. err := json.Unmarshal(dialParamsRecord, &dialParams)
  1344. if err != nil {
  1345. return common.ContextError(err)
  1346. }
  1347. }
  1348. return nil
  1349. })
  1350. if err != nil {
  1351. return nil, nil, common.ContextError(err)
  1352. }
  1353. return serverEntryFields, dialParams, nil
  1354. }
  1355. func setBucketValue(bucket, key, value []byte) error {
  1356. err := datastoreUpdate(func(tx *datastoreTx) error {
  1357. bucket := tx.bucket(bucket)
  1358. err := bucket.put(key, value)
  1359. if err != nil {
  1360. return common.ContextError(err)
  1361. }
  1362. return nil
  1363. })
  1364. if err != nil {
  1365. return common.ContextError(err)
  1366. }
  1367. return nil
  1368. }
  1369. func getBucketValue(bucket, key []byte) ([]byte, error) {
  1370. var value []byte
  1371. err := datastoreView(func(tx *datastoreTx) error {
  1372. bucket := tx.bucket(bucket)
  1373. value = bucket.get(key)
  1374. return nil
  1375. })
  1376. if err != nil {
  1377. return nil, common.ContextError(err)
  1378. }
  1379. return value, nil
  1380. }
  1381. func deleteBucketValue(bucket, key []byte) error {
  1382. err := datastoreUpdate(func(tx *datastoreTx) error {
  1383. bucket := tx.bucket(bucket)
  1384. return bucket.delete(key)
  1385. })
  1386. if err != nil {
  1387. return common.ContextError(err)
  1388. }
  1389. return nil
  1390. }