dataStore.go 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "encoding/json"
  23. "errors"
  24. "fmt"
  25. "math/rand"
  26. "os"
  27. "path/filepath"
  28. "sync"
  29. "time"
  30. "github.com/Psiphon-Inc/bolt"
  31. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  32. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  33. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  34. )
  35. // The BoltDB dataStore implementation is an alternative to the sqlite3-based
  36. // implementation in dataStore.go. Both implementations have the same interface.
  37. //
  38. // BoltDB is pure Go, and is intended to be used in cases where we have trouble
  39. // building sqlite3/CGO (e.g., currently go mobile due to
  40. // https://github.com/mattn/go-sqlite3/issues/201), and perhaps ultimately as
  41. // the primary dataStore implementation.
  42. //
  43. type dataStore struct {
  44. init sync.Once
  45. db *bolt.DB
  46. }
  47. const (
  48. serverEntriesBucket = "serverEntries"
  49. rankedServerEntriesBucket = "rankedServerEntries"
  50. rankedServerEntriesKey = "rankedServerEntries"
  51. splitTunnelRouteETagsBucket = "splitTunnelRouteETags"
  52. splitTunnelRouteDataBucket = "splitTunnelRouteData"
  53. urlETagsBucket = "urlETags"
  54. keyValueBucket = "keyValues"
  55. tunnelStatsBucket = "tunnelStats"
  56. remoteServerListStatsBucket = "remoteServerListStats"
  57. slokBucket = "SLOKs"
  58. tacticsBucket = "tactics"
  59. speedTestSamplesBucket = "speedTestSamples"
  60. rankedServerEntryCount = 100
  61. )
  62. const (
  63. DATA_STORE_FILENAME = "psiphon.boltdb"
  64. LEGACY_DATA_STORE_FILENAME = "psiphon.db"
  65. DATA_STORE_LAST_CONNECTED_KEY = "lastConnected"
  66. DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY = "lastServerEntryFilter"
  67. PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST = remoteServerListStatsBucket
  68. )
  69. var singleton dataStore
  70. // InitDataStore initializes the singleton instance of dataStore. This
  71. // function uses a sync.Once and is safe for use by concurrent goroutines.
  72. // The underlying sql.DB connection pool is also safe.
  73. //
  74. // Note: the sync.Once was more useful when initDataStore was private and
  75. // called on-demand by the public functions below. Now we require an explicit
  76. // InitDataStore() call with the filename passed in. The on-demand calls
  77. // have been replaced by checkInitDataStore() to assert that Init was called.
  78. func InitDataStore(config *Config) (err error) {
  79. singleton.init.Do(func() {
  80. // Need to gather the list of migratable server entries before
  81. // initializing the boltdb store (as prepareMigrationEntries
  82. // checks for the existence of the bolt db file)
  83. migratableServerEntries := prepareMigrationEntries(config)
  84. filename := filepath.Join(config.DataStoreDirectory, DATA_STORE_FILENAME)
  85. var db *bolt.DB
  86. for retry := 0; retry < 3; retry++ {
  87. if retry > 0 {
  88. NoticeAlert("InitDataStore retry: %d", retry)
  89. }
  90. db, err = bolt.Open(filename, 0600, &bolt.Options{Timeout: 1 * time.Second})
  91. // The datastore file may be corrupt, so attempt to delete and try again
  92. if err != nil {
  93. NoticeAlert("bolt.Open error: %s", err)
  94. os.Remove(filename)
  95. continue
  96. }
  97. // Run consistency checks on datastore and emit errors for diagnostics purposes
  98. // We assume this will complete quickly for typical size Psiphon datastores.
  99. err = db.View(func(tx *bolt.Tx) error {
  100. return tx.SynchronousCheck()
  101. })
  102. // The datastore file may be corrupt, so attempt to delete and try again
  103. if err != nil {
  104. NoticeAlert("bolt.SynchronousCheck error: %s", err)
  105. db.Close()
  106. os.Remove(filename)
  107. continue
  108. }
  109. break
  110. }
  111. if err != nil {
  112. // Note: intending to set the err return value for InitDataStore
  113. err = fmt.Errorf("initDataStore failed to open database: %s", err)
  114. return
  115. }
  116. err = db.Update(func(tx *bolt.Tx) error {
  117. requiredBuckets := []string{
  118. serverEntriesBucket,
  119. rankedServerEntriesBucket,
  120. splitTunnelRouteETagsBucket,
  121. splitTunnelRouteDataBucket,
  122. urlETagsBucket,
  123. keyValueBucket,
  124. tunnelStatsBucket,
  125. remoteServerListStatsBucket,
  126. slokBucket,
  127. tacticsBucket,
  128. speedTestSamplesBucket,
  129. }
  130. for _, bucket := range requiredBuckets {
  131. _, err := tx.CreateBucketIfNotExists([]byte(bucket))
  132. if err != nil {
  133. return err
  134. }
  135. }
  136. return nil
  137. })
  138. if err != nil {
  139. err = fmt.Errorf("initDataStore failed to create buckets: %s", err)
  140. return
  141. }
  142. // Cleanup obsolete tunnel (session) stats bucket, if one still exists
  143. err = db.Update(func(tx *bolt.Tx) error {
  144. tunnelStatsBucket := []byte("tunnelStats")
  145. if tx.Bucket(tunnelStatsBucket) != nil {
  146. err := tx.DeleteBucket(tunnelStatsBucket)
  147. if err != nil {
  148. NoticeAlert("DeleteBucket %s error: %s", tunnelStatsBucket, err)
  149. // Continue, since this is not fatal
  150. }
  151. }
  152. return nil
  153. })
  154. if err != nil {
  155. err = fmt.Errorf("initDataStore failed to create buckets: %s", err)
  156. return
  157. }
  158. singleton.db = db
  159. // The migrateServerEntries function requires the data store is
  160. // initialized prior to execution so that migrated entries can be stored
  161. if len(migratableServerEntries) > 0 {
  162. migrateEntries(
  163. config, migratableServerEntries, filepath.Join(config.DataStoreDirectory, LEGACY_DATA_STORE_FILENAME))
  164. }
  165. resetAllPersistentStatsToUnreported()
  166. })
  167. return err
  168. }
  169. func checkInitDataStore() {
  170. if singleton.db == nil {
  171. panic("checkInitDataStore: datastore not initialized")
  172. }
  173. }
  174. // StoreServerEntry adds the server entry to the data store.
  175. // A newly stored (or re-stored) server entry is assigned the next-to-top
  176. // rank for iteration order (the previous top ranked entry is promoted). The
  177. // purpose of inserting at next-to-top is to keep the last selected server
  178. // as the top ranked server.
  179. //
  180. // When a server entry already exists for a given server, it will be
  181. // replaced only if replaceIfExists is set or if the the ConfigurationVersion
  182. // field of the new entry is strictly higher than the existing entry.
  183. //
  184. // If the server entry data is malformed, an alert notice is issued and
  185. // the entry is skipped; no error is returned.
  186. func StoreServerEntry(serverEntry *protocol.ServerEntry, replaceIfExists bool) error {
  187. checkInitDataStore()
  188. // Server entries should already be validated before this point,
  189. // so instead of skipping we fail with an error.
  190. err := protocol.ValidateServerEntry(serverEntry)
  191. if err != nil {
  192. return common.ContextError(
  193. fmt.Errorf("invalid server entry: %s", err))
  194. }
  195. // BoltDB implementation note:
  196. // For simplicity, we don't maintain indexes on server entry
  197. // region or supported protocols. Instead, we perform full-bucket
  198. // scans with a filter. With a small enough database (thousands or
  199. // even tens of thousand of server entries) and common enough
  200. // values (e.g., many servers support all protocols), performance
  201. // is expected to be acceptable.
  202. err = singleton.db.Update(func(tx *bolt.Tx) error {
  203. serverEntries := tx.Bucket([]byte(serverEntriesBucket))
  204. // Check not only that the entry exists, but is valid. This
  205. // will replace in the rare case where the data is corrupt.
  206. existingConfigurationVersion := -1
  207. existingData := serverEntries.Get([]byte(serverEntry.IpAddress))
  208. if existingData != nil {
  209. var existingServerEntry *protocol.ServerEntry
  210. err := json.Unmarshal(existingData, &existingServerEntry)
  211. if err == nil {
  212. existingConfigurationVersion = existingServerEntry.ConfigurationVersion
  213. }
  214. }
  215. exists := existingConfigurationVersion > -1
  216. newer := exists && existingConfigurationVersion < serverEntry.ConfigurationVersion
  217. update := !exists || replaceIfExists || newer
  218. if !update {
  219. // Disabling this notice, for now, as it generates too much noise
  220. // in diagnostics with clients that always submit embedded servers
  221. // to the core on each run.
  222. // NoticeInfo("ignored update for server %s", serverEntry.IpAddress)
  223. return nil
  224. }
  225. data, err := json.Marshal(serverEntry)
  226. if err != nil {
  227. return common.ContextError(err)
  228. }
  229. err = serverEntries.Put([]byte(serverEntry.IpAddress), data)
  230. if err != nil {
  231. return common.ContextError(err)
  232. }
  233. err = insertRankedServerEntry(tx, serverEntry.IpAddress, 1)
  234. if err != nil {
  235. return common.ContextError(err)
  236. }
  237. NoticeInfo("updated server %s", serverEntry.IpAddress)
  238. return nil
  239. })
  240. if err != nil {
  241. return common.ContextError(err)
  242. }
  243. return nil
  244. }
  245. // StoreServerEntries stores a list of server entries.
  246. // There is an independent transaction for each entry insert/update.
  247. func StoreServerEntries(serverEntries []*protocol.ServerEntry, replaceIfExists bool) error {
  248. checkInitDataStore()
  249. for _, serverEntry := range serverEntries {
  250. err := StoreServerEntry(serverEntry, replaceIfExists)
  251. if err != nil {
  252. return common.ContextError(err)
  253. }
  254. }
  255. // Since there has possibly been a significant change in the server entries,
  256. // take this opportunity to update the available egress regions.
  257. ReportAvailableRegions()
  258. return nil
  259. }
  260. // StreamingStoreServerEntries stores a list of server entries.
  261. // There is an independent transaction for each entry insert/update.
  262. func StreamingStoreServerEntries(
  263. serverEntries *protocol.StreamingServerEntryDecoder, replaceIfExists bool) error {
  264. checkInitDataStore()
  265. // Note: both StreamingServerEntryDecoder.Next and StoreServerEntry
  266. // allocate temporary memory buffers for hex/JSON decoding/encoding,
  267. // so this isn't true constant-memory streaming (it depends on garbage
  268. // collection).
  269. for {
  270. serverEntry, err := serverEntries.Next()
  271. if err != nil {
  272. return common.ContextError(err)
  273. }
  274. if serverEntry == nil {
  275. // No more server entries
  276. break
  277. }
  278. err = StoreServerEntry(serverEntry, replaceIfExists)
  279. if err != nil {
  280. return common.ContextError(err)
  281. }
  282. }
  283. // Since there has possibly been a significant change in the server entries,
  284. // take this opportunity to update the available egress regions.
  285. ReportAvailableRegions()
  286. return nil
  287. }
  288. // PromoteServerEntry assigns the top rank (one more than current
  289. // max rank) to the specified server entry. Server candidates are
  290. // iterated in decending rank order, so this server entry will be
  291. // the first candidate in a subsequent tunnel establishment.
  292. func PromoteServerEntry(config *Config, ipAddress string) error {
  293. checkInitDataStore()
  294. err := singleton.db.Update(func(tx *bolt.Tx) error {
  295. // Ensure the corresponding entry exists before
  296. // inserting into rank.
  297. bucket := tx.Bucket([]byte(serverEntriesBucket))
  298. data := bucket.Get([]byte(ipAddress))
  299. if data == nil {
  300. NoticeAlert(
  301. "PromoteServerEntry: ignoring unknown server entry: %s",
  302. ipAddress)
  303. return nil
  304. }
  305. err := insertRankedServerEntry(tx, ipAddress, 0)
  306. if err != nil {
  307. return err
  308. }
  309. // Store the current server entry filter (e.g, region, etc.) that
  310. // was in use when the entry was promoted. This is used to detect
  311. // when the top ranked server entry was promoted under a different
  312. // filter.
  313. currentFilter, err := makeServerEntryFilterValue(config)
  314. if err != nil {
  315. return err
  316. }
  317. bucket = tx.Bucket([]byte(keyValueBucket))
  318. return bucket.Put([]byte(DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY), currentFilter)
  319. })
  320. if err != nil {
  321. return common.ContextError(err)
  322. }
  323. return nil
  324. }
  325. func makeServerEntryFilterValue(config *Config) ([]byte, error) {
  326. // Currently, only a change of EgressRegion will "break" server affinity.
  327. // If the tunnel protocol filter changes, any existing affinity server
  328. // either passes the new filter, or it will be skipped anyway.
  329. return []byte(config.EgressRegion), nil
  330. }
  331. func hasServerEntryFilterChanged(config *Config) (bool, error) {
  332. currentFilter, err := makeServerEntryFilterValue(config)
  333. if err != nil {
  334. return false, common.ContextError(err)
  335. }
  336. changed := false
  337. err = singleton.db.View(func(tx *bolt.Tx) error {
  338. // previousFilter will be nil not found (not previously
  339. // set) which will never match any current filter.
  340. bucket := tx.Bucket([]byte(keyValueBucket))
  341. previousFilter := bucket.Get([]byte(DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY))
  342. if bytes.Compare(previousFilter, currentFilter) != 0 {
  343. changed = true
  344. }
  345. return nil
  346. })
  347. if err != nil {
  348. return false, common.ContextError(err)
  349. }
  350. return changed, nil
  351. }
  352. func getRankedServerEntries(tx *bolt.Tx) ([]string, error) {
  353. bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
  354. data := bucket.Get([]byte(rankedServerEntriesKey))
  355. if data == nil {
  356. return []string{}, nil
  357. }
  358. rankedServerEntries := make([]string, 0)
  359. err := json.Unmarshal(data, &rankedServerEntries)
  360. if err != nil {
  361. return nil, common.ContextError(err)
  362. }
  363. return rankedServerEntries, nil
  364. }
  365. func setRankedServerEntries(tx *bolt.Tx, rankedServerEntries []string) error {
  366. data, err := json.Marshal(rankedServerEntries)
  367. if err != nil {
  368. return common.ContextError(err)
  369. }
  370. bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
  371. err = bucket.Put([]byte(rankedServerEntriesKey), data)
  372. if err != nil {
  373. return common.ContextError(err)
  374. }
  375. return nil
  376. }
  377. func insertRankedServerEntry(tx *bolt.Tx, serverEntryId string, position int) error {
  378. rankedServerEntries, err := getRankedServerEntries(tx)
  379. if err != nil {
  380. return common.ContextError(err)
  381. }
  382. // BoltDB implementation note:
  383. // For simplicity, we store the ranked server ids in an array serialized to
  384. // a single key value. To ensure this value doesn't grow without bound,
  385. // it's capped at rankedServerEntryCount. For now, this cap should be large
  386. // enough to meet the shuffleHeadLength = config.TunnelPoolSize criteria, for
  387. // any reasonable configuration of config.TunnelPoolSize.
  388. // Using: https://github.com/golang/go/wiki/SliceTricks
  389. // When serverEntryId is already ranked, remove it first to avoid duplicates
  390. for i, rankedServerEntryId := range rankedServerEntries {
  391. if rankedServerEntryId == serverEntryId {
  392. rankedServerEntries = append(
  393. rankedServerEntries[:i], rankedServerEntries[i+1:]...)
  394. break
  395. }
  396. }
  397. // SliceTricks insert, with length cap enforced
  398. if len(rankedServerEntries) < rankedServerEntryCount {
  399. rankedServerEntries = append(rankedServerEntries, "")
  400. }
  401. if position >= len(rankedServerEntries) {
  402. position = len(rankedServerEntries) - 1
  403. }
  404. copy(rankedServerEntries[position+1:], rankedServerEntries[position:])
  405. rankedServerEntries[position] = serverEntryId
  406. err = setRankedServerEntries(tx, rankedServerEntries)
  407. if err != nil {
  408. return common.ContextError(err)
  409. }
  410. return nil
  411. }
  412. // ServerEntryIterator is used to iterate over
  413. // stored server entries in rank order.
  414. type ServerEntryIterator struct {
  415. config *Config
  416. shuffleHeadLength int
  417. serverEntryIds []string
  418. serverEntryIndex int
  419. isTacticsServerEntryIterator bool
  420. isTargetServerEntryIterator bool
  421. hasNextTargetServerEntry bool
  422. targetServerEntry *protocol.ServerEntry
  423. }
  424. // NewServerEntryIterator creates a new ServerEntryIterator.
  425. //
  426. // The boolean return value indicates whether to treat the first server(s)
  427. // as affinity servers or not. When the server entry selection filter changes
  428. // such as from a specific region to any region, or when there was no previous
  429. // filter/iterator, the the first server(s) are arbitrary and should not be
  430. // given affinity treatment.
  431. //
  432. // NewServerEntryIterator and any returned ServerEntryIterator are not
  433. // designed for concurrent use as not all related datastore operations are
  434. // performed in a single transaction.
  435. //
  436. func NewServerEntryIterator(config *Config) (bool, *ServerEntryIterator, error) {
  437. // When configured, this target server entry is the only candidate
  438. if config.TargetServerEntry != "" {
  439. return newTargetServerEntryIterator(config, false)
  440. }
  441. checkInitDataStore()
  442. filterChanged, err := hasServerEntryFilterChanged(config)
  443. if err != nil {
  444. return false, nil, common.ContextError(err)
  445. }
  446. applyServerAffinity := !filterChanged
  447. iterator := &ServerEntryIterator{
  448. config: config,
  449. shuffleHeadLength: config.TunnelPoolSize,
  450. }
  451. err = iterator.Reset()
  452. if err != nil {
  453. return false, nil, common.ContextError(err)
  454. }
  455. return applyServerAffinity, iterator, nil
  456. }
  457. func NewTacticsServerEntryIterator(config *Config) (*ServerEntryIterator, error) {
  458. // When configured, this target server entry is the only candidate
  459. if config.TargetServerEntry != "" {
  460. _, iterator, err := newTargetServerEntryIterator(config, true)
  461. return iterator, err
  462. }
  463. checkInitDataStore()
  464. iterator := &ServerEntryIterator{
  465. shuffleHeadLength: 0,
  466. isTacticsServerEntryIterator: true,
  467. }
  468. err := iterator.Reset()
  469. if err != nil {
  470. return nil, common.ContextError(err)
  471. }
  472. return iterator, nil
  473. }
  474. // newTargetServerEntryIterator is a helper for initializing the TargetServerEntry case
  475. func newTargetServerEntryIterator(config *Config, isTactics bool) (bool, *ServerEntryIterator, error) {
  476. serverEntry, err := protocol.DecodeServerEntry(
  477. config.TargetServerEntry, common.GetCurrentTimestamp(), protocol.SERVER_ENTRY_SOURCE_TARGET)
  478. if err != nil {
  479. return false, nil, common.ContextError(err)
  480. }
  481. if isTactics {
  482. if len(serverEntry.GetSupportedTacticsProtocols()) == 0 {
  483. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support tactics protocols"))
  484. }
  485. } else {
  486. if config.EgressRegion != "" && serverEntry.Region != config.EgressRegion {
  487. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support EgressRegion"))
  488. }
  489. limitTunnelProtocols := config.clientParameters.Get().TunnelProtocols(parameters.LimitTunnelProtocols)
  490. if len(limitTunnelProtocols) > 0 {
  491. // At the ServerEntryIterator level, only limitTunnelProtocols is applied;
  492. // impairedTunnelProtocols and excludeMeek are handled higher up.
  493. if len(serverEntry.GetSupportedProtocols(limitTunnelProtocols, nil, false)) == 0 {
  494. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support LimitTunnelProtocols"))
  495. }
  496. }
  497. }
  498. iterator := &ServerEntryIterator{
  499. isTacticsServerEntryIterator: isTactics,
  500. isTargetServerEntryIterator: true,
  501. hasNextTargetServerEntry: true,
  502. targetServerEntry: serverEntry,
  503. }
  504. NoticeInfo("using TargetServerEntry: %s", serverEntry.IpAddress)
  505. return false, iterator, nil
  506. }
  507. // Reset a NewServerEntryIterator to the start of its cycle. The next
  508. // call to Next will return the first server entry.
  509. func (iterator *ServerEntryIterator) Reset() error {
  510. iterator.Close()
  511. if iterator.isTargetServerEntryIterator {
  512. iterator.hasNextTargetServerEntry = true
  513. return nil
  514. }
  515. // For diagnostics, it's useful to count the number of known server
  516. // entries that satisfy both the egress region and tunnel protocol
  517. // requirements. The tunnel protocol filter is not applied by the iterator
  518. // as protocol filtering, including impaire protocol and exclude-meek
  519. // logic, is all handled higher up.
  520. // TODO: for isTacticsServerEntryIterator, emit tactics candidate count.
  521. if !iterator.isTacticsServerEntryIterator {
  522. limitTunnelProtocols := iterator.config.clientParameters.Get().TunnelProtocols(
  523. parameters.LimitTunnelProtocols)
  524. count := CountServerEntries(iterator.config.EgressRegion, limitTunnelProtocols)
  525. NoticeCandidateServers(iterator.config.EgressRegion, limitTunnelProtocols, count)
  526. }
  527. // This query implements the Psiphon server candidate selection
  528. // algorithm: the first TunnelPoolSize server candidates are in rank
  529. // (priority) order, to favor previously successful servers; then the
  530. // remaining long tail is shuffled to raise up less recent candidates.
  531. // BoltDB implementation note:
  532. // We don't keep a transaction open for the duration of the iterator
  533. // because this would expose the following semantics to consumer code:
  534. //
  535. // Read-only transactions and read-write transactions ... generally
  536. // shouldn't be opened simultaneously in the same goroutine. This can
  537. // cause a deadlock as the read-write transaction needs to periodically
  538. // re-map the data file but it cannot do so while a read-only
  539. // transaction is open.
  540. // (https://github.com/boltdb/bolt)
  541. //
  542. // So the underlying serverEntriesBucket could change after the serverEntryIds
  543. // list is built.
  544. var serverEntryIds []string
  545. err := singleton.db.View(func(tx *bolt.Tx) error {
  546. var err error
  547. serverEntryIds, err = getRankedServerEntries(tx)
  548. if err != nil {
  549. return err
  550. }
  551. skipServerEntryIds := make(map[string]bool)
  552. for _, serverEntryId := range serverEntryIds {
  553. skipServerEntryIds[serverEntryId] = true
  554. }
  555. bucket := tx.Bucket([]byte(serverEntriesBucket))
  556. cursor := bucket.Cursor()
  557. for key, _ := cursor.Last(); key != nil; key, _ = cursor.Prev() {
  558. serverEntryId := string(key)
  559. if _, ok := skipServerEntryIds[serverEntryId]; ok {
  560. continue
  561. }
  562. serverEntryIds = append(serverEntryIds, serverEntryId)
  563. }
  564. return nil
  565. })
  566. if err != nil {
  567. return common.ContextError(err)
  568. }
  569. for i := len(serverEntryIds) - 1; i > iterator.shuffleHeadLength-1; i-- {
  570. j := rand.Intn(i+1-iterator.shuffleHeadLength) + iterator.shuffleHeadLength
  571. serverEntryIds[i], serverEntryIds[j] = serverEntryIds[j], serverEntryIds[i]
  572. }
  573. iterator.serverEntryIds = serverEntryIds
  574. iterator.serverEntryIndex = 0
  575. return nil
  576. }
  577. // Close cleans up resources associated with a ServerEntryIterator.
  578. func (iterator *ServerEntryIterator) Close() {
  579. iterator.serverEntryIds = nil
  580. iterator.serverEntryIndex = 0
  581. }
  582. // Next returns the next server entry, by rank, for a ServerEntryIterator.
  583. // Returns nil with no error when there is no next item.
  584. func (iterator *ServerEntryIterator) Next() (*protocol.ServerEntry, error) {
  585. var err error
  586. var serverEntry *protocol.ServerEntry
  587. defer func() {
  588. if err != nil {
  589. iterator.Close()
  590. }
  591. }()
  592. if iterator.isTargetServerEntryIterator {
  593. if iterator.hasNextTargetServerEntry {
  594. iterator.hasNextTargetServerEntry = false
  595. return MakeCompatibleServerEntry(iterator.targetServerEntry), nil
  596. }
  597. return nil, nil
  598. }
  599. // There are no region/protocol indexes for the server entries bucket.
  600. // Loop until we have the next server entry that matches the iterator
  601. // filter requirements.
  602. for {
  603. if iterator.serverEntryIndex >= len(iterator.serverEntryIds) {
  604. // There is no next item
  605. return nil, nil
  606. }
  607. serverEntryId := iterator.serverEntryIds[iterator.serverEntryIndex]
  608. iterator.serverEntryIndex += 1
  609. var data []byte
  610. err = singleton.db.View(func(tx *bolt.Tx) error {
  611. bucket := tx.Bucket([]byte(serverEntriesBucket))
  612. value := bucket.Get([]byte(serverEntryId))
  613. if value != nil {
  614. // Must make a copy as slice is only valid within transaction.
  615. data = make([]byte, len(value))
  616. copy(data, value)
  617. }
  618. return nil
  619. })
  620. if err != nil {
  621. return nil, common.ContextError(err)
  622. }
  623. if data == nil {
  624. // In case of data corruption or a bug causing this condition,
  625. // do not stop iterating.
  626. NoticeAlert("ServerEntryIterator.Next: unexpected missing server entry: %s", serverEntryId)
  627. continue
  628. }
  629. err = json.Unmarshal(data, &serverEntry)
  630. if err != nil {
  631. // In case of data corruption or a bug causing this condition,
  632. // do not stop iterating.
  633. NoticeAlert("ServerEntryIterator.Next: %s", common.ContextError(err))
  634. continue
  635. }
  636. // Check filter requirements
  637. if iterator.isTacticsServerEntryIterator {
  638. // Tactics doesn't filter by egress region.
  639. if len(serverEntry.GetSupportedTacticsProtocols()) > 0 {
  640. break
  641. }
  642. } else {
  643. if iterator.config.EgressRegion == "" ||
  644. serverEntry.Region == iterator.config.EgressRegion {
  645. break
  646. }
  647. }
  648. }
  649. return MakeCompatibleServerEntry(serverEntry), nil
  650. }
  651. // MakeCompatibleServerEntry provides backwards compatibility with old server entries
  652. // which have a single meekFrontingDomain and not a meekFrontingAddresses array.
  653. // By copying this one meekFrontingDomain into meekFrontingAddresses, this client effectively
  654. // uses that single value as legacy clients do.
  655. func MakeCompatibleServerEntry(serverEntry *protocol.ServerEntry) *protocol.ServerEntry {
  656. if len(serverEntry.MeekFrontingAddresses) == 0 && serverEntry.MeekFrontingDomain != "" {
  657. serverEntry.MeekFrontingAddresses =
  658. append(serverEntry.MeekFrontingAddresses, serverEntry.MeekFrontingDomain)
  659. }
  660. return serverEntry
  661. }
  662. func scanServerEntries(scanner func(*protocol.ServerEntry)) error {
  663. err := singleton.db.View(func(tx *bolt.Tx) error {
  664. bucket := tx.Bucket([]byte(serverEntriesBucket))
  665. cursor := bucket.Cursor()
  666. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  667. serverEntry := new(protocol.ServerEntry)
  668. err := json.Unmarshal(value, serverEntry)
  669. if err != nil {
  670. // In case of data corruption or a bug causing this condition,
  671. // do not stop iterating.
  672. NoticeAlert("scanServerEntries: %s", common.ContextError(err))
  673. continue
  674. }
  675. scanner(serverEntry)
  676. }
  677. return nil
  678. })
  679. if err != nil {
  680. return common.ContextError(err)
  681. }
  682. return nil
  683. }
  684. // CountServerEntries returns a count of stored servers for the
  685. // specified region and tunnel protocols.
  686. func CountServerEntries(region string, tunnelProtocols []string) int {
  687. checkInitDataStore()
  688. count := 0
  689. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  690. if (region == "" || serverEntry.Region == region) &&
  691. (len(tunnelProtocols) == 0 ||
  692. // When CountServerEntries is called only limitTunnelProtocols is known;
  693. // impairedTunnelProtocols and excludeMeek may not apply.
  694. len(serverEntry.GetSupportedProtocols(tunnelProtocols, nil, false)) > 0) {
  695. count += 1
  696. }
  697. })
  698. if err != nil {
  699. NoticeAlert("CountServerEntries failed: %s", err)
  700. return 0
  701. }
  702. return count
  703. }
  704. // CountNonImpairedProtocols returns the number of distinct tunnel
  705. // protocols supported by stored server entries, excluding the
  706. // specified impaired protocols.
  707. func CountNonImpairedProtocols(
  708. region string,
  709. limitTunnelProtocols, impairedProtocols []string) int {
  710. checkInitDataStore()
  711. distinctProtocols := make(map[string]bool)
  712. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  713. if region == "" || serverEntry.Region == region {
  714. for _, protocol := range protocol.SupportedTunnelProtocols {
  715. if serverEntry.SupportsProtocol(protocol) {
  716. if len(limitTunnelProtocols) == 0 ||
  717. common.Contains(limitTunnelProtocols, protocol) {
  718. distinctProtocols[protocol] = true
  719. }
  720. }
  721. }
  722. }
  723. })
  724. for _, protocol := range impairedProtocols {
  725. delete(distinctProtocols, protocol)
  726. }
  727. if err != nil {
  728. NoticeAlert("CountNonImpairedProtocols failed: %s", err)
  729. return 0
  730. }
  731. return len(distinctProtocols)
  732. }
  733. // ReportAvailableRegions prints a notice with the available egress regions.
  734. // Note that this report ignores LimitTunnelProtocols.
  735. func ReportAvailableRegions() {
  736. checkInitDataStore()
  737. regions := make(map[string]bool)
  738. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  739. regions[serverEntry.Region] = true
  740. })
  741. if err != nil {
  742. NoticeAlert("ReportAvailableRegions failed: %s", err)
  743. return
  744. }
  745. regionList := make([]string, 0, len(regions))
  746. for region := range regions {
  747. // Some server entries do not have a region, but it makes no sense to return
  748. // an empty string as an "available region".
  749. if region != "" {
  750. regionList = append(regionList, region)
  751. }
  752. }
  753. NoticeAvailableEgressRegions(regionList)
  754. }
  755. // GetServerEntryIpAddresses returns an array containing
  756. // all stored server IP addresses.
  757. func GetServerEntryIpAddresses() (ipAddresses []string, err error) {
  758. checkInitDataStore()
  759. ipAddresses = make([]string, 0)
  760. err = scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  761. ipAddresses = append(ipAddresses, serverEntry.IpAddress)
  762. })
  763. if err != nil {
  764. return nil, common.ContextError(err)
  765. }
  766. return ipAddresses, nil
  767. }
  768. // SetSplitTunnelRoutes updates the cached routes data for
  769. // the given region. The associated etag is also stored and
  770. // used to make efficient web requests for updates to the data.
  771. func SetSplitTunnelRoutes(region, etag string, data []byte) error {
  772. checkInitDataStore()
  773. err := singleton.db.Update(func(tx *bolt.Tx) error {
  774. bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
  775. err := bucket.Put([]byte(region), []byte(etag))
  776. bucket = tx.Bucket([]byte(splitTunnelRouteDataBucket))
  777. err = bucket.Put([]byte(region), data)
  778. return err
  779. })
  780. if err != nil {
  781. return common.ContextError(err)
  782. }
  783. return nil
  784. }
  785. // GetSplitTunnelRoutesETag retrieves the etag for cached routes
  786. // data for the specified region. If not found, it returns an empty string value.
  787. func GetSplitTunnelRoutesETag(region string) (etag string, err error) {
  788. checkInitDataStore()
  789. err = singleton.db.View(func(tx *bolt.Tx) error {
  790. bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
  791. etag = string(bucket.Get([]byte(region)))
  792. return nil
  793. })
  794. if err != nil {
  795. return "", common.ContextError(err)
  796. }
  797. return etag, nil
  798. }
  799. // GetSplitTunnelRoutesData retrieves the cached routes data
  800. // for the specified region. If not found, it returns a nil value.
  801. func GetSplitTunnelRoutesData(region string) (data []byte, err error) {
  802. checkInitDataStore()
  803. err = singleton.db.View(func(tx *bolt.Tx) error {
  804. bucket := tx.Bucket([]byte(splitTunnelRouteDataBucket))
  805. value := bucket.Get([]byte(region))
  806. if value != nil {
  807. // Must make a copy as slice is only valid within transaction.
  808. data = make([]byte, len(value))
  809. copy(data, value)
  810. }
  811. return nil
  812. })
  813. if err != nil {
  814. return nil, common.ContextError(err)
  815. }
  816. return data, nil
  817. }
  818. // SetUrlETag stores an ETag for the specfied URL.
  819. // Note: input URL is treated as a string, and is not
  820. // encoded or decoded or otherwise canonicalized.
  821. func SetUrlETag(url, etag string) error {
  822. checkInitDataStore()
  823. err := singleton.db.Update(func(tx *bolt.Tx) error {
  824. bucket := tx.Bucket([]byte(urlETagsBucket))
  825. err := bucket.Put([]byte(url), []byte(etag))
  826. return err
  827. })
  828. if err != nil {
  829. return common.ContextError(err)
  830. }
  831. return nil
  832. }
  833. // GetUrlETag retrieves a previously stored an ETag for the
  834. // specfied URL. If not found, it returns an empty string value.
  835. func GetUrlETag(url string) (etag string, err error) {
  836. checkInitDataStore()
  837. err = singleton.db.View(func(tx *bolt.Tx) error {
  838. bucket := tx.Bucket([]byte(urlETagsBucket))
  839. etag = string(bucket.Get([]byte(url)))
  840. return nil
  841. })
  842. if err != nil {
  843. return "", common.ContextError(err)
  844. }
  845. return etag, nil
  846. }
  847. // SetKeyValue stores a key/value pair.
  848. func SetKeyValue(key, value string) error {
  849. checkInitDataStore()
  850. err := singleton.db.Update(func(tx *bolt.Tx) error {
  851. bucket := tx.Bucket([]byte(keyValueBucket))
  852. err := bucket.Put([]byte(key), []byte(value))
  853. return err
  854. })
  855. if err != nil {
  856. return common.ContextError(err)
  857. }
  858. return nil
  859. }
  860. // GetKeyValue retrieves the value for a given key. If not found,
  861. // it returns an empty string value.
  862. func GetKeyValue(key string) (value string, err error) {
  863. checkInitDataStore()
  864. err = singleton.db.View(func(tx *bolt.Tx) error {
  865. bucket := tx.Bucket([]byte(keyValueBucket))
  866. value = string(bucket.Get([]byte(key)))
  867. return nil
  868. })
  869. if err != nil {
  870. return "", common.ContextError(err)
  871. }
  872. return value, nil
  873. }
  874. // Persistent stat records in the persistentStatStateUnreported
  875. // state are available for take out.
  876. //
  877. // Records in the persistentStatStateReporting have been taken
  878. // out and are pending either deletion (for a successful request)
  879. // or change to StateUnreported (for a failed request).
  880. //
  881. // All persistent stat records are reverted to StateUnreported
  882. // when the datastore is initialized at start up.
  883. var persistentStatStateUnreported = []byte("0")
  884. var persistentStatStateReporting = []byte("1")
  885. var persistentStatTypes = []string{
  886. PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST,
  887. }
  888. // StorePersistentStat adds a new persistent stat record, which
  889. // is set to StateUnreported and is an immediate candidate for
  890. // reporting.
  891. //
  892. // The stat is a JSON byte array containing fields as
  893. // required by the Psiphon server API. It's assumed that the
  894. // JSON value contains enough unique information for the value to
  895. // function as a key in the key/value datastore. This assumption
  896. // is currently satisfied by the fields sessionId + tunnelNumber
  897. // for tunnel stats, and URL + ETag for remote server list stats.
  898. func StorePersistentStat(statType string, stat []byte) error {
  899. checkInitDataStore()
  900. if !common.Contains(persistentStatTypes, statType) {
  901. return common.ContextError(fmt.Errorf("invalid persistent stat type: %s", statType))
  902. }
  903. err := singleton.db.Update(func(tx *bolt.Tx) error {
  904. bucket := tx.Bucket([]byte(statType))
  905. err := bucket.Put(stat, persistentStatStateUnreported)
  906. return err
  907. })
  908. if err != nil {
  909. return common.ContextError(err)
  910. }
  911. return nil
  912. }
  913. // CountUnreportedPersistentStats returns the number of persistent
  914. // stat records in StateUnreported.
  915. func CountUnreportedPersistentStats() int {
  916. checkInitDataStore()
  917. unreported := 0
  918. err := singleton.db.View(func(tx *bolt.Tx) error {
  919. for _, statType := range persistentStatTypes {
  920. bucket := tx.Bucket([]byte(statType))
  921. cursor := bucket.Cursor()
  922. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  923. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  924. unreported++
  925. break
  926. }
  927. }
  928. }
  929. return nil
  930. })
  931. if err != nil {
  932. NoticeAlert("CountUnreportedPersistentStats failed: %s", err)
  933. return 0
  934. }
  935. return unreported
  936. }
  937. // TakeOutUnreportedPersistentStats returns up to maxCount persistent
  938. // stats records that are in StateUnreported. The records are set to
  939. // StateReporting. If the records are successfully reported, clear them
  940. // with ClearReportedPersistentStats. If the records are not successfully
  941. // reported, restore them with PutBackUnreportedPersistentStats.
  942. func TakeOutUnreportedPersistentStats(maxCount int) (map[string][][]byte, error) {
  943. checkInitDataStore()
  944. stats := make(map[string][][]byte)
  945. err := singleton.db.Update(func(tx *bolt.Tx) error {
  946. count := 0
  947. for _, statType := range persistentStatTypes {
  948. bucket := tx.Bucket([]byte(statType))
  949. cursor := bucket.Cursor()
  950. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  951. if count >= maxCount {
  952. break
  953. }
  954. // Perform a test JSON unmarshaling. In case of data corruption or a bug,
  955. // skip the record.
  956. var jsonData interface{}
  957. err := json.Unmarshal(key, &jsonData)
  958. if err != nil {
  959. NoticeAlert(
  960. "Invalid key in TakeOutUnreportedPersistentStats: %s: %s",
  961. string(key), err)
  962. continue
  963. }
  964. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  965. // Must make a copy as slice is only valid within transaction.
  966. data := make([]byte, len(key))
  967. copy(data, key)
  968. if stats[statType] == nil {
  969. stats[statType] = make([][]byte, 0)
  970. }
  971. stats[statType] = append(stats[statType], data)
  972. count += 1
  973. }
  974. }
  975. for _, key := range stats[statType] {
  976. err := bucket.Put(key, persistentStatStateReporting)
  977. if err != nil {
  978. return err
  979. }
  980. }
  981. }
  982. return nil
  983. })
  984. if err != nil {
  985. return nil, common.ContextError(err)
  986. }
  987. return stats, nil
  988. }
  989. // PutBackUnreportedPersistentStats restores a list of persistent
  990. // stat records to StateUnreported.
  991. func PutBackUnreportedPersistentStats(stats map[string][][]byte) error {
  992. checkInitDataStore()
  993. err := singleton.db.Update(func(tx *bolt.Tx) error {
  994. for _, statType := range persistentStatTypes {
  995. bucket := tx.Bucket([]byte(statType))
  996. for _, key := range stats[statType] {
  997. err := bucket.Put(key, persistentStatStateUnreported)
  998. if err != nil {
  999. return err
  1000. }
  1001. }
  1002. }
  1003. return nil
  1004. })
  1005. if err != nil {
  1006. return common.ContextError(err)
  1007. }
  1008. return nil
  1009. }
  1010. // ClearReportedPersistentStats deletes a list of persistent
  1011. // stat records that were successfully reported.
  1012. func ClearReportedPersistentStats(stats map[string][][]byte) error {
  1013. checkInitDataStore()
  1014. err := singleton.db.Update(func(tx *bolt.Tx) error {
  1015. for _, statType := range persistentStatTypes {
  1016. bucket := tx.Bucket([]byte(statType))
  1017. for _, key := range stats[statType] {
  1018. err := bucket.Delete(key)
  1019. if err != nil {
  1020. return err
  1021. }
  1022. }
  1023. }
  1024. return nil
  1025. })
  1026. if err != nil {
  1027. return common.ContextError(err)
  1028. }
  1029. return nil
  1030. }
  1031. // resetAllPersistentStatsToUnreported sets all persistent stat
  1032. // records to StateUnreported. This reset is called when the
  1033. // datastore is initialized at start up, as we do not know if
  1034. // persistent records in StateReporting were reported or not.
  1035. func resetAllPersistentStatsToUnreported() error {
  1036. checkInitDataStore()
  1037. err := singleton.db.Update(func(tx *bolt.Tx) error {
  1038. for _, statType := range persistentStatTypes {
  1039. bucket := tx.Bucket([]byte(statType))
  1040. resetKeys := make([][]byte, 0)
  1041. cursor := bucket.Cursor()
  1042. for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
  1043. resetKeys = append(resetKeys, key)
  1044. }
  1045. // TODO: data mutation is done outside cursor. Is this
  1046. // strictly necessary in this case? As is, this means
  1047. // all stats need to be loaded into memory at once.
  1048. // https://godoc.org/github.com/boltdb/bolt#Cursor
  1049. for _, key := range resetKeys {
  1050. err := bucket.Put(key, persistentStatStateUnreported)
  1051. if err != nil {
  1052. return err
  1053. }
  1054. }
  1055. }
  1056. return nil
  1057. })
  1058. if err != nil {
  1059. return common.ContextError(err)
  1060. }
  1061. return nil
  1062. }
  1063. // CountSLOKs returns the total number of SLOK records.
  1064. func CountSLOKs() int {
  1065. checkInitDataStore()
  1066. count := 0
  1067. err := singleton.db.View(func(tx *bolt.Tx) error {
  1068. bucket := tx.Bucket([]byte(slokBucket))
  1069. cursor := bucket.Cursor()
  1070. for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
  1071. count++
  1072. }
  1073. return nil
  1074. })
  1075. if err != nil {
  1076. NoticeAlert("CountSLOKs failed: %s", err)
  1077. return 0
  1078. }
  1079. return count
  1080. }
  1081. // DeleteSLOKs deletes all SLOK records.
  1082. func DeleteSLOKs() error {
  1083. checkInitDataStore()
  1084. err := singleton.db.Update(func(tx *bolt.Tx) error {
  1085. bucket := tx.Bucket([]byte(slokBucket))
  1086. return bucket.ForEach(
  1087. func(id, _ []byte) error {
  1088. return bucket.Delete(id)
  1089. })
  1090. })
  1091. if err != nil {
  1092. return common.ContextError(err)
  1093. }
  1094. return nil
  1095. }
  1096. // SetSLOK stores a SLOK key, referenced by its ID. The bool
  1097. // return value indicates whether the SLOK was already stored.
  1098. func SetSLOK(id, key []byte) (bool, error) {
  1099. checkInitDataStore()
  1100. var duplicate bool
  1101. err := singleton.db.Update(func(tx *bolt.Tx) error {
  1102. bucket := tx.Bucket([]byte(slokBucket))
  1103. duplicate = bucket.Get(id) != nil
  1104. err := bucket.Put([]byte(id), []byte(key))
  1105. return err
  1106. })
  1107. if err != nil {
  1108. return false, common.ContextError(err)
  1109. }
  1110. return duplicate, nil
  1111. }
  1112. // GetSLOK returns a SLOK key for the specified ID. The return
  1113. // value is nil if the SLOK is not found.
  1114. func GetSLOK(id []byte) (key []byte, err error) {
  1115. checkInitDataStore()
  1116. err = singleton.db.View(func(tx *bolt.Tx) error {
  1117. bucket := tx.Bucket([]byte(slokBucket))
  1118. key = bucket.Get(id)
  1119. return nil
  1120. })
  1121. if err != nil {
  1122. return nil, common.ContextError(err)
  1123. }
  1124. return key, nil
  1125. }
  1126. // TacticsStorer implements tactics.Storer.
  1127. type TacticsStorer struct {
  1128. }
  1129. func (t *TacticsStorer) SetTacticsRecord(networkID string, record []byte) error {
  1130. return setBucketValue([]byte(tacticsBucket), []byte(networkID), record)
  1131. }
  1132. func (t *TacticsStorer) GetTacticsRecord(networkID string) ([]byte, error) {
  1133. return getBucketValue([]byte(tacticsBucket), []byte(networkID))
  1134. }
  1135. func (t *TacticsStorer) SetSpeedTestSamplesRecord(networkID string, record []byte) error {
  1136. return setBucketValue([]byte(speedTestSamplesBucket), []byte(networkID), record)
  1137. }
  1138. func (t *TacticsStorer) GetSpeedTestSamplesRecord(networkID string) ([]byte, error) {
  1139. return getBucketValue([]byte(speedTestSamplesBucket), []byte(networkID))
  1140. }
  1141. // GetTacticsStorer creates a TacticsStorer.
  1142. func GetTacticsStorer() *TacticsStorer {
  1143. return &TacticsStorer{}
  1144. }
  1145. func setBucketValue(bucket, key, value []byte) error {
  1146. checkInitDataStore()
  1147. err := singleton.db.Update(func(tx *bolt.Tx) error {
  1148. bucket := tx.Bucket(bucket)
  1149. err := bucket.Put(key, value)
  1150. return err
  1151. })
  1152. if err != nil {
  1153. return common.ContextError(err)
  1154. }
  1155. return nil
  1156. }
  1157. func getBucketValue(bucket, key []byte) (value []byte, err error) {
  1158. checkInitDataStore()
  1159. err = singleton.db.View(func(tx *bolt.Tx) error {
  1160. bucket := tx.Bucket(bucket)
  1161. value = bucket.Get(key)
  1162. return nil
  1163. })
  1164. if err != nil {
  1165. return nil, common.ContextError(err)
  1166. }
  1167. return value, nil
  1168. }