dataStore.go 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "encoding/json"
  23. "errors"
  24. "fmt"
  25. "math/rand"
  26. "os"
  27. "path/filepath"
  28. "sync"
  29. "time"
  30. "github.com/Psiphon-Labs/bolt"
  31. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  32. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  33. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  34. )
  35. const (
  36. serverEntriesBucket = "serverEntries"
  37. rankedServerEntriesBucket = "rankedServerEntries"
  38. rankedServerEntriesKey = "rankedServerEntries"
  39. splitTunnelRouteETagsBucket = "splitTunnelRouteETags"
  40. splitTunnelRouteDataBucket = "splitTunnelRouteData"
  41. urlETagsBucket = "urlETags"
  42. keyValueBucket = "keyValues"
  43. tunnelStatsBucket = "tunnelStats"
  44. remoteServerListStatsBucket = "remoteServerListStats"
  45. slokBucket = "SLOKs"
  46. tacticsBucket = "tactics"
  47. speedTestSamplesBucket = "speedTestSamples"
  48. rankedServerEntryCount = 100
  49. )
  50. const (
  51. DATA_STORE_FILENAME = "psiphon.boltdb"
  52. DATA_STORE_LAST_CONNECTED_KEY = "lastConnected"
  53. DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY = "lastServerEntryFilter"
  54. PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST = remoteServerListStatsBucket
  55. )
  56. var (
  57. datastoreInitalizeMutex sync.Mutex
  58. datastoreReferenceMutex sync.Mutex
  59. datastoreDB *bolt.DB
  60. )
  61. // OpenDataStore opens and initializes the singleton data store instance.
  62. func OpenDataStore(config *Config) error {
  63. datastoreInitalizeMutex.Lock()
  64. defer datastoreInitalizeMutex.Unlock()
  65. datastoreReferenceMutex.Lock()
  66. existingDB := datastoreDB
  67. datastoreReferenceMutex.Unlock()
  68. if existingDB != nil {
  69. return common.ContextError(errors.New("db already open"))
  70. }
  71. filename := filepath.Join(config.DataStoreDirectory, DATA_STORE_FILENAME)
  72. var newDB *bolt.DB
  73. var err error
  74. for retry := 0; retry < 3; retry++ {
  75. if retry > 0 {
  76. NoticeAlert("OpenDataStore retry: %d", retry)
  77. }
  78. newDB, err = bolt.Open(filename, 0600, &bolt.Options{Timeout: 1 * time.Second})
  79. // The datastore file may be corrupt, so attempt to delete and try again
  80. if err != nil {
  81. NoticeAlert("bolt.Open error: %s", err)
  82. os.Remove(filename)
  83. continue
  84. }
  85. // Run consistency checks on datastore and emit errors for diagnostics purposes
  86. // We assume this will complete quickly for typical size Psiphon datastores.
  87. err = newDB.View(func(tx *bolt.Tx) error {
  88. return tx.SynchronousCheck()
  89. })
  90. // The datastore file may be corrupt, so attempt to delete and try again
  91. if err != nil {
  92. NoticeAlert("bolt.SynchronousCheck error: %s", err)
  93. newDB.Close()
  94. os.Remove(filename)
  95. continue
  96. }
  97. break
  98. }
  99. if err != nil {
  100. return common.ContextError(fmt.Errorf("failed to open database: %s", err))
  101. }
  102. err = newDB.Update(func(tx *bolt.Tx) error {
  103. requiredBuckets := []string{
  104. serverEntriesBucket,
  105. rankedServerEntriesBucket,
  106. splitTunnelRouteETagsBucket,
  107. splitTunnelRouteDataBucket,
  108. urlETagsBucket,
  109. keyValueBucket,
  110. tunnelStatsBucket,
  111. remoteServerListStatsBucket,
  112. slokBucket,
  113. tacticsBucket,
  114. speedTestSamplesBucket,
  115. }
  116. for _, bucket := range requiredBuckets {
  117. _, err := tx.CreateBucketIfNotExists([]byte(bucket))
  118. if err != nil {
  119. return err
  120. }
  121. }
  122. return nil
  123. })
  124. if err != nil {
  125. return common.ContextError(fmt.Errorf("failed to create buckets: %s", err))
  126. }
  127. // Cleanup obsolete tunnel (session) stats bucket, if one still exists
  128. err = newDB.Update(func(tx *bolt.Tx) error {
  129. tunnelStatsBucket := []byte("tunnelStats")
  130. if tx.Bucket(tunnelStatsBucket) != nil {
  131. err := tx.DeleteBucket(tunnelStatsBucket)
  132. if err != nil {
  133. NoticeAlert("DeleteBucket %s error: %s", tunnelStatsBucket, err)
  134. // Continue, since this is not fatal
  135. }
  136. }
  137. return nil
  138. })
  139. if err != nil {
  140. return common.ContextError(fmt.Errorf("failed to create buckets: %s", err))
  141. }
  142. datastoreReferenceMutex.Lock()
  143. datastoreDB = newDB
  144. datastoreReferenceMutex.Unlock()
  145. _ = resetAllPersistentStatsToUnreported()
  146. return nil
  147. }
  148. // CloseDataStore closes the singleton data store instance, if open.
  149. func CloseDataStore() {
  150. datastoreInitalizeMutex.Lock()
  151. defer datastoreInitalizeMutex.Unlock()
  152. datastoreReferenceMutex.Lock()
  153. defer datastoreReferenceMutex.Unlock()
  154. if datastoreDB == nil {
  155. return
  156. }
  157. err := datastoreDB.Close()
  158. if err != nil {
  159. NoticeAlert("failed to close database: %s", err)
  160. }
  161. datastoreDB = nil
  162. }
  163. func dataStoreView(fn func(tx *bolt.Tx) error) error {
  164. datastoreReferenceMutex.Lock()
  165. db := datastoreDB
  166. datastoreReferenceMutex.Unlock()
  167. if db == nil {
  168. return common.ContextError(errors.New("database not open"))
  169. }
  170. return db.View(fn)
  171. }
  172. func dataStoreUpdate(fn func(tx *bolt.Tx) error) error {
  173. datastoreReferenceMutex.Lock()
  174. db := datastoreDB
  175. datastoreReferenceMutex.Unlock()
  176. if db == nil {
  177. return common.ContextError(errors.New("database not open"))
  178. }
  179. return db.Update(fn)
  180. }
  181. // StoreServerEntry adds the server entry to the data store.
  182. // A newly stored (or re-stored) server entry is assigned the next-to-top
  183. // rank for iteration order (the previous top ranked entry is promoted). The
  184. // purpose of inserting at next-to-top is to keep the last selected server
  185. // as the top ranked server.
  186. //
  187. // When a server entry already exists for a given server, it will be
  188. // replaced only if replaceIfExists is set or if the the ConfigurationVersion
  189. // field of the new entry is strictly higher than the existing entry.
  190. //
  191. // If the server entry data is malformed, an alert notice is issued and
  192. // the entry is skipped; no error is returned.
  193. func StoreServerEntry(serverEntryFields protocol.ServerEntryFields, replaceIfExists bool) error {
  194. // Server entries should already be validated before this point,
  195. // so instead of skipping we fail with an error.
  196. err := protocol.ValidateServerEntryFields(serverEntryFields)
  197. if err != nil {
  198. return common.ContextError(
  199. fmt.Errorf("invalid server entry: %s", err))
  200. }
  201. // BoltDB implementation note:
  202. // For simplicity, we don't maintain indexes on server entry
  203. // region or supported protocols. Instead, we perform full-bucket
  204. // scans with a filter. With a small enough database (thousands or
  205. // even tens of thousand of server entries) and common enough
  206. // values (e.g., many servers support all protocols), performance
  207. // is expected to be acceptable.
  208. err = dataStoreUpdate(func(tx *bolt.Tx) error {
  209. serverEntries := tx.Bucket([]byte(serverEntriesBucket))
  210. ipAddress := serverEntryFields.GetIPAddress()
  211. // Check not only that the entry exists, but is valid. This
  212. // will replace in the rare case where the data is corrupt.
  213. existingConfigurationVersion := -1
  214. existingData := serverEntries.Get([]byte(ipAddress))
  215. if existingData != nil {
  216. var existingServerEntry *protocol.ServerEntry
  217. err := json.Unmarshal(existingData, &existingServerEntry)
  218. if err == nil {
  219. existingConfigurationVersion = existingServerEntry.ConfigurationVersion
  220. }
  221. }
  222. exists := existingConfigurationVersion > -1
  223. newer := exists && existingConfigurationVersion < serverEntryFields.GetConfigurationVersion()
  224. update := !exists || replaceIfExists || newer
  225. if !update {
  226. // Disabling this notice, for now, as it generates too much noise
  227. // in diagnostics with clients that always submit embedded servers
  228. // to the core on each run.
  229. // NoticeInfo("ignored update for server %s", serverEntry.IpAddress)
  230. return nil
  231. }
  232. data, err := json.Marshal(serverEntryFields)
  233. if err != nil {
  234. return common.ContextError(err)
  235. }
  236. err = serverEntries.Put([]byte(ipAddress), data)
  237. if err != nil {
  238. return common.ContextError(err)
  239. }
  240. err = insertRankedServerEntry(tx, ipAddress, 1)
  241. if err != nil {
  242. return common.ContextError(err)
  243. }
  244. NoticeInfo("updated server %s", ipAddress)
  245. return nil
  246. })
  247. if err != nil {
  248. return common.ContextError(err)
  249. }
  250. return nil
  251. }
  252. // StoreServerEntries stores a list of server entries.
  253. // There is an independent transaction for each entry insert/update.
  254. func StoreServerEntries(
  255. config *Config,
  256. serverEntries []protocol.ServerEntryFields,
  257. replaceIfExists bool) error {
  258. for _, serverEntryFields := range serverEntries {
  259. err := StoreServerEntry(serverEntryFields, replaceIfExists)
  260. if err != nil {
  261. return common.ContextError(err)
  262. }
  263. }
  264. return nil
  265. }
  266. // StreamingStoreServerEntries stores a list of server entries.
  267. // There is an independent transaction for each entry insert/update.
  268. func StreamingStoreServerEntries(
  269. config *Config,
  270. serverEntries *protocol.StreamingServerEntryDecoder,
  271. replaceIfExists bool) error {
  272. // Note: both StreamingServerEntryDecoder.Next and StoreServerEntry
  273. // allocate temporary memory buffers for hex/JSON decoding/encoding,
  274. // so this isn't true constant-memory streaming (it depends on garbage
  275. // collection).
  276. for {
  277. serverEntry, err := serverEntries.Next()
  278. if err != nil {
  279. return common.ContextError(err)
  280. }
  281. if serverEntry == nil {
  282. // No more server entries
  283. break
  284. }
  285. err = StoreServerEntry(serverEntry, replaceIfExists)
  286. if err != nil {
  287. return common.ContextError(err)
  288. }
  289. }
  290. return nil
  291. }
  292. // PromoteServerEntry assigns the top rank (one more than current
  293. // max rank) to the specified server entry. Server candidates are
  294. // iterated in decending rank order, so this server entry will be
  295. // the first candidate in a subsequent tunnel establishment.
  296. func PromoteServerEntry(config *Config, ipAddress string) error {
  297. err := dataStoreUpdate(func(tx *bolt.Tx) error {
  298. // Ensure the corresponding entry exists before
  299. // inserting into rank.
  300. bucket := tx.Bucket([]byte(serverEntriesBucket))
  301. data := bucket.Get([]byte(ipAddress))
  302. if data == nil {
  303. NoticeAlert(
  304. "PromoteServerEntry: ignoring unknown server entry: %s",
  305. ipAddress)
  306. return nil
  307. }
  308. err := insertRankedServerEntry(tx, ipAddress, 0)
  309. if err != nil {
  310. return err
  311. }
  312. // Store the current server entry filter (e.g, region, etc.) that
  313. // was in use when the entry was promoted. This is used to detect
  314. // when the top ranked server entry was promoted under a different
  315. // filter.
  316. currentFilter, err := makeServerEntryFilterValue(config)
  317. if err != nil {
  318. return err
  319. }
  320. bucket = tx.Bucket([]byte(keyValueBucket))
  321. return bucket.Put([]byte(DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY), currentFilter)
  322. })
  323. if err != nil {
  324. return common.ContextError(err)
  325. }
  326. return nil
  327. }
  328. func makeServerEntryFilterValue(config *Config) ([]byte, error) {
  329. // Currently, only a change of EgressRegion will "break" server affinity.
  330. // If the tunnel protocol filter changes, any existing affinity server
  331. // either passes the new filter, or it will be skipped anyway.
  332. return []byte(config.EgressRegion), nil
  333. }
  334. func hasServerEntryFilterChanged(config *Config) (bool, error) {
  335. currentFilter, err := makeServerEntryFilterValue(config)
  336. if err != nil {
  337. return false, common.ContextError(err)
  338. }
  339. changed := false
  340. err = dataStoreView(func(tx *bolt.Tx) error {
  341. // previousFilter will be nil not found (not previously
  342. // set) which will never match any current filter.
  343. bucket := tx.Bucket([]byte(keyValueBucket))
  344. previousFilter := bucket.Get([]byte(DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY))
  345. if bytes.Compare(previousFilter, currentFilter) != 0 {
  346. changed = true
  347. }
  348. return nil
  349. })
  350. if err != nil {
  351. return false, common.ContextError(err)
  352. }
  353. return changed, nil
  354. }
  355. func getRankedServerEntries(tx *bolt.Tx) ([]string, error) {
  356. bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
  357. data := bucket.Get([]byte(rankedServerEntriesKey))
  358. if data == nil {
  359. return []string{}, nil
  360. }
  361. rankedServerEntries := make([]string, 0)
  362. err := json.Unmarshal(data, &rankedServerEntries)
  363. if err != nil {
  364. return nil, common.ContextError(err)
  365. }
  366. return rankedServerEntries, nil
  367. }
  368. func setRankedServerEntries(tx *bolt.Tx, rankedServerEntries []string) error {
  369. data, err := json.Marshal(rankedServerEntries)
  370. if err != nil {
  371. return common.ContextError(err)
  372. }
  373. bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
  374. err = bucket.Put([]byte(rankedServerEntriesKey), data)
  375. if err != nil {
  376. return common.ContextError(err)
  377. }
  378. return nil
  379. }
  380. func insertRankedServerEntry(tx *bolt.Tx, serverEntryId string, position int) error {
  381. rankedServerEntries, err := getRankedServerEntries(tx)
  382. if err != nil {
  383. return common.ContextError(err)
  384. }
  385. // BoltDB implementation note:
  386. // For simplicity, we store the ranked server ids in an array serialized to
  387. // a single key value. To ensure this value doesn't grow without bound,
  388. // it's capped at rankedServerEntryCount. For now, this cap should be large
  389. // enough to meet the shuffleHeadLength = config.TunnelPoolSize criteria, for
  390. // any reasonable configuration of config.TunnelPoolSize.
  391. // Using: https://github.com/golang/go/wiki/SliceTricks
  392. // When serverEntryId is already ranked, remove it first to avoid duplicates
  393. for i, rankedServerEntryId := range rankedServerEntries {
  394. if rankedServerEntryId == serverEntryId {
  395. rankedServerEntries = append(
  396. rankedServerEntries[:i], rankedServerEntries[i+1:]...)
  397. break
  398. }
  399. }
  400. // SliceTricks insert, with length cap enforced
  401. if len(rankedServerEntries) < rankedServerEntryCount {
  402. rankedServerEntries = append(rankedServerEntries, "")
  403. }
  404. if position >= len(rankedServerEntries) {
  405. position = len(rankedServerEntries) - 1
  406. }
  407. copy(rankedServerEntries[position+1:], rankedServerEntries[position:])
  408. rankedServerEntries[position] = serverEntryId
  409. err = setRankedServerEntries(tx, rankedServerEntries)
  410. if err != nil {
  411. return common.ContextError(err)
  412. }
  413. return nil
  414. }
  415. // ServerEntryIterator is used to iterate over
  416. // stored server entries in rank order.
  417. type ServerEntryIterator struct {
  418. config *Config
  419. shuffleHeadLength int
  420. serverEntryIds []string
  421. serverEntryIndex int
  422. isTacticsServerEntryIterator bool
  423. isTargetServerEntryIterator bool
  424. hasNextTargetServerEntry bool
  425. targetServerEntry *protocol.ServerEntry
  426. }
  427. // NewServerEntryIterator creates a new ServerEntryIterator.
  428. //
  429. // The boolean return value indicates whether to treat the first server(s)
  430. // as affinity servers or not. When the server entry selection filter changes
  431. // such as from a specific region to any region, or when there was no previous
  432. // filter/iterator, the the first server(s) are arbitrary and should not be
  433. // given affinity treatment.
  434. //
  435. // NewServerEntryIterator and any returned ServerEntryIterator are not
  436. // designed for concurrent use as not all related datastore operations are
  437. // performed in a single transaction.
  438. //
  439. func NewServerEntryIterator(config *Config) (bool, *ServerEntryIterator, error) {
  440. // When configured, this target server entry is the only candidate
  441. if config.TargetServerEntry != "" {
  442. return newTargetServerEntryIterator(config, false)
  443. }
  444. filterChanged, err := hasServerEntryFilterChanged(config)
  445. if err != nil {
  446. return false, nil, common.ContextError(err)
  447. }
  448. applyServerAffinity := !filterChanged
  449. iterator := &ServerEntryIterator{
  450. config: config,
  451. shuffleHeadLength: config.TunnelPoolSize,
  452. }
  453. err = iterator.Reset()
  454. if err != nil {
  455. return false, nil, common.ContextError(err)
  456. }
  457. return applyServerAffinity, iterator, nil
  458. }
  459. func NewTacticsServerEntryIterator(config *Config) (*ServerEntryIterator, error) {
  460. // When configured, this target server entry is the only candidate
  461. if config.TargetServerEntry != "" {
  462. _, iterator, err := newTargetServerEntryIterator(config, true)
  463. return iterator, err
  464. }
  465. iterator := &ServerEntryIterator{
  466. shuffleHeadLength: 0,
  467. isTacticsServerEntryIterator: true,
  468. }
  469. err := iterator.Reset()
  470. if err != nil {
  471. return nil, common.ContextError(err)
  472. }
  473. return iterator, nil
  474. }
  475. // newTargetServerEntryIterator is a helper for initializing the TargetServerEntry case
  476. func newTargetServerEntryIterator(config *Config, isTactics bool) (bool, *ServerEntryIterator, error) {
  477. serverEntry, err := protocol.DecodeServerEntry(
  478. config.TargetServerEntry, common.GetCurrentTimestamp(), protocol.SERVER_ENTRY_SOURCE_TARGET)
  479. if err != nil {
  480. return false, nil, common.ContextError(err)
  481. }
  482. if isTactics {
  483. if len(serverEntry.GetSupportedTacticsProtocols()) == 0 {
  484. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support tactics protocols"))
  485. }
  486. } else {
  487. if config.EgressRegion != "" && serverEntry.Region != config.EgressRegion {
  488. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support EgressRegion"))
  489. }
  490. limitTunnelProtocols := config.clientParameters.Get().TunnelProtocols(parameters.LimitTunnelProtocols)
  491. if len(limitTunnelProtocols) > 0 {
  492. // At the ServerEntryIterator level, only limitTunnelProtocols is applied;
  493. // excludeIntensive is handled higher up.
  494. if len(serverEntry.GetSupportedProtocols(
  495. config.UseUpstreamProxy(), limitTunnelProtocols, false)) == 0 {
  496. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support LimitTunnelProtocols"))
  497. }
  498. }
  499. }
  500. iterator := &ServerEntryIterator{
  501. isTacticsServerEntryIterator: isTactics,
  502. isTargetServerEntryIterator: true,
  503. hasNextTargetServerEntry: true,
  504. targetServerEntry: serverEntry,
  505. }
  506. NoticeInfo("using TargetServerEntry: %s", serverEntry.IpAddress)
  507. return false, iterator, nil
  508. }
  509. // Reset a NewServerEntryIterator to the start of its cycle. The next
  510. // call to Next will return the first server entry.
  511. func (iterator *ServerEntryIterator) Reset() error {
  512. iterator.Close()
  513. if iterator.isTargetServerEntryIterator {
  514. iterator.hasNextTargetServerEntry = true
  515. return nil
  516. }
  517. // This query implements the Psiphon server candidate selection
  518. // algorithm: the first TunnelPoolSize server candidates are in rank
  519. // (priority) order, to favor previously successful servers; then the
  520. // remaining long tail is shuffled to raise up less recent candidates.
  521. // BoltDB implementation note:
  522. // We don't keep a transaction open for the duration of the iterator
  523. // because this would expose the following semantics to consumer code:
  524. //
  525. // Read-only transactions and read-write transactions ... generally
  526. // shouldn't be opened simultaneously in the same goroutine. This can
  527. // cause a deadlock as the read-write transaction needs to periodically
  528. // re-map the data file but it cannot do so while a read-only
  529. // transaction is open.
  530. // (https://github.com/boltdb/bolt)
  531. //
  532. // So the underlying serverEntriesBucket could change after the serverEntryIds
  533. // list is built.
  534. var serverEntryIds []string
  535. err := dataStoreView(func(tx *bolt.Tx) error {
  536. var err error
  537. serverEntryIds, err = getRankedServerEntries(tx)
  538. if err != nil {
  539. return err
  540. }
  541. skipServerEntryIds := make(map[string]bool)
  542. for _, serverEntryId := range serverEntryIds {
  543. skipServerEntryIds[serverEntryId] = true
  544. }
  545. bucket := tx.Bucket([]byte(serverEntriesBucket))
  546. cursor := bucket.Cursor()
  547. for key, _ := cursor.Last(); key != nil; key, _ = cursor.Prev() {
  548. serverEntryId := string(key)
  549. if _, ok := skipServerEntryIds[serverEntryId]; ok {
  550. continue
  551. }
  552. serverEntryIds = append(serverEntryIds, serverEntryId)
  553. }
  554. return nil
  555. })
  556. if err != nil {
  557. return common.ContextError(err)
  558. }
  559. for i := len(serverEntryIds) - 1; i > iterator.shuffleHeadLength-1; i-- {
  560. j := rand.Intn(i+1-iterator.shuffleHeadLength) + iterator.shuffleHeadLength
  561. serverEntryIds[i], serverEntryIds[j] = serverEntryIds[j], serverEntryIds[i]
  562. }
  563. iterator.serverEntryIds = serverEntryIds
  564. iterator.serverEntryIndex = 0
  565. return nil
  566. }
  567. // Close cleans up resources associated with a ServerEntryIterator.
  568. func (iterator *ServerEntryIterator) Close() {
  569. iterator.serverEntryIds = nil
  570. iterator.serverEntryIndex = 0
  571. }
  572. // Next returns the next server entry, by rank, for a ServerEntryIterator.
  573. // Returns nil with no error when there is no next item.
  574. func (iterator *ServerEntryIterator) Next() (*protocol.ServerEntry, error) {
  575. var serverEntry *protocol.ServerEntry
  576. var err error
  577. defer func() {
  578. if err != nil {
  579. iterator.Close()
  580. }
  581. }()
  582. if iterator.isTargetServerEntryIterator {
  583. if iterator.hasNextTargetServerEntry {
  584. iterator.hasNextTargetServerEntry = false
  585. return MakeCompatibleServerEntry(iterator.targetServerEntry), nil
  586. }
  587. return nil, nil
  588. }
  589. // There are no region/protocol indexes for the server entries bucket.
  590. // Loop until we have the next server entry that matches the iterator
  591. // filter requirements.
  592. for {
  593. if iterator.serverEntryIndex >= len(iterator.serverEntryIds) {
  594. // There is no next item
  595. return nil, nil
  596. }
  597. serverEntryId := iterator.serverEntryIds[iterator.serverEntryIndex]
  598. iterator.serverEntryIndex += 1
  599. var data []byte
  600. err = dataStoreView(func(tx *bolt.Tx) error {
  601. bucket := tx.Bucket([]byte(serverEntriesBucket))
  602. value := bucket.Get([]byte(serverEntryId))
  603. if value != nil {
  604. // Must make a copy as slice is only valid within transaction.
  605. data = make([]byte, len(value))
  606. copy(data, value)
  607. }
  608. return nil
  609. })
  610. if err != nil {
  611. return nil, common.ContextError(err)
  612. }
  613. if data == nil {
  614. // In case of data corruption or a bug causing this condition,
  615. // do not stop iterating.
  616. NoticeAlert("ServerEntryIterator.Next: unexpected missing server entry: %s", serverEntryId)
  617. continue
  618. }
  619. err = json.Unmarshal(data, &serverEntry)
  620. if err != nil {
  621. // In case of data corruption or a bug causing this condition,
  622. // do not stop iterating.
  623. NoticeAlert("ServerEntryIterator.Next: %s", common.ContextError(err))
  624. continue
  625. }
  626. // Check filter requirements
  627. if iterator.isTacticsServerEntryIterator {
  628. // Tactics doesn't filter by egress region.
  629. if len(serverEntry.GetSupportedTacticsProtocols()) > 0 {
  630. break
  631. }
  632. } else {
  633. if iterator.config.EgressRegion == "" ||
  634. serverEntry.Region == iterator.config.EgressRegion {
  635. break
  636. }
  637. }
  638. }
  639. return MakeCompatibleServerEntry(serverEntry), nil
  640. }
  641. // MakeCompatibleServerEntry provides backwards compatibility with old server entries
  642. // which have a single meekFrontingDomain and not a meekFrontingAddresses array.
  643. // By copying this one meekFrontingDomain into meekFrontingAddresses, this client effectively
  644. // uses that single value as legacy clients do.
  645. func MakeCompatibleServerEntry(serverEntry *protocol.ServerEntry) *protocol.ServerEntry {
  646. if len(serverEntry.MeekFrontingAddresses) == 0 && serverEntry.MeekFrontingDomain != "" {
  647. serverEntry.MeekFrontingAddresses =
  648. append(serverEntry.MeekFrontingAddresses, serverEntry.MeekFrontingDomain)
  649. }
  650. return serverEntry
  651. }
  652. func scanServerEntries(scanner func(*protocol.ServerEntry)) error {
  653. err := dataStoreView(func(tx *bolt.Tx) error {
  654. bucket := tx.Bucket([]byte(serverEntriesBucket))
  655. cursor := bucket.Cursor()
  656. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  657. serverEntry := new(protocol.ServerEntry)
  658. err := json.Unmarshal(value, serverEntry)
  659. if err != nil {
  660. // In case of data corruption or a bug causing this condition,
  661. // do not stop iterating.
  662. NoticeAlert("scanServerEntries: %s", common.ContextError(err))
  663. continue
  664. }
  665. scanner(serverEntry)
  666. }
  667. return nil
  668. })
  669. if err != nil {
  670. return common.ContextError(err)
  671. }
  672. return nil
  673. }
  674. // CountServerEntries returns a count of stored server entries.
  675. func CountServerEntries() int {
  676. count := 0
  677. err := scanServerEntries(func(_ *protocol.ServerEntry) {
  678. count += 1
  679. })
  680. if err != nil {
  681. NoticeAlert("CountServerEntries failed: %s", err)
  682. return 0
  683. }
  684. return count
  685. }
  686. // CountServerEntriesWithLimits returns a count of stored server entries for
  687. // the specified region and tunnel protocol limits.
  688. func CountServerEntriesWithLimits(
  689. useUpstreamProxy bool, region string, limitState *limitTunnelProtocolsState) (int, int) {
  690. // When CountServerEntriesWithLimits is called only
  691. // limitTunnelProtocolState is fixed; excludeIntensive is transitory.
  692. excludeIntensive := false
  693. initialCount := 0
  694. count := 0
  695. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  696. if region == "" || serverEntry.Region == region {
  697. if limitState.isInitialCandidate(excludeIntensive, serverEntry) {
  698. initialCount += 1
  699. }
  700. if limitState.isCandidate(excludeIntensive, serverEntry) {
  701. count += 1
  702. }
  703. }
  704. })
  705. if err != nil {
  706. NoticeAlert("CountServerEntriesWithLimits failed: %s", err)
  707. return 0, 0
  708. }
  709. return initialCount, count
  710. }
  711. // ReportAvailableRegions prints a notice with the available egress regions.
  712. func ReportAvailableRegions(config *Config, limitState *limitTunnelProtocolsState) {
  713. // When ReportAvailableRegions is called only
  714. // limitTunnelProtocolState is fixed; excludeIntensive is transitory.
  715. excludeIntensive := false
  716. regions := make(map[string]bool)
  717. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  718. if limitState.isInitialCandidate(excludeIntensive, serverEntry) ||
  719. limitState.isCandidate(excludeIntensive, serverEntry) {
  720. regions[serverEntry.Region] = true
  721. }
  722. })
  723. if err != nil {
  724. NoticeAlert("ReportAvailableRegions failed: %s", err)
  725. return
  726. }
  727. regionList := make([]string, 0, len(regions))
  728. for region := range regions {
  729. // Some server entries do not have a region, but it makes no sense to return
  730. // an empty string as an "available region".
  731. if region != "" {
  732. regionList = append(regionList, region)
  733. }
  734. }
  735. NoticeAvailableEgressRegions(regionList)
  736. }
  737. // GetServerEntryIpAddresses returns an array containing
  738. // all stored server IP addresses.
  739. func GetServerEntryIpAddresses() ([]string, error) {
  740. ipAddresses := make([]string, 0)
  741. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  742. ipAddresses = append(ipAddresses, serverEntry.IpAddress)
  743. })
  744. if err != nil {
  745. return nil, common.ContextError(err)
  746. }
  747. return ipAddresses, nil
  748. }
  749. // SetSplitTunnelRoutes updates the cached routes data for
  750. // the given region. The associated etag is also stored and
  751. // used to make efficient web requests for updates to the data.
  752. func SetSplitTunnelRoutes(region, etag string, data []byte) error {
  753. err := dataStoreUpdate(func(tx *bolt.Tx) error {
  754. bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
  755. err := bucket.Put([]byte(region), []byte(etag))
  756. bucket = tx.Bucket([]byte(splitTunnelRouteDataBucket))
  757. err = bucket.Put([]byte(region), data)
  758. return err
  759. })
  760. if err != nil {
  761. return common.ContextError(err)
  762. }
  763. return nil
  764. }
  765. // GetSplitTunnelRoutesETag retrieves the etag for cached routes
  766. // data for the specified region. If not found, it returns an empty string value.
  767. func GetSplitTunnelRoutesETag(region string) (string, error) {
  768. var etag string
  769. err := dataStoreView(func(tx *bolt.Tx) error {
  770. bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
  771. etag = string(bucket.Get([]byte(region)))
  772. return nil
  773. })
  774. if err != nil {
  775. return "", common.ContextError(err)
  776. }
  777. return etag, nil
  778. }
  779. // GetSplitTunnelRoutesData retrieves the cached routes data
  780. // for the specified region. If not found, it returns a nil value.
  781. func GetSplitTunnelRoutesData(region string) ([]byte, error) {
  782. var data []byte
  783. err := dataStoreView(func(tx *bolt.Tx) error {
  784. bucket := tx.Bucket([]byte(splitTunnelRouteDataBucket))
  785. value := bucket.Get([]byte(region))
  786. if value != nil {
  787. // Must make a copy as slice is only valid within transaction.
  788. data = make([]byte, len(value))
  789. copy(data, value)
  790. }
  791. return nil
  792. })
  793. if err != nil {
  794. return nil, common.ContextError(err)
  795. }
  796. return data, nil
  797. }
  798. // SetUrlETag stores an ETag for the specfied URL.
  799. // Note: input URL is treated as a string, and is not
  800. // encoded or decoded or otherwise canonicalized.
  801. func SetUrlETag(url, etag string) error {
  802. err := dataStoreUpdate(func(tx *bolt.Tx) error {
  803. bucket := tx.Bucket([]byte(urlETagsBucket))
  804. err := bucket.Put([]byte(url), []byte(etag))
  805. return err
  806. })
  807. if err != nil {
  808. return common.ContextError(err)
  809. }
  810. return nil
  811. }
  812. // GetUrlETag retrieves a previously stored an ETag for the
  813. // specfied URL. If not found, it returns an empty string value.
  814. func GetUrlETag(url string) (string, error) {
  815. var etag string
  816. err := dataStoreView(func(tx *bolt.Tx) error {
  817. bucket := tx.Bucket([]byte(urlETagsBucket))
  818. etag = string(bucket.Get([]byte(url)))
  819. return nil
  820. })
  821. if err != nil {
  822. return "", common.ContextError(err)
  823. }
  824. return etag, nil
  825. }
  826. // SetKeyValue stores a key/value pair.
  827. func SetKeyValue(key, value string) error {
  828. err := dataStoreUpdate(func(tx *bolt.Tx) error {
  829. bucket := tx.Bucket([]byte(keyValueBucket))
  830. err := bucket.Put([]byte(key), []byte(value))
  831. return err
  832. })
  833. if err != nil {
  834. return common.ContextError(err)
  835. }
  836. return nil
  837. }
  838. // GetKeyValue retrieves the value for a given key. If not found,
  839. // it returns an empty string value.
  840. func GetKeyValue(key string) (string, error) {
  841. var value string
  842. err := dataStoreView(func(tx *bolt.Tx) error {
  843. bucket := tx.Bucket([]byte(keyValueBucket))
  844. value = string(bucket.Get([]byte(key)))
  845. return nil
  846. })
  847. if err != nil {
  848. return "", common.ContextError(err)
  849. }
  850. return value, nil
  851. }
  852. // Persistent stat records in the persistentStatStateUnreported
  853. // state are available for take out.
  854. //
  855. // Records in the persistentStatStateReporting have been taken
  856. // out and are pending either deletion (for a successful request)
  857. // or change to StateUnreported (for a failed request).
  858. //
  859. // All persistent stat records are reverted to StateUnreported
  860. // when the datastore is initialized at start up.
  861. var persistentStatStateUnreported = []byte("0")
  862. var persistentStatStateReporting = []byte("1")
  863. var persistentStatTypes = []string{
  864. PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST,
  865. }
  866. // StorePersistentStat adds a new persistent stat record, which
  867. // is set to StateUnreported and is an immediate candidate for
  868. // reporting.
  869. //
  870. // The stat is a JSON byte array containing fields as
  871. // required by the Psiphon server API. It's assumed that the
  872. // JSON value contains enough unique information for the value to
  873. // function as a key in the key/value datastore. This assumption
  874. // is currently satisfied by the fields sessionId + tunnelNumber
  875. // for tunnel stats, and URL + ETag for remote server list stats.
  876. func StorePersistentStat(statType string, stat []byte) error {
  877. if !common.Contains(persistentStatTypes, statType) {
  878. return common.ContextError(fmt.Errorf("invalid persistent stat type: %s", statType))
  879. }
  880. err := dataStoreUpdate(func(tx *bolt.Tx) error {
  881. bucket := tx.Bucket([]byte(statType))
  882. err := bucket.Put(stat, persistentStatStateUnreported)
  883. return err
  884. })
  885. if err != nil {
  886. return common.ContextError(err)
  887. }
  888. return nil
  889. }
  890. // CountUnreportedPersistentStats returns the number of persistent
  891. // stat records in StateUnreported.
  892. func CountUnreportedPersistentStats() int {
  893. unreported := 0
  894. err := dataStoreView(func(tx *bolt.Tx) error {
  895. for _, statType := range persistentStatTypes {
  896. bucket := tx.Bucket([]byte(statType))
  897. cursor := bucket.Cursor()
  898. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  899. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  900. unreported++
  901. break
  902. }
  903. }
  904. }
  905. return nil
  906. })
  907. if err != nil {
  908. NoticeAlert("CountUnreportedPersistentStats failed: %s", err)
  909. return 0
  910. }
  911. return unreported
  912. }
  913. // TakeOutUnreportedPersistentStats returns up to maxCount persistent
  914. // stats records that are in StateUnreported. The records are set to
  915. // StateReporting. If the records are successfully reported, clear them
  916. // with ClearReportedPersistentStats. If the records are not successfully
  917. // reported, restore them with PutBackUnreportedPersistentStats.
  918. func TakeOutUnreportedPersistentStats(maxCount int) (map[string][][]byte, error) {
  919. stats := make(map[string][][]byte)
  920. err := dataStoreUpdate(func(tx *bolt.Tx) error {
  921. count := 0
  922. for _, statType := range persistentStatTypes {
  923. bucket := tx.Bucket([]byte(statType))
  924. cursor := bucket.Cursor()
  925. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  926. if count >= maxCount {
  927. break
  928. }
  929. // Perform a test JSON unmarshaling. In case of data corruption or a bug,
  930. // skip the record.
  931. var jsonData interface{}
  932. err := json.Unmarshal(key, &jsonData)
  933. if err != nil {
  934. NoticeAlert(
  935. "Invalid key in TakeOutUnreportedPersistentStats: %s: %s",
  936. string(key), err)
  937. continue
  938. }
  939. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  940. // Must make a copy as slice is only valid within transaction.
  941. data := make([]byte, len(key))
  942. copy(data, key)
  943. if stats[statType] == nil {
  944. stats[statType] = make([][]byte, 0)
  945. }
  946. stats[statType] = append(stats[statType], data)
  947. count += 1
  948. }
  949. }
  950. for _, key := range stats[statType] {
  951. err := bucket.Put(key, persistentStatStateReporting)
  952. if err != nil {
  953. return err
  954. }
  955. }
  956. }
  957. return nil
  958. })
  959. if err != nil {
  960. return nil, common.ContextError(err)
  961. }
  962. return stats, nil
  963. }
  964. // PutBackUnreportedPersistentStats restores a list of persistent
  965. // stat records to StateUnreported.
  966. func PutBackUnreportedPersistentStats(stats map[string][][]byte) error {
  967. err := dataStoreUpdate(func(tx *bolt.Tx) error {
  968. for _, statType := range persistentStatTypes {
  969. bucket := tx.Bucket([]byte(statType))
  970. for _, key := range stats[statType] {
  971. err := bucket.Put(key, persistentStatStateUnreported)
  972. if err != nil {
  973. return err
  974. }
  975. }
  976. }
  977. return nil
  978. })
  979. if err != nil {
  980. return common.ContextError(err)
  981. }
  982. return nil
  983. }
  984. // ClearReportedPersistentStats deletes a list of persistent
  985. // stat records that were successfully reported.
  986. func ClearReportedPersistentStats(stats map[string][][]byte) error {
  987. err := dataStoreUpdate(func(tx *bolt.Tx) error {
  988. for _, statType := range persistentStatTypes {
  989. bucket := tx.Bucket([]byte(statType))
  990. for _, key := range stats[statType] {
  991. err := bucket.Delete(key)
  992. if err != nil {
  993. return err
  994. }
  995. }
  996. }
  997. return nil
  998. })
  999. if err != nil {
  1000. return common.ContextError(err)
  1001. }
  1002. return nil
  1003. }
  1004. // resetAllPersistentStatsToUnreported sets all persistent stat
  1005. // records to StateUnreported. This reset is called when the
  1006. // datastore is initialized at start up, as we do not know if
  1007. // persistent records in StateReporting were reported or not.
  1008. func resetAllPersistentStatsToUnreported() error {
  1009. err := dataStoreUpdate(func(tx *bolt.Tx) error {
  1010. for _, statType := range persistentStatTypes {
  1011. bucket := tx.Bucket([]byte(statType))
  1012. resetKeys := make([][]byte, 0)
  1013. cursor := bucket.Cursor()
  1014. for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
  1015. resetKeys = append(resetKeys, key)
  1016. }
  1017. // TODO: data mutation is done outside cursor. Is this
  1018. // strictly necessary in this case? As is, this means
  1019. // all stats need to be loaded into memory at once.
  1020. // https://godoc.org/github.com/boltdb/bolt#Cursor
  1021. for _, key := range resetKeys {
  1022. err := bucket.Put(key, persistentStatStateUnreported)
  1023. if err != nil {
  1024. return err
  1025. }
  1026. }
  1027. }
  1028. return nil
  1029. })
  1030. if err != nil {
  1031. return common.ContextError(err)
  1032. }
  1033. return nil
  1034. }
  1035. // CountSLOKs returns the total number of SLOK records.
  1036. func CountSLOKs() int {
  1037. count := 0
  1038. err := dataStoreView(func(tx *bolt.Tx) error {
  1039. bucket := tx.Bucket([]byte(slokBucket))
  1040. cursor := bucket.Cursor()
  1041. for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
  1042. count++
  1043. }
  1044. return nil
  1045. })
  1046. if err != nil {
  1047. NoticeAlert("CountSLOKs failed: %s", err)
  1048. return 0
  1049. }
  1050. return count
  1051. }
  1052. // DeleteSLOKs deletes all SLOK records.
  1053. func DeleteSLOKs() error {
  1054. err := dataStoreUpdate(func(tx *bolt.Tx) error {
  1055. bucket := tx.Bucket([]byte(slokBucket))
  1056. return bucket.ForEach(
  1057. func(id, _ []byte) error {
  1058. return bucket.Delete(id)
  1059. })
  1060. })
  1061. if err != nil {
  1062. return common.ContextError(err)
  1063. }
  1064. return nil
  1065. }
  1066. // SetSLOK stores a SLOK key, referenced by its ID. The bool
  1067. // return value indicates whether the SLOK was already stored.
  1068. func SetSLOK(id, key []byte) (bool, error) {
  1069. var duplicate bool
  1070. err := dataStoreUpdate(func(tx *bolt.Tx) error {
  1071. bucket := tx.Bucket([]byte(slokBucket))
  1072. duplicate = bucket.Get(id) != nil
  1073. err := bucket.Put([]byte(id), []byte(key))
  1074. return err
  1075. })
  1076. if err != nil {
  1077. return false, common.ContextError(err)
  1078. }
  1079. return duplicate, nil
  1080. }
  1081. // GetSLOK returns a SLOK key for the specified ID. The return
  1082. // value is nil if the SLOK is not found.
  1083. func GetSLOK(id []byte) ([]byte, error) {
  1084. var key []byte
  1085. err := dataStoreView(func(tx *bolt.Tx) error {
  1086. bucket := tx.Bucket([]byte(slokBucket))
  1087. key = bucket.Get(id)
  1088. return nil
  1089. })
  1090. if err != nil {
  1091. return nil, common.ContextError(err)
  1092. }
  1093. return key, nil
  1094. }
  1095. // TacticsStorer implements tactics.Storer.
  1096. type TacticsStorer struct {
  1097. }
  1098. func (t *TacticsStorer) SetTacticsRecord(networkID string, record []byte) error {
  1099. return setBucketValue([]byte(tacticsBucket), []byte(networkID), record)
  1100. }
  1101. func (t *TacticsStorer) GetTacticsRecord(networkID string) ([]byte, error) {
  1102. return getBucketValue([]byte(tacticsBucket), []byte(networkID))
  1103. }
  1104. func (t *TacticsStorer) SetSpeedTestSamplesRecord(networkID string, record []byte) error {
  1105. return setBucketValue([]byte(speedTestSamplesBucket), []byte(networkID), record)
  1106. }
  1107. func (t *TacticsStorer) GetSpeedTestSamplesRecord(networkID string) ([]byte, error) {
  1108. return getBucketValue([]byte(speedTestSamplesBucket), []byte(networkID))
  1109. }
  1110. // GetTacticsStorer creates a TacticsStorer.
  1111. func GetTacticsStorer() *TacticsStorer {
  1112. return &TacticsStorer{}
  1113. }
  1114. func setBucketValue(bucket, key, value []byte) error {
  1115. err := dataStoreUpdate(func(tx *bolt.Tx) error {
  1116. bucket := tx.Bucket(bucket)
  1117. err := bucket.Put(key, value)
  1118. return err
  1119. })
  1120. if err != nil {
  1121. return common.ContextError(err)
  1122. }
  1123. return nil
  1124. }
  1125. func getBucketValue(bucket, key []byte) ([]byte, error) {
  1126. var value []byte
  1127. err := dataStoreView(func(tx *bolt.Tx) error {
  1128. bucket := tx.Bucket(bucket)
  1129. value = bucket.Get(key)
  1130. return nil
  1131. })
  1132. if err != nil {
  1133. return nil, common.ContextError(err)
  1134. }
  1135. return value, nil
  1136. }