dataStore.go 32 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "encoding/json"
  23. "errors"
  24. "fmt"
  25. "math/rand"
  26. "os"
  27. "path/filepath"
  28. "strings"
  29. "sync"
  30. "time"
  31. "github.com/Psiphon-Inc/bolt"
  32. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  33. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  34. )
  35. // The BoltDB dataStore implementation is an alternative to the sqlite3-based
  36. // implementation in dataStore.go. Both implementations have the same interface.
  37. //
  38. // BoltDB is pure Go, and is intended to be used in cases where we have trouble
  39. // building sqlite3/CGO (e.g., currently go mobile due to
  40. // https://github.com/mattn/go-sqlite3/issues/201), and perhaps ultimately as
  41. // the primary dataStore implementation.
  42. //
  43. type dataStore struct {
  44. init sync.Once
  45. db *bolt.DB
  46. }
  47. const (
  48. serverEntriesBucket = "serverEntries"
  49. rankedServerEntriesBucket = "rankedServerEntries"
  50. rankedServerEntriesKey = "rankedServerEntries"
  51. splitTunnelRouteETagsBucket = "splitTunnelRouteETags"
  52. splitTunnelRouteDataBucket = "splitTunnelRouteData"
  53. urlETagsBucket = "urlETags"
  54. keyValueBucket = "keyValues"
  55. tunnelStatsBucket = "tunnelStats"
  56. remoteServerListStatsBucket = "remoteServerListStats"
  57. slokBucket = "SLOKs"
  58. rankedServerEntryCount = 100
  59. )
  60. const (
  61. DATA_STORE_LAST_CONNECTED_KEY = "lastConnected"
  62. DATA_STORE_OSL_REGISTRY_KEY = "OSLRegistry"
  63. PERSISTENT_STAT_TYPE_TUNNEL = tunnelStatsBucket
  64. PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST = remoteServerListStatsBucket
  65. )
  66. var singleton dataStore
  67. // InitDataStore initializes the singleton instance of dataStore. This
  68. // function uses a sync.Once and is safe for use by concurrent goroutines.
  69. // The underlying sql.DB connection pool is also safe.
  70. //
  71. // Note: the sync.Once was more useful when initDataStore was private and
  72. // called on-demand by the public functions below. Now we require an explicit
  73. // InitDataStore() call with the filename passed in. The on-demand calls
  74. // have been replaced by checkInitDataStore() to assert that Init was called.
  75. func InitDataStore(config *Config) (err error) {
  76. singleton.init.Do(func() {
  77. // Need to gather the list of migratable server entries before
  78. // initializing the boltdb store (as prepareMigrationEntries
  79. // checks for the existence of the bolt db file)
  80. migratableServerEntries := prepareMigrationEntries(config)
  81. filename := filepath.Join(config.DataStoreDirectory, DATA_STORE_FILENAME)
  82. var db *bolt.DB
  83. db, err = bolt.Open(filename, 0600, &bolt.Options{Timeout: 1 * time.Second})
  84. // The datastore file may be corrupt, so attempt to delete and try again
  85. if err != nil {
  86. NoticeAlert("retry on initDataStore error: %s", err)
  87. os.Remove(filename)
  88. db, err = bolt.Open(filename, 0600, &bolt.Options{Timeout: 1 * time.Second})
  89. }
  90. if err != nil {
  91. // Note: intending to set the err return value for InitDataStore
  92. err = fmt.Errorf("initDataStore failed to open database: %s", err)
  93. return
  94. }
  95. err = db.Update(func(tx *bolt.Tx) error {
  96. requiredBuckets := []string{
  97. serverEntriesBucket,
  98. rankedServerEntriesBucket,
  99. splitTunnelRouteETagsBucket,
  100. splitTunnelRouteDataBucket,
  101. urlETagsBucket,
  102. keyValueBucket,
  103. tunnelStatsBucket,
  104. remoteServerListStatsBucket,
  105. slokBucket,
  106. }
  107. for _, bucket := range requiredBuckets {
  108. _, err := tx.CreateBucketIfNotExists([]byte(bucket))
  109. if err != nil {
  110. return err
  111. }
  112. }
  113. return nil
  114. })
  115. if err != nil {
  116. err = fmt.Errorf("initDataStore failed to create buckets: %s", err)
  117. return
  118. }
  119. // Run consistency checks on datastore and emit errors for diagnostics purposes
  120. // We assume this will complete quickly for typical size Psiphon datastores.
  121. db.View(func(tx *bolt.Tx) error {
  122. err := <-tx.Check()
  123. if err != nil {
  124. NoticeAlert("boltdb Check(): %s", err)
  125. }
  126. return nil
  127. })
  128. singleton.db = db
  129. // The migrateServerEntries function requires the data store is
  130. // initialized prior to execution so that migrated entries can be stored
  131. if len(migratableServerEntries) > 0 {
  132. migrateEntries(migratableServerEntries, filepath.Join(config.DataStoreDirectory, LEGACY_DATA_STORE_FILENAME))
  133. }
  134. resetAllPersistentStatsToUnreported()
  135. })
  136. return err
  137. }
  138. func checkInitDataStore() {
  139. if singleton.db == nil {
  140. panic("checkInitDataStore: datastore not initialized")
  141. }
  142. }
  143. // StoreServerEntry adds the server entry to the data store.
  144. // A newly stored (or re-stored) server entry is assigned the next-to-top
  145. // rank for iteration order (the previous top ranked entry is promoted). The
  146. // purpose of inserting at next-to-top is to keep the last selected server
  147. // as the top ranked server.
  148. // When replaceIfExists is true, an existing server entry record is
  149. // overwritten; otherwise, the existing record is unchanged.
  150. // If the server entry data is malformed, an alert notice is issued and
  151. // the entry is skipped; no error is returned.
  152. func StoreServerEntry(serverEntry *protocol.ServerEntry, replaceIfExists bool) error {
  153. checkInitDataStore()
  154. // Server entries should already be validated before this point,
  155. // so instead of skipping we fail with an error.
  156. err := protocol.ValidateServerEntry(serverEntry)
  157. if err != nil {
  158. return common.ContextError(errors.New("invalid server entry"))
  159. }
  160. // BoltDB implementation note:
  161. // For simplicity, we don't maintain indexes on server entry
  162. // region or supported protocols. Instead, we perform full-bucket
  163. // scans with a filter. With a small enough database (thousands or
  164. // even tens of thousand of server entries) and common enough
  165. // values (e.g., many servers support all protocols), performance
  166. // is expected to be acceptable.
  167. err = singleton.db.Update(func(tx *bolt.Tx) error {
  168. serverEntries := tx.Bucket([]byte(serverEntriesBucket))
  169. // Check not only that the entry exists, but is valid. This
  170. // will replace in the rare case where the data is corrupt.
  171. existingServerEntryValid := false
  172. existingData := serverEntries.Get([]byte(serverEntry.IpAddress))
  173. if existingData != nil {
  174. existingServerEntry := new(protocol.ServerEntry)
  175. if json.Unmarshal(existingData, existingServerEntry) == nil {
  176. existingServerEntryValid = true
  177. }
  178. }
  179. if existingServerEntryValid && !replaceIfExists {
  180. // Disabling this notice, for now, as it generates too much noise
  181. // in diagnostics with clients that always submit embedded servers
  182. // to the core on each run.
  183. // NoticeInfo("ignored update for server %s", serverEntry.IpAddress)
  184. return nil
  185. }
  186. data, err := json.Marshal(serverEntry)
  187. if err != nil {
  188. return common.ContextError(err)
  189. }
  190. err = serverEntries.Put([]byte(serverEntry.IpAddress), data)
  191. if err != nil {
  192. return common.ContextError(err)
  193. }
  194. err = insertRankedServerEntry(tx, serverEntry.IpAddress, 1)
  195. if err != nil {
  196. return common.ContextError(err)
  197. }
  198. NoticeInfo("updated server %s", serverEntry.IpAddress)
  199. return nil
  200. })
  201. if err != nil {
  202. return common.ContextError(err)
  203. }
  204. return nil
  205. }
  206. // StoreServerEntries shuffles and stores a list of server entries.
  207. // Shuffling is performed on imported server entrues as part of client-side
  208. // load balancing.
  209. // There is an independent transaction for each entry insert/update.
  210. func StoreServerEntries(serverEntries []*protocol.ServerEntry, replaceIfExists bool) error {
  211. checkInitDataStore()
  212. for index := len(serverEntries) - 1; index > 0; index-- {
  213. swapIndex := rand.Intn(index + 1)
  214. serverEntries[index], serverEntries[swapIndex] = serverEntries[swapIndex], serverEntries[index]
  215. }
  216. for _, serverEntry := range serverEntries {
  217. err := StoreServerEntry(serverEntry, replaceIfExists)
  218. if err != nil {
  219. return common.ContextError(err)
  220. }
  221. }
  222. // Since there has possibly been a significant change in the server entries,
  223. // take this opportunity to update the available egress regions.
  224. ReportAvailableRegions()
  225. return nil
  226. }
  227. // PromoteServerEntry assigns the top rank (one more than current
  228. // max rank) to the specified server entry. Server candidates are
  229. // iterated in decending rank order, so this server entry will be
  230. // the first candidate in a subsequent tunnel establishment.
  231. func PromoteServerEntry(ipAddress string) error {
  232. checkInitDataStore()
  233. err := singleton.db.Update(func(tx *bolt.Tx) error {
  234. // Ensure the corresponding entry exists before
  235. // inserting into rank.
  236. bucket := tx.Bucket([]byte(serverEntriesBucket))
  237. data := bucket.Get([]byte(ipAddress))
  238. if data == nil {
  239. NoticeAlert(
  240. "PromoteServerEntry: ignoring unknown server entry: %s",
  241. ipAddress)
  242. return nil
  243. }
  244. return insertRankedServerEntry(tx, ipAddress, 0)
  245. })
  246. if err != nil {
  247. return common.ContextError(err)
  248. }
  249. return nil
  250. }
  251. func getRankedServerEntries(tx *bolt.Tx) ([]string, error) {
  252. bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
  253. data := bucket.Get([]byte(rankedServerEntriesKey))
  254. if data == nil {
  255. return []string{}, nil
  256. }
  257. rankedServerEntries := make([]string, 0)
  258. err := json.Unmarshal(data, &rankedServerEntries)
  259. if err != nil {
  260. return nil, common.ContextError(err)
  261. }
  262. return rankedServerEntries, nil
  263. }
  264. func setRankedServerEntries(tx *bolt.Tx, rankedServerEntries []string) error {
  265. data, err := json.Marshal(rankedServerEntries)
  266. if err != nil {
  267. return common.ContextError(err)
  268. }
  269. bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
  270. err = bucket.Put([]byte(rankedServerEntriesKey), data)
  271. if err != nil {
  272. return common.ContextError(err)
  273. }
  274. return nil
  275. }
  276. func insertRankedServerEntry(tx *bolt.Tx, serverEntryId string, position int) error {
  277. rankedServerEntries, err := getRankedServerEntries(tx)
  278. if err != nil {
  279. return common.ContextError(err)
  280. }
  281. // BoltDB implementation note:
  282. // For simplicity, we store the ranked server ids in an array serialized to
  283. // a single key value. To ensure this value doesn't grow without bound,
  284. // it's capped at rankedServerEntryCount. For now, this cap should be large
  285. // enough to meet the shuffleHeadLength = config.TunnelPoolSize criteria, for
  286. // any reasonable configuration of config.TunnelPoolSize.
  287. // Using: https://github.com/golang/go/wiki/SliceTricks
  288. // When serverEntryId is already ranked, remove it first to avoid duplicates
  289. for i, rankedServerEntryId := range rankedServerEntries {
  290. if rankedServerEntryId == serverEntryId {
  291. rankedServerEntries = append(
  292. rankedServerEntries[:i], rankedServerEntries[i+1:]...)
  293. break
  294. }
  295. }
  296. // SliceTricks insert, with length cap enforced
  297. if len(rankedServerEntries) < rankedServerEntryCount {
  298. rankedServerEntries = append(rankedServerEntries, "")
  299. }
  300. if position >= len(rankedServerEntries) {
  301. position = len(rankedServerEntries) - 1
  302. }
  303. copy(rankedServerEntries[position+1:], rankedServerEntries[position:])
  304. rankedServerEntries[position] = serverEntryId
  305. err = setRankedServerEntries(tx, rankedServerEntries)
  306. if err != nil {
  307. return common.ContextError(err)
  308. }
  309. return nil
  310. }
  311. func serverEntrySupportsProtocol(serverEntry *protocol.ServerEntry, protocol string) bool {
  312. // Note: for meek, the capabilities are FRONTED-MEEK and UNFRONTED-MEEK
  313. // and the additonal OSSH service is assumed to be available internally.
  314. requiredCapability := strings.TrimSuffix(protocol, "-OSSH")
  315. return common.Contains(serverEntry.Capabilities, requiredCapability)
  316. }
  317. // ServerEntryIterator is used to iterate over
  318. // stored server entries in rank order.
  319. type ServerEntryIterator struct {
  320. region string
  321. protocol string
  322. shuffleHeadLength int
  323. serverEntryIds []string
  324. serverEntryIndex int
  325. isTargetServerEntryIterator bool
  326. hasNextTargetServerEntry bool
  327. targetServerEntry *protocol.ServerEntry
  328. }
  329. // NewServerEntryIterator creates a new ServerEntryIterator
  330. func NewServerEntryIterator(config *Config) (iterator *ServerEntryIterator, err error) {
  331. // When configured, this target server entry is the only candidate
  332. if config.TargetServerEntry != "" {
  333. return newTargetServerEntryIterator(config)
  334. }
  335. checkInitDataStore()
  336. iterator = &ServerEntryIterator{
  337. region: config.EgressRegion,
  338. protocol: config.TunnelProtocol,
  339. shuffleHeadLength: config.TunnelPoolSize,
  340. isTargetServerEntryIterator: false,
  341. }
  342. err = iterator.Reset()
  343. if err != nil {
  344. return nil, err
  345. }
  346. return iterator, nil
  347. }
  348. // newTargetServerEntryIterator is a helper for initializing the TargetServerEntry case
  349. func newTargetServerEntryIterator(config *Config) (iterator *ServerEntryIterator, err error) {
  350. serverEntry, err := protocol.DecodeServerEntry(
  351. config.TargetServerEntry, common.GetCurrentTimestamp(), protocol.SERVER_ENTRY_SOURCE_TARGET)
  352. if err != nil {
  353. return nil, err
  354. }
  355. if config.EgressRegion != "" && serverEntry.Region != config.EgressRegion {
  356. return nil, errors.New("TargetServerEntry does not support EgressRegion")
  357. }
  358. if config.TunnelProtocol != "" {
  359. // Note: same capability/protocol mapping as in StoreServerEntry
  360. requiredCapability := strings.TrimSuffix(config.TunnelProtocol, "-OSSH")
  361. if !common.Contains(serverEntry.Capabilities, requiredCapability) {
  362. return nil, errors.New("TargetServerEntry does not support TunnelProtocol")
  363. }
  364. }
  365. iterator = &ServerEntryIterator{
  366. isTargetServerEntryIterator: true,
  367. hasNextTargetServerEntry: true,
  368. targetServerEntry: serverEntry,
  369. }
  370. NoticeInfo("using TargetServerEntry: %s", serverEntry.IpAddress)
  371. return iterator, nil
  372. }
  373. // Reset a NewServerEntryIterator to the start of its cycle. The next
  374. // call to Next will return the first server entry.
  375. func (iterator *ServerEntryIterator) Reset() error {
  376. iterator.Close()
  377. if iterator.isTargetServerEntryIterator {
  378. iterator.hasNextTargetServerEntry = true
  379. return nil
  380. }
  381. count := CountServerEntries(iterator.region, iterator.protocol)
  382. NoticeCandidateServers(iterator.region, iterator.protocol, count)
  383. // This query implements the Psiphon server candidate selection
  384. // algorithm: the first TunnelPoolSize server candidates are in rank
  385. // (priority) order, to favor previously successful servers; then the
  386. // remaining long tail is shuffled to raise up less recent candidates.
  387. // BoltDB implementation note:
  388. // We don't keep a transaction open for the duration of the iterator
  389. // because this would expose the following semantics to consumer code:
  390. //
  391. // Read-only transactions and read-write transactions ... generally
  392. // shouldn't be opened simultaneously in the same goroutine. This can
  393. // cause a deadlock as the read-write transaction needs to periodically
  394. // re-map the data file but it cannot do so while a read-only
  395. // transaction is open.
  396. // (https://github.com/boltdb/bolt)
  397. //
  398. // So the underlying serverEntriesBucket could change after the serverEntryIds
  399. // list is built.
  400. var serverEntryIds []string
  401. err := singleton.db.View(func(tx *bolt.Tx) error {
  402. var err error
  403. serverEntryIds, err = getRankedServerEntries(tx)
  404. if err != nil {
  405. return err
  406. }
  407. skipServerEntryIds := make(map[string]bool)
  408. for _, serverEntryId := range serverEntryIds {
  409. skipServerEntryIds[serverEntryId] = true
  410. }
  411. bucket := tx.Bucket([]byte(serverEntriesBucket))
  412. cursor := bucket.Cursor()
  413. for key, _ := cursor.Last(); key != nil; key, _ = cursor.Prev() {
  414. serverEntryId := string(key)
  415. if _, ok := skipServerEntryIds[serverEntryId]; ok {
  416. continue
  417. }
  418. serverEntryIds = append(serverEntryIds, serverEntryId)
  419. }
  420. return nil
  421. })
  422. if err != nil {
  423. return common.ContextError(err)
  424. }
  425. for i := len(serverEntryIds) - 1; i > iterator.shuffleHeadLength-1; i-- {
  426. j := rand.Intn(i+1-iterator.shuffleHeadLength) + iterator.shuffleHeadLength
  427. serverEntryIds[i], serverEntryIds[j] = serverEntryIds[j], serverEntryIds[i]
  428. }
  429. iterator.serverEntryIds = serverEntryIds
  430. iterator.serverEntryIndex = 0
  431. return nil
  432. }
  433. // Close cleans up resources associated with a ServerEntryIterator.
  434. func (iterator *ServerEntryIterator) Close() {
  435. iterator.serverEntryIds = nil
  436. iterator.serverEntryIndex = 0
  437. }
  438. // Next returns the next server entry, by rank, for a ServerEntryIterator.
  439. // Returns nil with no error when there is no next item.
  440. func (iterator *ServerEntryIterator) Next() (serverEntry *protocol.ServerEntry, err error) {
  441. defer func() {
  442. if err != nil {
  443. iterator.Close()
  444. }
  445. }()
  446. if iterator.isTargetServerEntryIterator {
  447. if iterator.hasNextTargetServerEntry {
  448. iterator.hasNextTargetServerEntry = false
  449. return MakeCompatibleServerEntry(iterator.targetServerEntry), nil
  450. }
  451. return nil, nil
  452. }
  453. // There are no region/protocol indexes for the server entries bucket.
  454. // Loop until we have the next server entry that matches the iterator
  455. // filter requirements.
  456. for {
  457. if iterator.serverEntryIndex >= len(iterator.serverEntryIds) {
  458. // There is no next item
  459. return nil, nil
  460. }
  461. serverEntryId := iterator.serverEntryIds[iterator.serverEntryIndex]
  462. iterator.serverEntryIndex += 1
  463. var data []byte
  464. err = singleton.db.View(func(tx *bolt.Tx) error {
  465. bucket := tx.Bucket([]byte(serverEntriesBucket))
  466. value := bucket.Get([]byte(serverEntryId))
  467. if value != nil {
  468. // Must make a copy as slice is only valid within transaction.
  469. data = make([]byte, len(value))
  470. copy(data, value)
  471. }
  472. return nil
  473. })
  474. if err != nil {
  475. return nil, common.ContextError(err)
  476. }
  477. if data == nil {
  478. // In case of data corruption or a bug causing this condition,
  479. // do not stop iterating.
  480. NoticeAlert("ServerEntryIterator.Next: unexpected missing server entry: %s", serverEntryId)
  481. continue
  482. }
  483. serverEntry = new(protocol.ServerEntry)
  484. err = json.Unmarshal(data, serverEntry)
  485. if err != nil {
  486. // In case of data corruption or a bug causing this condition,
  487. // do not stop iterating.
  488. NoticeAlert("ServerEntryIterator.Next: %s", common.ContextError(err))
  489. continue
  490. }
  491. // Check filter requirements
  492. if (iterator.region == "" || serverEntry.Region == iterator.region) &&
  493. (iterator.protocol == "" || serverEntrySupportsProtocol(serverEntry, iterator.protocol)) {
  494. break
  495. }
  496. }
  497. return MakeCompatibleServerEntry(serverEntry), nil
  498. }
  499. // MakeCompatibleServerEntry provides backwards compatibility with old server entries
  500. // which have a single meekFrontingDomain and not a meekFrontingAddresses array.
  501. // By copying this one meekFrontingDomain into meekFrontingAddresses, this client effectively
  502. // uses that single value as legacy clients do.
  503. func MakeCompatibleServerEntry(serverEntry *protocol.ServerEntry) *protocol.ServerEntry {
  504. if len(serverEntry.MeekFrontingAddresses) == 0 && serverEntry.MeekFrontingDomain != "" {
  505. serverEntry.MeekFrontingAddresses =
  506. append(serverEntry.MeekFrontingAddresses, serverEntry.MeekFrontingDomain)
  507. }
  508. return serverEntry
  509. }
  510. func scanServerEntries(scanner func(*protocol.ServerEntry)) error {
  511. err := singleton.db.View(func(tx *bolt.Tx) error {
  512. bucket := tx.Bucket([]byte(serverEntriesBucket))
  513. cursor := bucket.Cursor()
  514. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  515. serverEntry := new(protocol.ServerEntry)
  516. err := json.Unmarshal(value, serverEntry)
  517. if err != nil {
  518. // In case of data corruption or a bug causing this condition,
  519. // do not stop iterating.
  520. NoticeAlert("scanServerEntries: %s", common.ContextError(err))
  521. continue
  522. }
  523. scanner(serverEntry)
  524. }
  525. return nil
  526. })
  527. if err != nil {
  528. return common.ContextError(err)
  529. }
  530. return nil
  531. }
  532. // CountServerEntries returns a count of stored servers for the
  533. // specified region and protocol.
  534. func CountServerEntries(region, tunnelProtocol string) int {
  535. checkInitDataStore()
  536. count := 0
  537. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  538. if (region == "" || serverEntry.Region == region) &&
  539. (tunnelProtocol == "" || serverEntrySupportsProtocol(serverEntry, tunnelProtocol)) {
  540. count += 1
  541. }
  542. })
  543. if err != nil {
  544. NoticeAlert("CountServerEntries failed: %s", err)
  545. return 0
  546. }
  547. return count
  548. }
  549. // ReportAvailableRegions prints a notice with the available egress regions.
  550. // Note that this report ignores config.TunnelProtocol.
  551. func ReportAvailableRegions() {
  552. checkInitDataStore()
  553. regions := make(map[string]bool)
  554. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  555. regions[serverEntry.Region] = true
  556. })
  557. if err != nil {
  558. NoticeAlert("ReportAvailableRegions failed: %s", err)
  559. return
  560. }
  561. regionList := make([]string, 0, len(regions))
  562. for region, _ := range regions {
  563. // Some server entries do not have a region, but it makes no sense to return
  564. // an empty string as an "available region".
  565. if region != "" {
  566. regionList = append(regionList, region)
  567. }
  568. }
  569. NoticeAvailableEgressRegions(regionList)
  570. }
  571. // GetServerEntryIpAddresses returns an array containing
  572. // all stored server IP addresses.
  573. func GetServerEntryIpAddresses() (ipAddresses []string, err error) {
  574. checkInitDataStore()
  575. ipAddresses = make([]string, 0)
  576. err = scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  577. ipAddresses = append(ipAddresses, serverEntry.IpAddress)
  578. })
  579. if err != nil {
  580. return nil, common.ContextError(err)
  581. }
  582. return ipAddresses, nil
  583. }
  584. // SetSplitTunnelRoutes updates the cached routes data for
  585. // the given region. The associated etag is also stored and
  586. // used to make efficient web requests for updates to the data.
  587. func SetSplitTunnelRoutes(region, etag string, data []byte) error {
  588. checkInitDataStore()
  589. err := singleton.db.Update(func(tx *bolt.Tx) error {
  590. bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
  591. err := bucket.Put([]byte(region), []byte(etag))
  592. bucket = tx.Bucket([]byte(splitTunnelRouteDataBucket))
  593. err = bucket.Put([]byte(region), data)
  594. return err
  595. })
  596. if err != nil {
  597. return common.ContextError(err)
  598. }
  599. return nil
  600. }
  601. // GetSplitTunnelRoutesETag retrieves the etag for cached routes
  602. // data for the specified region. If not found, it returns an empty string value.
  603. func GetSplitTunnelRoutesETag(region string) (etag string, err error) {
  604. checkInitDataStore()
  605. err = singleton.db.View(func(tx *bolt.Tx) error {
  606. bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
  607. etag = string(bucket.Get([]byte(region)))
  608. return nil
  609. })
  610. if err != nil {
  611. return "", common.ContextError(err)
  612. }
  613. return etag, nil
  614. }
  615. // GetSplitTunnelRoutesData retrieves the cached routes data
  616. // for the specified region. If not found, it returns a nil value.
  617. func GetSplitTunnelRoutesData(region string) (data []byte, err error) {
  618. checkInitDataStore()
  619. err = singleton.db.View(func(tx *bolt.Tx) error {
  620. bucket := tx.Bucket([]byte(splitTunnelRouteDataBucket))
  621. value := bucket.Get([]byte(region))
  622. if value != nil {
  623. // Must make a copy as slice is only valid within transaction.
  624. data = make([]byte, len(value))
  625. copy(data, value)
  626. }
  627. return nil
  628. })
  629. if err != nil {
  630. return nil, common.ContextError(err)
  631. }
  632. return data, nil
  633. }
  634. // SetUrlETag stores an ETag for the specfied URL.
  635. // Note: input URL is treated as a string, and is not
  636. // encoded or decoded or otherwise canonicalized.
  637. func SetUrlETag(url, etag string) error {
  638. checkInitDataStore()
  639. err := singleton.db.Update(func(tx *bolt.Tx) error {
  640. bucket := tx.Bucket([]byte(urlETagsBucket))
  641. err := bucket.Put([]byte(url), []byte(etag))
  642. return err
  643. })
  644. if err != nil {
  645. return common.ContextError(err)
  646. }
  647. return nil
  648. }
  649. // GetUrlETag retrieves a previously stored an ETag for the
  650. // specfied URL. If not found, it returns an empty string value.
  651. func GetUrlETag(url string) (etag string, err error) {
  652. checkInitDataStore()
  653. err = singleton.db.View(func(tx *bolt.Tx) error {
  654. bucket := tx.Bucket([]byte(urlETagsBucket))
  655. etag = string(bucket.Get([]byte(url)))
  656. return nil
  657. })
  658. if err != nil {
  659. return "", common.ContextError(err)
  660. }
  661. return etag, nil
  662. }
  663. // SetKeyValue stores a key/value pair.
  664. func SetKeyValue(key, value string) error {
  665. checkInitDataStore()
  666. err := singleton.db.Update(func(tx *bolt.Tx) error {
  667. bucket := tx.Bucket([]byte(keyValueBucket))
  668. err := bucket.Put([]byte(key), []byte(value))
  669. return err
  670. })
  671. if err != nil {
  672. return common.ContextError(err)
  673. }
  674. return nil
  675. }
  676. // GetKeyValue retrieves the value for a given key. If not found,
  677. // it returns an empty string value.
  678. func GetKeyValue(key string) (value string, err error) {
  679. checkInitDataStore()
  680. err = singleton.db.View(func(tx *bolt.Tx) error {
  681. bucket := tx.Bucket([]byte(keyValueBucket))
  682. value = string(bucket.Get([]byte(key)))
  683. return nil
  684. })
  685. if err != nil {
  686. return "", common.ContextError(err)
  687. }
  688. return value, nil
  689. }
  690. // Persistent stat records in the persistentStatStateUnreported
  691. // state are available for take out.
  692. //
  693. // Records in the persistentStatStateReporting have been taken
  694. // out and are pending either deletion (for a successful request)
  695. // or change to StateUnreported (for a failed request).
  696. //
  697. // All persistent stat records are reverted to StateUnreported
  698. // when the datastore is initialized at start up.
  699. var persistentStatStateUnreported = []byte("0")
  700. var persistentStatStateReporting = []byte("1")
  701. var persistentStatTypes = []string{
  702. PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST,
  703. PERSISTENT_STAT_TYPE_TUNNEL,
  704. }
  705. // StorePersistentStats adds a new persistent stat record, which
  706. // is set to StateUnreported and is an immediate candidate for
  707. // reporting.
  708. //
  709. // The stat is a JSON byte array containing fields as
  710. // required by the Psiphon server API. It's assumed that the
  711. // JSON value contains enough unique information for the value to
  712. // function as a key in the key/value datastore. This assumption
  713. // is currently satisfied by the fields sessionId + tunnelNumber
  714. // for tunnel stats, and URL + ETag for remote server list stats.
  715. func StorePersistentStat(statType string, stat []byte) error {
  716. checkInitDataStore()
  717. if !common.Contains(persistentStatTypes, statType) {
  718. return common.ContextError(fmt.Errorf("invalid persistent stat type: %s", statType))
  719. }
  720. err := singleton.db.Update(func(tx *bolt.Tx) error {
  721. bucket := tx.Bucket([]byte(statType))
  722. err := bucket.Put(stat, persistentStatStateUnreported)
  723. return err
  724. })
  725. if err != nil {
  726. return common.ContextError(err)
  727. }
  728. return nil
  729. }
  730. // CountUnreportedPersistentStats returns the number of persistent
  731. // stat records in StateUnreported.
  732. func CountUnreportedPersistentStats() int {
  733. checkInitDataStore()
  734. unreported := 0
  735. err := singleton.db.Update(func(tx *bolt.Tx) error {
  736. for _, statType := range persistentStatTypes {
  737. bucket := tx.Bucket([]byte(statType))
  738. cursor := bucket.Cursor()
  739. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  740. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  741. unreported++
  742. break
  743. }
  744. }
  745. }
  746. return nil
  747. })
  748. if err != nil {
  749. NoticeAlert("CountUnreportedPersistentStats failed: %s", err)
  750. return 0
  751. }
  752. return unreported
  753. }
  754. // TakeOutUnreportedPersistentStats returns up to maxCount persistent
  755. // stats records that are in StateUnreported. The records are set to
  756. // StateReporting. If the records are successfully reported, clear them
  757. // with ClearReportedPersistentStats. If the records are not successfully
  758. // reported, restore them with PutBackUnreportedPersistentStats.
  759. func TakeOutUnreportedPersistentStats(maxCount int) (map[string][][]byte, error) {
  760. checkInitDataStore()
  761. stats := make(map[string][][]byte)
  762. err := singleton.db.Update(func(tx *bolt.Tx) error {
  763. count := 0
  764. for _, statType := range persistentStatTypes {
  765. stats[statType] = make([][]byte, 0)
  766. bucket := tx.Bucket([]byte(statType))
  767. cursor := bucket.Cursor()
  768. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  769. if count >= maxCount {
  770. break
  771. }
  772. // Perform a test JSON unmarshaling. In case of data corruption or a bug,
  773. // skip the record.
  774. var jsonData interface{}
  775. err := json.Unmarshal(key, &jsonData)
  776. if err != nil {
  777. NoticeAlert(
  778. "Invalid key in TakeOutUnreportedPersistentStats: %s: %s",
  779. string(key), err)
  780. continue
  781. }
  782. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  783. // Must make a copy as slice is only valid within transaction.
  784. data := make([]byte, len(key))
  785. copy(data, key)
  786. stats[statType] = append(stats[statType], data)
  787. count += 1
  788. }
  789. }
  790. for _, key := range stats[statType] {
  791. err := bucket.Put(key, persistentStatStateReporting)
  792. if err != nil {
  793. return err
  794. }
  795. }
  796. }
  797. return nil
  798. })
  799. if err != nil {
  800. return nil, common.ContextError(err)
  801. }
  802. return stats, nil
  803. }
  804. // PutBackUnreportedPersistentStats restores a list of persistent
  805. // stat records to StateUnreported.
  806. func PutBackUnreportedPersistentStats(stats map[string][][]byte) error {
  807. checkInitDataStore()
  808. err := singleton.db.Update(func(tx *bolt.Tx) error {
  809. for _, statType := range persistentStatTypes {
  810. bucket := tx.Bucket([]byte(statType))
  811. for _, key := range stats[statType] {
  812. err := bucket.Put(key, persistentStatStateUnreported)
  813. if err != nil {
  814. return err
  815. }
  816. }
  817. }
  818. return nil
  819. })
  820. if err != nil {
  821. return common.ContextError(err)
  822. }
  823. return nil
  824. }
  825. // ClearReportedPersistentStats deletes a list of persistent
  826. // stat records that were successfully reported.
  827. func ClearReportedPersistentStats(stats map[string][][]byte) error {
  828. checkInitDataStore()
  829. err := singleton.db.Update(func(tx *bolt.Tx) error {
  830. for _, statType := range persistentStatTypes {
  831. bucket := tx.Bucket([]byte(statType))
  832. for _, key := range stats[statType] {
  833. err := bucket.Delete(key)
  834. if err != nil {
  835. return err
  836. }
  837. }
  838. }
  839. return nil
  840. })
  841. if err != nil {
  842. return common.ContextError(err)
  843. }
  844. return nil
  845. }
  846. // resetAllPersistentStatsToUnreported sets all persistent stat
  847. // records to StateUnreported. This reset is called when the
  848. // datastore is initialized at start up, as we do not know if
  849. // persistent records in StateReporting were reported or not.
  850. func resetAllPersistentStatsToUnreported() error {
  851. checkInitDataStore()
  852. err := singleton.db.Update(func(tx *bolt.Tx) error {
  853. for _, statType := range persistentStatTypes {
  854. bucket := tx.Bucket([]byte(statType))
  855. resetKeys := make([][]byte, 0)
  856. cursor := bucket.Cursor()
  857. for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
  858. resetKeys = append(resetKeys, key)
  859. }
  860. // TODO: data mutation is done outside cursor. Is this
  861. // strictly necessary in this case? As is, this means
  862. // all stats need to be loaded into memory at once.
  863. // https://godoc.org/github.com/boltdb/bolt#Cursor
  864. for _, key := range resetKeys {
  865. err := bucket.Put(key, persistentStatStateUnreported)
  866. if err != nil {
  867. return err
  868. }
  869. }
  870. }
  871. return nil
  872. })
  873. if err != nil {
  874. return common.ContextError(err)
  875. }
  876. return nil
  877. }
  878. // DeleteSLOKs deletes all SLOK records.
  879. func DeleteSLOKs() error {
  880. checkInitDataStore()
  881. err := singleton.db.Update(func(tx *bolt.Tx) error {
  882. bucket := tx.Bucket([]byte(slokBucket))
  883. return bucket.ForEach(
  884. func(id, _ []byte) error {
  885. return bucket.Delete(id)
  886. })
  887. })
  888. if err != nil {
  889. return common.ContextError(err)
  890. }
  891. return nil
  892. }
  893. // SetSLOK stores a SLOK key, referenced by its ID. The bool
  894. // return value indicates whether the SLOK was already stored.
  895. func SetSLOK(id, key []byte) (bool, error) {
  896. checkInitDataStore()
  897. var duplicate bool
  898. err := singleton.db.Update(func(tx *bolt.Tx) error {
  899. bucket := tx.Bucket([]byte(slokBucket))
  900. duplicate = bucket.Get(id) != nil
  901. err := bucket.Put([]byte(id), []byte(key))
  902. return err
  903. })
  904. if err != nil {
  905. return false, common.ContextError(err)
  906. }
  907. return duplicate, nil
  908. }
  909. // GetSLOK returns a SLOK key for the specified ID. The return
  910. // value is nil if the SLOK is not found.
  911. func GetSLOK(id []byte) (key []byte, err error) {
  912. checkInitDataStore()
  913. err = singleton.db.View(func(tx *bolt.Tx) error {
  914. bucket := tx.Bucket([]byte(slokBucket))
  915. key = bucket.Get(id)
  916. return nil
  917. })
  918. if err != nil {
  919. return nil, common.ContextError(err)
  920. }
  921. return key, nil
  922. }