dataStore.go 48 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "encoding/json"
  23. "errors"
  24. "fmt"
  25. "sync"
  26. "time"
  27. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  30. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  31. )
  32. var (
  33. datastoreServerEntriesBucket = []byte("serverEntries")
  34. datastoreServerEntryTagsBucket = []byte("serverEntryTags")
  35. datastoreServerEntryTombstoneTagsBucket = []byte("serverEntryTombstoneTags")
  36. datastoreSplitTunnelRouteETagsBucket = []byte("splitTunnelRouteETags")
  37. datastoreSplitTunnelRouteDataBucket = []byte("splitTunnelRouteData")
  38. datastoreUrlETagsBucket = []byte("urlETags")
  39. datastoreKeyValueBucket = []byte("keyValues")
  40. datastoreRemoteServerListStatsBucket = []byte("remoteServerListStats")
  41. datastoreFailedTunnelStatsBucket = []byte("failedTunnelStats")
  42. datastoreSLOKsBucket = []byte("SLOKs")
  43. datastoreTacticsBucket = []byte("tactics")
  44. datastoreSpeedTestSamplesBucket = []byte("speedTestSamples")
  45. datastoreDialParametersBucket = []byte("dialParameters")
  46. datastoreLastConnectedKey = "lastConnected"
  47. datastoreLastServerEntryFilterKey = []byte("lastServerEntryFilter")
  48. datastoreAffinityServerEntryIDKey = []byte("affinityServerEntryID")
  49. datastorePersistentStatTypeRemoteServerList = string(datastoreRemoteServerListStatsBucket)
  50. datastorePersistentStatTypeFailedTunnel = string(datastoreFailedTunnelStatsBucket)
  51. datastoreServerEntryFetchGCThreshold = 20
  52. datastoreMutex sync.RWMutex
  53. activeDatastoreDB *datastoreDB
  54. )
  55. // OpenDataStore opens and initializes the singleton data store instance.
  56. func OpenDataStore(config *Config) error {
  57. datastoreMutex.Lock()
  58. existingDB := activeDatastoreDB
  59. if existingDB != nil {
  60. datastoreMutex.Unlock()
  61. return common.ContextError(errors.New("db already open"))
  62. }
  63. newDB, err := datastoreOpenDB(config.DataStoreDirectory)
  64. if err != nil {
  65. datastoreMutex.Unlock()
  66. return common.ContextError(err)
  67. }
  68. activeDatastoreDB = newDB
  69. datastoreMutex.Unlock()
  70. _ = resetAllPersistentStatsToUnreported()
  71. return nil
  72. }
  73. // CloseDataStore closes the singleton data store instance, if open.
  74. func CloseDataStore() {
  75. datastoreMutex.Lock()
  76. defer datastoreMutex.Unlock()
  77. if activeDatastoreDB == nil {
  78. return
  79. }
  80. err := activeDatastoreDB.close()
  81. if err != nil {
  82. NoticeAlert("failed to close database: %s", common.ContextError(err))
  83. }
  84. activeDatastoreDB = nil
  85. }
  86. func datastoreView(fn func(tx *datastoreTx) error) error {
  87. datastoreMutex.RLock()
  88. defer datastoreMutex.RUnlock()
  89. if activeDatastoreDB == nil {
  90. return common.ContextError(errors.New("database not open"))
  91. }
  92. err := activeDatastoreDB.view(fn)
  93. if err != nil {
  94. err = common.ContextError(err)
  95. }
  96. return err
  97. }
  98. func datastoreUpdate(fn func(tx *datastoreTx) error) error {
  99. datastoreMutex.RLock()
  100. defer datastoreMutex.RUnlock()
  101. if activeDatastoreDB == nil {
  102. return common.ContextError(errors.New("database not open"))
  103. }
  104. err := activeDatastoreDB.update(fn)
  105. if err != nil {
  106. err = common.ContextError(err)
  107. }
  108. return err
  109. }
  110. // StoreServerEntry adds the server entry to the data store.
  111. //
  112. // When a server entry already exists for a given server, it will be
  113. // replaced only if replaceIfExists is set or if the the ConfigurationVersion
  114. // field of the new entry is strictly higher than the existing entry.
  115. //
  116. // If the server entry data is malformed, an alert notice is issued and
  117. // the entry is skipped; no error is returned.
  118. func StoreServerEntry(serverEntryFields protocol.ServerEntryFields, replaceIfExists bool) error {
  119. // TODO: call serverEntryFields.VerifySignature. At this time, we do not do
  120. // this as not all server entries have an individual signature field. All
  121. // StoreServerEntry callers either call VerifySignature or obtain server
  122. // entries from a trusted source (embedded in a signed client, or in a signed
  123. // authenticated package).
  124. // Server entries should already be validated before this point,
  125. // so instead of skipping we fail with an error.
  126. err := protocol.ValidateServerEntryFields(serverEntryFields)
  127. if err != nil {
  128. return common.ContextError(
  129. fmt.Errorf("invalid server entry: %s", err))
  130. }
  131. // BoltDB implementation note:
  132. // For simplicity, we don't maintain indexes on server entry
  133. // region or supported protocols. Instead, we perform full-bucket
  134. // scans with a filter. With a small enough database (thousands or
  135. // even tens of thousand of server entries) and common enough
  136. // values (e.g., many servers support all protocols), performance
  137. // is expected to be acceptable.
  138. err = datastoreUpdate(func(tx *datastoreTx) error {
  139. serverEntries := tx.bucket(datastoreServerEntriesBucket)
  140. serverEntryTags := tx.bucket(datastoreServerEntryTagsBucket)
  141. serverEntryTombstoneTags := tx.bucket(datastoreServerEntryTombstoneTagsBucket)
  142. serverEntryID := []byte(serverEntryFields.GetIPAddress())
  143. // Check not only that the entry exists, but is valid. This
  144. // will replace in the rare case where the data is corrupt.
  145. existingConfigurationVersion := -1
  146. existingData := serverEntries.get(serverEntryID)
  147. if existingData != nil {
  148. var existingServerEntry *protocol.ServerEntry
  149. err := json.Unmarshal(existingData, &existingServerEntry)
  150. if err == nil {
  151. existingConfigurationVersion = existingServerEntry.ConfigurationVersion
  152. }
  153. }
  154. exists := existingConfigurationVersion > -1
  155. newer := exists && existingConfigurationVersion < serverEntryFields.GetConfigurationVersion()
  156. update := !exists || replaceIfExists || newer
  157. if !update {
  158. return nil
  159. }
  160. serverEntryTag := serverEntryFields.GetTag()
  161. // Generate a derived tag when the server entry has no tag.
  162. if serverEntryTag == "" {
  163. serverEntryTag = protocol.GenerateServerEntryTag(
  164. serverEntryFields.GetIPAddress(),
  165. serverEntryFields.GetWebServerSecret())
  166. serverEntryFields.SetTag(serverEntryTag)
  167. }
  168. serverEntryTagBytes := []byte(serverEntryTag)
  169. // Ignore the server entry if it was previously pruned and a tombstone is
  170. // set.
  171. //
  172. // This logic is enforced only for embedded server entries, as all other
  173. // sources are considered to be definitive and non-stale. These exceptions
  174. // intentionally allow the scenario where a server is temporarily deleted
  175. // and then restored; in this case, it's desired for pruned server entries
  176. // to be restored.
  177. if serverEntryFields.GetLocalSource() == protocol.SERVER_ENTRY_SOURCE_EMBEDDED {
  178. if serverEntryTombstoneTags.get(serverEntryTagBytes) != nil {
  179. return nil
  180. }
  181. }
  182. data, err := json.Marshal(serverEntryFields)
  183. if err != nil {
  184. return common.ContextError(err)
  185. }
  186. err = serverEntries.put(serverEntryID, data)
  187. if err != nil {
  188. return common.ContextError(err)
  189. }
  190. err = serverEntryTags.put(serverEntryTagBytes, serverEntryID)
  191. if err != nil {
  192. return common.ContextError(err)
  193. }
  194. NoticeInfo("updated server %s", serverEntryFields.GetDiagnosticID())
  195. return nil
  196. })
  197. if err != nil {
  198. return common.ContextError(err)
  199. }
  200. return nil
  201. }
  202. // StoreServerEntries stores a list of server entries.
  203. // There is an independent transaction for each entry insert/update.
  204. func StoreServerEntries(
  205. config *Config,
  206. serverEntries []protocol.ServerEntryFields,
  207. replaceIfExists bool) error {
  208. for _, serverEntryFields := range serverEntries {
  209. err := StoreServerEntry(serverEntryFields, replaceIfExists)
  210. if err != nil {
  211. return common.ContextError(err)
  212. }
  213. }
  214. return nil
  215. }
  216. // StreamingStoreServerEntries stores a list of server entries.
  217. // There is an independent transaction for each entry insert/update.
  218. func StreamingStoreServerEntries(
  219. config *Config,
  220. serverEntries *protocol.StreamingServerEntryDecoder,
  221. replaceIfExists bool) error {
  222. // Note: both StreamingServerEntryDecoder.Next and StoreServerEntry
  223. // allocate temporary memory buffers for hex/JSON decoding/encoding,
  224. // so this isn't true constant-memory streaming (it depends on garbage
  225. // collection).
  226. n := 0
  227. for {
  228. serverEntry, err := serverEntries.Next()
  229. if err != nil {
  230. return common.ContextError(err)
  231. }
  232. if serverEntry == nil {
  233. // No more server entries
  234. break
  235. }
  236. err = StoreServerEntry(serverEntry, replaceIfExists)
  237. if err != nil {
  238. return common.ContextError(err)
  239. }
  240. n += 1
  241. if n == datastoreServerEntryFetchGCThreshold {
  242. DoGarbageCollection()
  243. n = 0
  244. }
  245. }
  246. return nil
  247. }
  248. // PromoteServerEntry sets the server affinity server entry ID to the
  249. // specified server entry IP address.
  250. func PromoteServerEntry(config *Config, ipAddress string) error {
  251. err := datastoreUpdate(func(tx *datastoreTx) error {
  252. serverEntryID := []byte(ipAddress)
  253. // Ensure the corresponding server entry exists before
  254. // setting server affinity.
  255. bucket := tx.bucket(datastoreServerEntriesBucket)
  256. data := bucket.get(serverEntryID)
  257. if data == nil {
  258. NoticeAlert(
  259. "PromoteServerEntry: ignoring unknown server entry: %s",
  260. ipAddress)
  261. return nil
  262. }
  263. bucket = tx.bucket(datastoreKeyValueBucket)
  264. err := bucket.put(datastoreAffinityServerEntryIDKey, serverEntryID)
  265. if err != nil {
  266. return common.ContextError(err)
  267. }
  268. // Store the current server entry filter (e.g, region, etc.) that
  269. // was in use when the entry was promoted. This is used to detect
  270. // when the top ranked server entry was promoted under a different
  271. // filter.
  272. currentFilter, err := makeServerEntryFilterValue(config)
  273. if err != nil {
  274. return common.ContextError(err)
  275. }
  276. return bucket.put(datastoreLastServerEntryFilterKey, currentFilter)
  277. })
  278. if err != nil {
  279. return common.ContextError(err)
  280. }
  281. return nil
  282. }
  283. func makeServerEntryFilterValue(config *Config) ([]byte, error) {
  284. // Currently, only a change of EgressRegion will "break" server affinity.
  285. // If the tunnel protocol filter changes, any existing affinity server
  286. // either passes the new filter, or it will be skipped anyway.
  287. return []byte(config.EgressRegion), nil
  288. }
  289. func hasServerEntryFilterChanged(config *Config) (bool, error) {
  290. currentFilter, err := makeServerEntryFilterValue(config)
  291. if err != nil {
  292. return false, common.ContextError(err)
  293. }
  294. changed := false
  295. err = datastoreView(func(tx *datastoreTx) error {
  296. bucket := tx.bucket(datastoreKeyValueBucket)
  297. previousFilter := bucket.get(datastoreLastServerEntryFilterKey)
  298. // When not found, previousFilter will be nil; ensures this
  299. // results in "changed", even if currentFilter is len(0).
  300. if previousFilter == nil ||
  301. bytes.Compare(previousFilter, currentFilter) != 0 {
  302. changed = true
  303. }
  304. return nil
  305. })
  306. if err != nil {
  307. return false, common.ContextError(err)
  308. }
  309. return changed, nil
  310. }
  311. // ServerEntryIterator is used to iterate over
  312. // stored server entries in rank order.
  313. type ServerEntryIterator struct {
  314. config *Config
  315. applyServerAffinity bool
  316. serverEntryIDs [][]byte
  317. serverEntryIndex int
  318. isTacticsServerEntryIterator bool
  319. isTargetServerEntryIterator bool
  320. hasNextTargetServerEntry bool
  321. targetServerEntry *protocol.ServerEntry
  322. }
  323. // NewServerEntryIterator creates a new ServerEntryIterator.
  324. //
  325. // The boolean return value indicates whether to treat the first server(s)
  326. // as affinity servers or not. When the server entry selection filter changes
  327. // such as from a specific region to any region, or when there was no previous
  328. // filter/iterator, the the first server(s) are arbitrary and should not be
  329. // given affinity treatment.
  330. //
  331. // NewServerEntryIterator and any returned ServerEntryIterator are not
  332. // designed for concurrent use as not all related datastore operations are
  333. // performed in a single transaction.
  334. //
  335. func NewServerEntryIterator(config *Config) (bool, *ServerEntryIterator, error) {
  336. // When configured, this target server entry is the only candidate
  337. if config.TargetServerEntry != "" {
  338. return newTargetServerEntryIterator(config, false)
  339. }
  340. filterChanged, err := hasServerEntryFilterChanged(config)
  341. if err != nil {
  342. return false, nil, common.ContextError(err)
  343. }
  344. applyServerAffinity := !filterChanged
  345. iterator := &ServerEntryIterator{
  346. config: config,
  347. applyServerAffinity: applyServerAffinity,
  348. }
  349. err = iterator.reset(true)
  350. if err != nil {
  351. return false, nil, common.ContextError(err)
  352. }
  353. return applyServerAffinity, iterator, nil
  354. }
  355. func NewTacticsServerEntryIterator(config *Config) (*ServerEntryIterator, error) {
  356. // When configured, this target server entry is the only candidate
  357. if config.TargetServerEntry != "" {
  358. _, iterator, err := newTargetServerEntryIterator(config, true)
  359. return iterator, err
  360. }
  361. iterator := &ServerEntryIterator{
  362. config: config,
  363. isTacticsServerEntryIterator: true,
  364. }
  365. err := iterator.reset(true)
  366. if err != nil {
  367. return nil, common.ContextError(err)
  368. }
  369. return iterator, nil
  370. }
  371. // newTargetServerEntryIterator is a helper for initializing the TargetServerEntry case
  372. func newTargetServerEntryIterator(config *Config, isTactics bool) (bool, *ServerEntryIterator, error) {
  373. serverEntry, err := protocol.DecodeServerEntry(
  374. config.TargetServerEntry, config.loadTimestamp, protocol.SERVER_ENTRY_SOURCE_TARGET)
  375. if err != nil {
  376. return false, nil, common.ContextError(err)
  377. }
  378. if serverEntry.Tag == "" {
  379. serverEntry.Tag = protocol.GenerateServerEntryTag(
  380. serverEntry.IpAddress, serverEntry.WebServerSecret)
  381. }
  382. if isTactics {
  383. if len(serverEntry.GetSupportedTacticsProtocols()) == 0 {
  384. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support tactics protocols"))
  385. }
  386. } else {
  387. if config.EgressRegion != "" && serverEntry.Region != config.EgressRegion {
  388. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support EgressRegion"))
  389. }
  390. limitTunnelProtocols := config.GetClientParameters().Get().TunnelProtocols(parameters.LimitTunnelProtocols)
  391. if len(limitTunnelProtocols) > 0 {
  392. // At the ServerEntryIterator level, only limitTunnelProtocols is applied;
  393. // excludeIntensive is handled higher up.
  394. if len(serverEntry.GetSupportedProtocols(
  395. config.UseUpstreamProxy(), limitTunnelProtocols, false)) == 0 {
  396. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support LimitTunnelProtocols"))
  397. }
  398. }
  399. }
  400. iterator := &ServerEntryIterator{
  401. isTacticsServerEntryIterator: isTactics,
  402. isTargetServerEntryIterator: true,
  403. hasNextTargetServerEntry: true,
  404. targetServerEntry: serverEntry,
  405. }
  406. NoticeInfo("using TargetServerEntry: %s", serverEntry.GetDiagnosticID())
  407. return false, iterator, nil
  408. }
  409. // Reset a NewServerEntryIterator to the start of its cycle. The next
  410. // call to Next will return the first server entry.
  411. func (iterator *ServerEntryIterator) Reset() error {
  412. return iterator.reset(false)
  413. }
  414. func (iterator *ServerEntryIterator) reset(isInitialRound bool) error {
  415. iterator.Close()
  416. if iterator.isTargetServerEntryIterator {
  417. iterator.hasNextTargetServerEntry = true
  418. return nil
  419. }
  420. // BoltDB implementation note:
  421. // We don't keep a transaction open for the duration of the iterator
  422. // because this would expose the following semantics to consumer code:
  423. //
  424. // Read-only transactions and read-write transactions ... generally
  425. // shouldn't be opened simultaneously in the same goroutine. This can
  426. // cause a deadlock as the read-write transaction needs to periodically
  427. // re-map the data file but it cannot do so while a read-only
  428. // transaction is open.
  429. // (https://github.com/boltdb/bolt)
  430. //
  431. // So the underlying serverEntriesBucket could change after the serverEntryIDs
  432. // list is built.
  433. var serverEntryIDs [][]byte
  434. err := datastoreView(func(tx *datastoreTx) error {
  435. bucket := tx.bucket(datastoreKeyValueBucket)
  436. serverEntryIDs = make([][]byte, 0)
  437. shuffleHead := 0
  438. var affinityServerEntryID []byte
  439. // In the first round only, move any server affinity candiate to the
  440. // very first position.
  441. if isInitialRound &&
  442. iterator.applyServerAffinity {
  443. affinityServerEntryID = bucket.get(datastoreAffinityServerEntryIDKey)
  444. if affinityServerEntryID != nil {
  445. serverEntryIDs = append(serverEntryIDs, append([]byte(nil), affinityServerEntryID...))
  446. shuffleHead = 1
  447. }
  448. }
  449. bucket = tx.bucket(datastoreServerEntriesBucket)
  450. cursor := bucket.cursor()
  451. for key := cursor.firstKey(); key != nil; key = cursor.nextKey() {
  452. if affinityServerEntryID != nil {
  453. if bytes.Equal(affinityServerEntryID, key) {
  454. continue
  455. }
  456. }
  457. serverEntryIDs = append(serverEntryIDs, append([]byte(nil), key...))
  458. }
  459. cursor.close()
  460. // Randomly shuffle the entire list of server IDs, excluding the
  461. // server affinity candidate.
  462. for i := len(serverEntryIDs) - 1; i > shuffleHead-1; i-- {
  463. j := prng.Intn(i+1-shuffleHead) + shuffleHead
  464. serverEntryIDs[i], serverEntryIDs[j] = serverEntryIDs[j], serverEntryIDs[i]
  465. }
  466. // In the first round, or with some probability, move _potential_ replay
  467. // candidates to the front of the list (excepting the server affinity slot,
  468. // if any). This move is post-shuffle so the order is still randomized. To
  469. // save the memory overhead of unmarshalling all dial parameters, this
  470. // operation just moves any server with a dial parameter record to the
  471. // front. Whether the dial parameter remains valid for replay -- TTL,
  472. // tactics/config unchanged, etc. --- is checked later.
  473. //
  474. // TODO: move only up to parameters.ReplayCandidateCount to front?
  475. p := iterator.config.GetClientParameters().Get()
  476. if (isInitialRound || p.WeightedCoinFlip(parameters.ReplayLaterRoundMoveToFrontProbability)) &&
  477. p.Int(parameters.ReplayCandidateCount) != 0 {
  478. networkID := []byte(iterator.config.GetNetworkID())
  479. dialParamsBucket := tx.bucket(datastoreDialParametersBucket)
  480. i := shuffleHead
  481. j := len(serverEntryIDs) - 1
  482. for {
  483. for ; i < j; i++ {
  484. key := makeDialParametersKey(serverEntryIDs[i], networkID)
  485. if dialParamsBucket.get(key) == nil {
  486. break
  487. }
  488. }
  489. for ; i < j; j-- {
  490. key := makeDialParametersKey(serverEntryIDs[j], networkID)
  491. if dialParamsBucket.get(key) != nil {
  492. break
  493. }
  494. }
  495. if i < j {
  496. serverEntryIDs[i], serverEntryIDs[j] = serverEntryIDs[j], serverEntryIDs[i]
  497. i++
  498. j--
  499. } else {
  500. break
  501. }
  502. }
  503. }
  504. return nil
  505. })
  506. if err != nil {
  507. return common.ContextError(err)
  508. }
  509. iterator.serverEntryIDs = serverEntryIDs
  510. iterator.serverEntryIndex = 0
  511. return nil
  512. }
  513. // Close cleans up resources associated with a ServerEntryIterator.
  514. func (iterator *ServerEntryIterator) Close() {
  515. iterator.serverEntryIDs = nil
  516. iterator.serverEntryIndex = 0
  517. }
  518. // Next returns the next server entry, by rank, for a ServerEntryIterator.
  519. // Returns nil with no error when there is no next item.
  520. func (iterator *ServerEntryIterator) Next() (*protocol.ServerEntry, error) {
  521. var serverEntry *protocol.ServerEntry
  522. var err error
  523. defer func() {
  524. if err != nil {
  525. iterator.Close()
  526. }
  527. }()
  528. if iterator.isTargetServerEntryIterator {
  529. if iterator.hasNextTargetServerEntry {
  530. iterator.hasNextTargetServerEntry = false
  531. return MakeCompatibleServerEntry(iterator.targetServerEntry), nil
  532. }
  533. return nil, nil
  534. }
  535. // There are no region/protocol indexes for the server entries bucket.
  536. // Loop until we have the next server entry that matches the iterator
  537. // filter requirements.
  538. for {
  539. if iterator.serverEntryIndex >= len(iterator.serverEntryIDs) {
  540. // There is no next item
  541. return nil, nil
  542. }
  543. serverEntryID := iterator.serverEntryIDs[iterator.serverEntryIndex]
  544. iterator.serverEntryIndex += 1
  545. serverEntry = nil
  546. err = datastoreView(func(tx *datastoreTx) error {
  547. serverEntries := tx.bucket(datastoreServerEntriesBucket)
  548. value := serverEntries.get(serverEntryID)
  549. if value == nil {
  550. return nil
  551. }
  552. // Must unmarshal here as slice is only valid within transaction.
  553. err = json.Unmarshal(value, &serverEntry)
  554. if err != nil {
  555. // In case of data corruption or a bug causing this condition,
  556. // do not stop iterating.
  557. serverEntry = nil
  558. NoticeAlert(
  559. "ServerEntryIterator.Next: json.Unmarshal failed: %s",
  560. common.ContextError(err))
  561. }
  562. return nil
  563. })
  564. if err != nil {
  565. return nil, common.ContextError(err)
  566. }
  567. if serverEntry == nil {
  568. // In case of data corruption or a bug causing this condition,
  569. // do not stop iterating.
  570. NoticeAlert("ServerEntryIterator.Next: unexpected missing server entry")
  571. continue
  572. }
  573. // Generate a derived server entry tag for server entries with no tag. Store
  574. // back the updated server entry so that (a) the tag doesn't need to be
  575. // regenerated; (b) the server entry can be looked up by tag (currently used
  576. // in the status request prune case).
  577. //
  578. // This is a distinct transaction so as to avoid the overhead of regular
  579. // write transactions in the iterator; once tags have been stored back, most
  580. // iterator transactions will remain read-only.
  581. if serverEntry.Tag == "" {
  582. serverEntry.Tag = protocol.GenerateServerEntryTag(
  583. serverEntry.IpAddress, serverEntry.WebServerSecret)
  584. err = datastoreUpdate(func(tx *datastoreTx) error {
  585. serverEntries := tx.bucket(datastoreServerEntriesBucket)
  586. serverEntryTags := tx.bucket(datastoreServerEntryTagsBucket)
  587. // We must reload and store back the server entry _fields_ to preserve any
  588. // currently unrecognized fields, for future compatibility.
  589. value := serverEntries.get(serverEntryID)
  590. if value == nil {
  591. return nil
  592. }
  593. var serverEntryFields protocol.ServerEntryFields
  594. err := json.Unmarshal(value, &serverEntryFields)
  595. if err != nil {
  596. return common.ContextError(err)
  597. }
  598. // As there is minor race condition between loading/checking serverEntry
  599. // and reloading/modifying serverEntryFields, this transaction references
  600. // only the freshly loaded fields when checking and setting the tag.
  601. serverEntryTag := serverEntryFields.GetTag()
  602. if serverEntryTag != "" {
  603. return nil
  604. }
  605. serverEntryTag = protocol.GenerateServerEntryTag(
  606. serverEntryFields.GetIPAddress(),
  607. serverEntryFields.GetWebServerSecret())
  608. serverEntryFields.SetTag(serverEntryTag)
  609. jsonServerEntryFields, err := json.Marshal(serverEntryFields)
  610. if err != nil {
  611. return common.ContextError(err)
  612. }
  613. serverEntries.put(serverEntryID, jsonServerEntryFields)
  614. if err != nil {
  615. return common.ContextError(err)
  616. }
  617. serverEntryTags.put([]byte(serverEntryTag), serverEntryID)
  618. if err != nil {
  619. return common.ContextError(err)
  620. }
  621. return nil
  622. })
  623. if err != nil {
  624. // Do not stop.
  625. NoticeAlert(
  626. "ServerEntryIterator.Next: update server entry failed: %s",
  627. common.ContextError(err))
  628. }
  629. }
  630. if iterator.serverEntryIndex%datastoreServerEntryFetchGCThreshold == 0 {
  631. DoGarbageCollection()
  632. }
  633. // Check filter requirements
  634. if iterator.isTacticsServerEntryIterator {
  635. // Tactics doesn't filter by egress region.
  636. if len(serverEntry.GetSupportedTacticsProtocols()) > 0 {
  637. break
  638. }
  639. } else {
  640. if iterator.config.EgressRegion == "" ||
  641. serverEntry.Region == iterator.config.EgressRegion {
  642. break
  643. }
  644. }
  645. }
  646. return MakeCompatibleServerEntry(serverEntry), nil
  647. }
  648. // MakeCompatibleServerEntry provides backwards compatibility with old server entries
  649. // which have a single meekFrontingDomain and not a meekFrontingAddresses array.
  650. // By copying this one meekFrontingDomain into meekFrontingAddresses, this client effectively
  651. // uses that single value as legacy clients do.
  652. func MakeCompatibleServerEntry(serverEntry *protocol.ServerEntry) *protocol.ServerEntry {
  653. if len(serverEntry.MeekFrontingAddresses) == 0 && serverEntry.MeekFrontingDomain != "" {
  654. serverEntry.MeekFrontingAddresses =
  655. append(serverEntry.MeekFrontingAddresses, serverEntry.MeekFrontingDomain)
  656. }
  657. return serverEntry
  658. }
  659. // PruneServerEntry deletes the server entry, along with associated data,
  660. // corresponding to the specified server entry tag. Pruning is subject to an
  661. // age check. In the case of an error, a notice is emitted.
  662. func PruneServerEntry(config *Config, serverEntryTag string) {
  663. err := pruneServerEntry(config, serverEntryTag)
  664. if err != nil {
  665. NoticeAlert(
  666. "PruneServerEntry failed: %s: %s",
  667. serverEntryTag, common.ContextError(err))
  668. return
  669. }
  670. NoticePruneServerEntry(serverEntryTag)
  671. }
  672. func pruneServerEntry(config *Config, serverEntryTag string) error {
  673. minimumAgeForPruning := config.GetClientParameters().Get().Duration(
  674. parameters.ServerEntryMinimumAgeForPruning)
  675. return datastoreUpdate(func(tx *datastoreTx) error {
  676. serverEntries := tx.bucket(datastoreServerEntriesBucket)
  677. serverEntryTags := tx.bucket(datastoreServerEntryTagsBucket)
  678. serverEntryTombstoneTags := tx.bucket(datastoreServerEntryTombstoneTagsBucket)
  679. keyValues := tx.bucket(datastoreKeyValueBucket)
  680. dialParameters := tx.bucket(datastoreDialParametersBucket)
  681. serverEntryTagBytes := []byte(serverEntryTag)
  682. serverEntryID := serverEntryTags.get(serverEntryTagBytes)
  683. if serverEntryID == nil {
  684. return common.ContextError(errors.New("server entry tag not found"))
  685. }
  686. serverEntryJson := serverEntries.get(serverEntryID)
  687. if serverEntryJson == nil {
  688. return common.ContextError(errors.New("server entry not found"))
  689. }
  690. var serverEntry *protocol.ServerEntry
  691. err := json.Unmarshal(serverEntryJson, &serverEntry)
  692. if err != nil {
  693. common.ContextError(err)
  694. }
  695. // Only prune sufficiently old server entries. This mitigates the case where
  696. // stale data in psiphond will incorrectly identify brand new servers as
  697. // being invalid/deleted.
  698. serverEntryLocalTimestamp, err := time.Parse(time.RFC3339, serverEntry.LocalTimestamp)
  699. if err != nil {
  700. common.ContextError(err)
  701. }
  702. if serverEntryLocalTimestamp.Add(minimumAgeForPruning).After(time.Now()) {
  703. return nil
  704. }
  705. // Handle the server IP recycle case where multiple serverEntryTags records
  706. // refer to the same server IP. Only delete the server entry record when its
  707. // tag matches the pruned tag. Otherwise, the server entry record is
  708. // associated with another tag. The pruned tag is still deleted.
  709. deleteServerEntry := (serverEntry.Tag == serverEntryTag)
  710. err = serverEntryTags.delete(serverEntryTagBytes)
  711. if err != nil {
  712. common.ContextError(err)
  713. }
  714. if deleteServerEntry {
  715. err = serverEntries.delete(serverEntryID)
  716. if err != nil {
  717. common.ContextError(err)
  718. }
  719. affinityServerEntryID := keyValues.get(datastoreAffinityServerEntryIDKey)
  720. if 0 == bytes.Compare(affinityServerEntryID, serverEntryID) {
  721. err = keyValues.delete(datastoreAffinityServerEntryIDKey)
  722. if err != nil {
  723. return common.ContextError(err)
  724. }
  725. }
  726. // TODO: expose boltdb Seek functionality to skip to first matching record.
  727. cursor := dialParameters.cursor()
  728. defer cursor.close()
  729. foundFirstMatch := false
  730. for key, _ := cursor.first(); key != nil; key, _ = cursor.next() {
  731. // Dial parameters key has serverID as a prefix; see makeDialParametersKey.
  732. if bytes.HasPrefix(key, serverEntryID) {
  733. foundFirstMatch = true
  734. err := dialParameters.delete(key)
  735. if err != nil {
  736. return common.ContextError(err)
  737. }
  738. } else if foundFirstMatch {
  739. break
  740. }
  741. }
  742. }
  743. // Tombstones prevent reimporting pruned server entries. Tombstone
  744. // identifiers are tags, which are derived from the web server secret in
  745. // addition to the server IP, so tombstones will not clobber recycled server
  746. // IPs as long as new web server secrets are generated in the recycle case.
  747. //
  748. // Tombstones are set only for embedded server entries, as all other sources
  749. // are expected to provide valid server entries; this also provides a fail-
  750. // safe mechanism to restore pruned server entries through all non-embedded
  751. // sources.
  752. if serverEntry.LocalSource == protocol.SERVER_ENTRY_SOURCE_EMBEDDED {
  753. err = serverEntryTombstoneTags.put(serverEntryTagBytes, []byte{1})
  754. if err != nil {
  755. return common.ContextError(err)
  756. }
  757. }
  758. return nil
  759. })
  760. }
  761. func scanServerEntries(scanner func(*protocol.ServerEntry)) error {
  762. err := datastoreView(func(tx *datastoreTx) error {
  763. bucket := tx.bucket(datastoreServerEntriesBucket)
  764. cursor := bucket.cursor()
  765. n := 0
  766. for key, value := cursor.first(); key != nil; key, value = cursor.next() {
  767. var serverEntry *protocol.ServerEntry
  768. err := json.Unmarshal(value, &serverEntry)
  769. if err != nil {
  770. // In case of data corruption or a bug causing this condition,
  771. // do not stop iterating.
  772. NoticeAlert("scanServerEntries: %s", common.ContextError(err))
  773. continue
  774. }
  775. scanner(serverEntry)
  776. n += 1
  777. if n == datastoreServerEntryFetchGCThreshold {
  778. DoGarbageCollection()
  779. n = 0
  780. }
  781. }
  782. cursor.close()
  783. return nil
  784. })
  785. if err != nil {
  786. return common.ContextError(err)
  787. }
  788. return nil
  789. }
  790. // CountServerEntries returns a count of stored server entries.
  791. func CountServerEntries() int {
  792. count := 0
  793. err := scanServerEntries(func(_ *protocol.ServerEntry) {
  794. count += 1
  795. })
  796. if err != nil {
  797. NoticeAlert("CountServerEntries failed: %s", err)
  798. return 0
  799. }
  800. return count
  801. }
  802. // CountServerEntriesWithConstraints returns a count of stored server entries for
  803. // the specified region and tunnel protocol limits.
  804. func CountServerEntriesWithConstraints(
  805. useUpstreamProxy bool,
  806. region string,
  807. constraints *protocolSelectionConstraints) (int, int) {
  808. // When CountServerEntriesWithConstraints is called only
  809. // limitTunnelProtocolState is fixed; excludeIntensive is transitory.
  810. excludeIntensive := false
  811. initialCount := 0
  812. count := 0
  813. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  814. if region == "" || serverEntry.Region == region {
  815. if constraints.isInitialCandidate(excludeIntensive, serverEntry) {
  816. initialCount += 1
  817. }
  818. if constraints.isCandidate(excludeIntensive, serverEntry) {
  819. count += 1
  820. }
  821. }
  822. })
  823. if err != nil {
  824. NoticeAlert("CountServerEntriesWithConstraints failed: %s", err)
  825. return 0, 0
  826. }
  827. return initialCount, count
  828. }
  829. // ReportAvailableRegions prints a notice with the available egress regions.
  830. // When limitState has initial protocols, the available regions are limited
  831. // to those available for the initial protocols; or if limitState has general
  832. // limited protocols, the available regions are similarly limited.
  833. func ReportAvailableRegions(config *Config, constraints *protocolSelectionConstraints) {
  834. // When ReportAvailableRegions is called only limitTunnelProtocolState is
  835. // fixed; excludeIntensive is transitory.
  836. excludeIntensive := false
  837. regions := make(map[string]bool)
  838. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  839. isCandidate := false
  840. if constraints.hasInitialProtocols() {
  841. isCandidate = constraints.isInitialCandidate(excludeIntensive, serverEntry)
  842. } else {
  843. isCandidate = constraints.isCandidate(excludeIntensive, serverEntry)
  844. }
  845. if isCandidate {
  846. regions[serverEntry.Region] = true
  847. }
  848. })
  849. if err != nil {
  850. NoticeAlert("ReportAvailableRegions failed: %s", err)
  851. return
  852. }
  853. regionList := make([]string, 0, len(regions))
  854. for region := range regions {
  855. // Some server entries do not have a region, but it makes no sense to return
  856. // an empty string as an "available region".
  857. if region != "" {
  858. regionList = append(regionList, region)
  859. }
  860. }
  861. NoticeAvailableEgressRegions(regionList)
  862. }
  863. // SetSplitTunnelRoutes updates the cached routes data for
  864. // the given region. The associated etag is also stored and
  865. // used to make efficient web requests for updates to the data.
  866. func SetSplitTunnelRoutes(region, etag string, data []byte) error {
  867. err := datastoreUpdate(func(tx *datastoreTx) error {
  868. bucket := tx.bucket(datastoreSplitTunnelRouteETagsBucket)
  869. err := bucket.put([]byte(region), []byte(etag))
  870. if err != nil {
  871. return common.ContextError(err)
  872. }
  873. bucket = tx.bucket(datastoreSplitTunnelRouteDataBucket)
  874. err = bucket.put([]byte(region), data)
  875. if err != nil {
  876. return common.ContextError(err)
  877. }
  878. return nil
  879. })
  880. if err != nil {
  881. return common.ContextError(err)
  882. }
  883. return nil
  884. }
  885. // GetSplitTunnelRoutesETag retrieves the etag for cached routes
  886. // data for the specified region. If not found, it returns an empty string value.
  887. func GetSplitTunnelRoutesETag(region string) (string, error) {
  888. var etag string
  889. err := datastoreView(func(tx *datastoreTx) error {
  890. bucket := tx.bucket(datastoreSplitTunnelRouteETagsBucket)
  891. etag = string(bucket.get([]byte(region)))
  892. return nil
  893. })
  894. if err != nil {
  895. return "", common.ContextError(err)
  896. }
  897. return etag, nil
  898. }
  899. // GetSplitTunnelRoutesData retrieves the cached routes data
  900. // for the specified region. If not found, it returns a nil value.
  901. func GetSplitTunnelRoutesData(region string) ([]byte, error) {
  902. var data []byte
  903. err := datastoreView(func(tx *datastoreTx) error {
  904. bucket := tx.bucket(datastoreSplitTunnelRouteDataBucket)
  905. value := bucket.get([]byte(region))
  906. if value != nil {
  907. // Must make a copy as slice is only valid within transaction.
  908. data = make([]byte, len(value))
  909. copy(data, value)
  910. }
  911. return nil
  912. })
  913. if err != nil {
  914. return nil, common.ContextError(err)
  915. }
  916. return data, nil
  917. }
  918. // SetUrlETag stores an ETag for the specfied URL.
  919. // Note: input URL is treated as a string, and is not
  920. // encoded or decoded or otherwise canonicalized.
  921. func SetUrlETag(url, etag string) error {
  922. err := datastoreUpdate(func(tx *datastoreTx) error {
  923. bucket := tx.bucket(datastoreUrlETagsBucket)
  924. err := bucket.put([]byte(url), []byte(etag))
  925. if err != nil {
  926. return common.ContextError(err)
  927. }
  928. return nil
  929. })
  930. if err != nil {
  931. return common.ContextError(err)
  932. }
  933. return nil
  934. }
  935. // GetUrlETag retrieves a previously stored an ETag for the
  936. // specfied URL. If not found, it returns an empty string value.
  937. func GetUrlETag(url string) (string, error) {
  938. var etag string
  939. err := datastoreView(func(tx *datastoreTx) error {
  940. bucket := tx.bucket(datastoreUrlETagsBucket)
  941. etag = string(bucket.get([]byte(url)))
  942. return nil
  943. })
  944. if err != nil {
  945. return "", common.ContextError(err)
  946. }
  947. return etag, nil
  948. }
  949. // SetKeyValue stores a key/value pair.
  950. func SetKeyValue(key, value string) error {
  951. err := datastoreUpdate(func(tx *datastoreTx) error {
  952. bucket := tx.bucket(datastoreKeyValueBucket)
  953. err := bucket.put([]byte(key), []byte(value))
  954. if err != nil {
  955. return common.ContextError(err)
  956. }
  957. return nil
  958. })
  959. if err != nil {
  960. return common.ContextError(err)
  961. }
  962. return nil
  963. }
  964. // GetKeyValue retrieves the value for a given key. If not found,
  965. // it returns an empty string value.
  966. func GetKeyValue(key string) (string, error) {
  967. var value string
  968. err := datastoreView(func(tx *datastoreTx) error {
  969. bucket := tx.bucket(datastoreKeyValueBucket)
  970. value = string(bucket.get([]byte(key)))
  971. return nil
  972. })
  973. if err != nil {
  974. return "", common.ContextError(err)
  975. }
  976. return value, nil
  977. }
  978. // Persistent stat records in the persistentStatStateUnreported
  979. // state are available for take out.
  980. //
  981. // Records in the persistentStatStateReporting have been taken
  982. // out and are pending either deletion (for a successful request)
  983. // or change to StateUnreported (for a failed request).
  984. //
  985. // All persistent stat records are reverted to StateUnreported
  986. // when the datastore is initialized at start up.
  987. var persistentStatStateUnreported = []byte("0")
  988. var persistentStatStateReporting = []byte("1")
  989. var persistentStatTypes = []string{
  990. datastorePersistentStatTypeRemoteServerList,
  991. datastorePersistentStatTypeFailedTunnel,
  992. }
  993. // StorePersistentStat adds a new persistent stat record, which
  994. // is set to StateUnreported and is an immediate candidate for
  995. // reporting.
  996. //
  997. // The stat is a JSON byte array containing fields as
  998. // required by the Psiphon server API. It's assumed that the
  999. // JSON value contains enough unique information for the value to
  1000. // function as a key in the key/value datastore.
  1001. //
  1002. // Only up to PersistentStatsMaxStoreRecords are stored. Once this
  1003. // limit is reached, new records are discarded.
  1004. func StorePersistentStat(config *Config, statType string, stat []byte) error {
  1005. if !common.Contains(persistentStatTypes, statType) {
  1006. return common.ContextError(fmt.Errorf("invalid persistent stat type: %s", statType))
  1007. }
  1008. maxStoreRecords := config.GetClientParameters().Get().Int(
  1009. parameters.PersistentStatsMaxStoreRecords)
  1010. err := datastoreUpdate(func(tx *datastoreTx) error {
  1011. bucket := tx.bucket([]byte(statType))
  1012. count := 0
  1013. cursor := bucket.cursor()
  1014. for key, _ := cursor.first(); key != nil; key, _ = cursor.next() {
  1015. count++
  1016. }
  1017. cursor.close()
  1018. // TODO: assuming newer metrics are more useful, replace oldest record
  1019. // instead of discarding?
  1020. if count >= maxStoreRecords {
  1021. // Silently discard.
  1022. return nil
  1023. }
  1024. err := bucket.put(stat, persistentStatStateUnreported)
  1025. if err != nil {
  1026. return common.ContextError(err)
  1027. }
  1028. return nil
  1029. })
  1030. if err != nil {
  1031. return common.ContextError(err)
  1032. }
  1033. return nil
  1034. }
  1035. // CountUnreportedPersistentStats returns the number of persistent
  1036. // stat records in StateUnreported.
  1037. func CountUnreportedPersistentStats() int {
  1038. unreported := 0
  1039. err := datastoreView(func(tx *datastoreTx) error {
  1040. for _, statType := range persistentStatTypes {
  1041. bucket := tx.bucket([]byte(statType))
  1042. cursor := bucket.cursor()
  1043. for key, value := cursor.first(); key != nil; key, value = cursor.next() {
  1044. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  1045. unreported++
  1046. }
  1047. }
  1048. cursor.close()
  1049. }
  1050. return nil
  1051. })
  1052. if err != nil {
  1053. NoticeAlert("CountUnreportedPersistentStats failed: %s", err)
  1054. return 0
  1055. }
  1056. return unreported
  1057. }
  1058. // TakeOutUnreportedPersistentStats returns persistent stats records that are
  1059. // in StateUnreported. At least one record, if present, will be returned and
  1060. // then additional records up to PersistentStatsMaxSendBytes. The records are
  1061. // set to StateReporting. If the records are successfully reported, clear them
  1062. // with ClearReportedPersistentStats. If the records are not successfully
  1063. // reported, restore them with PutBackUnreportedPersistentStats.
  1064. func TakeOutUnreportedPersistentStats(config *Config) (map[string][][]byte, error) {
  1065. stats := make(map[string][][]byte)
  1066. maxSendBytes := config.GetClientParameters().Get().Int(
  1067. parameters.PersistentStatsMaxSendBytes)
  1068. err := datastoreUpdate(func(tx *datastoreTx) error {
  1069. sendBytes := 0
  1070. for _, statType := range persistentStatTypes {
  1071. bucket := tx.bucket([]byte(statType))
  1072. cursor := bucket.cursor()
  1073. for key, value := cursor.first(); key != nil; key, value = cursor.next() {
  1074. // Perform a test JSON unmarshaling. In case of data corruption or a bug,
  1075. // delete and skip the record.
  1076. var jsonData interface{}
  1077. err := json.Unmarshal(key, &jsonData)
  1078. if err != nil {
  1079. NoticeAlert(
  1080. "Invalid key in TakeOutUnreportedPersistentStats: %s: %s",
  1081. string(key), err)
  1082. bucket.delete(key)
  1083. continue
  1084. }
  1085. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  1086. // Must make a copy as slice is only valid within transaction.
  1087. data := make([]byte, len(key))
  1088. copy(data, key)
  1089. if stats[statType] == nil {
  1090. stats[statType] = make([][]byte, 0)
  1091. }
  1092. stats[statType] = append(stats[statType], data)
  1093. sendBytes += len(data)
  1094. if sendBytes >= maxSendBytes {
  1095. break
  1096. }
  1097. }
  1098. }
  1099. cursor.close()
  1100. for _, key := range stats[statType] {
  1101. err := bucket.put(key, persistentStatStateReporting)
  1102. if err != nil {
  1103. return common.ContextError(err)
  1104. }
  1105. }
  1106. }
  1107. return nil
  1108. })
  1109. if err != nil {
  1110. return nil, common.ContextError(err)
  1111. }
  1112. return stats, nil
  1113. }
  1114. // PutBackUnreportedPersistentStats restores a list of persistent
  1115. // stat records to StateUnreported.
  1116. func PutBackUnreportedPersistentStats(stats map[string][][]byte) error {
  1117. err := datastoreUpdate(func(tx *datastoreTx) error {
  1118. for _, statType := range persistentStatTypes {
  1119. bucket := tx.bucket([]byte(statType))
  1120. for _, key := range stats[statType] {
  1121. err := bucket.put(key, persistentStatStateUnreported)
  1122. if err != nil {
  1123. return common.ContextError(err)
  1124. }
  1125. }
  1126. }
  1127. return nil
  1128. })
  1129. if err != nil {
  1130. return common.ContextError(err)
  1131. }
  1132. return nil
  1133. }
  1134. // ClearReportedPersistentStats deletes a list of persistent
  1135. // stat records that were successfully reported.
  1136. func ClearReportedPersistentStats(stats map[string][][]byte) error {
  1137. err := datastoreUpdate(func(tx *datastoreTx) error {
  1138. for _, statType := range persistentStatTypes {
  1139. bucket := tx.bucket([]byte(statType))
  1140. for _, key := range stats[statType] {
  1141. err := bucket.delete(key)
  1142. if err != nil {
  1143. return err
  1144. }
  1145. }
  1146. }
  1147. return nil
  1148. })
  1149. if err != nil {
  1150. return common.ContextError(err)
  1151. }
  1152. return nil
  1153. }
  1154. // resetAllPersistentStatsToUnreported sets all persistent stat
  1155. // records to StateUnreported. This reset is called when the
  1156. // datastore is initialized at start up, as we do not know if
  1157. // persistent records in StateReporting were reported or not.
  1158. func resetAllPersistentStatsToUnreported() error {
  1159. err := datastoreUpdate(func(tx *datastoreTx) error {
  1160. for _, statType := range persistentStatTypes {
  1161. bucket := tx.bucket([]byte(statType))
  1162. resetKeys := make([][]byte, 0)
  1163. cursor := bucket.cursor()
  1164. for key := cursor.firstKey(); key != nil; key = cursor.nextKey() {
  1165. resetKeys = append(resetKeys, key)
  1166. }
  1167. cursor.close()
  1168. // TODO: data mutation is done outside cursor. Is this
  1169. // strictly necessary in this case? As is, this means
  1170. // all stats need to be loaded into memory at once.
  1171. // https://godoc.org/github.com/boltdb/bolt#Cursor
  1172. for _, key := range resetKeys {
  1173. err := bucket.put(key, persistentStatStateUnreported)
  1174. if err != nil {
  1175. return common.ContextError(err)
  1176. }
  1177. }
  1178. }
  1179. return nil
  1180. })
  1181. if err != nil {
  1182. return common.ContextError(err)
  1183. }
  1184. return nil
  1185. }
  1186. // CountSLOKs returns the total number of SLOK records.
  1187. func CountSLOKs() int {
  1188. count := 0
  1189. err := datastoreView(func(tx *datastoreTx) error {
  1190. bucket := tx.bucket(datastoreSLOKsBucket)
  1191. cursor := bucket.cursor()
  1192. for key := cursor.firstKey(); key != nil; key = cursor.nextKey() {
  1193. count++
  1194. }
  1195. cursor.close()
  1196. return nil
  1197. })
  1198. if err != nil {
  1199. NoticeAlert("CountSLOKs failed: %s", err)
  1200. return 0
  1201. }
  1202. return count
  1203. }
  1204. // DeleteSLOKs deletes all SLOK records.
  1205. func DeleteSLOKs() error {
  1206. err := datastoreUpdate(func(tx *datastoreTx) error {
  1207. return tx.clearBucket(datastoreSLOKsBucket)
  1208. })
  1209. if err != nil {
  1210. return common.ContextError(err)
  1211. }
  1212. return nil
  1213. }
  1214. // SetSLOK stores a SLOK key, referenced by its ID. The bool
  1215. // return value indicates whether the SLOK was already stored.
  1216. func SetSLOK(id, key []byte) (bool, error) {
  1217. var duplicate bool
  1218. err := datastoreUpdate(func(tx *datastoreTx) error {
  1219. bucket := tx.bucket(datastoreSLOKsBucket)
  1220. duplicate = bucket.get(id) != nil
  1221. err := bucket.put([]byte(id), []byte(key))
  1222. if err != nil {
  1223. return common.ContextError(err)
  1224. }
  1225. return nil
  1226. })
  1227. if err != nil {
  1228. return false, common.ContextError(err)
  1229. }
  1230. return duplicate, nil
  1231. }
  1232. // GetSLOK returns a SLOK key for the specified ID. The return
  1233. // value is nil if the SLOK is not found.
  1234. func GetSLOK(id []byte) ([]byte, error) {
  1235. var key []byte
  1236. err := datastoreView(func(tx *datastoreTx) error {
  1237. bucket := tx.bucket(datastoreSLOKsBucket)
  1238. key = bucket.get(id)
  1239. return nil
  1240. })
  1241. if err != nil {
  1242. return nil, common.ContextError(err)
  1243. }
  1244. return key, nil
  1245. }
  1246. func makeDialParametersKey(serverIPAddress, networkID []byte) []byte {
  1247. // TODO: structured key?
  1248. return append(append([]byte(nil), serverIPAddress...), networkID...)
  1249. }
  1250. // SetDialParameters stores dial parameters associated with the specified
  1251. // server/network ID.
  1252. func SetDialParameters(serverIPAddress, networkID string, dialParams *DialParameters) error {
  1253. key := makeDialParametersKey([]byte(serverIPAddress), []byte(networkID))
  1254. data, err := json.Marshal(dialParams)
  1255. if err != nil {
  1256. return common.ContextError(err)
  1257. }
  1258. return setBucketValue(datastoreDialParametersBucket, key, data)
  1259. }
  1260. // GetDialParameters fetches any dial parameters associated with the specified
  1261. // server/network ID. Returns nil, nil when no record is found.
  1262. func GetDialParameters(serverIPAddress, networkID string) (*DialParameters, error) {
  1263. key := makeDialParametersKey([]byte(serverIPAddress), []byte(networkID))
  1264. data, err := getBucketValue(datastoreDialParametersBucket, key)
  1265. if err != nil {
  1266. return nil, common.ContextError(err)
  1267. }
  1268. if data == nil {
  1269. return nil, nil
  1270. }
  1271. var dialParams *DialParameters
  1272. err = json.Unmarshal(data, &dialParams)
  1273. if err != nil {
  1274. return nil, common.ContextError(err)
  1275. }
  1276. return dialParams, nil
  1277. }
  1278. // DeleteDialParameters clears any dial parameters associated with the
  1279. // specified server/network ID.
  1280. func DeleteDialParameters(serverIPAddress, networkID string) error {
  1281. key := makeDialParametersKey([]byte(serverIPAddress), []byte(networkID))
  1282. return deleteBucketValue(datastoreDialParametersBucket, key)
  1283. }
  1284. // TacticsStorer implements tactics.Storer.
  1285. type TacticsStorer struct {
  1286. }
  1287. func (t *TacticsStorer) SetTacticsRecord(networkID string, record []byte) error {
  1288. return setBucketValue(datastoreTacticsBucket, []byte(networkID), record)
  1289. }
  1290. func (t *TacticsStorer) GetTacticsRecord(networkID string) ([]byte, error) {
  1291. return getBucketValue(datastoreTacticsBucket, []byte(networkID))
  1292. }
  1293. func (t *TacticsStorer) SetSpeedTestSamplesRecord(networkID string, record []byte) error {
  1294. return setBucketValue(datastoreSpeedTestSamplesBucket, []byte(networkID), record)
  1295. }
  1296. func (t *TacticsStorer) GetSpeedTestSamplesRecord(networkID string) ([]byte, error) {
  1297. return getBucketValue(datastoreSpeedTestSamplesBucket, []byte(networkID))
  1298. }
  1299. // GetTacticsStorer creates a TacticsStorer.
  1300. func GetTacticsStorer() *TacticsStorer {
  1301. return &TacticsStorer{}
  1302. }
  1303. // GetAffinityServerEntryAndDialParameters fetches the current affinity server
  1304. // entry value and any corresponding dial parameters for the specified network
  1305. // ID. An error is returned when no affinity server is available. The
  1306. // DialParameter output may be nil when a server entry is found but has no
  1307. // dial parameters.
  1308. func GetAffinityServerEntryAndDialParameters(
  1309. networkID string) (protocol.ServerEntryFields, *DialParameters, error) {
  1310. var serverEntryFields protocol.ServerEntryFields
  1311. var dialParams *DialParameters
  1312. err := datastoreView(func(tx *datastoreTx) error {
  1313. keyValues := tx.bucket(datastoreKeyValueBucket)
  1314. serverEntries := tx.bucket(datastoreServerEntriesBucket)
  1315. dialParameters := tx.bucket(datastoreDialParametersBucket)
  1316. affinityServerEntryID := keyValues.get(datastoreAffinityServerEntryIDKey)
  1317. if affinityServerEntryID == nil {
  1318. return common.ContextError(errors.New("no affinity server available"))
  1319. }
  1320. serverEntryRecord := serverEntries.get(affinityServerEntryID)
  1321. if serverEntryRecord == nil {
  1322. return common.ContextError(errors.New("affinity server entry not found"))
  1323. }
  1324. err := json.Unmarshal(
  1325. serverEntryRecord,
  1326. &serverEntryFields)
  1327. if err != nil {
  1328. return common.ContextError(err)
  1329. }
  1330. dialParamsKey := makeDialParametersKey(
  1331. []byte(serverEntryFields.GetIPAddress()),
  1332. []byte(networkID))
  1333. dialParamsRecord := dialParameters.get(dialParamsKey)
  1334. if dialParamsRecord != nil {
  1335. err := json.Unmarshal(dialParamsRecord, &dialParams)
  1336. if err != nil {
  1337. return common.ContextError(err)
  1338. }
  1339. }
  1340. return nil
  1341. })
  1342. if err != nil {
  1343. return nil, nil, common.ContextError(err)
  1344. }
  1345. return serverEntryFields, dialParams, nil
  1346. }
  1347. func setBucketValue(bucket, key, value []byte) error {
  1348. err := datastoreUpdate(func(tx *datastoreTx) error {
  1349. bucket := tx.bucket(bucket)
  1350. err := bucket.put(key, value)
  1351. if err != nil {
  1352. return common.ContextError(err)
  1353. }
  1354. return nil
  1355. })
  1356. if err != nil {
  1357. return common.ContextError(err)
  1358. }
  1359. return nil
  1360. }
  1361. func getBucketValue(bucket, key []byte) ([]byte, error) {
  1362. var value []byte
  1363. err := datastoreView(func(tx *datastoreTx) error {
  1364. bucket := tx.bucket(bucket)
  1365. value = bucket.get(key)
  1366. return nil
  1367. })
  1368. if err != nil {
  1369. return nil, common.ContextError(err)
  1370. }
  1371. return value, nil
  1372. }
  1373. func deleteBucketValue(bucket, key []byte) error {
  1374. err := datastoreUpdate(func(tx *datastoreTx) error {
  1375. bucket := tx.bucket(bucket)
  1376. return bucket.delete(key)
  1377. })
  1378. if err != nil {
  1379. return common.ContextError(err)
  1380. }
  1381. return nil
  1382. }