dataStore.go 31 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "encoding/json"
  23. "errors"
  24. "fmt"
  25. "math/rand"
  26. "os"
  27. "path/filepath"
  28. "strings"
  29. "sync"
  30. "time"
  31. "github.com/Psiphon-Inc/bolt"
  32. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  33. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  34. )
  35. // The BoltDB dataStore implementation is an alternative to the sqlite3-based
  36. // implementation in dataStore.go. Both implementations have the same interface.
  37. //
  38. // BoltDB is pure Go, and is intended to be used in cases where we have trouble
  39. // building sqlite3/CGO (e.g., currently go mobile due to
  40. // https://github.com/mattn/go-sqlite3/issues/201), and perhaps ultimately as
  41. // the primary dataStore implementation.
  42. //
  43. type dataStore struct {
  44. init sync.Once
  45. db *bolt.DB
  46. }
  47. const (
  48. serverEntriesBucket = "serverEntries"
  49. rankedServerEntriesBucket = "rankedServerEntries"
  50. rankedServerEntriesKey = "rankedServerEntries"
  51. splitTunnelRouteETagsBucket = "splitTunnelRouteETags"
  52. splitTunnelRouteDataBucket = "splitTunnelRouteData"
  53. urlETagsBucket = "urlETags"
  54. keyValueBucket = "keyValues"
  55. tunnelStatsBucket = "tunnelStats"
  56. slokBucket = "SLOKs"
  57. rankedServerEntryCount = 100
  58. )
  59. var singleton dataStore
  60. // InitDataStore initializes the singleton instance of dataStore. This
  61. // function uses a sync.Once and is safe for use by concurrent goroutines.
  62. // The underlying sql.DB connection pool is also safe.
  63. //
  64. // Note: the sync.Once was more useful when initDataStore was private and
  65. // called on-demand by the public functions below. Now we require an explicit
  66. // InitDataStore() call with the filename passed in. The on-demand calls
  67. // have been replaced by checkInitDataStore() to assert that Init was called.
  68. func InitDataStore(config *Config) (err error) {
  69. singleton.init.Do(func() {
  70. // Need to gather the list of migratable server entries before
  71. // initializing the boltdb store (as prepareMigrationEntries
  72. // checks for the existence of the bolt db file)
  73. migratableServerEntries := prepareMigrationEntries(config)
  74. filename := filepath.Join(config.DataStoreDirectory, DATA_STORE_FILENAME)
  75. var db *bolt.DB
  76. db, err = bolt.Open(filename, 0600, &bolt.Options{Timeout: 1 * time.Second})
  77. // The datastore file may be corrupt, so attempt to delete and try again
  78. if err != nil {
  79. NoticeAlert("retry on initDataStore error: %s", err)
  80. os.Remove(filename)
  81. db, err = bolt.Open(filename, 0600, &bolt.Options{Timeout: 1 * time.Second})
  82. }
  83. if err != nil {
  84. // Note: intending to set the err return value for InitDataStore
  85. err = fmt.Errorf("initDataStore failed to open database: %s", err)
  86. return
  87. }
  88. err = db.Update(func(tx *bolt.Tx) error {
  89. requiredBuckets := []string{
  90. serverEntriesBucket,
  91. rankedServerEntriesBucket,
  92. splitTunnelRouteETagsBucket,
  93. splitTunnelRouteDataBucket,
  94. urlETagsBucket,
  95. keyValueBucket,
  96. tunnelStatsBucket,
  97. slokBucket,
  98. }
  99. for _, bucket := range requiredBuckets {
  100. _, err := tx.CreateBucketIfNotExists([]byte(bucket))
  101. if err != nil {
  102. return err
  103. }
  104. }
  105. return nil
  106. })
  107. if err != nil {
  108. err = fmt.Errorf("initDataStore failed to create buckets: %s", err)
  109. return
  110. }
  111. // Run consistency checks on datastore and emit errors for diagnostics purposes
  112. // We assume this will complete quickly for typical size Psiphon datastores.
  113. db.View(func(tx *bolt.Tx) error {
  114. err := <-tx.Check()
  115. if err != nil {
  116. NoticeAlert("boltdb Check(): %s", err)
  117. }
  118. return nil
  119. })
  120. singleton.db = db
  121. // The migrateServerEntries function requires the data store is
  122. // initialized prior to execution so that migrated entries can be stored
  123. if len(migratableServerEntries) > 0 {
  124. migrateEntries(migratableServerEntries, filepath.Join(config.DataStoreDirectory, LEGACY_DATA_STORE_FILENAME))
  125. }
  126. resetAllTunnelStatsToUnreported()
  127. })
  128. return err
  129. }
  130. func checkInitDataStore() {
  131. if singleton.db == nil {
  132. panic("checkInitDataStore: datastore not initialized")
  133. }
  134. }
  135. // StoreServerEntry adds the server entry to the data store.
  136. // A newly stored (or re-stored) server entry is assigned the next-to-top
  137. // rank for iteration order (the previous top ranked entry is promoted). The
  138. // purpose of inserting at next-to-top is to keep the last selected server
  139. // as the top ranked server.
  140. // When replaceIfExists is true, an existing server entry record is
  141. // overwritten; otherwise, the existing record is unchanged.
  142. // If the server entry data is malformed, an alert notice is issued and
  143. // the entry is skipped; no error is returned.
  144. func StoreServerEntry(serverEntry *ServerEntry, replaceIfExists bool) error {
  145. checkInitDataStore()
  146. // Server entries should already be validated before this point,
  147. // so instead of skipping we fail with an error.
  148. err := ValidateServerEntry(serverEntry)
  149. if err != nil {
  150. return common.ContextError(errors.New("invalid server entry"))
  151. }
  152. // BoltDB implementation note:
  153. // For simplicity, we don't maintain indexes on server entry
  154. // region or supported protocols. Instead, we perform full-bucket
  155. // scans with a filter. With a small enough database (thousands or
  156. // even tens of thousand of server entries) and common enough
  157. // values (e.g., many servers support all protocols), performance
  158. // is expected to be acceptable.
  159. err = singleton.db.Update(func(tx *bolt.Tx) error {
  160. serverEntries := tx.Bucket([]byte(serverEntriesBucket))
  161. // Check not only that the entry exists, but is valid. This
  162. // will replace in the rare case where the data is corrupt.
  163. existingServerEntryValid := false
  164. existingData := serverEntries.Get([]byte(serverEntry.IpAddress))
  165. if existingData != nil {
  166. existingServerEntry := new(ServerEntry)
  167. if json.Unmarshal(existingData, existingServerEntry) == nil {
  168. existingServerEntryValid = true
  169. }
  170. }
  171. if existingServerEntryValid && !replaceIfExists {
  172. // Disabling this notice, for now, as it generates too much noise
  173. // in diagnostics with clients that always submit embedded servers
  174. // to the core on each run.
  175. // NoticeInfo("ignored update for server %s", serverEntry.IpAddress)
  176. return nil
  177. }
  178. data, err := json.Marshal(serverEntry)
  179. if err != nil {
  180. return common.ContextError(err)
  181. }
  182. err = serverEntries.Put([]byte(serverEntry.IpAddress), data)
  183. if err != nil {
  184. return common.ContextError(err)
  185. }
  186. err = insertRankedServerEntry(tx, serverEntry.IpAddress, 1)
  187. if err != nil {
  188. return common.ContextError(err)
  189. }
  190. NoticeInfo("updated server %s", serverEntry.IpAddress)
  191. return nil
  192. })
  193. if err != nil {
  194. return common.ContextError(err)
  195. }
  196. return nil
  197. }
  198. // StoreServerEntries shuffles and stores a list of server entries.
  199. // Shuffling is performed on imported server entrues as part of client-side
  200. // load balancing.
  201. // There is an independent transaction for each entry insert/update.
  202. func StoreServerEntries(serverEntries []*ServerEntry, replaceIfExists bool) error {
  203. checkInitDataStore()
  204. for index := len(serverEntries) - 1; index > 0; index-- {
  205. swapIndex := rand.Intn(index + 1)
  206. serverEntries[index], serverEntries[swapIndex] = serverEntries[swapIndex], serverEntries[index]
  207. }
  208. for _, serverEntry := range serverEntries {
  209. err := StoreServerEntry(serverEntry, replaceIfExists)
  210. if err != nil {
  211. return common.ContextError(err)
  212. }
  213. }
  214. // Since there has possibly been a significant change in the server entries,
  215. // take this opportunity to update the available egress regions.
  216. ReportAvailableRegions()
  217. return nil
  218. }
  219. // PromoteServerEntry assigns the top rank (one more than current
  220. // max rank) to the specified server entry. Server candidates are
  221. // iterated in decending rank order, so this server entry will be
  222. // the first candidate in a subsequent tunnel establishment.
  223. func PromoteServerEntry(ipAddress string) error {
  224. checkInitDataStore()
  225. err := singleton.db.Update(func(tx *bolt.Tx) error {
  226. // Ensure the corresponding entry exists before
  227. // inserting into rank.
  228. bucket := tx.Bucket([]byte(serverEntriesBucket))
  229. data := bucket.Get([]byte(ipAddress))
  230. if data == nil {
  231. NoticeAlert(
  232. "PromoteServerEntry: ignoring unknown server entry: %s",
  233. ipAddress)
  234. return nil
  235. }
  236. return insertRankedServerEntry(tx, ipAddress, 0)
  237. })
  238. if err != nil {
  239. return common.ContextError(err)
  240. }
  241. return nil
  242. }
  243. func getRankedServerEntries(tx *bolt.Tx) ([]string, error) {
  244. bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
  245. data := bucket.Get([]byte(rankedServerEntriesKey))
  246. if data == nil {
  247. return []string{}, nil
  248. }
  249. rankedServerEntries := make([]string, 0)
  250. err := json.Unmarshal(data, &rankedServerEntries)
  251. if err != nil {
  252. return nil, common.ContextError(err)
  253. }
  254. return rankedServerEntries, nil
  255. }
  256. func setRankedServerEntries(tx *bolt.Tx, rankedServerEntries []string) error {
  257. data, err := json.Marshal(rankedServerEntries)
  258. if err != nil {
  259. return common.ContextError(err)
  260. }
  261. bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
  262. err = bucket.Put([]byte(rankedServerEntriesKey), data)
  263. if err != nil {
  264. return common.ContextError(err)
  265. }
  266. return nil
  267. }
  268. func insertRankedServerEntry(tx *bolt.Tx, serverEntryId string, position int) error {
  269. rankedServerEntries, err := getRankedServerEntries(tx)
  270. if err != nil {
  271. return common.ContextError(err)
  272. }
  273. // BoltDB implementation note:
  274. // For simplicity, we store the ranked server ids in an array serialized to
  275. // a single key value. To ensure this value doesn't grow without bound,
  276. // it's capped at rankedServerEntryCount. For now, this cap should be large
  277. // enough to meet the shuffleHeadLength = config.TunnelPoolSize criteria, for
  278. // any reasonable configuration of config.TunnelPoolSize.
  279. // Using: https://github.com/golang/go/wiki/SliceTricks
  280. // When serverEntryId is already ranked, remove it first to avoid duplicates
  281. for i, rankedServerEntryId := range rankedServerEntries {
  282. if rankedServerEntryId == serverEntryId {
  283. rankedServerEntries = append(
  284. rankedServerEntries[:i], rankedServerEntries[i+1:]...)
  285. break
  286. }
  287. }
  288. // SliceTricks insert, with length cap enforced
  289. if len(rankedServerEntries) < rankedServerEntryCount {
  290. rankedServerEntries = append(rankedServerEntries, "")
  291. }
  292. if position >= len(rankedServerEntries) {
  293. position = len(rankedServerEntries) - 1
  294. }
  295. copy(rankedServerEntries[position+1:], rankedServerEntries[position:])
  296. rankedServerEntries[position] = serverEntryId
  297. err = setRankedServerEntries(tx, rankedServerEntries)
  298. if err != nil {
  299. return common.ContextError(err)
  300. }
  301. return nil
  302. }
  303. func serverEntrySupportsProtocol(serverEntry *ServerEntry, protocol string) bool {
  304. // Note: for meek, the capabilities are FRONTED-MEEK and UNFRONTED-MEEK
  305. // and the additonal OSSH service is assumed to be available internally.
  306. requiredCapability := strings.TrimSuffix(protocol, "-OSSH")
  307. return common.Contains(serverEntry.Capabilities, requiredCapability)
  308. }
  309. // ServerEntryIterator is used to iterate over
  310. // stored server entries in rank order.
  311. type ServerEntryIterator struct {
  312. region string
  313. protocol string
  314. shuffleHeadLength int
  315. serverEntryIds []string
  316. serverEntryIndex int
  317. isTargetServerEntryIterator bool
  318. hasNextTargetServerEntry bool
  319. targetServerEntry *ServerEntry
  320. }
  321. // NewServerEntryIterator creates a new ServerEntryIterator
  322. func NewServerEntryIterator(config *Config) (iterator *ServerEntryIterator, err error) {
  323. // When configured, this target server entry is the only candidate
  324. if config.TargetServerEntry != "" {
  325. return newTargetServerEntryIterator(config)
  326. }
  327. checkInitDataStore()
  328. iterator = &ServerEntryIterator{
  329. region: config.EgressRegion,
  330. protocol: config.TunnelProtocol,
  331. shuffleHeadLength: config.TunnelPoolSize,
  332. isTargetServerEntryIterator: false,
  333. }
  334. err = iterator.Reset()
  335. if err != nil {
  336. return nil, err
  337. }
  338. return iterator, nil
  339. }
  340. // newTargetServerEntryIterator is a helper for initializing the TargetServerEntry case
  341. func newTargetServerEntryIterator(config *Config) (iterator *ServerEntryIterator, err error) {
  342. serverEntry, err := DecodeServerEntry(
  343. config.TargetServerEntry, common.GetCurrentTimestamp(), protocol.SERVER_ENTRY_SOURCE_TARGET)
  344. if err != nil {
  345. return nil, err
  346. }
  347. if config.EgressRegion != "" && serverEntry.Region != config.EgressRegion {
  348. return nil, errors.New("TargetServerEntry does not support EgressRegion")
  349. }
  350. if config.TunnelProtocol != "" {
  351. // Note: same capability/protocol mapping as in StoreServerEntry
  352. requiredCapability := strings.TrimSuffix(config.TunnelProtocol, "-OSSH")
  353. if !common.Contains(serverEntry.Capabilities, requiredCapability) {
  354. return nil, errors.New("TargetServerEntry does not support TunnelProtocol")
  355. }
  356. }
  357. iterator = &ServerEntryIterator{
  358. isTargetServerEntryIterator: true,
  359. hasNextTargetServerEntry: true,
  360. targetServerEntry: serverEntry,
  361. }
  362. NoticeInfo("using TargetServerEntry: %s", serverEntry.IpAddress)
  363. return iterator, nil
  364. }
  365. // Reset a NewServerEntryIterator to the start of its cycle. The next
  366. // call to Next will return the first server entry.
  367. func (iterator *ServerEntryIterator) Reset() error {
  368. iterator.Close()
  369. if iterator.isTargetServerEntryIterator {
  370. iterator.hasNextTargetServerEntry = true
  371. return nil
  372. }
  373. count := CountServerEntries(iterator.region, iterator.protocol)
  374. NoticeCandidateServers(iterator.region, iterator.protocol, count)
  375. // This query implements the Psiphon server candidate selection
  376. // algorithm: the first TunnelPoolSize server candidates are in rank
  377. // (priority) order, to favor previously successful servers; then the
  378. // remaining long tail is shuffled to raise up less recent candidates.
  379. // BoltDB implementation note:
  380. // We don't keep a transaction open for the duration of the iterator
  381. // because this would expose the following semantics to consumer code:
  382. //
  383. // Read-only transactions and read-write transactions ... generally
  384. // shouldn't be opened simultaneously in the same goroutine. This can
  385. // cause a deadlock as the read-write transaction needs to periodically
  386. // re-map the data file but it cannot do so while a read-only
  387. // transaction is open.
  388. // (https://github.com/boltdb/bolt)
  389. //
  390. // So the underlying serverEntriesBucket could change after the serverEntryIds
  391. // list is built.
  392. var serverEntryIds []string
  393. err := singleton.db.View(func(tx *bolt.Tx) error {
  394. var err error
  395. serverEntryIds, err = getRankedServerEntries(tx)
  396. if err != nil {
  397. return err
  398. }
  399. skipServerEntryIds := make(map[string]bool)
  400. for _, serverEntryId := range serverEntryIds {
  401. skipServerEntryIds[serverEntryId] = true
  402. }
  403. bucket := tx.Bucket([]byte(serverEntriesBucket))
  404. cursor := bucket.Cursor()
  405. for key, _ := cursor.Last(); key != nil; key, _ = cursor.Prev() {
  406. serverEntryId := string(key)
  407. if _, ok := skipServerEntryIds[serverEntryId]; ok {
  408. continue
  409. }
  410. serverEntryIds = append(serverEntryIds, serverEntryId)
  411. }
  412. return nil
  413. })
  414. if err != nil {
  415. return common.ContextError(err)
  416. }
  417. for i := len(serverEntryIds) - 1; i > iterator.shuffleHeadLength-1; i-- {
  418. j := rand.Intn(i+1-iterator.shuffleHeadLength) + iterator.shuffleHeadLength
  419. serverEntryIds[i], serverEntryIds[j] = serverEntryIds[j], serverEntryIds[i]
  420. }
  421. iterator.serverEntryIds = serverEntryIds
  422. iterator.serverEntryIndex = 0
  423. return nil
  424. }
  425. // Close cleans up resources associated with a ServerEntryIterator.
  426. func (iterator *ServerEntryIterator) Close() {
  427. iterator.serverEntryIds = nil
  428. iterator.serverEntryIndex = 0
  429. }
  430. // Next returns the next server entry, by rank, for a ServerEntryIterator.
  431. // Returns nil with no error when there is no next item.
  432. func (iterator *ServerEntryIterator) Next() (serverEntry *ServerEntry, err error) {
  433. defer func() {
  434. if err != nil {
  435. iterator.Close()
  436. }
  437. }()
  438. if iterator.isTargetServerEntryIterator {
  439. if iterator.hasNextTargetServerEntry {
  440. iterator.hasNextTargetServerEntry = false
  441. return MakeCompatibleServerEntry(iterator.targetServerEntry), nil
  442. }
  443. return nil, nil
  444. }
  445. // There are no region/protocol indexes for the server entries bucket.
  446. // Loop until we have the next server entry that matches the iterator
  447. // filter requirements.
  448. for {
  449. if iterator.serverEntryIndex >= len(iterator.serverEntryIds) {
  450. // There is no next item
  451. return nil, nil
  452. }
  453. serverEntryId := iterator.serverEntryIds[iterator.serverEntryIndex]
  454. iterator.serverEntryIndex += 1
  455. var data []byte
  456. err = singleton.db.View(func(tx *bolt.Tx) error {
  457. bucket := tx.Bucket([]byte(serverEntriesBucket))
  458. value := bucket.Get([]byte(serverEntryId))
  459. if value != nil {
  460. // Must make a copy as slice is only valid within transaction.
  461. data = make([]byte, len(value))
  462. copy(data, value)
  463. }
  464. return nil
  465. })
  466. if err != nil {
  467. return nil, common.ContextError(err)
  468. }
  469. if data == nil {
  470. // In case of data corruption or a bug causing this condition,
  471. // do not stop iterating.
  472. NoticeAlert("ServerEntryIterator.Next: unexpected missing server entry: %s", serverEntryId)
  473. continue
  474. }
  475. serverEntry = new(ServerEntry)
  476. err = json.Unmarshal(data, serverEntry)
  477. if err != nil {
  478. // In case of data corruption or a bug causing this condition,
  479. // do not stop iterating.
  480. NoticeAlert("ServerEntryIterator.Next: %s", common.ContextError(err))
  481. continue
  482. }
  483. // Check filter requirements
  484. if (iterator.region == "" || serverEntry.Region == iterator.region) &&
  485. (iterator.protocol == "" || serverEntrySupportsProtocol(serverEntry, iterator.protocol)) {
  486. break
  487. }
  488. }
  489. return MakeCompatibleServerEntry(serverEntry), nil
  490. }
  491. // MakeCompatibleServerEntry provides backwards compatibility with old server entries
  492. // which have a single meekFrontingDomain and not a meekFrontingAddresses array.
  493. // By copying this one meekFrontingDomain into meekFrontingAddresses, this client effectively
  494. // uses that single value as legacy clients do.
  495. func MakeCompatibleServerEntry(serverEntry *ServerEntry) *ServerEntry {
  496. if len(serverEntry.MeekFrontingAddresses) == 0 && serverEntry.MeekFrontingDomain != "" {
  497. serverEntry.MeekFrontingAddresses =
  498. append(serverEntry.MeekFrontingAddresses, serverEntry.MeekFrontingDomain)
  499. }
  500. return serverEntry
  501. }
  502. func scanServerEntries(scanner func(*ServerEntry)) error {
  503. err := singleton.db.View(func(tx *bolt.Tx) error {
  504. bucket := tx.Bucket([]byte(serverEntriesBucket))
  505. cursor := bucket.Cursor()
  506. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  507. serverEntry := new(ServerEntry)
  508. err := json.Unmarshal(value, serverEntry)
  509. if err != nil {
  510. // In case of data corruption or a bug causing this condition,
  511. // do not stop iterating.
  512. NoticeAlert("scanServerEntries: %s", common.ContextError(err))
  513. continue
  514. }
  515. scanner(serverEntry)
  516. }
  517. return nil
  518. })
  519. if err != nil {
  520. return common.ContextError(err)
  521. }
  522. return nil
  523. }
  524. // CountServerEntries returns a count of stored servers for the
  525. // specified region and protocol.
  526. func CountServerEntries(region, protocol string) int {
  527. checkInitDataStore()
  528. count := 0
  529. err := scanServerEntries(func(serverEntry *ServerEntry) {
  530. if (region == "" || serverEntry.Region == region) &&
  531. (protocol == "" || serverEntrySupportsProtocol(serverEntry, protocol)) {
  532. count += 1
  533. }
  534. })
  535. if err != nil {
  536. NoticeAlert("CountServerEntries failed: %s", err)
  537. return 0
  538. }
  539. return count
  540. }
  541. // ReportAvailableRegions prints a notice with the available egress regions.
  542. // Note that this report ignores config.TunnelProtocol.
  543. func ReportAvailableRegions() {
  544. checkInitDataStore()
  545. regions := make(map[string]bool)
  546. err := scanServerEntries(func(serverEntry *ServerEntry) {
  547. regions[serverEntry.Region] = true
  548. })
  549. if err != nil {
  550. NoticeAlert("ReportAvailableRegions failed: %s", err)
  551. return
  552. }
  553. regionList := make([]string, 0, len(regions))
  554. for region, _ := range regions {
  555. // Some server entries do not have a region, but it makes no sense to return
  556. // an empty string as an "available region".
  557. if region != "" {
  558. regionList = append(regionList, region)
  559. }
  560. }
  561. NoticeAvailableEgressRegions(regionList)
  562. }
  563. // GetServerEntryIpAddresses returns an array containing
  564. // all stored server IP addresses.
  565. func GetServerEntryIpAddresses() (ipAddresses []string, err error) {
  566. checkInitDataStore()
  567. ipAddresses = make([]string, 0)
  568. err = scanServerEntries(func(serverEntry *ServerEntry) {
  569. ipAddresses = append(ipAddresses, serverEntry.IpAddress)
  570. })
  571. if err != nil {
  572. return nil, common.ContextError(err)
  573. }
  574. return ipAddresses, nil
  575. }
  576. // SetSplitTunnelRoutes updates the cached routes data for
  577. // the given region. The associated etag is also stored and
  578. // used to make efficient web requests for updates to the data.
  579. func SetSplitTunnelRoutes(region, etag string, data []byte) error {
  580. checkInitDataStore()
  581. err := singleton.db.Update(func(tx *bolt.Tx) error {
  582. bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
  583. err := bucket.Put([]byte(region), []byte(etag))
  584. bucket = tx.Bucket([]byte(splitTunnelRouteDataBucket))
  585. err = bucket.Put([]byte(region), data)
  586. return err
  587. })
  588. if err != nil {
  589. return common.ContextError(err)
  590. }
  591. return nil
  592. }
  593. // GetSplitTunnelRoutesETag retrieves the etag for cached routes
  594. // data for the specified region. If not found, it returns an empty string value.
  595. func GetSplitTunnelRoutesETag(region string) (etag string, err error) {
  596. checkInitDataStore()
  597. err = singleton.db.View(func(tx *bolt.Tx) error {
  598. bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
  599. etag = string(bucket.Get([]byte(region)))
  600. return nil
  601. })
  602. if err != nil {
  603. return "", common.ContextError(err)
  604. }
  605. return etag, nil
  606. }
  607. // GetSplitTunnelRoutesData retrieves the cached routes data
  608. // for the specified region. If not found, it returns a nil value.
  609. func GetSplitTunnelRoutesData(region string) (data []byte, err error) {
  610. checkInitDataStore()
  611. err = singleton.db.View(func(tx *bolt.Tx) error {
  612. bucket := tx.Bucket([]byte(splitTunnelRouteDataBucket))
  613. value := bucket.Get([]byte(region))
  614. if value != nil {
  615. // Must make a copy as slice is only valid within transaction.
  616. data = make([]byte, len(value))
  617. copy(data, value)
  618. }
  619. return nil
  620. })
  621. if err != nil {
  622. return nil, common.ContextError(err)
  623. }
  624. return data, nil
  625. }
  626. // SetUrlETag stores an ETag for the specfied URL.
  627. // Note: input URL is treated as a string, and is not
  628. // encoded or decoded or otherwise canonicalized.
  629. func SetUrlETag(url, etag string) error {
  630. checkInitDataStore()
  631. err := singleton.db.Update(func(tx *bolt.Tx) error {
  632. bucket := tx.Bucket([]byte(urlETagsBucket))
  633. err := bucket.Put([]byte(url), []byte(etag))
  634. return err
  635. })
  636. if err != nil {
  637. return common.ContextError(err)
  638. }
  639. return nil
  640. }
  641. // GetUrlETag retrieves a previously stored an ETag for the
  642. // specfied URL. If not found, it returns an empty string value.
  643. func GetUrlETag(url string) (etag string, err error) {
  644. checkInitDataStore()
  645. err = singleton.db.View(func(tx *bolt.Tx) error {
  646. bucket := tx.Bucket([]byte(urlETagsBucket))
  647. etag = string(bucket.Get([]byte(url)))
  648. return nil
  649. })
  650. if err != nil {
  651. return "", common.ContextError(err)
  652. }
  653. return etag, nil
  654. }
  655. // SetKeyValue stores a key/value pair.
  656. func SetKeyValue(key, value string) error {
  657. checkInitDataStore()
  658. err := singleton.db.Update(func(tx *bolt.Tx) error {
  659. bucket := tx.Bucket([]byte(keyValueBucket))
  660. err := bucket.Put([]byte(key), []byte(value))
  661. return err
  662. })
  663. if err != nil {
  664. return common.ContextError(err)
  665. }
  666. return nil
  667. }
  668. // GetKeyValue retrieves the value for a given key. If not found,
  669. // it returns an empty string value.
  670. func GetKeyValue(key string) (value string, err error) {
  671. checkInitDataStore()
  672. err = singleton.db.View(func(tx *bolt.Tx) error {
  673. bucket := tx.Bucket([]byte(keyValueBucket))
  674. value = string(bucket.Get([]byte(key)))
  675. return nil
  676. })
  677. if err != nil {
  678. return "", common.ContextError(err)
  679. }
  680. return value, nil
  681. }
  682. // Tunnel stats records in the tunnelStatsStateUnreported
  683. // state are available for take out.
  684. // Records in the tunnelStatsStateReporting have been
  685. // taken out and are pending either deleting (for a
  686. // successful request) or change to StateUnreported (for
  687. // a failed request).
  688. // All tunnel stats records are reverted to StateUnreported
  689. // when the datastore is initialized at start up.
  690. var tunnelStatsStateUnreported = []byte("0")
  691. var tunnelStatsStateReporting = []byte("1")
  692. // StoreTunnelStats adds a new tunnel stats record, which is
  693. // set to StateUnreported and is an immediate candidate for
  694. // reporting.
  695. // tunnelStats is a JSON byte array containing fields as
  696. // required by the Psiphon server API (see RecordTunnelStats).
  697. // It's assumed that the JSON value contains enough unique
  698. // information for the value to function as a key in the
  699. // key/value datastore. This assumption is currently satisfied
  700. // by the fields sessionId + tunnelNumber.
  701. func StoreTunnelStats(tunnelStats []byte) error {
  702. checkInitDataStore()
  703. err := singleton.db.Update(func(tx *bolt.Tx) error {
  704. bucket := tx.Bucket([]byte(tunnelStatsBucket))
  705. err := bucket.Put(tunnelStats, tunnelStatsStateUnreported)
  706. return err
  707. })
  708. if err != nil {
  709. return common.ContextError(err)
  710. }
  711. return nil
  712. }
  713. // CountUnreportedTunnelStats returns the number of tunnel
  714. // stats records in StateUnreported.
  715. func CountUnreportedTunnelStats() int {
  716. checkInitDataStore()
  717. unreported := 0
  718. err := singleton.db.Update(func(tx *bolt.Tx) error {
  719. bucket := tx.Bucket([]byte(tunnelStatsBucket))
  720. cursor := bucket.Cursor()
  721. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  722. if 0 == bytes.Compare(value, tunnelStatsStateUnreported) {
  723. unreported++
  724. break
  725. }
  726. }
  727. return nil
  728. })
  729. if err != nil {
  730. NoticeAlert("CountUnreportedTunnelStats failed: %s", err)
  731. return 0
  732. }
  733. return unreported
  734. }
  735. // TakeOutUnreportedTunnelStats returns up to maxCount tunnel
  736. // stats records that are in StateUnreported. The records are set
  737. // to StateReporting. If the records are successfully reported,
  738. // clear them with ClearReportedTunnelStats. If the records are
  739. // not successfully reported, restore them with
  740. // PutBackUnreportedTunnelStats.
  741. func TakeOutUnreportedTunnelStats(maxCount int) ([][]byte, error) {
  742. checkInitDataStore()
  743. tunnelStats := make([][]byte, 0)
  744. err := singleton.db.Update(func(tx *bolt.Tx) error {
  745. bucket := tx.Bucket([]byte(tunnelStatsBucket))
  746. cursor := bucket.Cursor()
  747. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  748. // Perform a test JSON unmarshaling. In case of data corruption or a bug,
  749. // skip the record.
  750. var jsonData interface{}
  751. err := json.Unmarshal(key, &jsonData)
  752. if err != nil {
  753. NoticeAlert(
  754. "Invalid key in TakeOutUnreportedTunnelStats: %s: %s",
  755. string(key), err)
  756. continue
  757. }
  758. if 0 == bytes.Compare(value, tunnelStatsStateUnreported) {
  759. // Must make a copy as slice is only valid within transaction.
  760. data := make([]byte, len(key))
  761. copy(data, key)
  762. tunnelStats = append(tunnelStats, data)
  763. if len(tunnelStats) >= maxCount {
  764. break
  765. }
  766. }
  767. }
  768. for _, key := range tunnelStats {
  769. err := bucket.Put(key, tunnelStatsStateReporting)
  770. if err != nil {
  771. return err
  772. }
  773. }
  774. return nil
  775. })
  776. if err != nil {
  777. return nil, common.ContextError(err)
  778. }
  779. return tunnelStats, nil
  780. }
  781. // PutBackUnreportedTunnelStats restores a list of tunnel
  782. // stats records to StateUnreported.
  783. func PutBackUnreportedTunnelStats(tunnelStats [][]byte) error {
  784. checkInitDataStore()
  785. err := singleton.db.Update(func(tx *bolt.Tx) error {
  786. bucket := tx.Bucket([]byte(tunnelStatsBucket))
  787. for _, key := range tunnelStats {
  788. err := bucket.Put(key, tunnelStatsStateUnreported)
  789. if err != nil {
  790. return err
  791. }
  792. }
  793. return nil
  794. })
  795. if err != nil {
  796. return common.ContextError(err)
  797. }
  798. return nil
  799. }
  800. // ClearReportedTunnelStats deletes a list of tunnel
  801. // stats records that were succesdfully reported.
  802. func ClearReportedTunnelStats(tunnelStats [][]byte) error {
  803. checkInitDataStore()
  804. err := singleton.db.Update(func(tx *bolt.Tx) error {
  805. bucket := tx.Bucket([]byte(tunnelStatsBucket))
  806. for _, key := range tunnelStats {
  807. err := bucket.Delete(key)
  808. if err != nil {
  809. return err
  810. }
  811. }
  812. return nil
  813. })
  814. if err != nil {
  815. return common.ContextError(err)
  816. }
  817. return nil
  818. }
  819. // resetAllTunnelStatsToUnreported sets all tunnel
  820. // stats records to StateUnreported. This reset is called
  821. // when the datastore is initialized at start up, as we do
  822. // not know if tunnel records in StateReporting were reported
  823. // or not.
  824. func resetAllTunnelStatsToUnreported() error {
  825. checkInitDataStore()
  826. err := singleton.db.Update(func(tx *bolt.Tx) error {
  827. bucket := tx.Bucket([]byte(tunnelStatsBucket))
  828. resetKeys := make([][]byte, 0)
  829. cursor := bucket.Cursor()
  830. for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
  831. resetKeys = append(resetKeys, key)
  832. }
  833. // TODO: data mutation is done outside cursor. Is this
  834. // strictly necessary in this case?
  835. // https://godoc.org/github.com/boltdb/bolt#Cursor
  836. for _, key := range resetKeys {
  837. err := bucket.Put(key, tunnelStatsStateUnreported)
  838. if err != nil {
  839. return err
  840. }
  841. }
  842. return nil
  843. })
  844. if err != nil {
  845. return common.ContextError(err)
  846. }
  847. return nil
  848. }
  849. // DeleteSLOKs deletes all SLOK records.
  850. func DeleteSLOKs() error {
  851. checkInitDataStore()
  852. err := singleton.db.Update(func(tx *bolt.Tx) error {
  853. bucket := tx.Bucket([]byte(slokBucket))
  854. return bucket.ForEach(
  855. func(id, _ []byte) error {
  856. return bucket.Delete(id)
  857. })
  858. })
  859. if err != nil {
  860. return common.ContextError(err)
  861. }
  862. return nil
  863. }
  864. // SetSLOK stores a SLOK key, referenced by its ID. The bool
  865. // return value indicates whether the SLOK was already stored.
  866. func SetSLOK(id, key []byte) (bool, error) {
  867. checkInitDataStore()
  868. var duplicate bool
  869. err := singleton.db.Update(func(tx *bolt.Tx) error {
  870. bucket := tx.Bucket([]byte(slokBucket))
  871. duplicate = bucket.Get(id) != nil
  872. err := bucket.Put([]byte(id), []byte(key))
  873. return err
  874. })
  875. if err != nil {
  876. return false, common.ContextError(err)
  877. }
  878. return duplicate, nil
  879. }
  880. // GetSLOK returns a SLOK key for the specified ID. The return
  881. // value is nil if the SLOK is not found.
  882. func GetSLOK(id []byte) (key []byte, err error) {
  883. checkInitDataStore()
  884. err = singleton.db.View(func(tx *bolt.Tx) error {
  885. bucket := tx.Bucket([]byte(slokBucket))
  886. key = bucket.Get(id)
  887. return nil
  888. })
  889. if err != nil {
  890. return nil, common.ContextError(err)
  891. }
  892. return key, nil
  893. }