dataStore.go 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "encoding/json"
  23. "errors"
  24. "fmt"
  25. "math/rand"
  26. "os"
  27. "path/filepath"
  28. "strings"
  29. "sync"
  30. "time"
  31. "github.com/Psiphon-Inc/bolt"
  32. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  33. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  34. )
  35. // The BoltDB dataStore implementation is an alternative to the sqlite3-based
  36. // implementation in dataStore.go. Both implementations have the same interface.
  37. //
  38. // BoltDB is pure Go, and is intended to be used in cases where we have trouble
  39. // building sqlite3/CGO (e.g., currently go mobile due to
  40. // https://github.com/mattn/go-sqlite3/issues/201), and perhaps ultimately as
  41. // the primary dataStore implementation.
  42. //
  43. type dataStore struct {
  44. init sync.Once
  45. db *bolt.DB
  46. }
  47. const (
  48. serverEntriesBucket = "serverEntries"
  49. rankedServerEntriesBucket = "rankedServerEntries"
  50. rankedServerEntriesKey = "rankedServerEntries"
  51. splitTunnelRouteETagsBucket = "splitTunnelRouteETags"
  52. splitTunnelRouteDataBucket = "splitTunnelRouteData"
  53. urlETagsBucket = "urlETags"
  54. keyValueBucket = "keyValues"
  55. tunnelStatsBucket = "tunnelStats"
  56. remoteServerListStatsBucket = "remoteServerListStats"
  57. slokBucket = "SLOKs"
  58. rankedServerEntryCount = 100
  59. )
  60. const (
  61. DATA_STORE_LAST_CONNECTED_KEY = "lastConnected"
  62. DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY = "lastServerEntryFilter"
  63. PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST = remoteServerListStatsBucket
  64. )
  65. var singleton dataStore
  66. // InitDataStore initializes the singleton instance of dataStore. This
  67. // function uses a sync.Once and is safe for use by concurrent goroutines.
  68. // The underlying sql.DB connection pool is also safe.
  69. //
  70. // Note: the sync.Once was more useful when initDataStore was private and
  71. // called on-demand by the public functions below. Now we require an explicit
  72. // InitDataStore() call with the filename passed in. The on-demand calls
  73. // have been replaced by checkInitDataStore() to assert that Init was called.
  74. func InitDataStore(config *Config) (err error) {
  75. singleton.init.Do(func() {
  76. // Need to gather the list of migratable server entries before
  77. // initializing the boltdb store (as prepareMigrationEntries
  78. // checks for the existence of the bolt db file)
  79. migratableServerEntries := prepareMigrationEntries(config)
  80. filename := filepath.Join(config.DataStoreDirectory, DATA_STORE_FILENAME)
  81. var db *bolt.DB
  82. for retry := 0; retry < 3; retry++ {
  83. if retry > 0 {
  84. NoticeAlert("InitDataStore retry: %d", retry)
  85. }
  86. db, err = bolt.Open(filename, 0600, &bolt.Options{Timeout: 1 * time.Second})
  87. // The datastore file may be corrupt, so attempt to delete and try again
  88. if err != nil {
  89. NoticeAlert("bolt.Open error: %s", err)
  90. os.Remove(filename)
  91. continue
  92. }
  93. // Run consistency checks on datastore and emit errors for diagnostics purposes
  94. // We assume this will complete quickly for typical size Psiphon datastores.
  95. err = db.View(func(tx *bolt.Tx) error {
  96. return tx.SynchronousCheck()
  97. })
  98. // The datastore file may be corrupt, so attempt to delete and try again
  99. if err != nil {
  100. NoticeAlert("bolt.SynchronousCheck error: %s", err)
  101. db.Close()
  102. os.Remove(filename)
  103. continue
  104. }
  105. break
  106. }
  107. if err != nil {
  108. // Note: intending to set the err return value for InitDataStore
  109. err = fmt.Errorf("initDataStore failed to open database: %s", err)
  110. return
  111. }
  112. err = db.Update(func(tx *bolt.Tx) error {
  113. requiredBuckets := []string{
  114. serverEntriesBucket,
  115. rankedServerEntriesBucket,
  116. splitTunnelRouteETagsBucket,
  117. splitTunnelRouteDataBucket,
  118. urlETagsBucket,
  119. keyValueBucket,
  120. tunnelStatsBucket,
  121. remoteServerListStatsBucket,
  122. slokBucket,
  123. }
  124. for _, bucket := range requiredBuckets {
  125. _, err := tx.CreateBucketIfNotExists([]byte(bucket))
  126. if err != nil {
  127. return err
  128. }
  129. }
  130. return nil
  131. })
  132. if err != nil {
  133. err = fmt.Errorf("initDataStore failed to create buckets: %s", err)
  134. return
  135. }
  136. // Cleanup obsolete tunnel (session) stats bucket, if one still exists
  137. err = db.Update(func(tx *bolt.Tx) error {
  138. tunnelStatsBucket := []byte("tunnelStats")
  139. if tx.Bucket(tunnelStatsBucket) != nil {
  140. err := tx.DeleteBucket(tunnelStatsBucket)
  141. if err != nil {
  142. NoticeAlert("DeleteBucket %s error: %s", tunnelStatsBucket, err)
  143. // Continue, since this is not fatal
  144. }
  145. }
  146. return nil
  147. })
  148. if err != nil {
  149. err = fmt.Errorf("initDataStore failed to create buckets: %s", err)
  150. return
  151. }
  152. singleton.db = db
  153. // The migrateServerEntries function requires the data store is
  154. // initialized prior to execution so that migrated entries can be stored
  155. if len(migratableServerEntries) > 0 {
  156. migrateEntries(
  157. config, migratableServerEntries, filepath.Join(config.DataStoreDirectory, LEGACY_DATA_STORE_FILENAME))
  158. }
  159. resetAllPersistentStatsToUnreported()
  160. })
  161. return err
  162. }
  163. func checkInitDataStore() {
  164. if singleton.db == nil {
  165. panic("checkInitDataStore: datastore not initialized")
  166. }
  167. }
  168. // StoreServerEntry adds the server entry to the data store.
  169. // A newly stored (or re-stored) server entry is assigned the next-to-top
  170. // rank for iteration order (the previous top ranked entry is promoted). The
  171. // purpose of inserting at next-to-top is to keep the last selected server
  172. // as the top ranked server.
  173. // When replaceIfExists is true, an existing server entry record is
  174. // overwritten; otherwise, the existing record is unchanged.
  175. // If the server entry data is malformed, an alert notice is issued and
  176. // the entry is skipped; no error is returned.
  177. func StoreServerEntry(serverEntry *protocol.ServerEntry, replaceIfExists bool) error {
  178. checkInitDataStore()
  179. // Server entries should already be validated before this point,
  180. // so instead of skipping we fail with an error.
  181. err := protocol.ValidateServerEntry(serverEntry)
  182. if err != nil {
  183. return common.ContextError(errors.New("invalid server entry"))
  184. }
  185. // BoltDB implementation note:
  186. // For simplicity, we don't maintain indexes on server entry
  187. // region or supported protocols. Instead, we perform full-bucket
  188. // scans with a filter. With a small enough database (thousands or
  189. // even tens of thousand of server entries) and common enough
  190. // values (e.g., many servers support all protocols), performance
  191. // is expected to be acceptable.
  192. err = singleton.db.Update(func(tx *bolt.Tx) error {
  193. serverEntries := tx.Bucket([]byte(serverEntriesBucket))
  194. // Check not only that the entry exists, but is valid. This
  195. // will replace in the rare case where the data is corrupt.
  196. existingServerEntryValid := false
  197. existingData := serverEntries.Get([]byte(serverEntry.IpAddress))
  198. if existingData != nil {
  199. existingServerEntry := new(protocol.ServerEntry)
  200. if json.Unmarshal(existingData, existingServerEntry) == nil {
  201. existingServerEntryValid = true
  202. }
  203. }
  204. if existingServerEntryValid && !replaceIfExists {
  205. // Disabling this notice, for now, as it generates too much noise
  206. // in diagnostics with clients that always submit embedded servers
  207. // to the core on each run.
  208. // NoticeInfo("ignored update for server %s", serverEntry.IpAddress)
  209. return nil
  210. }
  211. data, err := json.Marshal(serverEntry)
  212. if err != nil {
  213. return common.ContextError(err)
  214. }
  215. err = serverEntries.Put([]byte(serverEntry.IpAddress), data)
  216. if err != nil {
  217. return common.ContextError(err)
  218. }
  219. err = insertRankedServerEntry(tx, serverEntry.IpAddress, 1)
  220. if err != nil {
  221. return common.ContextError(err)
  222. }
  223. NoticeInfo("updated server %s", serverEntry.IpAddress)
  224. return nil
  225. })
  226. if err != nil {
  227. return common.ContextError(err)
  228. }
  229. return nil
  230. }
  231. // StoreServerEntries stores a list of server entries.
  232. // There is an independent transaction for each entry insert/update.
  233. func StoreServerEntries(serverEntries []*protocol.ServerEntry, replaceIfExists bool) error {
  234. checkInitDataStore()
  235. for _, serverEntry := range serverEntries {
  236. err := StoreServerEntry(serverEntry, replaceIfExists)
  237. if err != nil {
  238. return common.ContextError(err)
  239. }
  240. }
  241. // Since there has possibly been a significant change in the server entries,
  242. // take this opportunity to update the available egress regions.
  243. ReportAvailableRegions()
  244. return nil
  245. }
  246. // StreamingStoreServerEntries stores a list of server entries.
  247. // There is an independent transaction for each entry insert/update.
  248. func StreamingStoreServerEntries(
  249. serverEntries *protocol.StreamingServerEntryDecoder, replaceIfExists bool) error {
  250. checkInitDataStore()
  251. // Note: both StreamingServerEntryDecoder.Next and StoreServerEntry
  252. // allocate temporary memory buffers for hex/JSON decoding/encoding,
  253. // so this isn't true constant-memory streaming (it depends on garbage
  254. // collection).
  255. for {
  256. serverEntry, err := serverEntries.Next()
  257. if err != nil {
  258. return common.ContextError(err)
  259. }
  260. if serverEntry == nil {
  261. // No more server entries
  262. break
  263. }
  264. err = StoreServerEntry(serverEntry, replaceIfExists)
  265. if err != nil {
  266. return common.ContextError(err)
  267. }
  268. }
  269. // Since there has possibly been a significant change in the server entries,
  270. // take this opportunity to update the available egress regions.
  271. ReportAvailableRegions()
  272. return nil
  273. }
  274. // PromoteServerEntry assigns the top rank (one more than current
  275. // max rank) to the specified server entry. Server candidates are
  276. // iterated in decending rank order, so this server entry will be
  277. // the first candidate in a subsequent tunnel establishment.
  278. func PromoteServerEntry(config *Config, ipAddress string) error {
  279. checkInitDataStore()
  280. err := singleton.db.Update(func(tx *bolt.Tx) error {
  281. // Ensure the corresponding entry exists before
  282. // inserting into rank.
  283. bucket := tx.Bucket([]byte(serverEntriesBucket))
  284. data := bucket.Get([]byte(ipAddress))
  285. if data == nil {
  286. NoticeAlert(
  287. "PromoteServerEntry: ignoring unknown server entry: %s",
  288. ipAddress)
  289. return nil
  290. }
  291. err := insertRankedServerEntry(tx, ipAddress, 0)
  292. if err != nil {
  293. return err
  294. }
  295. // Store the current server entry filter (e.g, region, etc.) that
  296. // was in use when the entry was promoted. This is used to detect
  297. // when the top ranked server entry was promoted under a different
  298. // filter.
  299. currentFilter, err := makeServerEntryFilterValue(config)
  300. if err != nil {
  301. return err
  302. }
  303. bucket = tx.Bucket([]byte(keyValueBucket))
  304. return bucket.Put([]byte(DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY), currentFilter)
  305. })
  306. if err != nil {
  307. return common.ContextError(err)
  308. }
  309. return nil
  310. }
  311. func makeServerEntryFilterValue(config *Config) ([]byte, error) {
  312. filter, err := json.Marshal(
  313. struct {
  314. Region string
  315. Protocol string
  316. }{config.EgressRegion, config.TunnelProtocol})
  317. if err != nil {
  318. return nil, common.ContextError(err)
  319. }
  320. return filter, nil
  321. }
  322. func hasServerEntryFilterChanged(config *Config) (bool, error) {
  323. currentFilter, err := makeServerEntryFilterValue(config)
  324. if err != nil {
  325. return false, common.ContextError(err)
  326. }
  327. changed := false
  328. err = singleton.db.View(func(tx *bolt.Tx) error {
  329. // previousFilter will be nil not found (not previously
  330. // set) which will never match any current filter.
  331. bucket := tx.Bucket([]byte(keyValueBucket))
  332. previousFilter := bucket.Get([]byte(DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY))
  333. if bytes.Compare(previousFilter, currentFilter) != 0 {
  334. changed = true
  335. }
  336. return nil
  337. })
  338. if err != nil {
  339. return false, common.ContextError(err)
  340. }
  341. return changed, nil
  342. }
  343. func getRankedServerEntries(tx *bolt.Tx) ([]string, error) {
  344. bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
  345. data := bucket.Get([]byte(rankedServerEntriesKey))
  346. if data == nil {
  347. return []string{}, nil
  348. }
  349. rankedServerEntries := make([]string, 0)
  350. err := json.Unmarshal(data, &rankedServerEntries)
  351. if err != nil {
  352. return nil, common.ContextError(err)
  353. }
  354. return rankedServerEntries, nil
  355. }
  356. func setRankedServerEntries(tx *bolt.Tx, rankedServerEntries []string) error {
  357. data, err := json.Marshal(rankedServerEntries)
  358. if err != nil {
  359. return common.ContextError(err)
  360. }
  361. bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
  362. err = bucket.Put([]byte(rankedServerEntriesKey), data)
  363. if err != nil {
  364. return common.ContextError(err)
  365. }
  366. return nil
  367. }
  368. func insertRankedServerEntry(tx *bolt.Tx, serverEntryId string, position int) error {
  369. rankedServerEntries, err := getRankedServerEntries(tx)
  370. if err != nil {
  371. return common.ContextError(err)
  372. }
  373. // BoltDB implementation note:
  374. // For simplicity, we store the ranked server ids in an array serialized to
  375. // a single key value. To ensure this value doesn't grow without bound,
  376. // it's capped at rankedServerEntryCount. For now, this cap should be large
  377. // enough to meet the shuffleHeadLength = config.TunnelPoolSize criteria, for
  378. // any reasonable configuration of config.TunnelPoolSize.
  379. // Using: https://github.com/golang/go/wiki/SliceTricks
  380. // When serverEntryId is already ranked, remove it first to avoid duplicates
  381. for i, rankedServerEntryId := range rankedServerEntries {
  382. if rankedServerEntryId == serverEntryId {
  383. rankedServerEntries = append(
  384. rankedServerEntries[:i], rankedServerEntries[i+1:]...)
  385. break
  386. }
  387. }
  388. // SliceTricks insert, with length cap enforced
  389. if len(rankedServerEntries) < rankedServerEntryCount {
  390. rankedServerEntries = append(rankedServerEntries, "")
  391. }
  392. if position >= len(rankedServerEntries) {
  393. position = len(rankedServerEntries) - 1
  394. }
  395. copy(rankedServerEntries[position+1:], rankedServerEntries[position:])
  396. rankedServerEntries[position] = serverEntryId
  397. err = setRankedServerEntries(tx, rankedServerEntries)
  398. if err != nil {
  399. return common.ContextError(err)
  400. }
  401. return nil
  402. }
  403. // ServerEntryIterator is used to iterate over
  404. // stored server entries in rank order.
  405. type ServerEntryIterator struct {
  406. region string
  407. protocol string
  408. shuffleHeadLength int
  409. serverEntryIds []string
  410. serverEntryIndex int
  411. isTargetServerEntryIterator bool
  412. hasNextTargetServerEntry bool
  413. targetServerEntry *protocol.ServerEntry
  414. }
  415. // NewServerEntryIterator creates a new ServerEntryIterator.
  416. //
  417. // The boolean return value indicates whether to treat the first server(s)
  418. // as affinity servers or not. When the server entry selection filter changes
  419. // such as from a specific region to any region, or when there was no previous
  420. // filter/iterator, the the first server(s) are arbitrary and should not be
  421. // given affinity treatment.
  422. //
  423. // NewServerEntryIterator and any returned ServerEntryIterator are not
  424. // designed for concurrent use as not all related datastore operations are
  425. // performed in a single transaction.
  426. //
  427. func NewServerEntryIterator(config *Config) (bool, *ServerEntryIterator, error) {
  428. // When configured, this target server entry is the only candidate
  429. if config.TargetServerEntry != "" {
  430. return newTargetServerEntryIterator(config)
  431. }
  432. checkInitDataStore()
  433. filterChanged, err := hasServerEntryFilterChanged(config)
  434. if err != nil {
  435. return false, nil, common.ContextError(err)
  436. }
  437. applyServerAffinity := !filterChanged
  438. iterator := &ServerEntryIterator{
  439. region: config.EgressRegion,
  440. protocol: config.TunnelProtocol,
  441. shuffleHeadLength: config.TunnelPoolSize,
  442. isTargetServerEntryIterator: false,
  443. }
  444. err = iterator.Reset()
  445. if err != nil {
  446. return false, nil, common.ContextError(err)
  447. }
  448. return applyServerAffinity, iterator, nil
  449. }
  450. // newTargetServerEntryIterator is a helper for initializing the TargetServerEntry case
  451. func newTargetServerEntryIterator(config *Config) (bool, *ServerEntryIterator, error) {
  452. serverEntry, err := protocol.DecodeServerEntry(
  453. config.TargetServerEntry, common.GetCurrentTimestamp(), protocol.SERVER_ENTRY_SOURCE_TARGET)
  454. if err != nil {
  455. return false, nil, common.ContextError(err)
  456. }
  457. if config.EgressRegion != "" && serverEntry.Region != config.EgressRegion {
  458. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support EgressRegion"))
  459. }
  460. if config.TunnelProtocol != "" {
  461. // Note: same capability/protocol mapping as in StoreServerEntry
  462. requiredCapability := strings.TrimSuffix(config.TunnelProtocol, "-OSSH")
  463. if !common.Contains(serverEntry.Capabilities, requiredCapability) {
  464. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support TunnelProtocol"))
  465. }
  466. }
  467. iterator := &ServerEntryIterator{
  468. isTargetServerEntryIterator: true,
  469. hasNextTargetServerEntry: true,
  470. targetServerEntry: serverEntry,
  471. }
  472. NoticeInfo("using TargetServerEntry: %s", serverEntry.IpAddress)
  473. return false, iterator, nil
  474. }
  475. // Reset a NewServerEntryIterator to the start of its cycle. The next
  476. // call to Next will return the first server entry.
  477. func (iterator *ServerEntryIterator) Reset() error {
  478. iterator.Close()
  479. if iterator.isTargetServerEntryIterator {
  480. iterator.hasNextTargetServerEntry = true
  481. return nil
  482. }
  483. count := CountServerEntries(iterator.region, iterator.protocol)
  484. NoticeCandidateServers(iterator.region, iterator.protocol, count)
  485. // This query implements the Psiphon server candidate selection
  486. // algorithm: the first TunnelPoolSize server candidates are in rank
  487. // (priority) order, to favor previously successful servers; then the
  488. // remaining long tail is shuffled to raise up less recent candidates.
  489. // BoltDB implementation note:
  490. // We don't keep a transaction open for the duration of the iterator
  491. // because this would expose the following semantics to consumer code:
  492. //
  493. // Read-only transactions and read-write transactions ... generally
  494. // shouldn't be opened simultaneously in the same goroutine. This can
  495. // cause a deadlock as the read-write transaction needs to periodically
  496. // re-map the data file but it cannot do so while a read-only
  497. // transaction is open.
  498. // (https://github.com/boltdb/bolt)
  499. //
  500. // So the underlying serverEntriesBucket could change after the serverEntryIds
  501. // list is built.
  502. var serverEntryIds []string
  503. err := singleton.db.View(func(tx *bolt.Tx) error {
  504. var err error
  505. serverEntryIds, err = getRankedServerEntries(tx)
  506. if err != nil {
  507. return err
  508. }
  509. skipServerEntryIds := make(map[string]bool)
  510. for _, serverEntryId := range serverEntryIds {
  511. skipServerEntryIds[serverEntryId] = true
  512. }
  513. bucket := tx.Bucket([]byte(serverEntriesBucket))
  514. cursor := bucket.Cursor()
  515. for key, _ := cursor.Last(); key != nil; key, _ = cursor.Prev() {
  516. serverEntryId := string(key)
  517. if _, ok := skipServerEntryIds[serverEntryId]; ok {
  518. continue
  519. }
  520. serverEntryIds = append(serverEntryIds, serverEntryId)
  521. }
  522. return nil
  523. })
  524. if err != nil {
  525. return common.ContextError(err)
  526. }
  527. for i := len(serverEntryIds) - 1; i > iterator.shuffleHeadLength-1; i-- {
  528. j := rand.Intn(i+1-iterator.shuffleHeadLength) + iterator.shuffleHeadLength
  529. serverEntryIds[i], serverEntryIds[j] = serverEntryIds[j], serverEntryIds[i]
  530. }
  531. iterator.serverEntryIds = serverEntryIds
  532. iterator.serverEntryIndex = 0
  533. return nil
  534. }
  535. // Close cleans up resources associated with a ServerEntryIterator.
  536. func (iterator *ServerEntryIterator) Close() {
  537. iterator.serverEntryIds = nil
  538. iterator.serverEntryIndex = 0
  539. }
  540. // Next returns the next server entry, by rank, for a ServerEntryIterator.
  541. // Returns nil with no error when there is no next item.
  542. func (iterator *ServerEntryIterator) Next() (serverEntry *protocol.ServerEntry, err error) {
  543. defer func() {
  544. if err != nil {
  545. iterator.Close()
  546. }
  547. }()
  548. if iterator.isTargetServerEntryIterator {
  549. if iterator.hasNextTargetServerEntry {
  550. iterator.hasNextTargetServerEntry = false
  551. return MakeCompatibleServerEntry(iterator.targetServerEntry), nil
  552. }
  553. return nil, nil
  554. }
  555. // There are no region/protocol indexes for the server entries bucket.
  556. // Loop until we have the next server entry that matches the iterator
  557. // filter requirements.
  558. for {
  559. if iterator.serverEntryIndex >= len(iterator.serverEntryIds) {
  560. // There is no next item
  561. return nil, nil
  562. }
  563. serverEntryId := iterator.serverEntryIds[iterator.serverEntryIndex]
  564. iterator.serverEntryIndex += 1
  565. var data []byte
  566. err = singleton.db.View(func(tx *bolt.Tx) error {
  567. bucket := tx.Bucket([]byte(serverEntriesBucket))
  568. value := bucket.Get([]byte(serverEntryId))
  569. if value != nil {
  570. // Must make a copy as slice is only valid within transaction.
  571. data = make([]byte, len(value))
  572. copy(data, value)
  573. }
  574. return nil
  575. })
  576. if err != nil {
  577. return nil, common.ContextError(err)
  578. }
  579. if data == nil {
  580. // In case of data corruption or a bug causing this condition,
  581. // do not stop iterating.
  582. NoticeAlert("ServerEntryIterator.Next: unexpected missing server entry: %s", serverEntryId)
  583. continue
  584. }
  585. serverEntry = new(protocol.ServerEntry)
  586. err = json.Unmarshal(data, serverEntry)
  587. if err != nil {
  588. // In case of data corruption or a bug causing this condition,
  589. // do not stop iterating.
  590. NoticeAlert("ServerEntryIterator.Next: %s", common.ContextError(err))
  591. continue
  592. }
  593. // Check filter requirements
  594. if (iterator.region == "" || serverEntry.Region == iterator.region) &&
  595. (iterator.protocol == "" || serverEntry.SupportsProtocol(iterator.protocol)) {
  596. break
  597. }
  598. }
  599. return MakeCompatibleServerEntry(serverEntry), nil
  600. }
  601. // MakeCompatibleServerEntry provides backwards compatibility with old server entries
  602. // which have a single meekFrontingDomain and not a meekFrontingAddresses array.
  603. // By copying this one meekFrontingDomain into meekFrontingAddresses, this client effectively
  604. // uses that single value as legacy clients do.
  605. func MakeCompatibleServerEntry(serverEntry *protocol.ServerEntry) *protocol.ServerEntry {
  606. if len(serverEntry.MeekFrontingAddresses) == 0 && serverEntry.MeekFrontingDomain != "" {
  607. serverEntry.MeekFrontingAddresses =
  608. append(serverEntry.MeekFrontingAddresses, serverEntry.MeekFrontingDomain)
  609. }
  610. return serverEntry
  611. }
  612. func scanServerEntries(scanner func(*protocol.ServerEntry)) error {
  613. err := singleton.db.View(func(tx *bolt.Tx) error {
  614. bucket := tx.Bucket([]byte(serverEntriesBucket))
  615. cursor := bucket.Cursor()
  616. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  617. serverEntry := new(protocol.ServerEntry)
  618. err := json.Unmarshal(value, serverEntry)
  619. if err != nil {
  620. // In case of data corruption or a bug causing this condition,
  621. // do not stop iterating.
  622. NoticeAlert("scanServerEntries: %s", common.ContextError(err))
  623. continue
  624. }
  625. scanner(serverEntry)
  626. }
  627. return nil
  628. })
  629. if err != nil {
  630. return common.ContextError(err)
  631. }
  632. return nil
  633. }
  634. // CountServerEntries returns a count of stored servers for the
  635. // specified region and protocol.
  636. func CountServerEntries(region, tunnelProtocol string) int {
  637. checkInitDataStore()
  638. count := 0
  639. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  640. if (region == "" || serverEntry.Region == region) &&
  641. (tunnelProtocol == "" || serverEntry.SupportsProtocol(tunnelProtocol)) {
  642. count += 1
  643. }
  644. })
  645. if err != nil {
  646. NoticeAlert("CountServerEntries failed: %s", err)
  647. return 0
  648. }
  649. return count
  650. }
  651. // CountNonImpairedProtocols returns the number of distinct tunnel
  652. // protocols supported by stored server entries, excluding the
  653. // specified impaired protocols.
  654. func CountNonImpairedProtocols(
  655. region, tunnelProtocol string,
  656. impairedProtocols []string) int {
  657. checkInitDataStore()
  658. distinctProtocols := make(map[string]bool)
  659. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  660. if region == "" || serverEntry.Region == region {
  661. if tunnelProtocol != "" {
  662. if serverEntry.SupportsProtocol(tunnelProtocol) {
  663. distinctProtocols[tunnelProtocol] = true
  664. // Exit early, since only one protocol is enabled
  665. return
  666. }
  667. } else {
  668. for _, protocol := range protocol.SupportedTunnelProtocols {
  669. if serverEntry.SupportsProtocol(protocol) {
  670. distinctProtocols[protocol] = true
  671. }
  672. }
  673. }
  674. }
  675. })
  676. for _, protocol := range impairedProtocols {
  677. delete(distinctProtocols, protocol)
  678. }
  679. if err != nil {
  680. NoticeAlert("CountNonImpairedProtocols failed: %s", err)
  681. return 0
  682. }
  683. return len(distinctProtocols)
  684. }
  685. // ReportAvailableRegions prints a notice with the available egress regions.
  686. // Note that this report ignores config.TunnelProtocol.
  687. func ReportAvailableRegions() {
  688. checkInitDataStore()
  689. regions := make(map[string]bool)
  690. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  691. regions[serverEntry.Region] = true
  692. })
  693. if err != nil {
  694. NoticeAlert("ReportAvailableRegions failed: %s", err)
  695. return
  696. }
  697. regionList := make([]string, 0, len(regions))
  698. for region := range regions {
  699. // Some server entries do not have a region, but it makes no sense to return
  700. // an empty string as an "available region".
  701. if region != "" {
  702. regionList = append(regionList, region)
  703. }
  704. }
  705. NoticeAvailableEgressRegions(regionList)
  706. }
  707. // GetServerEntryIpAddresses returns an array containing
  708. // all stored server IP addresses.
  709. func GetServerEntryIpAddresses() (ipAddresses []string, err error) {
  710. checkInitDataStore()
  711. ipAddresses = make([]string, 0)
  712. err = scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  713. ipAddresses = append(ipAddresses, serverEntry.IpAddress)
  714. })
  715. if err != nil {
  716. return nil, common.ContextError(err)
  717. }
  718. return ipAddresses, nil
  719. }
  720. // SetSplitTunnelRoutes updates the cached routes data for
  721. // the given region. The associated etag is also stored and
  722. // used to make efficient web requests for updates to the data.
  723. func SetSplitTunnelRoutes(region, etag string, data []byte) error {
  724. checkInitDataStore()
  725. err := singleton.db.Update(func(tx *bolt.Tx) error {
  726. bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
  727. err := bucket.Put([]byte(region), []byte(etag))
  728. bucket = tx.Bucket([]byte(splitTunnelRouteDataBucket))
  729. err = bucket.Put([]byte(region), data)
  730. return err
  731. })
  732. if err != nil {
  733. return common.ContextError(err)
  734. }
  735. return nil
  736. }
  737. // GetSplitTunnelRoutesETag retrieves the etag for cached routes
  738. // data for the specified region. If not found, it returns an empty string value.
  739. func GetSplitTunnelRoutesETag(region string) (etag string, err error) {
  740. checkInitDataStore()
  741. err = singleton.db.View(func(tx *bolt.Tx) error {
  742. bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
  743. etag = string(bucket.Get([]byte(region)))
  744. return nil
  745. })
  746. if err != nil {
  747. return "", common.ContextError(err)
  748. }
  749. return etag, nil
  750. }
  751. // GetSplitTunnelRoutesData retrieves the cached routes data
  752. // for the specified region. If not found, it returns a nil value.
  753. func GetSplitTunnelRoutesData(region string) (data []byte, err error) {
  754. checkInitDataStore()
  755. err = singleton.db.View(func(tx *bolt.Tx) error {
  756. bucket := tx.Bucket([]byte(splitTunnelRouteDataBucket))
  757. value := bucket.Get([]byte(region))
  758. if value != nil {
  759. // Must make a copy as slice is only valid within transaction.
  760. data = make([]byte, len(value))
  761. copy(data, value)
  762. }
  763. return nil
  764. })
  765. if err != nil {
  766. return nil, common.ContextError(err)
  767. }
  768. return data, nil
  769. }
  770. // SetUrlETag stores an ETag for the specfied URL.
  771. // Note: input URL is treated as a string, and is not
  772. // encoded or decoded or otherwise canonicalized.
  773. func SetUrlETag(url, etag string) error {
  774. checkInitDataStore()
  775. err := singleton.db.Update(func(tx *bolt.Tx) error {
  776. bucket := tx.Bucket([]byte(urlETagsBucket))
  777. err := bucket.Put([]byte(url), []byte(etag))
  778. return err
  779. })
  780. if err != nil {
  781. return common.ContextError(err)
  782. }
  783. return nil
  784. }
  785. // GetUrlETag retrieves a previously stored an ETag for the
  786. // specfied URL. If not found, it returns an empty string value.
  787. func GetUrlETag(url string) (etag string, err error) {
  788. checkInitDataStore()
  789. err = singleton.db.View(func(tx *bolt.Tx) error {
  790. bucket := tx.Bucket([]byte(urlETagsBucket))
  791. etag = string(bucket.Get([]byte(url)))
  792. return nil
  793. })
  794. if err != nil {
  795. return "", common.ContextError(err)
  796. }
  797. return etag, nil
  798. }
  799. // SetKeyValue stores a key/value pair.
  800. func SetKeyValue(key, value string) error {
  801. checkInitDataStore()
  802. err := singleton.db.Update(func(tx *bolt.Tx) error {
  803. bucket := tx.Bucket([]byte(keyValueBucket))
  804. err := bucket.Put([]byte(key), []byte(value))
  805. return err
  806. })
  807. if err != nil {
  808. return common.ContextError(err)
  809. }
  810. return nil
  811. }
  812. // GetKeyValue retrieves the value for a given key. If not found,
  813. // it returns an empty string value.
  814. func GetKeyValue(key string) (value string, err error) {
  815. checkInitDataStore()
  816. err = singleton.db.View(func(tx *bolt.Tx) error {
  817. bucket := tx.Bucket([]byte(keyValueBucket))
  818. value = string(bucket.Get([]byte(key)))
  819. return nil
  820. })
  821. if err != nil {
  822. return "", common.ContextError(err)
  823. }
  824. return value, nil
  825. }
  826. // Persistent stat records in the persistentStatStateUnreported
  827. // state are available for take out.
  828. //
  829. // Records in the persistentStatStateReporting have been taken
  830. // out and are pending either deletion (for a successful request)
  831. // or change to StateUnreported (for a failed request).
  832. //
  833. // All persistent stat records are reverted to StateUnreported
  834. // when the datastore is initialized at start up.
  835. var persistentStatStateUnreported = []byte("0")
  836. var persistentStatStateReporting = []byte("1")
  837. var persistentStatTypes = []string{
  838. PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST,
  839. }
  840. // StorePersistentStat adds a new persistent stat record, which
  841. // is set to StateUnreported and is an immediate candidate for
  842. // reporting.
  843. //
  844. // The stat is a JSON byte array containing fields as
  845. // required by the Psiphon server API. It's assumed that the
  846. // JSON value contains enough unique information for the value to
  847. // function as a key in the key/value datastore. This assumption
  848. // is currently satisfied by the fields sessionId + tunnelNumber
  849. // for tunnel stats, and URL + ETag for remote server list stats.
  850. func StorePersistentStat(statType string, stat []byte) error {
  851. checkInitDataStore()
  852. if !common.Contains(persistentStatTypes, statType) {
  853. return common.ContextError(fmt.Errorf("invalid persistent stat type: %s", statType))
  854. }
  855. err := singleton.db.Update(func(tx *bolt.Tx) error {
  856. bucket := tx.Bucket([]byte(statType))
  857. err := bucket.Put(stat, persistentStatStateUnreported)
  858. return err
  859. })
  860. if err != nil {
  861. return common.ContextError(err)
  862. }
  863. return nil
  864. }
  865. // CountUnreportedPersistentStats returns the number of persistent
  866. // stat records in StateUnreported.
  867. func CountUnreportedPersistentStats() int {
  868. checkInitDataStore()
  869. unreported := 0
  870. err := singleton.db.View(func(tx *bolt.Tx) error {
  871. for _, statType := range persistentStatTypes {
  872. bucket := tx.Bucket([]byte(statType))
  873. cursor := bucket.Cursor()
  874. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  875. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  876. unreported++
  877. break
  878. }
  879. }
  880. }
  881. return nil
  882. })
  883. if err != nil {
  884. NoticeAlert("CountUnreportedPersistentStats failed: %s", err)
  885. return 0
  886. }
  887. return unreported
  888. }
  889. // TakeOutUnreportedPersistentStats returns up to maxCount persistent
  890. // stats records that are in StateUnreported. The records are set to
  891. // StateReporting. If the records are successfully reported, clear them
  892. // with ClearReportedPersistentStats. If the records are not successfully
  893. // reported, restore them with PutBackUnreportedPersistentStats.
  894. func TakeOutUnreportedPersistentStats(maxCount int) (map[string][][]byte, error) {
  895. checkInitDataStore()
  896. stats := make(map[string][][]byte)
  897. err := singleton.db.Update(func(tx *bolt.Tx) error {
  898. count := 0
  899. for _, statType := range persistentStatTypes {
  900. bucket := tx.Bucket([]byte(statType))
  901. cursor := bucket.Cursor()
  902. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  903. if count >= maxCount {
  904. break
  905. }
  906. // Perform a test JSON unmarshaling. In case of data corruption or a bug,
  907. // skip the record.
  908. var jsonData interface{}
  909. err := json.Unmarshal(key, &jsonData)
  910. if err != nil {
  911. NoticeAlert(
  912. "Invalid key in TakeOutUnreportedPersistentStats: %s: %s",
  913. string(key), err)
  914. continue
  915. }
  916. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  917. // Must make a copy as slice is only valid within transaction.
  918. data := make([]byte, len(key))
  919. copy(data, key)
  920. if stats[statType] == nil {
  921. stats[statType] = make([][]byte, 0)
  922. }
  923. stats[statType] = append(stats[statType], data)
  924. count += 1
  925. }
  926. }
  927. for _, key := range stats[statType] {
  928. err := bucket.Put(key, persistentStatStateReporting)
  929. if err != nil {
  930. return err
  931. }
  932. }
  933. }
  934. return nil
  935. })
  936. if err != nil {
  937. return nil, common.ContextError(err)
  938. }
  939. return stats, nil
  940. }
  941. // PutBackUnreportedPersistentStats restores a list of persistent
  942. // stat records to StateUnreported.
  943. func PutBackUnreportedPersistentStats(stats map[string][][]byte) error {
  944. checkInitDataStore()
  945. err := singleton.db.Update(func(tx *bolt.Tx) error {
  946. for _, statType := range persistentStatTypes {
  947. bucket := tx.Bucket([]byte(statType))
  948. for _, key := range stats[statType] {
  949. err := bucket.Put(key, persistentStatStateUnreported)
  950. if err != nil {
  951. return err
  952. }
  953. }
  954. }
  955. return nil
  956. })
  957. if err != nil {
  958. return common.ContextError(err)
  959. }
  960. return nil
  961. }
  962. // ClearReportedPersistentStats deletes a list of persistent
  963. // stat records that were successfully reported.
  964. func ClearReportedPersistentStats(stats map[string][][]byte) error {
  965. checkInitDataStore()
  966. err := singleton.db.Update(func(tx *bolt.Tx) error {
  967. for _, statType := range persistentStatTypes {
  968. bucket := tx.Bucket([]byte(statType))
  969. for _, key := range stats[statType] {
  970. err := bucket.Delete(key)
  971. if err != nil {
  972. return err
  973. }
  974. }
  975. }
  976. return nil
  977. })
  978. if err != nil {
  979. return common.ContextError(err)
  980. }
  981. return nil
  982. }
  983. // resetAllPersistentStatsToUnreported sets all persistent stat
  984. // records to StateUnreported. This reset is called when the
  985. // datastore is initialized at start up, as we do not know if
  986. // persistent records in StateReporting were reported or not.
  987. func resetAllPersistentStatsToUnreported() error {
  988. checkInitDataStore()
  989. err := singleton.db.Update(func(tx *bolt.Tx) error {
  990. for _, statType := range persistentStatTypes {
  991. bucket := tx.Bucket([]byte(statType))
  992. resetKeys := make([][]byte, 0)
  993. cursor := bucket.Cursor()
  994. for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
  995. resetKeys = append(resetKeys, key)
  996. }
  997. // TODO: data mutation is done outside cursor. Is this
  998. // strictly necessary in this case? As is, this means
  999. // all stats need to be loaded into memory at once.
  1000. // https://godoc.org/github.com/boltdb/bolt#Cursor
  1001. for _, key := range resetKeys {
  1002. err := bucket.Put(key, persistentStatStateUnreported)
  1003. if err != nil {
  1004. return err
  1005. }
  1006. }
  1007. }
  1008. return nil
  1009. })
  1010. if err != nil {
  1011. return common.ContextError(err)
  1012. }
  1013. return nil
  1014. }
  1015. // CountSLOKs returns the total number of SLOK records.
  1016. func CountSLOKs() int {
  1017. checkInitDataStore()
  1018. count := 0
  1019. err := singleton.db.View(func(tx *bolt.Tx) error {
  1020. bucket := tx.Bucket([]byte(slokBucket))
  1021. cursor := bucket.Cursor()
  1022. for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
  1023. count++
  1024. }
  1025. return nil
  1026. })
  1027. if err != nil {
  1028. NoticeAlert("CountSLOKs failed: %s", err)
  1029. return 0
  1030. }
  1031. return count
  1032. }
  1033. // DeleteSLOKs deletes all SLOK records.
  1034. func DeleteSLOKs() error {
  1035. checkInitDataStore()
  1036. err := singleton.db.Update(func(tx *bolt.Tx) error {
  1037. bucket := tx.Bucket([]byte(slokBucket))
  1038. return bucket.ForEach(
  1039. func(id, _ []byte) error {
  1040. return bucket.Delete(id)
  1041. })
  1042. })
  1043. if err != nil {
  1044. return common.ContextError(err)
  1045. }
  1046. return nil
  1047. }
  1048. // SetSLOK stores a SLOK key, referenced by its ID. The bool
  1049. // return value indicates whether the SLOK was already stored.
  1050. func SetSLOK(id, key []byte) (bool, error) {
  1051. checkInitDataStore()
  1052. var duplicate bool
  1053. err := singleton.db.Update(func(tx *bolt.Tx) error {
  1054. bucket := tx.Bucket([]byte(slokBucket))
  1055. duplicate = bucket.Get(id) != nil
  1056. err := bucket.Put([]byte(id), []byte(key))
  1057. return err
  1058. })
  1059. if err != nil {
  1060. return false, common.ContextError(err)
  1061. }
  1062. return duplicate, nil
  1063. }
  1064. // GetSLOK returns a SLOK key for the specified ID. The return
  1065. // value is nil if the SLOK is not found.
  1066. func GetSLOK(id []byte) (key []byte, err error) {
  1067. checkInitDataStore()
  1068. err = singleton.db.View(func(tx *bolt.Tx) error {
  1069. bucket := tx.Bucket([]byte(slokBucket))
  1070. key = bucket.Get(id)
  1071. return nil
  1072. })
  1073. if err != nil {
  1074. return nil, common.ContextError(err)
  1075. }
  1076. return key, nil
  1077. }