dataStore.go 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "encoding/json"
  23. "errors"
  24. "fmt"
  25. "math/rand"
  26. "os"
  27. "path/filepath"
  28. "sync"
  29. "time"
  30. "github.com/Psiphon-Labs/bolt"
  31. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  32. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  33. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  34. )
  35. // The BoltDB dataStore implementation is an alternative to the sqlite3-based
  36. // implementation in dataStore.go. Both implementations have the same interface.
  37. //
  38. // BoltDB is pure Go, and is intended to be used in cases where we have trouble
  39. // building sqlite3/CGO (e.g., currently go mobile due to
  40. // https://github.com/mattn/go-sqlite3/issues/201), and perhaps ultimately as
  41. // the primary dataStore implementation.
  42. //
  43. type dataStore struct {
  44. init sync.Once
  45. db *bolt.DB
  46. }
  47. const (
  48. serverEntriesBucket = "serverEntries"
  49. rankedServerEntriesBucket = "rankedServerEntries"
  50. rankedServerEntriesKey = "rankedServerEntries"
  51. splitTunnelRouteETagsBucket = "splitTunnelRouteETags"
  52. splitTunnelRouteDataBucket = "splitTunnelRouteData"
  53. urlETagsBucket = "urlETags"
  54. keyValueBucket = "keyValues"
  55. tunnelStatsBucket = "tunnelStats"
  56. remoteServerListStatsBucket = "remoteServerListStats"
  57. slokBucket = "SLOKs"
  58. tacticsBucket = "tactics"
  59. speedTestSamplesBucket = "speedTestSamples"
  60. rankedServerEntryCount = 100
  61. )
  62. const (
  63. DATA_STORE_FILENAME = "psiphon.boltdb"
  64. LEGACY_DATA_STORE_FILENAME = "psiphon.db"
  65. DATA_STORE_LAST_CONNECTED_KEY = "lastConnected"
  66. DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY = "lastServerEntryFilter"
  67. PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST = remoteServerListStatsBucket
  68. )
  69. var singleton dataStore
  70. // InitDataStore initializes the singleton instance of dataStore. This
  71. // function uses a sync.Once and is safe for use by concurrent goroutines.
  72. // The underlying sql.DB connection pool is also safe.
  73. //
  74. // Note: the sync.Once was more useful when initDataStore was private and
  75. // called on-demand by the public functions below. Now we require an explicit
  76. // InitDataStore() call with the filename passed in. The on-demand calls
  77. // have been replaced by checkInitDataStore() to assert that Init was called.
  78. func InitDataStore(config *Config) (err error) {
  79. singleton.init.Do(func() {
  80. // Need to gather the list of migratable server entries before
  81. // initializing the boltdb store (as prepareMigrationEntries
  82. // checks for the existence of the bolt db file)
  83. migratableServerEntries := prepareMigrationEntries(config)
  84. filename := filepath.Join(config.DataStoreDirectory, DATA_STORE_FILENAME)
  85. var db *bolt.DB
  86. for retry := 0; retry < 3; retry++ {
  87. if retry > 0 {
  88. NoticeAlert("InitDataStore retry: %d", retry)
  89. }
  90. db, err = bolt.Open(filename, 0600, &bolt.Options{Timeout: 1 * time.Second})
  91. // The datastore file may be corrupt, so attempt to delete and try again
  92. if err != nil {
  93. NoticeAlert("bolt.Open error: %s", err)
  94. os.Remove(filename)
  95. continue
  96. }
  97. // Run consistency checks on datastore and emit errors for diagnostics purposes
  98. // We assume this will complete quickly for typical size Psiphon datastores.
  99. err = db.View(func(tx *bolt.Tx) error {
  100. return tx.SynchronousCheck()
  101. })
  102. // The datastore file may be corrupt, so attempt to delete and try again
  103. if err != nil {
  104. NoticeAlert("bolt.SynchronousCheck error: %s", err)
  105. db.Close()
  106. os.Remove(filename)
  107. continue
  108. }
  109. break
  110. }
  111. if err != nil {
  112. // Note: intending to set the err return value for InitDataStore
  113. err = fmt.Errorf("initDataStore failed to open database: %s", err)
  114. return
  115. }
  116. err = db.Update(func(tx *bolt.Tx) error {
  117. requiredBuckets := []string{
  118. serverEntriesBucket,
  119. rankedServerEntriesBucket,
  120. splitTunnelRouteETagsBucket,
  121. splitTunnelRouteDataBucket,
  122. urlETagsBucket,
  123. keyValueBucket,
  124. tunnelStatsBucket,
  125. remoteServerListStatsBucket,
  126. slokBucket,
  127. tacticsBucket,
  128. speedTestSamplesBucket,
  129. }
  130. for _, bucket := range requiredBuckets {
  131. _, err := tx.CreateBucketIfNotExists([]byte(bucket))
  132. if err != nil {
  133. return err
  134. }
  135. }
  136. return nil
  137. })
  138. if err != nil {
  139. err = fmt.Errorf("initDataStore failed to create buckets: %s", err)
  140. return
  141. }
  142. // Cleanup obsolete tunnel (session) stats bucket, if one still exists
  143. err = db.Update(func(tx *bolt.Tx) error {
  144. tunnelStatsBucket := []byte("tunnelStats")
  145. if tx.Bucket(tunnelStatsBucket) != nil {
  146. err := tx.DeleteBucket(tunnelStatsBucket)
  147. if err != nil {
  148. NoticeAlert("DeleteBucket %s error: %s", tunnelStatsBucket, err)
  149. // Continue, since this is not fatal
  150. }
  151. }
  152. return nil
  153. })
  154. if err != nil {
  155. err = fmt.Errorf("initDataStore failed to create buckets: %s", err)
  156. return
  157. }
  158. singleton.db = db
  159. // The migrateServerEntries function requires the data store is
  160. // initialized prior to execution so that migrated entries can be stored
  161. if len(migratableServerEntries) > 0 {
  162. migrateEntries(
  163. config, migratableServerEntries, filepath.Join(config.DataStoreDirectory, LEGACY_DATA_STORE_FILENAME))
  164. }
  165. resetAllPersistentStatsToUnreported()
  166. })
  167. return err
  168. }
  169. func checkInitDataStore() {
  170. if singleton.db == nil {
  171. panic("checkInitDataStore: datastore not initialized")
  172. }
  173. }
  174. // StoreServerEntry adds the server entry to the data store.
  175. // A newly stored (or re-stored) server entry is assigned the next-to-top
  176. // rank for iteration order (the previous top ranked entry is promoted). The
  177. // purpose of inserting at next-to-top is to keep the last selected server
  178. // as the top ranked server.
  179. //
  180. // When a server entry already exists for a given server, it will be
  181. // replaced only if replaceIfExists is set or if the the ConfigurationVersion
  182. // field of the new entry is strictly higher than the existing entry.
  183. //
  184. // If the server entry data is malformed, an alert notice is issued and
  185. // the entry is skipped; no error is returned.
  186. func StoreServerEntry(serverEntry *protocol.ServerEntry, replaceIfExists bool) error {
  187. checkInitDataStore()
  188. // Server entries should already be validated before this point,
  189. // so instead of skipping we fail with an error.
  190. err := protocol.ValidateServerEntry(serverEntry)
  191. if err != nil {
  192. return common.ContextError(
  193. fmt.Errorf("invalid server entry: %s", err))
  194. }
  195. // BoltDB implementation note:
  196. // For simplicity, we don't maintain indexes on server entry
  197. // region or supported protocols. Instead, we perform full-bucket
  198. // scans with a filter. With a small enough database (thousands or
  199. // even tens of thousand of server entries) and common enough
  200. // values (e.g., many servers support all protocols), performance
  201. // is expected to be acceptable.
  202. err = singleton.db.Update(func(tx *bolt.Tx) error {
  203. serverEntries := tx.Bucket([]byte(serverEntriesBucket))
  204. // Check not only that the entry exists, but is valid. This
  205. // will replace in the rare case where the data is corrupt.
  206. existingConfigurationVersion := -1
  207. existingData := serverEntries.Get([]byte(serverEntry.IpAddress))
  208. if existingData != nil {
  209. var existingServerEntry *protocol.ServerEntry
  210. err := json.Unmarshal(existingData, &existingServerEntry)
  211. if err == nil {
  212. existingConfigurationVersion = existingServerEntry.ConfigurationVersion
  213. }
  214. }
  215. exists := existingConfigurationVersion > -1
  216. newer := exists && existingConfigurationVersion < serverEntry.ConfigurationVersion
  217. update := !exists || replaceIfExists || newer
  218. if !update {
  219. // Disabling this notice, for now, as it generates too much noise
  220. // in diagnostics with clients that always submit embedded servers
  221. // to the core on each run.
  222. // NoticeInfo("ignored update for server %s", serverEntry.IpAddress)
  223. return nil
  224. }
  225. data, err := json.Marshal(serverEntry)
  226. if err != nil {
  227. return common.ContextError(err)
  228. }
  229. err = serverEntries.Put([]byte(serverEntry.IpAddress), data)
  230. if err != nil {
  231. return common.ContextError(err)
  232. }
  233. err = insertRankedServerEntry(tx, serverEntry.IpAddress, 1)
  234. if err != nil {
  235. return common.ContextError(err)
  236. }
  237. NoticeInfo("updated server %s", serverEntry.IpAddress)
  238. return nil
  239. })
  240. if err != nil {
  241. return common.ContextError(err)
  242. }
  243. return nil
  244. }
  245. // StoreServerEntries stores a list of server entries.
  246. // There is an independent transaction for each entry insert/update.
  247. func StoreServerEntries(
  248. config *Config,
  249. serverEntries []*protocol.ServerEntry,
  250. replaceIfExists bool) error {
  251. checkInitDataStore()
  252. for _, serverEntry := range serverEntries {
  253. err := StoreServerEntry(serverEntry, replaceIfExists)
  254. if err != nil {
  255. return common.ContextError(err)
  256. }
  257. }
  258. return nil
  259. }
  260. // StreamingStoreServerEntries stores a list of server entries.
  261. // There is an independent transaction for each entry insert/update.
  262. func StreamingStoreServerEntries(
  263. config *Config,
  264. serverEntries *protocol.StreamingServerEntryDecoder,
  265. replaceIfExists bool) error {
  266. checkInitDataStore()
  267. // Note: both StreamingServerEntryDecoder.Next and StoreServerEntry
  268. // allocate temporary memory buffers for hex/JSON decoding/encoding,
  269. // so this isn't true constant-memory streaming (it depends on garbage
  270. // collection).
  271. for {
  272. serverEntry, err := serverEntries.Next()
  273. if err != nil {
  274. return common.ContextError(err)
  275. }
  276. if serverEntry == nil {
  277. // No more server entries
  278. break
  279. }
  280. err = StoreServerEntry(serverEntry, replaceIfExists)
  281. if err != nil {
  282. return common.ContextError(err)
  283. }
  284. }
  285. return nil
  286. }
  287. // PromoteServerEntry assigns the top rank (one more than current
  288. // max rank) to the specified server entry. Server candidates are
  289. // iterated in decending rank order, so this server entry will be
  290. // the first candidate in a subsequent tunnel establishment.
  291. func PromoteServerEntry(config *Config, ipAddress string) error {
  292. checkInitDataStore()
  293. err := singleton.db.Update(func(tx *bolt.Tx) error {
  294. // Ensure the corresponding entry exists before
  295. // inserting into rank.
  296. bucket := tx.Bucket([]byte(serverEntriesBucket))
  297. data := bucket.Get([]byte(ipAddress))
  298. if data == nil {
  299. NoticeAlert(
  300. "PromoteServerEntry: ignoring unknown server entry: %s",
  301. ipAddress)
  302. return nil
  303. }
  304. err := insertRankedServerEntry(tx, ipAddress, 0)
  305. if err != nil {
  306. return err
  307. }
  308. // Store the current server entry filter (e.g, region, etc.) that
  309. // was in use when the entry was promoted. This is used to detect
  310. // when the top ranked server entry was promoted under a different
  311. // filter.
  312. currentFilter, err := makeServerEntryFilterValue(config)
  313. if err != nil {
  314. return err
  315. }
  316. bucket = tx.Bucket([]byte(keyValueBucket))
  317. return bucket.Put([]byte(DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY), currentFilter)
  318. })
  319. if err != nil {
  320. return common.ContextError(err)
  321. }
  322. return nil
  323. }
  324. func makeServerEntryFilterValue(config *Config) ([]byte, error) {
  325. // Currently, only a change of EgressRegion will "break" server affinity.
  326. // If the tunnel protocol filter changes, any existing affinity server
  327. // either passes the new filter, or it will be skipped anyway.
  328. return []byte(config.EgressRegion), nil
  329. }
  330. func hasServerEntryFilterChanged(config *Config) (bool, error) {
  331. currentFilter, err := makeServerEntryFilterValue(config)
  332. if err != nil {
  333. return false, common.ContextError(err)
  334. }
  335. changed := false
  336. err = singleton.db.View(func(tx *bolt.Tx) error {
  337. // previousFilter will be nil not found (not previously
  338. // set) which will never match any current filter.
  339. bucket := tx.Bucket([]byte(keyValueBucket))
  340. previousFilter := bucket.Get([]byte(DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY))
  341. if bytes.Compare(previousFilter, currentFilter) != 0 {
  342. changed = true
  343. }
  344. return nil
  345. })
  346. if err != nil {
  347. return false, common.ContextError(err)
  348. }
  349. return changed, nil
  350. }
  351. func getRankedServerEntries(tx *bolt.Tx) ([]string, error) {
  352. bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
  353. data := bucket.Get([]byte(rankedServerEntriesKey))
  354. if data == nil {
  355. return []string{}, nil
  356. }
  357. rankedServerEntries := make([]string, 0)
  358. err := json.Unmarshal(data, &rankedServerEntries)
  359. if err != nil {
  360. return nil, common.ContextError(err)
  361. }
  362. return rankedServerEntries, nil
  363. }
  364. func setRankedServerEntries(tx *bolt.Tx, rankedServerEntries []string) error {
  365. data, err := json.Marshal(rankedServerEntries)
  366. if err != nil {
  367. return common.ContextError(err)
  368. }
  369. bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
  370. err = bucket.Put([]byte(rankedServerEntriesKey), data)
  371. if err != nil {
  372. return common.ContextError(err)
  373. }
  374. return nil
  375. }
  376. func insertRankedServerEntry(tx *bolt.Tx, serverEntryId string, position int) error {
  377. rankedServerEntries, err := getRankedServerEntries(tx)
  378. if err != nil {
  379. return common.ContextError(err)
  380. }
  381. // BoltDB implementation note:
  382. // For simplicity, we store the ranked server ids in an array serialized to
  383. // a single key value. To ensure this value doesn't grow without bound,
  384. // it's capped at rankedServerEntryCount. For now, this cap should be large
  385. // enough to meet the shuffleHeadLength = config.TunnelPoolSize criteria, for
  386. // any reasonable configuration of config.TunnelPoolSize.
  387. // Using: https://github.com/golang/go/wiki/SliceTricks
  388. // When serverEntryId is already ranked, remove it first to avoid duplicates
  389. for i, rankedServerEntryId := range rankedServerEntries {
  390. if rankedServerEntryId == serverEntryId {
  391. rankedServerEntries = append(
  392. rankedServerEntries[:i], rankedServerEntries[i+1:]...)
  393. break
  394. }
  395. }
  396. // SliceTricks insert, with length cap enforced
  397. if len(rankedServerEntries) < rankedServerEntryCount {
  398. rankedServerEntries = append(rankedServerEntries, "")
  399. }
  400. if position >= len(rankedServerEntries) {
  401. position = len(rankedServerEntries) - 1
  402. }
  403. copy(rankedServerEntries[position+1:], rankedServerEntries[position:])
  404. rankedServerEntries[position] = serverEntryId
  405. err = setRankedServerEntries(tx, rankedServerEntries)
  406. if err != nil {
  407. return common.ContextError(err)
  408. }
  409. return nil
  410. }
  411. // ServerEntryIterator is used to iterate over
  412. // stored server entries in rank order.
  413. type ServerEntryIterator struct {
  414. config *Config
  415. shuffleHeadLength int
  416. serverEntryIds []string
  417. serverEntryIndex int
  418. isTacticsServerEntryIterator bool
  419. isTargetServerEntryIterator bool
  420. hasNextTargetServerEntry bool
  421. targetServerEntry *protocol.ServerEntry
  422. }
  423. // NewServerEntryIterator creates a new ServerEntryIterator.
  424. //
  425. // The boolean return value indicates whether to treat the first server(s)
  426. // as affinity servers or not. When the server entry selection filter changes
  427. // such as from a specific region to any region, or when there was no previous
  428. // filter/iterator, the the first server(s) are arbitrary and should not be
  429. // given affinity treatment.
  430. //
  431. // NewServerEntryIterator and any returned ServerEntryIterator are not
  432. // designed for concurrent use as not all related datastore operations are
  433. // performed in a single transaction.
  434. //
  435. func NewServerEntryIterator(config *Config) (bool, *ServerEntryIterator, error) {
  436. // When configured, this target server entry is the only candidate
  437. if config.TargetServerEntry != "" {
  438. return newTargetServerEntryIterator(config, false)
  439. }
  440. checkInitDataStore()
  441. filterChanged, err := hasServerEntryFilterChanged(config)
  442. if err != nil {
  443. return false, nil, common.ContextError(err)
  444. }
  445. applyServerAffinity := !filterChanged
  446. iterator := &ServerEntryIterator{
  447. config: config,
  448. shuffleHeadLength: config.TunnelPoolSize,
  449. }
  450. err = iterator.Reset()
  451. if err != nil {
  452. return false, nil, common.ContextError(err)
  453. }
  454. return applyServerAffinity, iterator, nil
  455. }
  456. func NewTacticsServerEntryIterator(config *Config) (*ServerEntryIterator, error) {
  457. // When configured, this target server entry is the only candidate
  458. if config.TargetServerEntry != "" {
  459. _, iterator, err := newTargetServerEntryIterator(config, true)
  460. return iterator, err
  461. }
  462. checkInitDataStore()
  463. iterator := &ServerEntryIterator{
  464. shuffleHeadLength: 0,
  465. isTacticsServerEntryIterator: true,
  466. }
  467. err := iterator.Reset()
  468. if err != nil {
  469. return nil, common.ContextError(err)
  470. }
  471. return iterator, nil
  472. }
  473. // newTargetServerEntryIterator is a helper for initializing the TargetServerEntry case
  474. func newTargetServerEntryIterator(config *Config, isTactics bool) (bool, *ServerEntryIterator, error) {
  475. serverEntry, err := protocol.DecodeServerEntry(
  476. config.TargetServerEntry, common.GetCurrentTimestamp(), protocol.SERVER_ENTRY_SOURCE_TARGET)
  477. if err != nil {
  478. return false, nil, common.ContextError(err)
  479. }
  480. if isTactics {
  481. if len(serverEntry.GetSupportedTacticsProtocols()) == 0 {
  482. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support tactics protocols"))
  483. }
  484. } else {
  485. if config.EgressRegion != "" && serverEntry.Region != config.EgressRegion {
  486. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support EgressRegion"))
  487. }
  488. limitTunnelProtocols := config.clientParameters.Get().TunnelProtocols(parameters.LimitTunnelProtocols)
  489. if len(limitTunnelProtocols) > 0 {
  490. // At the ServerEntryIterator level, only limitTunnelProtocols is applied;
  491. // impairedTunnelProtocols and excludeMeek are handled higher up.
  492. if len(serverEntry.GetSupportedProtocols(
  493. config.UseUpstreamProxy(), limitTunnelProtocols, nil, false)) == 0 {
  494. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support LimitTunnelProtocols"))
  495. }
  496. }
  497. }
  498. iterator := &ServerEntryIterator{
  499. isTacticsServerEntryIterator: isTactics,
  500. isTargetServerEntryIterator: true,
  501. hasNextTargetServerEntry: true,
  502. targetServerEntry: serverEntry,
  503. }
  504. NoticeInfo("using TargetServerEntry: %s", serverEntry.IpAddress)
  505. return false, iterator, nil
  506. }
  507. // Reset a NewServerEntryIterator to the start of its cycle. The next
  508. // call to Next will return the first server entry.
  509. func (iterator *ServerEntryIterator) Reset() error {
  510. iterator.Close()
  511. if iterator.isTargetServerEntryIterator {
  512. iterator.hasNextTargetServerEntry = true
  513. return nil
  514. }
  515. // For diagnostics, it's useful to count the number of known server
  516. // entries that satisfy both the egress region and tunnel protocol
  517. // requirements. The tunnel protocol filter is not applied by the iterator
  518. // as protocol filtering, including impaire protocol and exclude-meek
  519. // logic, is all handled higher up.
  520. // TODO: for isTacticsServerEntryIterator, emit tactics candidate count.
  521. if !iterator.isTacticsServerEntryIterator {
  522. limitTunnelProtocols := iterator.config.clientParameters.Get().TunnelProtocols(
  523. parameters.LimitTunnelProtocols)
  524. count := CountServerEntries(
  525. iterator.config.UseUpstreamProxy(), iterator.config.EgressRegion, limitTunnelProtocols)
  526. NoticeCandidateServers(iterator.config.EgressRegion, limitTunnelProtocols, count)
  527. // LimitTunnelProtocols may have changed since the last ReportAvailableRegions,
  528. // and now there may be no servers with the required capabilities in the
  529. // selected region. ReportAvailableRegions will signal this to the client.
  530. if count == 0 {
  531. ReportAvailableRegions(iterator.config)
  532. }
  533. }
  534. // This query implements the Psiphon server candidate selection
  535. // algorithm: the first TunnelPoolSize server candidates are in rank
  536. // (priority) order, to favor previously successful servers; then the
  537. // remaining long tail is shuffled to raise up less recent candidates.
  538. // BoltDB implementation note:
  539. // We don't keep a transaction open for the duration of the iterator
  540. // because this would expose the following semantics to consumer code:
  541. //
  542. // Read-only transactions and read-write transactions ... generally
  543. // shouldn't be opened simultaneously in the same goroutine. This can
  544. // cause a deadlock as the read-write transaction needs to periodically
  545. // re-map the data file but it cannot do so while a read-only
  546. // transaction is open.
  547. // (https://github.com/boltdb/bolt)
  548. //
  549. // So the underlying serverEntriesBucket could change after the serverEntryIds
  550. // list is built.
  551. var serverEntryIds []string
  552. err := singleton.db.View(func(tx *bolt.Tx) error {
  553. var err error
  554. serverEntryIds, err = getRankedServerEntries(tx)
  555. if err != nil {
  556. return err
  557. }
  558. skipServerEntryIds := make(map[string]bool)
  559. for _, serverEntryId := range serverEntryIds {
  560. skipServerEntryIds[serverEntryId] = true
  561. }
  562. bucket := tx.Bucket([]byte(serverEntriesBucket))
  563. cursor := bucket.Cursor()
  564. for key, _ := cursor.Last(); key != nil; key, _ = cursor.Prev() {
  565. serverEntryId := string(key)
  566. if _, ok := skipServerEntryIds[serverEntryId]; ok {
  567. continue
  568. }
  569. serverEntryIds = append(serverEntryIds, serverEntryId)
  570. }
  571. return nil
  572. })
  573. if err != nil {
  574. return common.ContextError(err)
  575. }
  576. for i := len(serverEntryIds) - 1; i > iterator.shuffleHeadLength-1; i-- {
  577. j := rand.Intn(i+1-iterator.shuffleHeadLength) + iterator.shuffleHeadLength
  578. serverEntryIds[i], serverEntryIds[j] = serverEntryIds[j], serverEntryIds[i]
  579. }
  580. iterator.serverEntryIds = serverEntryIds
  581. iterator.serverEntryIndex = 0
  582. return nil
  583. }
  584. // Close cleans up resources associated with a ServerEntryIterator.
  585. func (iterator *ServerEntryIterator) Close() {
  586. iterator.serverEntryIds = nil
  587. iterator.serverEntryIndex = 0
  588. }
  589. // Next returns the next server entry, by rank, for a ServerEntryIterator.
  590. // Returns nil with no error when there is no next item.
  591. func (iterator *ServerEntryIterator) Next() (*protocol.ServerEntry, error) {
  592. var err error
  593. var serverEntry *protocol.ServerEntry
  594. defer func() {
  595. if err != nil {
  596. iterator.Close()
  597. }
  598. }()
  599. if iterator.isTargetServerEntryIterator {
  600. if iterator.hasNextTargetServerEntry {
  601. iterator.hasNextTargetServerEntry = false
  602. return MakeCompatibleServerEntry(iterator.targetServerEntry), nil
  603. }
  604. return nil, nil
  605. }
  606. // There are no region/protocol indexes for the server entries bucket.
  607. // Loop until we have the next server entry that matches the iterator
  608. // filter requirements.
  609. for {
  610. if iterator.serverEntryIndex >= len(iterator.serverEntryIds) {
  611. // There is no next item
  612. return nil, nil
  613. }
  614. serverEntryId := iterator.serverEntryIds[iterator.serverEntryIndex]
  615. iterator.serverEntryIndex += 1
  616. var data []byte
  617. err = singleton.db.View(func(tx *bolt.Tx) error {
  618. bucket := tx.Bucket([]byte(serverEntriesBucket))
  619. value := bucket.Get([]byte(serverEntryId))
  620. if value != nil {
  621. // Must make a copy as slice is only valid within transaction.
  622. data = make([]byte, len(value))
  623. copy(data, value)
  624. }
  625. return nil
  626. })
  627. if err != nil {
  628. return nil, common.ContextError(err)
  629. }
  630. if data == nil {
  631. // In case of data corruption or a bug causing this condition,
  632. // do not stop iterating.
  633. NoticeAlert("ServerEntryIterator.Next: unexpected missing server entry: %s", serverEntryId)
  634. continue
  635. }
  636. err = json.Unmarshal(data, &serverEntry)
  637. if err != nil {
  638. // In case of data corruption or a bug causing this condition,
  639. // do not stop iterating.
  640. NoticeAlert("ServerEntryIterator.Next: %s", common.ContextError(err))
  641. continue
  642. }
  643. // Check filter requirements
  644. if iterator.isTacticsServerEntryIterator {
  645. // Tactics doesn't filter by egress region.
  646. if len(serverEntry.GetSupportedTacticsProtocols()) > 0 {
  647. break
  648. }
  649. } else {
  650. if iterator.config.EgressRegion == "" ||
  651. serverEntry.Region == iterator.config.EgressRegion {
  652. break
  653. }
  654. }
  655. }
  656. return MakeCompatibleServerEntry(serverEntry), nil
  657. }
  658. // MakeCompatibleServerEntry provides backwards compatibility with old server entries
  659. // which have a single meekFrontingDomain and not a meekFrontingAddresses array.
  660. // By copying this one meekFrontingDomain into meekFrontingAddresses, this client effectively
  661. // uses that single value as legacy clients do.
  662. func MakeCompatibleServerEntry(serverEntry *protocol.ServerEntry) *protocol.ServerEntry {
  663. if len(serverEntry.MeekFrontingAddresses) == 0 && serverEntry.MeekFrontingDomain != "" {
  664. serverEntry.MeekFrontingAddresses =
  665. append(serverEntry.MeekFrontingAddresses, serverEntry.MeekFrontingDomain)
  666. }
  667. return serverEntry
  668. }
  669. func scanServerEntries(scanner func(*protocol.ServerEntry)) error {
  670. err := singleton.db.View(func(tx *bolt.Tx) error {
  671. bucket := tx.Bucket([]byte(serverEntriesBucket))
  672. cursor := bucket.Cursor()
  673. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  674. serverEntry := new(protocol.ServerEntry)
  675. err := json.Unmarshal(value, serverEntry)
  676. if err != nil {
  677. // In case of data corruption or a bug causing this condition,
  678. // do not stop iterating.
  679. NoticeAlert("scanServerEntries: %s", common.ContextError(err))
  680. continue
  681. }
  682. scanner(serverEntry)
  683. }
  684. return nil
  685. })
  686. if err != nil {
  687. return common.ContextError(err)
  688. }
  689. return nil
  690. }
  691. // CountServerEntries returns a count of stored servers for the
  692. // specified region and tunnel protocols.
  693. func CountServerEntries(useUpstreamProxy bool, region string, tunnelProtocols []string) int {
  694. checkInitDataStore()
  695. count := 0
  696. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  697. if (region == "" || serverEntry.Region == region) &&
  698. (len(tunnelProtocols) == 0 ||
  699. // When CountServerEntries is called only limitTunnelProtocols is known;
  700. // impairedTunnelProtocols and excludeMeek may not apply.
  701. len(serverEntry.GetSupportedProtocols(useUpstreamProxy, tunnelProtocols, nil, false)) > 0) {
  702. count += 1
  703. }
  704. })
  705. if err != nil {
  706. NoticeAlert("CountServerEntries failed: %s", err)
  707. return 0
  708. }
  709. return count
  710. }
  711. // CountNonImpairedProtocols returns the number of distinct tunnel
  712. // protocols supported by stored server entries, excluding the
  713. // specified impaired protocols.
  714. func CountNonImpairedProtocols(
  715. region string,
  716. limitTunnelProtocols, impairedProtocols []string) int {
  717. checkInitDataStore()
  718. distinctProtocols := make(map[string]bool)
  719. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  720. if region == "" || serverEntry.Region == region {
  721. for _, protocol := range protocol.SupportedTunnelProtocols {
  722. if serverEntry.SupportsProtocol(protocol) {
  723. if len(limitTunnelProtocols) == 0 ||
  724. common.Contains(limitTunnelProtocols, protocol) {
  725. distinctProtocols[protocol] = true
  726. }
  727. }
  728. }
  729. }
  730. })
  731. for _, protocol := range impairedProtocols {
  732. delete(distinctProtocols, protocol)
  733. }
  734. if err != nil {
  735. NoticeAlert("CountNonImpairedProtocols failed: %s", err)
  736. return 0
  737. }
  738. return len(distinctProtocols)
  739. }
  740. // ReportAvailableRegions prints a notice with the available egress regions.
  741. func ReportAvailableRegions(config *Config) {
  742. checkInitDataStore()
  743. limitTunnelProtocols := config.clientParameters.Get().TunnelProtocols(
  744. parameters.LimitTunnelProtocols)
  745. regions := make(map[string]bool)
  746. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  747. if len(limitTunnelProtocols) == 0 ||
  748. // When ReportAvailableRegions is called only limitTunnelProtocols is known;
  749. // impairedTunnelProtocols and excludeMeek may not apply.
  750. len(serverEntry.GetSupportedProtocols(
  751. config.UseUpstreamProxy(), limitTunnelProtocols, nil, false)) > 0 {
  752. regions[serverEntry.Region] = true
  753. }
  754. })
  755. if err != nil {
  756. NoticeAlert("ReportAvailableRegions failed: %s", err)
  757. return
  758. }
  759. regionList := make([]string, 0, len(regions))
  760. for region := range regions {
  761. // Some server entries do not have a region, but it makes no sense to return
  762. // an empty string as an "available region".
  763. if region != "" {
  764. regionList = append(regionList, region)
  765. }
  766. }
  767. NoticeAvailableEgressRegions(regionList)
  768. }
  769. // GetServerEntryIpAddresses returns an array containing
  770. // all stored server IP addresses.
  771. func GetServerEntryIpAddresses() (ipAddresses []string, err error) {
  772. checkInitDataStore()
  773. ipAddresses = make([]string, 0)
  774. err = scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  775. ipAddresses = append(ipAddresses, serverEntry.IpAddress)
  776. })
  777. if err != nil {
  778. return nil, common.ContextError(err)
  779. }
  780. return ipAddresses, nil
  781. }
  782. // SetSplitTunnelRoutes updates the cached routes data for
  783. // the given region. The associated etag is also stored and
  784. // used to make efficient web requests for updates to the data.
  785. func SetSplitTunnelRoutes(region, etag string, data []byte) error {
  786. checkInitDataStore()
  787. err := singleton.db.Update(func(tx *bolt.Tx) error {
  788. bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
  789. err := bucket.Put([]byte(region), []byte(etag))
  790. bucket = tx.Bucket([]byte(splitTunnelRouteDataBucket))
  791. err = bucket.Put([]byte(region), data)
  792. return err
  793. })
  794. if err != nil {
  795. return common.ContextError(err)
  796. }
  797. return nil
  798. }
  799. // GetSplitTunnelRoutesETag retrieves the etag for cached routes
  800. // data for the specified region. If not found, it returns an empty string value.
  801. func GetSplitTunnelRoutesETag(region string) (etag string, err error) {
  802. checkInitDataStore()
  803. err = singleton.db.View(func(tx *bolt.Tx) error {
  804. bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
  805. etag = string(bucket.Get([]byte(region)))
  806. return nil
  807. })
  808. if err != nil {
  809. return "", common.ContextError(err)
  810. }
  811. return etag, nil
  812. }
  813. // GetSplitTunnelRoutesData retrieves the cached routes data
  814. // for the specified region. If not found, it returns a nil value.
  815. func GetSplitTunnelRoutesData(region string) (data []byte, err error) {
  816. checkInitDataStore()
  817. err = singleton.db.View(func(tx *bolt.Tx) error {
  818. bucket := tx.Bucket([]byte(splitTunnelRouteDataBucket))
  819. value := bucket.Get([]byte(region))
  820. if value != nil {
  821. // Must make a copy as slice is only valid within transaction.
  822. data = make([]byte, len(value))
  823. copy(data, value)
  824. }
  825. return nil
  826. })
  827. if err != nil {
  828. return nil, common.ContextError(err)
  829. }
  830. return data, nil
  831. }
  832. // SetUrlETag stores an ETag for the specfied URL.
  833. // Note: input URL is treated as a string, and is not
  834. // encoded or decoded or otherwise canonicalized.
  835. func SetUrlETag(url, etag string) error {
  836. checkInitDataStore()
  837. err := singleton.db.Update(func(tx *bolt.Tx) error {
  838. bucket := tx.Bucket([]byte(urlETagsBucket))
  839. err := bucket.Put([]byte(url), []byte(etag))
  840. return err
  841. })
  842. if err != nil {
  843. return common.ContextError(err)
  844. }
  845. return nil
  846. }
  847. // GetUrlETag retrieves a previously stored an ETag for the
  848. // specfied URL. If not found, it returns an empty string value.
  849. func GetUrlETag(url string) (etag string, err error) {
  850. checkInitDataStore()
  851. err = singleton.db.View(func(tx *bolt.Tx) error {
  852. bucket := tx.Bucket([]byte(urlETagsBucket))
  853. etag = string(bucket.Get([]byte(url)))
  854. return nil
  855. })
  856. if err != nil {
  857. return "", common.ContextError(err)
  858. }
  859. return etag, nil
  860. }
  861. // SetKeyValue stores a key/value pair.
  862. func SetKeyValue(key, value string) error {
  863. checkInitDataStore()
  864. err := singleton.db.Update(func(tx *bolt.Tx) error {
  865. bucket := tx.Bucket([]byte(keyValueBucket))
  866. err := bucket.Put([]byte(key), []byte(value))
  867. return err
  868. })
  869. if err != nil {
  870. return common.ContextError(err)
  871. }
  872. return nil
  873. }
  874. // GetKeyValue retrieves the value for a given key. If not found,
  875. // it returns an empty string value.
  876. func GetKeyValue(key string) (value string, err error) {
  877. checkInitDataStore()
  878. err = singleton.db.View(func(tx *bolt.Tx) error {
  879. bucket := tx.Bucket([]byte(keyValueBucket))
  880. value = string(bucket.Get([]byte(key)))
  881. return nil
  882. })
  883. if err != nil {
  884. return "", common.ContextError(err)
  885. }
  886. return value, nil
  887. }
  888. // Persistent stat records in the persistentStatStateUnreported
  889. // state are available for take out.
  890. //
  891. // Records in the persistentStatStateReporting have been taken
  892. // out and are pending either deletion (for a successful request)
  893. // or change to StateUnreported (for a failed request).
  894. //
  895. // All persistent stat records are reverted to StateUnreported
  896. // when the datastore is initialized at start up.
  897. var persistentStatStateUnreported = []byte("0")
  898. var persistentStatStateReporting = []byte("1")
  899. var persistentStatTypes = []string{
  900. PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST,
  901. }
  902. // StorePersistentStat adds a new persistent stat record, which
  903. // is set to StateUnreported and is an immediate candidate for
  904. // reporting.
  905. //
  906. // The stat is a JSON byte array containing fields as
  907. // required by the Psiphon server API. It's assumed that the
  908. // JSON value contains enough unique information for the value to
  909. // function as a key in the key/value datastore. This assumption
  910. // is currently satisfied by the fields sessionId + tunnelNumber
  911. // for tunnel stats, and URL + ETag for remote server list stats.
  912. func StorePersistentStat(statType string, stat []byte) error {
  913. checkInitDataStore()
  914. if !common.Contains(persistentStatTypes, statType) {
  915. return common.ContextError(fmt.Errorf("invalid persistent stat type: %s", statType))
  916. }
  917. err := singleton.db.Update(func(tx *bolt.Tx) error {
  918. bucket := tx.Bucket([]byte(statType))
  919. err := bucket.Put(stat, persistentStatStateUnreported)
  920. return err
  921. })
  922. if err != nil {
  923. return common.ContextError(err)
  924. }
  925. return nil
  926. }
  927. // CountUnreportedPersistentStats returns the number of persistent
  928. // stat records in StateUnreported.
  929. func CountUnreportedPersistentStats() int {
  930. checkInitDataStore()
  931. unreported := 0
  932. err := singleton.db.View(func(tx *bolt.Tx) error {
  933. for _, statType := range persistentStatTypes {
  934. bucket := tx.Bucket([]byte(statType))
  935. cursor := bucket.Cursor()
  936. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  937. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  938. unreported++
  939. break
  940. }
  941. }
  942. }
  943. return nil
  944. })
  945. if err != nil {
  946. NoticeAlert("CountUnreportedPersistentStats failed: %s", err)
  947. return 0
  948. }
  949. return unreported
  950. }
  951. // TakeOutUnreportedPersistentStats returns up to maxCount persistent
  952. // stats records that are in StateUnreported. The records are set to
  953. // StateReporting. If the records are successfully reported, clear them
  954. // with ClearReportedPersistentStats. If the records are not successfully
  955. // reported, restore them with PutBackUnreportedPersistentStats.
  956. func TakeOutUnreportedPersistentStats(maxCount int) (map[string][][]byte, error) {
  957. checkInitDataStore()
  958. stats := make(map[string][][]byte)
  959. err := singleton.db.Update(func(tx *bolt.Tx) error {
  960. count := 0
  961. for _, statType := range persistentStatTypes {
  962. bucket := tx.Bucket([]byte(statType))
  963. cursor := bucket.Cursor()
  964. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  965. if count >= maxCount {
  966. break
  967. }
  968. // Perform a test JSON unmarshaling. In case of data corruption or a bug,
  969. // skip the record.
  970. var jsonData interface{}
  971. err := json.Unmarshal(key, &jsonData)
  972. if err != nil {
  973. NoticeAlert(
  974. "Invalid key in TakeOutUnreportedPersistentStats: %s: %s",
  975. string(key), err)
  976. continue
  977. }
  978. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  979. // Must make a copy as slice is only valid within transaction.
  980. data := make([]byte, len(key))
  981. copy(data, key)
  982. if stats[statType] == nil {
  983. stats[statType] = make([][]byte, 0)
  984. }
  985. stats[statType] = append(stats[statType], data)
  986. count += 1
  987. }
  988. }
  989. for _, key := range stats[statType] {
  990. err := bucket.Put(key, persistentStatStateReporting)
  991. if err != nil {
  992. return err
  993. }
  994. }
  995. }
  996. return nil
  997. })
  998. if err != nil {
  999. return nil, common.ContextError(err)
  1000. }
  1001. return stats, nil
  1002. }
  1003. // PutBackUnreportedPersistentStats restores a list of persistent
  1004. // stat records to StateUnreported.
  1005. func PutBackUnreportedPersistentStats(stats map[string][][]byte) error {
  1006. checkInitDataStore()
  1007. err := singleton.db.Update(func(tx *bolt.Tx) error {
  1008. for _, statType := range persistentStatTypes {
  1009. bucket := tx.Bucket([]byte(statType))
  1010. for _, key := range stats[statType] {
  1011. err := bucket.Put(key, persistentStatStateUnreported)
  1012. if err != nil {
  1013. return err
  1014. }
  1015. }
  1016. }
  1017. return nil
  1018. })
  1019. if err != nil {
  1020. return common.ContextError(err)
  1021. }
  1022. return nil
  1023. }
  1024. // ClearReportedPersistentStats deletes a list of persistent
  1025. // stat records that were successfully reported.
  1026. func ClearReportedPersistentStats(stats map[string][][]byte) error {
  1027. checkInitDataStore()
  1028. err := singleton.db.Update(func(tx *bolt.Tx) error {
  1029. for _, statType := range persistentStatTypes {
  1030. bucket := tx.Bucket([]byte(statType))
  1031. for _, key := range stats[statType] {
  1032. err := bucket.Delete(key)
  1033. if err != nil {
  1034. return err
  1035. }
  1036. }
  1037. }
  1038. return nil
  1039. })
  1040. if err != nil {
  1041. return common.ContextError(err)
  1042. }
  1043. return nil
  1044. }
  1045. // resetAllPersistentStatsToUnreported sets all persistent stat
  1046. // records to StateUnreported. This reset is called when the
  1047. // datastore is initialized at start up, as we do not know if
  1048. // persistent records in StateReporting were reported or not.
  1049. func resetAllPersistentStatsToUnreported() error {
  1050. checkInitDataStore()
  1051. err := singleton.db.Update(func(tx *bolt.Tx) error {
  1052. for _, statType := range persistentStatTypes {
  1053. bucket := tx.Bucket([]byte(statType))
  1054. resetKeys := make([][]byte, 0)
  1055. cursor := bucket.Cursor()
  1056. for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
  1057. resetKeys = append(resetKeys, key)
  1058. }
  1059. // TODO: data mutation is done outside cursor. Is this
  1060. // strictly necessary in this case? As is, this means
  1061. // all stats need to be loaded into memory at once.
  1062. // https://godoc.org/github.com/boltdb/bolt#Cursor
  1063. for _, key := range resetKeys {
  1064. err := bucket.Put(key, persistentStatStateUnreported)
  1065. if err != nil {
  1066. return err
  1067. }
  1068. }
  1069. }
  1070. return nil
  1071. })
  1072. if err != nil {
  1073. return common.ContextError(err)
  1074. }
  1075. return nil
  1076. }
  1077. // CountSLOKs returns the total number of SLOK records.
  1078. func CountSLOKs() int {
  1079. checkInitDataStore()
  1080. count := 0
  1081. err := singleton.db.View(func(tx *bolt.Tx) error {
  1082. bucket := tx.Bucket([]byte(slokBucket))
  1083. cursor := bucket.Cursor()
  1084. for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
  1085. count++
  1086. }
  1087. return nil
  1088. })
  1089. if err != nil {
  1090. NoticeAlert("CountSLOKs failed: %s", err)
  1091. return 0
  1092. }
  1093. return count
  1094. }
  1095. // DeleteSLOKs deletes all SLOK records.
  1096. func DeleteSLOKs() error {
  1097. checkInitDataStore()
  1098. err := singleton.db.Update(func(tx *bolt.Tx) error {
  1099. bucket := tx.Bucket([]byte(slokBucket))
  1100. return bucket.ForEach(
  1101. func(id, _ []byte) error {
  1102. return bucket.Delete(id)
  1103. })
  1104. })
  1105. if err != nil {
  1106. return common.ContextError(err)
  1107. }
  1108. return nil
  1109. }
  1110. // SetSLOK stores a SLOK key, referenced by its ID. The bool
  1111. // return value indicates whether the SLOK was already stored.
  1112. func SetSLOK(id, key []byte) (bool, error) {
  1113. checkInitDataStore()
  1114. var duplicate bool
  1115. err := singleton.db.Update(func(tx *bolt.Tx) error {
  1116. bucket := tx.Bucket([]byte(slokBucket))
  1117. duplicate = bucket.Get(id) != nil
  1118. err := bucket.Put([]byte(id), []byte(key))
  1119. return err
  1120. })
  1121. if err != nil {
  1122. return false, common.ContextError(err)
  1123. }
  1124. return duplicate, nil
  1125. }
  1126. // GetSLOK returns a SLOK key for the specified ID. The return
  1127. // value is nil if the SLOK is not found.
  1128. func GetSLOK(id []byte) (key []byte, err error) {
  1129. checkInitDataStore()
  1130. err = singleton.db.View(func(tx *bolt.Tx) error {
  1131. bucket := tx.Bucket([]byte(slokBucket))
  1132. key = bucket.Get(id)
  1133. return nil
  1134. })
  1135. if err != nil {
  1136. return nil, common.ContextError(err)
  1137. }
  1138. return key, nil
  1139. }
  1140. // TacticsStorer implements tactics.Storer.
  1141. type TacticsStorer struct {
  1142. }
  1143. func (t *TacticsStorer) SetTacticsRecord(networkID string, record []byte) error {
  1144. return setBucketValue([]byte(tacticsBucket), []byte(networkID), record)
  1145. }
  1146. func (t *TacticsStorer) GetTacticsRecord(networkID string) ([]byte, error) {
  1147. return getBucketValue([]byte(tacticsBucket), []byte(networkID))
  1148. }
  1149. func (t *TacticsStorer) SetSpeedTestSamplesRecord(networkID string, record []byte) error {
  1150. return setBucketValue([]byte(speedTestSamplesBucket), []byte(networkID), record)
  1151. }
  1152. func (t *TacticsStorer) GetSpeedTestSamplesRecord(networkID string) ([]byte, error) {
  1153. return getBucketValue([]byte(speedTestSamplesBucket), []byte(networkID))
  1154. }
  1155. // GetTacticsStorer creates a TacticsStorer.
  1156. func GetTacticsStorer() *TacticsStorer {
  1157. return &TacticsStorer{}
  1158. }
  1159. func setBucketValue(bucket, key, value []byte) error {
  1160. checkInitDataStore()
  1161. err := singleton.db.Update(func(tx *bolt.Tx) error {
  1162. bucket := tx.Bucket(bucket)
  1163. err := bucket.Put(key, value)
  1164. return err
  1165. })
  1166. if err != nil {
  1167. return common.ContextError(err)
  1168. }
  1169. return nil
  1170. }
  1171. func getBucketValue(bucket, key []byte) (value []byte, err error) {
  1172. checkInitDataStore()
  1173. err = singleton.db.View(func(tx *bolt.Tx) error {
  1174. bucket := tx.Bucket(bucket)
  1175. value = bucket.Get(key)
  1176. return nil
  1177. })
  1178. if err != nil {
  1179. return nil, common.ContextError(err)
  1180. }
  1181. return value, nil
  1182. }