dataStore.go 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "encoding/json"
  23. "errors"
  24. "fmt"
  25. "math/rand"
  26. "os"
  27. "path/filepath"
  28. "strings"
  29. "sync"
  30. "time"
  31. "github.com/Psiphon-Inc/bolt"
  32. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  33. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  34. )
  35. // The BoltDB dataStore implementation is an alternative to the sqlite3-based
  36. // implementation in dataStore.go. Both implementations have the same interface.
  37. //
  38. // BoltDB is pure Go, and is intended to be used in cases where we have trouble
  39. // building sqlite3/CGO (e.g., currently go mobile due to
  40. // https://github.com/mattn/go-sqlite3/issues/201), and perhaps ultimately as
  41. // the primary dataStore implementation.
  42. //
  43. type dataStore struct {
  44. init sync.Once
  45. db *bolt.DB
  46. }
  47. const (
  48. serverEntriesBucket = "serverEntries"
  49. rankedServerEntriesBucket = "rankedServerEntries"
  50. rankedServerEntriesKey = "rankedServerEntries"
  51. splitTunnelRouteETagsBucket = "splitTunnelRouteETags"
  52. splitTunnelRouteDataBucket = "splitTunnelRouteData"
  53. urlETagsBucket = "urlETags"
  54. keyValueBucket = "keyValues"
  55. tunnelStatsBucket = "tunnelStats"
  56. remoteServerListStatsBucket = "remoteServerListStats"
  57. slokBucket = "SLOKs"
  58. rankedServerEntryCount = 100
  59. )
  60. const (
  61. DATA_STORE_LAST_CONNECTED_KEY = "lastConnected"
  62. DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY = "lastServerEntryFilter"
  63. PERSISTENT_STAT_TYPE_TUNNEL = tunnelStatsBucket
  64. PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST = remoteServerListStatsBucket
  65. )
  66. var singleton dataStore
  67. // InitDataStore initializes the singleton instance of dataStore. This
  68. // function uses a sync.Once and is safe for use by concurrent goroutines.
  69. // The underlying sql.DB connection pool is also safe.
  70. //
  71. // Note: the sync.Once was more useful when initDataStore was private and
  72. // called on-demand by the public functions below. Now we require an explicit
  73. // InitDataStore() call with the filename passed in. The on-demand calls
  74. // have been replaced by checkInitDataStore() to assert that Init was called.
  75. func InitDataStore(config *Config) (err error) {
  76. singleton.init.Do(func() {
  77. // Need to gather the list of migratable server entries before
  78. // initializing the boltdb store (as prepareMigrationEntries
  79. // checks for the existence of the bolt db file)
  80. migratableServerEntries := prepareMigrationEntries(config)
  81. filename := filepath.Join(config.DataStoreDirectory, DATA_STORE_FILENAME)
  82. var db *bolt.DB
  83. for retry := 0; retry < 3; retry++ {
  84. if retry > 0 {
  85. NoticeAlert("InitDataStore retry: %d", retry)
  86. }
  87. db, err = bolt.Open(filename, 0600, &bolt.Options{Timeout: 1 * time.Second})
  88. // The datastore file may be corrupt, so attempt to delete and try again
  89. if err != nil {
  90. NoticeAlert("bolt.Open error: %s", err)
  91. os.Remove(filename)
  92. continue
  93. }
  94. // Run consistency checks on datastore and emit errors for diagnostics purposes
  95. // We assume this will complete quickly for typical size Psiphon datastores.
  96. err = db.View(func(tx *bolt.Tx) error {
  97. return tx.SynchronousCheck()
  98. })
  99. // The datastore file may be corrupt, so attempt to delete and try again
  100. if err != nil {
  101. NoticeAlert("bolt.SynchronousCheck error: %s", err)
  102. db.Close()
  103. os.Remove(filename)
  104. continue
  105. }
  106. break
  107. }
  108. if err != nil {
  109. // Note: intending to set the err return value for InitDataStore
  110. err = fmt.Errorf("initDataStore failed to open database: %s", err)
  111. return
  112. }
  113. err = db.Update(func(tx *bolt.Tx) error {
  114. requiredBuckets := []string{
  115. serverEntriesBucket,
  116. rankedServerEntriesBucket,
  117. splitTunnelRouteETagsBucket,
  118. splitTunnelRouteDataBucket,
  119. urlETagsBucket,
  120. keyValueBucket,
  121. tunnelStatsBucket,
  122. remoteServerListStatsBucket,
  123. slokBucket,
  124. }
  125. for _, bucket := range requiredBuckets {
  126. _, err := tx.CreateBucketIfNotExists([]byte(bucket))
  127. if err != nil {
  128. return err
  129. }
  130. }
  131. return nil
  132. })
  133. if err != nil {
  134. err = fmt.Errorf("initDataStore failed to create buckets: %s", err)
  135. return
  136. }
  137. singleton.db = db
  138. // The migrateServerEntries function requires the data store is
  139. // initialized prior to execution so that migrated entries can be stored
  140. if len(migratableServerEntries) > 0 {
  141. migrateEntries(migratableServerEntries, filepath.Join(config.DataStoreDirectory, LEGACY_DATA_STORE_FILENAME))
  142. }
  143. resetAllPersistentStatsToUnreported()
  144. })
  145. return err
  146. }
  147. func checkInitDataStore() {
  148. if singleton.db == nil {
  149. panic("checkInitDataStore: datastore not initialized")
  150. }
  151. }
  152. // StoreServerEntry adds the server entry to the data store.
  153. // A newly stored (or re-stored) server entry is assigned the next-to-top
  154. // rank for iteration order (the previous top ranked entry is promoted). The
  155. // purpose of inserting at next-to-top is to keep the last selected server
  156. // as the top ranked server.
  157. // When replaceIfExists is true, an existing server entry record is
  158. // overwritten; otherwise, the existing record is unchanged.
  159. // If the server entry data is malformed, an alert notice is issued and
  160. // the entry is skipped; no error is returned.
  161. func StoreServerEntry(serverEntry *protocol.ServerEntry, replaceIfExists bool) error {
  162. checkInitDataStore()
  163. // Server entries should already be validated before this point,
  164. // so instead of skipping we fail with an error.
  165. err := protocol.ValidateServerEntry(serverEntry)
  166. if err != nil {
  167. return common.ContextError(errors.New("invalid server entry"))
  168. }
  169. // BoltDB implementation note:
  170. // For simplicity, we don't maintain indexes on server entry
  171. // region or supported protocols. Instead, we perform full-bucket
  172. // scans with a filter. With a small enough database (thousands or
  173. // even tens of thousand of server entries) and common enough
  174. // values (e.g., many servers support all protocols), performance
  175. // is expected to be acceptable.
  176. err = singleton.db.Update(func(tx *bolt.Tx) error {
  177. serverEntries := tx.Bucket([]byte(serverEntriesBucket))
  178. // Check not only that the entry exists, but is valid. This
  179. // will replace in the rare case where the data is corrupt.
  180. existingServerEntryValid := false
  181. existingData := serverEntries.Get([]byte(serverEntry.IpAddress))
  182. if existingData != nil {
  183. existingServerEntry := new(protocol.ServerEntry)
  184. if json.Unmarshal(existingData, existingServerEntry) == nil {
  185. existingServerEntryValid = true
  186. }
  187. }
  188. if existingServerEntryValid && !replaceIfExists {
  189. // Disabling this notice, for now, as it generates too much noise
  190. // in diagnostics with clients that always submit embedded servers
  191. // to the core on each run.
  192. // NoticeInfo("ignored update for server %s", serverEntry.IpAddress)
  193. return nil
  194. }
  195. data, err := json.Marshal(serverEntry)
  196. if err != nil {
  197. return common.ContextError(err)
  198. }
  199. err = serverEntries.Put([]byte(serverEntry.IpAddress), data)
  200. if err != nil {
  201. return common.ContextError(err)
  202. }
  203. err = insertRankedServerEntry(tx, serverEntry.IpAddress, 1)
  204. if err != nil {
  205. return common.ContextError(err)
  206. }
  207. NoticeInfo("updated server %s", serverEntry.IpAddress)
  208. return nil
  209. })
  210. if err != nil {
  211. return common.ContextError(err)
  212. }
  213. return nil
  214. }
  215. // StoreServerEntries stores a list of server entries.
  216. // There is an independent transaction for each entry insert/update.
  217. func StoreServerEntries(serverEntries []*protocol.ServerEntry, replaceIfExists bool) error {
  218. checkInitDataStore()
  219. for _, serverEntry := range serverEntries {
  220. err := StoreServerEntry(serverEntry, replaceIfExists)
  221. if err != nil {
  222. return common.ContextError(err)
  223. }
  224. }
  225. // Since there has possibly been a significant change in the server entries,
  226. // take this opportunity to update the available egress regions.
  227. ReportAvailableRegions()
  228. return nil
  229. }
  230. // StreamingStoreServerEntries stores a list of server entries.
  231. // There is an independent transaction for each entry insert/update.
  232. func StreamingStoreServerEntries(
  233. serverEntries *protocol.StreamingServerEntryDecoder, replaceIfExists bool) error {
  234. checkInitDataStore()
  235. // Note: both StreamingServerEntryDecoder.Next and StoreServerEntry
  236. // allocate temporary memory buffers for hex/JSON decoding/encoding,
  237. // so this isn't true constant-memory streaming (it depends on garbage
  238. // collection).
  239. for {
  240. serverEntry, err := serverEntries.Next()
  241. if err != nil {
  242. return common.ContextError(err)
  243. }
  244. if serverEntry == nil {
  245. // No more server entries
  246. break
  247. }
  248. err = StoreServerEntry(serverEntry, replaceIfExists)
  249. if err != nil {
  250. return common.ContextError(err)
  251. }
  252. }
  253. // Since there has possibly been a significant change in the server entries,
  254. // take this opportunity to update the available egress regions.
  255. ReportAvailableRegions()
  256. return nil
  257. }
  258. // PromoteServerEntry assigns the top rank (one more than current
  259. // max rank) to the specified server entry. Server candidates are
  260. // iterated in decending rank order, so this server entry will be
  261. // the first candidate in a subsequent tunnel establishment.
  262. func PromoteServerEntry(config *Config, ipAddress string) error {
  263. checkInitDataStore()
  264. err := singleton.db.Update(func(tx *bolt.Tx) error {
  265. // Ensure the corresponding entry exists before
  266. // inserting into rank.
  267. bucket := tx.Bucket([]byte(serverEntriesBucket))
  268. data := bucket.Get([]byte(ipAddress))
  269. if data == nil {
  270. NoticeAlert(
  271. "PromoteServerEntry: ignoring unknown server entry: %s",
  272. ipAddress)
  273. return nil
  274. }
  275. err := insertRankedServerEntry(tx, ipAddress, 0)
  276. if err != nil {
  277. return err
  278. }
  279. // Store the current server entry filter (e.g, region, etc.) that
  280. // was in use when the entry was promoted. This is used to detect
  281. // when the top ranked server entry was promoted under a different
  282. // filter.
  283. currentFilter, err := makeServerEntryFilterValue(config)
  284. if err != nil {
  285. return err
  286. }
  287. bucket = tx.Bucket([]byte(keyValueBucket))
  288. return bucket.Put([]byte(DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY), currentFilter)
  289. })
  290. if err != nil {
  291. return common.ContextError(err)
  292. }
  293. return nil
  294. }
  295. func makeServerEntryFilterValue(config *Config) ([]byte, error) {
  296. filter, err := json.Marshal(
  297. struct {
  298. Region string
  299. Protocol string
  300. }{config.EgressRegion, config.TunnelProtocol})
  301. if err != nil {
  302. return nil, common.ContextError(err)
  303. }
  304. return filter, nil
  305. }
  306. func hasServerEntryFilterChanged(config *Config) (bool, error) {
  307. currentFilter, err := makeServerEntryFilterValue(config)
  308. if err != nil {
  309. return false, common.ContextError(err)
  310. }
  311. changed := false
  312. err = singleton.db.View(func(tx *bolt.Tx) error {
  313. // previousFilter will be nil not found (not previously
  314. // set) which will never match any current filter.
  315. bucket := tx.Bucket([]byte(keyValueBucket))
  316. previousFilter := bucket.Get([]byte(DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY))
  317. if bytes.Compare(previousFilter, currentFilter) == 0 {
  318. changed = true
  319. }
  320. return nil
  321. })
  322. if err != nil {
  323. return false, common.ContextError(err)
  324. }
  325. return changed, nil
  326. }
  327. func getRankedServerEntries(tx *bolt.Tx) ([]string, error) {
  328. bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
  329. data := bucket.Get([]byte(rankedServerEntriesKey))
  330. if data == nil {
  331. return []string{}, nil
  332. }
  333. rankedServerEntries := make([]string, 0)
  334. err := json.Unmarshal(data, &rankedServerEntries)
  335. if err != nil {
  336. return nil, common.ContextError(err)
  337. }
  338. return rankedServerEntries, nil
  339. }
  340. func setRankedServerEntries(tx *bolt.Tx, rankedServerEntries []string) error {
  341. data, err := json.Marshal(rankedServerEntries)
  342. if err != nil {
  343. return common.ContextError(err)
  344. }
  345. bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
  346. err = bucket.Put([]byte(rankedServerEntriesKey), data)
  347. if err != nil {
  348. return common.ContextError(err)
  349. }
  350. return nil
  351. }
  352. func insertRankedServerEntry(tx *bolt.Tx, serverEntryId string, position int) error {
  353. rankedServerEntries, err := getRankedServerEntries(tx)
  354. if err != nil {
  355. return common.ContextError(err)
  356. }
  357. // BoltDB implementation note:
  358. // For simplicity, we store the ranked server ids in an array serialized to
  359. // a single key value. To ensure this value doesn't grow without bound,
  360. // it's capped at rankedServerEntryCount. For now, this cap should be large
  361. // enough to meet the shuffleHeadLength = config.TunnelPoolSize criteria, for
  362. // any reasonable configuration of config.TunnelPoolSize.
  363. // Using: https://github.com/golang/go/wiki/SliceTricks
  364. // When serverEntryId is already ranked, remove it first to avoid duplicates
  365. for i, rankedServerEntryId := range rankedServerEntries {
  366. if rankedServerEntryId == serverEntryId {
  367. rankedServerEntries = append(
  368. rankedServerEntries[:i], rankedServerEntries[i+1:]...)
  369. break
  370. }
  371. }
  372. // SliceTricks insert, with length cap enforced
  373. if len(rankedServerEntries) < rankedServerEntryCount {
  374. rankedServerEntries = append(rankedServerEntries, "")
  375. }
  376. if position >= len(rankedServerEntries) {
  377. position = len(rankedServerEntries) - 1
  378. }
  379. copy(rankedServerEntries[position+1:], rankedServerEntries[position:])
  380. rankedServerEntries[position] = serverEntryId
  381. err = setRankedServerEntries(tx, rankedServerEntries)
  382. if err != nil {
  383. return common.ContextError(err)
  384. }
  385. return nil
  386. }
  387. // ServerEntryIterator is used to iterate over
  388. // stored server entries in rank order.
  389. type ServerEntryIterator struct {
  390. region string
  391. protocol string
  392. shuffleHeadLength int
  393. serverEntryIds []string
  394. serverEntryIndex int
  395. isTargetServerEntryIterator bool
  396. hasNextTargetServerEntry bool
  397. targetServerEntry *protocol.ServerEntry
  398. }
  399. // NewServerEntryIterator creates a new ServerEntryIterator.
  400. //
  401. // The boolean return value indicates whether to treat the first server(s)
  402. // as affinity servers or not. When the server entry selection filter changes
  403. // such as from a specific region to any region, or when there was no previous
  404. // filter/iterator, the the first server(s) are arbitrary and should not be
  405. // given affinity treatment.
  406. //
  407. // NewServerEntryIterator and any returned ServerEntryIterator are not
  408. // designed for concurrent use as not all related datastore operations are
  409. // performed in a single transaction.
  410. //
  411. func NewServerEntryIterator(config *Config) (bool, *ServerEntryIterator, error) {
  412. // When configured, this target server entry is the only candidate
  413. if config.TargetServerEntry != "" {
  414. return newTargetServerEntryIterator(config)
  415. }
  416. checkInitDataStore()
  417. applyServerAffinity, err := hasServerEntryFilterChanged(config)
  418. if err != nil {
  419. return false, nil, common.ContextError(err)
  420. }
  421. iterator := &ServerEntryIterator{
  422. region: config.EgressRegion,
  423. protocol: config.TunnelProtocol,
  424. shuffleHeadLength: config.TunnelPoolSize,
  425. isTargetServerEntryIterator: false,
  426. }
  427. err = iterator.Reset()
  428. if err != nil {
  429. return false, nil, common.ContextError(err)
  430. }
  431. return applyServerAffinity, iterator, nil
  432. }
  433. // newTargetServerEntryIterator is a helper for initializing the TargetServerEntry case
  434. func newTargetServerEntryIterator(config *Config) (bool, *ServerEntryIterator, error) {
  435. serverEntry, err := protocol.DecodeServerEntry(
  436. config.TargetServerEntry, common.GetCurrentTimestamp(), protocol.SERVER_ENTRY_SOURCE_TARGET)
  437. if err != nil {
  438. return false, nil, common.ContextError(err)
  439. }
  440. if config.EgressRegion != "" && serverEntry.Region != config.EgressRegion {
  441. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support EgressRegion"))
  442. }
  443. if config.TunnelProtocol != "" {
  444. // Note: same capability/protocol mapping as in StoreServerEntry
  445. requiredCapability := strings.TrimSuffix(config.TunnelProtocol, "-OSSH")
  446. if !common.Contains(serverEntry.Capabilities, requiredCapability) {
  447. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support TunnelProtocol"))
  448. }
  449. }
  450. iterator := &ServerEntryIterator{
  451. isTargetServerEntryIterator: true,
  452. hasNextTargetServerEntry: true,
  453. targetServerEntry: serverEntry,
  454. }
  455. NoticeInfo("using TargetServerEntry: %s", serverEntry.IpAddress)
  456. return false, iterator, nil
  457. }
  458. // Reset a NewServerEntryIterator to the start of its cycle. The next
  459. // call to Next will return the first server entry.
  460. func (iterator *ServerEntryIterator) Reset() error {
  461. iterator.Close()
  462. if iterator.isTargetServerEntryIterator {
  463. iterator.hasNextTargetServerEntry = true
  464. return nil
  465. }
  466. count := CountServerEntries(iterator.region, iterator.protocol)
  467. NoticeCandidateServers(iterator.region, iterator.protocol, count)
  468. // This query implements the Psiphon server candidate selection
  469. // algorithm: the first TunnelPoolSize server candidates are in rank
  470. // (priority) order, to favor previously successful servers; then the
  471. // remaining long tail is shuffled to raise up less recent candidates.
  472. // BoltDB implementation note:
  473. // We don't keep a transaction open for the duration of the iterator
  474. // because this would expose the following semantics to consumer code:
  475. //
  476. // Read-only transactions and read-write transactions ... generally
  477. // shouldn't be opened simultaneously in the same goroutine. This can
  478. // cause a deadlock as the read-write transaction needs to periodically
  479. // re-map the data file but it cannot do so while a read-only
  480. // transaction is open.
  481. // (https://github.com/boltdb/bolt)
  482. //
  483. // So the underlying serverEntriesBucket could change after the serverEntryIds
  484. // list is built.
  485. var serverEntryIds []string
  486. err := singleton.db.View(func(tx *bolt.Tx) error {
  487. var err error
  488. serverEntryIds, err = getRankedServerEntries(tx)
  489. if err != nil {
  490. return err
  491. }
  492. skipServerEntryIds := make(map[string]bool)
  493. for _, serverEntryId := range serverEntryIds {
  494. skipServerEntryIds[serverEntryId] = true
  495. }
  496. bucket := tx.Bucket([]byte(serverEntriesBucket))
  497. cursor := bucket.Cursor()
  498. for key, _ := cursor.Last(); key != nil; key, _ = cursor.Prev() {
  499. serverEntryId := string(key)
  500. if _, ok := skipServerEntryIds[serverEntryId]; ok {
  501. continue
  502. }
  503. serverEntryIds = append(serverEntryIds, serverEntryId)
  504. }
  505. return nil
  506. })
  507. if err != nil {
  508. return common.ContextError(err)
  509. }
  510. for i := len(serverEntryIds) - 1; i > iterator.shuffleHeadLength-1; i-- {
  511. j := rand.Intn(i+1-iterator.shuffleHeadLength) + iterator.shuffleHeadLength
  512. serverEntryIds[i], serverEntryIds[j] = serverEntryIds[j], serverEntryIds[i]
  513. }
  514. iterator.serverEntryIds = serverEntryIds
  515. iterator.serverEntryIndex = 0
  516. return nil
  517. }
  518. // Close cleans up resources associated with a ServerEntryIterator.
  519. func (iterator *ServerEntryIterator) Close() {
  520. iterator.serverEntryIds = nil
  521. iterator.serverEntryIndex = 0
  522. }
  523. // Next returns the next server entry, by rank, for a ServerEntryIterator.
  524. // Returns nil with no error when there is no next item.
  525. func (iterator *ServerEntryIterator) Next() (serverEntry *protocol.ServerEntry, err error) {
  526. defer func() {
  527. if err != nil {
  528. iterator.Close()
  529. }
  530. }()
  531. if iterator.isTargetServerEntryIterator {
  532. if iterator.hasNextTargetServerEntry {
  533. iterator.hasNextTargetServerEntry = false
  534. return MakeCompatibleServerEntry(iterator.targetServerEntry), nil
  535. }
  536. return nil, nil
  537. }
  538. // There are no region/protocol indexes for the server entries bucket.
  539. // Loop until we have the next server entry that matches the iterator
  540. // filter requirements.
  541. for {
  542. if iterator.serverEntryIndex >= len(iterator.serverEntryIds) {
  543. // There is no next item
  544. return nil, nil
  545. }
  546. serverEntryId := iterator.serverEntryIds[iterator.serverEntryIndex]
  547. iterator.serverEntryIndex += 1
  548. var data []byte
  549. err = singleton.db.View(func(tx *bolt.Tx) error {
  550. bucket := tx.Bucket([]byte(serverEntriesBucket))
  551. value := bucket.Get([]byte(serverEntryId))
  552. if value != nil {
  553. // Must make a copy as slice is only valid within transaction.
  554. data = make([]byte, len(value))
  555. copy(data, value)
  556. }
  557. return nil
  558. })
  559. if err != nil {
  560. return nil, common.ContextError(err)
  561. }
  562. if data == nil {
  563. // In case of data corruption or a bug causing this condition,
  564. // do not stop iterating.
  565. NoticeAlert("ServerEntryIterator.Next: unexpected missing server entry: %s", serverEntryId)
  566. continue
  567. }
  568. serverEntry = new(protocol.ServerEntry)
  569. err = json.Unmarshal(data, serverEntry)
  570. if err != nil {
  571. // In case of data corruption or a bug causing this condition,
  572. // do not stop iterating.
  573. NoticeAlert("ServerEntryIterator.Next: %s", common.ContextError(err))
  574. continue
  575. }
  576. // Check filter requirements
  577. if (iterator.region == "" || serverEntry.Region == iterator.region) &&
  578. (iterator.protocol == "" || serverEntry.SupportsProtocol(iterator.protocol)) {
  579. break
  580. }
  581. }
  582. return MakeCompatibleServerEntry(serverEntry), nil
  583. }
  584. // MakeCompatibleServerEntry provides backwards compatibility with old server entries
  585. // which have a single meekFrontingDomain and not a meekFrontingAddresses array.
  586. // By copying this one meekFrontingDomain into meekFrontingAddresses, this client effectively
  587. // uses that single value as legacy clients do.
  588. func MakeCompatibleServerEntry(serverEntry *protocol.ServerEntry) *protocol.ServerEntry {
  589. if len(serverEntry.MeekFrontingAddresses) == 0 && serverEntry.MeekFrontingDomain != "" {
  590. serverEntry.MeekFrontingAddresses =
  591. append(serverEntry.MeekFrontingAddresses, serverEntry.MeekFrontingDomain)
  592. }
  593. return serverEntry
  594. }
  595. func scanServerEntries(scanner func(*protocol.ServerEntry)) error {
  596. err := singleton.db.View(func(tx *bolt.Tx) error {
  597. bucket := tx.Bucket([]byte(serverEntriesBucket))
  598. cursor := bucket.Cursor()
  599. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  600. serverEntry := new(protocol.ServerEntry)
  601. err := json.Unmarshal(value, serverEntry)
  602. if err != nil {
  603. // In case of data corruption or a bug causing this condition,
  604. // do not stop iterating.
  605. NoticeAlert("scanServerEntries: %s", common.ContextError(err))
  606. continue
  607. }
  608. scanner(serverEntry)
  609. }
  610. return nil
  611. })
  612. if err != nil {
  613. return common.ContextError(err)
  614. }
  615. return nil
  616. }
  617. // CountServerEntries returns a count of stored servers for the
  618. // specified region and protocol.
  619. func CountServerEntries(region, tunnelProtocol string) int {
  620. checkInitDataStore()
  621. count := 0
  622. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  623. if (region == "" || serverEntry.Region == region) &&
  624. (tunnelProtocol == "" || serverEntry.SupportsProtocol(tunnelProtocol)) {
  625. count += 1
  626. }
  627. })
  628. if err != nil {
  629. NoticeAlert("CountServerEntries failed: %s", err)
  630. return 0
  631. }
  632. return count
  633. }
  634. // CountNonImpairedProtocols returns the number of distinct tunnel
  635. // protocols supported by stored server entries, excluding the
  636. // specified impaired protocols.
  637. func CountNonImpairedProtocols(
  638. region, tunnelProtocol string,
  639. impairedProtocols []string) int {
  640. checkInitDataStore()
  641. distinctProtocols := make(map[string]bool)
  642. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  643. if region == "" || serverEntry.Region == region {
  644. if tunnelProtocol != "" {
  645. if serverEntry.SupportsProtocol(tunnelProtocol) {
  646. distinctProtocols[tunnelProtocol] = true
  647. // Exit early, since only one protocol is enabled
  648. return
  649. }
  650. } else {
  651. for _, protocol := range protocol.SupportedTunnelProtocols {
  652. if serverEntry.SupportsProtocol(protocol) {
  653. distinctProtocols[protocol] = true
  654. }
  655. }
  656. }
  657. }
  658. })
  659. for _, protocol := range impairedProtocols {
  660. delete(distinctProtocols, protocol)
  661. }
  662. if err != nil {
  663. NoticeAlert("CountNonImpairedProtocols failed: %s", err)
  664. return 0
  665. }
  666. return len(distinctProtocols)
  667. }
  668. // ReportAvailableRegions prints a notice with the available egress regions.
  669. // Note that this report ignores config.TunnelProtocol.
  670. func ReportAvailableRegions() {
  671. checkInitDataStore()
  672. regions := make(map[string]bool)
  673. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  674. regions[serverEntry.Region] = true
  675. })
  676. if err != nil {
  677. NoticeAlert("ReportAvailableRegions failed: %s", err)
  678. return
  679. }
  680. regionList := make([]string, 0, len(regions))
  681. for region := range regions {
  682. // Some server entries do not have a region, but it makes no sense to return
  683. // an empty string as an "available region".
  684. if region != "" {
  685. regionList = append(regionList, region)
  686. }
  687. }
  688. NoticeAvailableEgressRegions(regionList)
  689. }
  690. // GetServerEntryIpAddresses returns an array containing
  691. // all stored server IP addresses.
  692. func GetServerEntryIpAddresses() (ipAddresses []string, err error) {
  693. checkInitDataStore()
  694. ipAddresses = make([]string, 0)
  695. err = scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  696. ipAddresses = append(ipAddresses, serverEntry.IpAddress)
  697. })
  698. if err != nil {
  699. return nil, common.ContextError(err)
  700. }
  701. return ipAddresses, nil
  702. }
  703. // SetSplitTunnelRoutes updates the cached routes data for
  704. // the given region. The associated etag is also stored and
  705. // used to make efficient web requests for updates to the data.
  706. func SetSplitTunnelRoutes(region, etag string, data []byte) error {
  707. checkInitDataStore()
  708. err := singleton.db.Update(func(tx *bolt.Tx) error {
  709. bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
  710. err := bucket.Put([]byte(region), []byte(etag))
  711. bucket = tx.Bucket([]byte(splitTunnelRouteDataBucket))
  712. err = bucket.Put([]byte(region), data)
  713. return err
  714. })
  715. if err != nil {
  716. return common.ContextError(err)
  717. }
  718. return nil
  719. }
  720. // GetSplitTunnelRoutesETag retrieves the etag for cached routes
  721. // data for the specified region. If not found, it returns an empty string value.
  722. func GetSplitTunnelRoutesETag(region string) (etag string, err error) {
  723. checkInitDataStore()
  724. err = singleton.db.View(func(tx *bolt.Tx) error {
  725. bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
  726. etag = string(bucket.Get([]byte(region)))
  727. return nil
  728. })
  729. if err != nil {
  730. return "", common.ContextError(err)
  731. }
  732. return etag, nil
  733. }
  734. // GetSplitTunnelRoutesData retrieves the cached routes data
  735. // for the specified region. If not found, it returns a nil value.
  736. func GetSplitTunnelRoutesData(region string) (data []byte, err error) {
  737. checkInitDataStore()
  738. err = singleton.db.View(func(tx *bolt.Tx) error {
  739. bucket := tx.Bucket([]byte(splitTunnelRouteDataBucket))
  740. value := bucket.Get([]byte(region))
  741. if value != nil {
  742. // Must make a copy as slice is only valid within transaction.
  743. data = make([]byte, len(value))
  744. copy(data, value)
  745. }
  746. return nil
  747. })
  748. if err != nil {
  749. return nil, common.ContextError(err)
  750. }
  751. return data, nil
  752. }
  753. // SetUrlETag stores an ETag for the specfied URL.
  754. // Note: input URL is treated as a string, and is not
  755. // encoded or decoded or otherwise canonicalized.
  756. func SetUrlETag(url, etag string) error {
  757. checkInitDataStore()
  758. err := singleton.db.Update(func(tx *bolt.Tx) error {
  759. bucket := tx.Bucket([]byte(urlETagsBucket))
  760. err := bucket.Put([]byte(url), []byte(etag))
  761. return err
  762. })
  763. if err != nil {
  764. return common.ContextError(err)
  765. }
  766. return nil
  767. }
  768. // GetUrlETag retrieves a previously stored an ETag for the
  769. // specfied URL. If not found, it returns an empty string value.
  770. func GetUrlETag(url string) (etag string, err error) {
  771. checkInitDataStore()
  772. err = singleton.db.View(func(tx *bolt.Tx) error {
  773. bucket := tx.Bucket([]byte(urlETagsBucket))
  774. etag = string(bucket.Get([]byte(url)))
  775. return nil
  776. })
  777. if err != nil {
  778. return "", common.ContextError(err)
  779. }
  780. return etag, nil
  781. }
  782. // SetKeyValue stores a key/value pair.
  783. func SetKeyValue(key, value string) error {
  784. checkInitDataStore()
  785. err := singleton.db.Update(func(tx *bolt.Tx) error {
  786. bucket := tx.Bucket([]byte(keyValueBucket))
  787. err := bucket.Put([]byte(key), []byte(value))
  788. return err
  789. })
  790. if err != nil {
  791. return common.ContextError(err)
  792. }
  793. return nil
  794. }
  795. // GetKeyValue retrieves the value for a given key. If not found,
  796. // it returns an empty string value.
  797. func GetKeyValue(key string) (value string, err error) {
  798. checkInitDataStore()
  799. err = singleton.db.View(func(tx *bolt.Tx) error {
  800. bucket := tx.Bucket([]byte(keyValueBucket))
  801. value = string(bucket.Get([]byte(key)))
  802. return nil
  803. })
  804. if err != nil {
  805. return "", common.ContextError(err)
  806. }
  807. return value, nil
  808. }
  809. // Persistent stat records in the persistentStatStateUnreported
  810. // state are available for take out.
  811. //
  812. // Records in the persistentStatStateReporting have been taken
  813. // out and are pending either deletion (for a successful request)
  814. // or change to StateUnreported (for a failed request).
  815. //
  816. // All persistent stat records are reverted to StateUnreported
  817. // when the datastore is initialized at start up.
  818. var persistentStatStateUnreported = []byte("0")
  819. var persistentStatStateReporting = []byte("1")
  820. var persistentStatTypes = []string{
  821. PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST,
  822. PERSISTENT_STAT_TYPE_TUNNEL,
  823. }
  824. // StorePersistentStat adds a new persistent stat record, which
  825. // is set to StateUnreported and is an immediate candidate for
  826. // reporting.
  827. //
  828. // The stat is a JSON byte array containing fields as
  829. // required by the Psiphon server API. It's assumed that the
  830. // JSON value contains enough unique information for the value to
  831. // function as a key in the key/value datastore. This assumption
  832. // is currently satisfied by the fields sessionId + tunnelNumber
  833. // for tunnel stats, and URL + ETag for remote server list stats.
  834. func StorePersistentStat(statType string, stat []byte) error {
  835. checkInitDataStore()
  836. if !common.Contains(persistentStatTypes, statType) {
  837. return common.ContextError(fmt.Errorf("invalid persistent stat type: %s", statType))
  838. }
  839. err := singleton.db.Update(func(tx *bolt.Tx) error {
  840. bucket := tx.Bucket([]byte(statType))
  841. err := bucket.Put(stat, persistentStatStateUnreported)
  842. return err
  843. })
  844. if err != nil {
  845. return common.ContextError(err)
  846. }
  847. return nil
  848. }
  849. // CountUnreportedPersistentStats returns the number of persistent
  850. // stat records in StateUnreported.
  851. func CountUnreportedPersistentStats() int {
  852. checkInitDataStore()
  853. unreported := 0
  854. err := singleton.db.View(func(tx *bolt.Tx) error {
  855. for _, statType := range persistentStatTypes {
  856. bucket := tx.Bucket([]byte(statType))
  857. cursor := bucket.Cursor()
  858. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  859. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  860. unreported++
  861. break
  862. }
  863. }
  864. }
  865. return nil
  866. })
  867. if err != nil {
  868. NoticeAlert("CountUnreportedPersistentStats failed: %s", err)
  869. return 0
  870. }
  871. return unreported
  872. }
  873. // TakeOutUnreportedPersistentStats returns up to maxCount persistent
  874. // stats records that are in StateUnreported. The records are set to
  875. // StateReporting. If the records are successfully reported, clear them
  876. // with ClearReportedPersistentStats. If the records are not successfully
  877. // reported, restore them with PutBackUnreportedPersistentStats.
  878. func TakeOutUnreportedPersistentStats(maxCount int) (map[string][][]byte, error) {
  879. checkInitDataStore()
  880. stats := make(map[string][][]byte)
  881. err := singleton.db.Update(func(tx *bolt.Tx) error {
  882. count := 0
  883. for _, statType := range persistentStatTypes {
  884. stats[statType] = make([][]byte, 0)
  885. bucket := tx.Bucket([]byte(statType))
  886. cursor := bucket.Cursor()
  887. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  888. if count >= maxCount {
  889. break
  890. }
  891. // Perform a test JSON unmarshaling. In case of data corruption or a bug,
  892. // skip the record.
  893. var jsonData interface{}
  894. err := json.Unmarshal(key, &jsonData)
  895. if err != nil {
  896. NoticeAlert(
  897. "Invalid key in TakeOutUnreportedPersistentStats: %s: %s",
  898. string(key), err)
  899. continue
  900. }
  901. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  902. // Must make a copy as slice is only valid within transaction.
  903. data := make([]byte, len(key))
  904. copy(data, key)
  905. stats[statType] = append(stats[statType], data)
  906. count += 1
  907. }
  908. }
  909. for _, key := range stats[statType] {
  910. err := bucket.Put(key, persistentStatStateReporting)
  911. if err != nil {
  912. return err
  913. }
  914. }
  915. }
  916. return nil
  917. })
  918. if err != nil {
  919. return nil, common.ContextError(err)
  920. }
  921. return stats, nil
  922. }
  923. // PutBackUnreportedPersistentStats restores a list of persistent
  924. // stat records to StateUnreported.
  925. func PutBackUnreportedPersistentStats(stats map[string][][]byte) error {
  926. checkInitDataStore()
  927. err := singleton.db.Update(func(tx *bolt.Tx) error {
  928. for _, statType := range persistentStatTypes {
  929. bucket := tx.Bucket([]byte(statType))
  930. for _, key := range stats[statType] {
  931. err := bucket.Put(key, persistentStatStateUnreported)
  932. if err != nil {
  933. return err
  934. }
  935. }
  936. }
  937. return nil
  938. })
  939. if err != nil {
  940. return common.ContextError(err)
  941. }
  942. return nil
  943. }
  944. // ClearReportedPersistentStats deletes a list of persistent
  945. // stat records that were successfully reported.
  946. func ClearReportedPersistentStats(stats map[string][][]byte) error {
  947. checkInitDataStore()
  948. err := singleton.db.Update(func(tx *bolt.Tx) error {
  949. for _, statType := range persistentStatTypes {
  950. bucket := tx.Bucket([]byte(statType))
  951. for _, key := range stats[statType] {
  952. err := bucket.Delete(key)
  953. if err != nil {
  954. return err
  955. }
  956. }
  957. }
  958. return nil
  959. })
  960. if err != nil {
  961. return common.ContextError(err)
  962. }
  963. return nil
  964. }
  965. // resetAllPersistentStatsToUnreported sets all persistent stat
  966. // records to StateUnreported. This reset is called when the
  967. // datastore is initialized at start up, as we do not know if
  968. // persistent records in StateReporting were reported or not.
  969. func resetAllPersistentStatsToUnreported() error {
  970. checkInitDataStore()
  971. err := singleton.db.Update(func(tx *bolt.Tx) error {
  972. for _, statType := range persistentStatTypes {
  973. bucket := tx.Bucket([]byte(statType))
  974. resetKeys := make([][]byte, 0)
  975. cursor := bucket.Cursor()
  976. for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
  977. resetKeys = append(resetKeys, key)
  978. }
  979. // TODO: data mutation is done outside cursor. Is this
  980. // strictly necessary in this case? As is, this means
  981. // all stats need to be loaded into memory at once.
  982. // https://godoc.org/github.com/boltdb/bolt#Cursor
  983. for _, key := range resetKeys {
  984. err := bucket.Put(key, persistentStatStateUnreported)
  985. if err != nil {
  986. return err
  987. }
  988. }
  989. }
  990. return nil
  991. })
  992. if err != nil {
  993. return common.ContextError(err)
  994. }
  995. return nil
  996. }
  997. // CountSLOKs returns the total number of SLOK records.
  998. func CountSLOKs() int {
  999. checkInitDataStore()
  1000. count := 0
  1001. err := singleton.db.View(func(tx *bolt.Tx) error {
  1002. bucket := tx.Bucket([]byte(slokBucket))
  1003. cursor := bucket.Cursor()
  1004. for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
  1005. count++
  1006. }
  1007. return nil
  1008. })
  1009. if err != nil {
  1010. NoticeAlert("CountSLOKs failed: %s", err)
  1011. return 0
  1012. }
  1013. return count
  1014. }
  1015. // DeleteSLOKs deletes all SLOK records.
  1016. func DeleteSLOKs() error {
  1017. checkInitDataStore()
  1018. err := singleton.db.Update(func(tx *bolt.Tx) error {
  1019. bucket := tx.Bucket([]byte(slokBucket))
  1020. return bucket.ForEach(
  1021. func(id, _ []byte) error {
  1022. return bucket.Delete(id)
  1023. })
  1024. })
  1025. if err != nil {
  1026. return common.ContextError(err)
  1027. }
  1028. return nil
  1029. }
  1030. // SetSLOK stores a SLOK key, referenced by its ID. The bool
  1031. // return value indicates whether the SLOK was already stored.
  1032. func SetSLOK(id, key []byte) (bool, error) {
  1033. checkInitDataStore()
  1034. var duplicate bool
  1035. err := singleton.db.Update(func(tx *bolt.Tx) error {
  1036. bucket := tx.Bucket([]byte(slokBucket))
  1037. duplicate = bucket.Get(id) != nil
  1038. err := bucket.Put([]byte(id), []byte(key))
  1039. return err
  1040. })
  1041. if err != nil {
  1042. return false, common.ContextError(err)
  1043. }
  1044. return duplicate, nil
  1045. }
  1046. // GetSLOK returns a SLOK key for the specified ID. The return
  1047. // value is nil if the SLOK is not found.
  1048. func GetSLOK(id []byte) (key []byte, err error) {
  1049. checkInitDataStore()
  1050. err = singleton.db.View(func(tx *bolt.Tx) error {
  1051. bucket := tx.Bucket([]byte(slokBucket))
  1052. key = bucket.Get(id)
  1053. return nil
  1054. })
  1055. if err != nil {
  1056. return nil, common.ContextError(err)
  1057. }
  1058. return key, nil
  1059. }