dataStore.go 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "encoding/json"
  23. "errors"
  24. "fmt"
  25. "math/rand"
  26. "os"
  27. "path/filepath"
  28. "strings"
  29. "sync"
  30. "time"
  31. "github.com/Psiphon-Inc/bolt"
  32. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  33. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  34. )
  35. // The BoltDB dataStore implementation is an alternative to the sqlite3-based
  36. // implementation in dataStore.go. Both implementations have the same interface.
  37. //
  38. // BoltDB is pure Go, and is intended to be used in cases where we have trouble
  39. // building sqlite3/CGO (e.g., currently go mobile due to
  40. // https://github.com/mattn/go-sqlite3/issues/201), and perhaps ultimately as
  41. // the primary dataStore implementation.
  42. //
  43. type dataStore struct {
  44. init sync.Once
  45. db *bolt.DB
  46. }
  47. const (
  48. serverEntriesBucket = "serverEntries"
  49. rankedServerEntriesBucket = "rankedServerEntries"
  50. rankedServerEntriesKey = "rankedServerEntries"
  51. splitTunnelRouteETagsBucket = "splitTunnelRouteETags"
  52. splitTunnelRouteDataBucket = "splitTunnelRouteData"
  53. urlETagsBucket = "urlETags"
  54. keyValueBucket = "keyValues"
  55. tunnelStatsBucket = "tunnelStats"
  56. remoteServerListStatsBucket = "remoteServerListStats"
  57. slokBucket = "SLOKs"
  58. rankedServerEntryCount = 100
  59. )
  60. const (
  61. DATA_STORE_LAST_CONNECTED_KEY = "lastConnected"
  62. DATA_STORE_OSL_REGISTRY_KEY = "OSLRegistry"
  63. PERSISTENT_STAT_TYPE_TUNNEL = tunnelStatsBucket
  64. PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST = remoteServerListStatsBucket
  65. )
  66. var singleton dataStore
  67. // InitDataStore initializes the singleton instance of dataStore. This
  68. // function uses a sync.Once and is safe for use by concurrent goroutines.
  69. // The underlying sql.DB connection pool is also safe.
  70. //
  71. // Note: the sync.Once was more useful when initDataStore was private and
  72. // called on-demand by the public functions below. Now we require an explicit
  73. // InitDataStore() call with the filename passed in. The on-demand calls
  74. // have been replaced by checkInitDataStore() to assert that Init was called.
  75. func InitDataStore(config *Config) (err error) {
  76. singleton.init.Do(func() {
  77. // Need to gather the list of migratable server entries before
  78. // initializing the boltdb store (as prepareMigrationEntries
  79. // checks for the existence of the bolt db file)
  80. migratableServerEntries := prepareMigrationEntries(config)
  81. filename := filepath.Join(config.DataStoreDirectory, DATA_STORE_FILENAME)
  82. var db *bolt.DB
  83. for retry := 0; retry < 3; retry++ {
  84. if retry > 0 {
  85. NoticeAlert("InitDataStore retry: %d", retry)
  86. }
  87. db, err = bolt.Open(filename, 0600, &bolt.Options{Timeout: 1 * time.Second})
  88. // The datastore file may be corrupt, so attempt to delete and try again
  89. if err != nil {
  90. NoticeAlert("bolt.Open error: %s", err)
  91. os.Remove(filename)
  92. continue
  93. }
  94. // Run consistency checks on datastore and emit errors for diagnostics purposes
  95. // We assume this will complete quickly for typical size Psiphon datastores.
  96. err = db.View(func(tx *bolt.Tx) error {
  97. return tx.SynchronousCheck()
  98. })
  99. // The datastore file may be corrupt, so attempt to delete and try again
  100. if err != nil {
  101. NoticeAlert("bolt.SynchronousCheck error: %s", err)
  102. db.Close()
  103. os.Remove(filename)
  104. continue
  105. }
  106. break
  107. }
  108. if err != nil {
  109. // Note: intending to set the err return value for InitDataStore
  110. err = fmt.Errorf("initDataStore failed to open database: %s", err)
  111. return
  112. }
  113. err = db.Update(func(tx *bolt.Tx) error {
  114. requiredBuckets := []string{
  115. serverEntriesBucket,
  116. rankedServerEntriesBucket,
  117. splitTunnelRouteETagsBucket,
  118. splitTunnelRouteDataBucket,
  119. urlETagsBucket,
  120. keyValueBucket,
  121. tunnelStatsBucket,
  122. remoteServerListStatsBucket,
  123. slokBucket,
  124. }
  125. for _, bucket := range requiredBuckets {
  126. _, err := tx.CreateBucketIfNotExists([]byte(bucket))
  127. if err != nil {
  128. return err
  129. }
  130. }
  131. return nil
  132. })
  133. if err != nil {
  134. err = fmt.Errorf("initDataStore failed to create buckets: %s", err)
  135. return
  136. }
  137. singleton.db = db
  138. // The migrateServerEntries function requires the data store is
  139. // initialized prior to execution so that migrated entries can be stored
  140. if len(migratableServerEntries) > 0 {
  141. migrateEntries(migratableServerEntries, filepath.Join(config.DataStoreDirectory, LEGACY_DATA_STORE_FILENAME))
  142. }
  143. resetAllPersistentStatsToUnreported()
  144. })
  145. return err
  146. }
  147. func checkInitDataStore() {
  148. if singleton.db == nil {
  149. panic("checkInitDataStore: datastore not initialized")
  150. }
  151. }
  152. // StoreServerEntry adds the server entry to the data store.
  153. // A newly stored (or re-stored) server entry is assigned the next-to-top
  154. // rank for iteration order (the previous top ranked entry is promoted). The
  155. // purpose of inserting at next-to-top is to keep the last selected server
  156. // as the top ranked server.
  157. // When replaceIfExists is true, an existing server entry record is
  158. // overwritten; otherwise, the existing record is unchanged.
  159. // If the server entry data is malformed, an alert notice is issued and
  160. // the entry is skipped; no error is returned.
  161. func StoreServerEntry(serverEntry *protocol.ServerEntry, replaceIfExists bool) error {
  162. checkInitDataStore()
  163. // Server entries should already be validated before this point,
  164. // so instead of skipping we fail with an error.
  165. err := protocol.ValidateServerEntry(serverEntry)
  166. if err != nil {
  167. return common.ContextError(errors.New("invalid server entry"))
  168. }
  169. // BoltDB implementation note:
  170. // For simplicity, we don't maintain indexes on server entry
  171. // region or supported protocols. Instead, we perform full-bucket
  172. // scans with a filter. With a small enough database (thousands or
  173. // even tens of thousand of server entries) and common enough
  174. // values (e.g., many servers support all protocols), performance
  175. // is expected to be acceptable.
  176. err = singleton.db.Update(func(tx *bolt.Tx) error {
  177. serverEntries := tx.Bucket([]byte(serverEntriesBucket))
  178. // Check not only that the entry exists, but is valid. This
  179. // will replace in the rare case where the data is corrupt.
  180. existingServerEntryValid := false
  181. existingData := serverEntries.Get([]byte(serverEntry.IpAddress))
  182. if existingData != nil {
  183. existingServerEntry := new(protocol.ServerEntry)
  184. if json.Unmarshal(existingData, existingServerEntry) == nil {
  185. existingServerEntryValid = true
  186. }
  187. }
  188. if existingServerEntryValid && !replaceIfExists {
  189. // Disabling this notice, for now, as it generates too much noise
  190. // in diagnostics with clients that always submit embedded servers
  191. // to the core on each run.
  192. // NoticeInfo("ignored update for server %s", serverEntry.IpAddress)
  193. return nil
  194. }
  195. data, err := json.Marshal(serverEntry)
  196. if err != nil {
  197. return common.ContextError(err)
  198. }
  199. err = serverEntries.Put([]byte(serverEntry.IpAddress), data)
  200. if err != nil {
  201. return common.ContextError(err)
  202. }
  203. err = insertRankedServerEntry(tx, serverEntry.IpAddress, 1)
  204. if err != nil {
  205. return common.ContextError(err)
  206. }
  207. NoticeInfo("updated server %s", serverEntry.IpAddress)
  208. return nil
  209. })
  210. if err != nil {
  211. return common.ContextError(err)
  212. }
  213. return nil
  214. }
  215. // StoreServerEntries shuffles and stores a list of server entries.
  216. // Shuffling is performed on imported server entrues as part of client-side
  217. // load balancing.
  218. // There is an independent transaction for each entry insert/update.
  219. func StoreServerEntries(serverEntries []*protocol.ServerEntry, replaceIfExists bool) error {
  220. checkInitDataStore()
  221. for index := len(serverEntries) - 1; index > 0; index-- {
  222. swapIndex := rand.Intn(index + 1)
  223. serverEntries[index], serverEntries[swapIndex] = serverEntries[swapIndex], serverEntries[index]
  224. }
  225. for _, serverEntry := range serverEntries {
  226. err := StoreServerEntry(serverEntry, replaceIfExists)
  227. if err != nil {
  228. return common.ContextError(err)
  229. }
  230. }
  231. // Since there has possibly been a significant change in the server entries,
  232. // take this opportunity to update the available egress regions.
  233. ReportAvailableRegions()
  234. return nil
  235. }
  236. // PromoteServerEntry assigns the top rank (one more than current
  237. // max rank) to the specified server entry. Server candidates are
  238. // iterated in decending rank order, so this server entry will be
  239. // the first candidate in a subsequent tunnel establishment.
  240. func PromoteServerEntry(ipAddress string) error {
  241. checkInitDataStore()
  242. err := singleton.db.Update(func(tx *bolt.Tx) error {
  243. // Ensure the corresponding entry exists before
  244. // inserting into rank.
  245. bucket := tx.Bucket([]byte(serverEntriesBucket))
  246. data := bucket.Get([]byte(ipAddress))
  247. if data == nil {
  248. NoticeAlert(
  249. "PromoteServerEntry: ignoring unknown server entry: %s",
  250. ipAddress)
  251. return nil
  252. }
  253. return insertRankedServerEntry(tx, ipAddress, 0)
  254. })
  255. if err != nil {
  256. return common.ContextError(err)
  257. }
  258. return nil
  259. }
  260. func getRankedServerEntries(tx *bolt.Tx) ([]string, error) {
  261. bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
  262. data := bucket.Get([]byte(rankedServerEntriesKey))
  263. if data == nil {
  264. return []string{}, nil
  265. }
  266. rankedServerEntries := make([]string, 0)
  267. err := json.Unmarshal(data, &rankedServerEntries)
  268. if err != nil {
  269. return nil, common.ContextError(err)
  270. }
  271. return rankedServerEntries, nil
  272. }
  273. func setRankedServerEntries(tx *bolt.Tx, rankedServerEntries []string) error {
  274. data, err := json.Marshal(rankedServerEntries)
  275. if err != nil {
  276. return common.ContextError(err)
  277. }
  278. bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
  279. err = bucket.Put([]byte(rankedServerEntriesKey), data)
  280. if err != nil {
  281. return common.ContextError(err)
  282. }
  283. return nil
  284. }
  285. func insertRankedServerEntry(tx *bolt.Tx, serverEntryId string, position int) error {
  286. rankedServerEntries, err := getRankedServerEntries(tx)
  287. if err != nil {
  288. return common.ContextError(err)
  289. }
  290. // BoltDB implementation note:
  291. // For simplicity, we store the ranked server ids in an array serialized to
  292. // a single key value. To ensure this value doesn't grow without bound,
  293. // it's capped at rankedServerEntryCount. For now, this cap should be large
  294. // enough to meet the shuffleHeadLength = config.TunnelPoolSize criteria, for
  295. // any reasonable configuration of config.TunnelPoolSize.
  296. // Using: https://github.com/golang/go/wiki/SliceTricks
  297. // When serverEntryId is already ranked, remove it first to avoid duplicates
  298. for i, rankedServerEntryId := range rankedServerEntries {
  299. if rankedServerEntryId == serverEntryId {
  300. rankedServerEntries = append(
  301. rankedServerEntries[:i], rankedServerEntries[i+1:]...)
  302. break
  303. }
  304. }
  305. // SliceTricks insert, with length cap enforced
  306. if len(rankedServerEntries) < rankedServerEntryCount {
  307. rankedServerEntries = append(rankedServerEntries, "")
  308. }
  309. if position >= len(rankedServerEntries) {
  310. position = len(rankedServerEntries) - 1
  311. }
  312. copy(rankedServerEntries[position+1:], rankedServerEntries[position:])
  313. rankedServerEntries[position] = serverEntryId
  314. err = setRankedServerEntries(tx, rankedServerEntries)
  315. if err != nil {
  316. return common.ContextError(err)
  317. }
  318. return nil
  319. }
  320. // ServerEntryIterator is used to iterate over
  321. // stored server entries in rank order.
  322. type ServerEntryIterator struct {
  323. region string
  324. protocol string
  325. shuffleHeadLength int
  326. serverEntryIds []string
  327. serverEntryIndex int
  328. isTargetServerEntryIterator bool
  329. hasNextTargetServerEntry bool
  330. targetServerEntry *protocol.ServerEntry
  331. }
  332. // NewServerEntryIterator creates a new ServerEntryIterator
  333. func NewServerEntryIterator(config *Config) (iterator *ServerEntryIterator, err error) {
  334. // When configured, this target server entry is the only candidate
  335. if config.TargetServerEntry != "" {
  336. return newTargetServerEntryIterator(config)
  337. }
  338. checkInitDataStore()
  339. iterator = &ServerEntryIterator{
  340. region: config.EgressRegion,
  341. protocol: config.TunnelProtocol,
  342. shuffleHeadLength: config.TunnelPoolSize,
  343. isTargetServerEntryIterator: false,
  344. }
  345. err = iterator.Reset()
  346. if err != nil {
  347. return nil, err
  348. }
  349. return iterator, nil
  350. }
  351. // newTargetServerEntryIterator is a helper for initializing the TargetServerEntry case
  352. func newTargetServerEntryIterator(config *Config) (iterator *ServerEntryIterator, err error) {
  353. serverEntry, err := protocol.DecodeServerEntry(
  354. config.TargetServerEntry, common.GetCurrentTimestamp(), protocol.SERVER_ENTRY_SOURCE_TARGET)
  355. if err != nil {
  356. return nil, err
  357. }
  358. if config.EgressRegion != "" && serverEntry.Region != config.EgressRegion {
  359. return nil, errors.New("TargetServerEntry does not support EgressRegion")
  360. }
  361. if config.TunnelProtocol != "" {
  362. // Note: same capability/protocol mapping as in StoreServerEntry
  363. requiredCapability := strings.TrimSuffix(config.TunnelProtocol, "-OSSH")
  364. if !common.Contains(serverEntry.Capabilities, requiredCapability) {
  365. return nil, errors.New("TargetServerEntry does not support TunnelProtocol")
  366. }
  367. }
  368. iterator = &ServerEntryIterator{
  369. isTargetServerEntryIterator: true,
  370. hasNextTargetServerEntry: true,
  371. targetServerEntry: serverEntry,
  372. }
  373. NoticeInfo("using TargetServerEntry: %s", serverEntry.IpAddress)
  374. return iterator, nil
  375. }
  376. // Reset a NewServerEntryIterator to the start of its cycle. The next
  377. // call to Next will return the first server entry.
  378. func (iterator *ServerEntryIterator) Reset() error {
  379. iterator.Close()
  380. if iterator.isTargetServerEntryIterator {
  381. iterator.hasNextTargetServerEntry = true
  382. return nil
  383. }
  384. count := CountServerEntries(iterator.region, iterator.protocol)
  385. NoticeCandidateServers(iterator.region, iterator.protocol, count)
  386. // This query implements the Psiphon server candidate selection
  387. // algorithm: the first TunnelPoolSize server candidates are in rank
  388. // (priority) order, to favor previously successful servers; then the
  389. // remaining long tail is shuffled to raise up less recent candidates.
  390. // BoltDB implementation note:
  391. // We don't keep a transaction open for the duration of the iterator
  392. // because this would expose the following semantics to consumer code:
  393. //
  394. // Read-only transactions and read-write transactions ... generally
  395. // shouldn't be opened simultaneously in the same goroutine. This can
  396. // cause a deadlock as the read-write transaction needs to periodically
  397. // re-map the data file but it cannot do so while a read-only
  398. // transaction is open.
  399. // (https://github.com/boltdb/bolt)
  400. //
  401. // So the underlying serverEntriesBucket could change after the serverEntryIds
  402. // list is built.
  403. var serverEntryIds []string
  404. err := singleton.db.View(func(tx *bolt.Tx) error {
  405. var err error
  406. serverEntryIds, err = getRankedServerEntries(tx)
  407. if err != nil {
  408. return err
  409. }
  410. skipServerEntryIds := make(map[string]bool)
  411. for _, serverEntryId := range serverEntryIds {
  412. skipServerEntryIds[serverEntryId] = true
  413. }
  414. bucket := tx.Bucket([]byte(serverEntriesBucket))
  415. cursor := bucket.Cursor()
  416. for key, _ := cursor.Last(); key != nil; key, _ = cursor.Prev() {
  417. serverEntryId := string(key)
  418. if _, ok := skipServerEntryIds[serverEntryId]; ok {
  419. continue
  420. }
  421. serverEntryIds = append(serverEntryIds, serverEntryId)
  422. }
  423. return nil
  424. })
  425. if err != nil {
  426. return common.ContextError(err)
  427. }
  428. for i := len(serverEntryIds) - 1; i > iterator.shuffleHeadLength-1; i-- {
  429. j := rand.Intn(i+1-iterator.shuffleHeadLength) + iterator.shuffleHeadLength
  430. serverEntryIds[i], serverEntryIds[j] = serverEntryIds[j], serverEntryIds[i]
  431. }
  432. iterator.serverEntryIds = serverEntryIds
  433. iterator.serverEntryIndex = 0
  434. return nil
  435. }
  436. // Close cleans up resources associated with a ServerEntryIterator.
  437. func (iterator *ServerEntryIterator) Close() {
  438. iterator.serverEntryIds = nil
  439. iterator.serverEntryIndex = 0
  440. }
  441. // Next returns the next server entry, by rank, for a ServerEntryIterator.
  442. // Returns nil with no error when there is no next item.
  443. func (iterator *ServerEntryIterator) Next() (serverEntry *protocol.ServerEntry, err error) {
  444. defer func() {
  445. if err != nil {
  446. iterator.Close()
  447. }
  448. }()
  449. if iterator.isTargetServerEntryIterator {
  450. if iterator.hasNextTargetServerEntry {
  451. iterator.hasNextTargetServerEntry = false
  452. return MakeCompatibleServerEntry(iterator.targetServerEntry), nil
  453. }
  454. return nil, nil
  455. }
  456. // There are no region/protocol indexes for the server entries bucket.
  457. // Loop until we have the next server entry that matches the iterator
  458. // filter requirements.
  459. for {
  460. if iterator.serverEntryIndex >= len(iterator.serverEntryIds) {
  461. // There is no next item
  462. return nil, nil
  463. }
  464. serverEntryId := iterator.serverEntryIds[iterator.serverEntryIndex]
  465. iterator.serverEntryIndex += 1
  466. var data []byte
  467. err = singleton.db.View(func(tx *bolt.Tx) error {
  468. bucket := tx.Bucket([]byte(serverEntriesBucket))
  469. value := bucket.Get([]byte(serverEntryId))
  470. if value != nil {
  471. // Must make a copy as slice is only valid within transaction.
  472. data = make([]byte, len(value))
  473. copy(data, value)
  474. }
  475. return nil
  476. })
  477. if err != nil {
  478. return nil, common.ContextError(err)
  479. }
  480. if data == nil {
  481. // In case of data corruption or a bug causing this condition,
  482. // do not stop iterating.
  483. NoticeAlert("ServerEntryIterator.Next: unexpected missing server entry: %s", serverEntryId)
  484. continue
  485. }
  486. serverEntry = new(protocol.ServerEntry)
  487. err = json.Unmarshal(data, serverEntry)
  488. if err != nil {
  489. // In case of data corruption or a bug causing this condition,
  490. // do not stop iterating.
  491. NoticeAlert("ServerEntryIterator.Next: %s", common.ContextError(err))
  492. continue
  493. }
  494. // Check filter requirements
  495. if (iterator.region == "" || serverEntry.Region == iterator.region) &&
  496. (iterator.protocol == "" || serverEntry.SupportsProtocol(iterator.protocol)) {
  497. break
  498. }
  499. }
  500. return MakeCompatibleServerEntry(serverEntry), nil
  501. }
  502. // MakeCompatibleServerEntry provides backwards compatibility with old server entries
  503. // which have a single meekFrontingDomain and not a meekFrontingAddresses array.
  504. // By copying this one meekFrontingDomain into meekFrontingAddresses, this client effectively
  505. // uses that single value as legacy clients do.
  506. func MakeCompatibleServerEntry(serverEntry *protocol.ServerEntry) *protocol.ServerEntry {
  507. if len(serverEntry.MeekFrontingAddresses) == 0 && serverEntry.MeekFrontingDomain != "" {
  508. serverEntry.MeekFrontingAddresses =
  509. append(serverEntry.MeekFrontingAddresses, serverEntry.MeekFrontingDomain)
  510. }
  511. return serverEntry
  512. }
  513. func scanServerEntries(scanner func(*protocol.ServerEntry)) error {
  514. err := singleton.db.View(func(tx *bolt.Tx) error {
  515. bucket := tx.Bucket([]byte(serverEntriesBucket))
  516. cursor := bucket.Cursor()
  517. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  518. serverEntry := new(protocol.ServerEntry)
  519. err := json.Unmarshal(value, serverEntry)
  520. if err != nil {
  521. // In case of data corruption or a bug causing this condition,
  522. // do not stop iterating.
  523. NoticeAlert("scanServerEntries: %s", common.ContextError(err))
  524. continue
  525. }
  526. scanner(serverEntry)
  527. }
  528. return nil
  529. })
  530. if err != nil {
  531. return common.ContextError(err)
  532. }
  533. return nil
  534. }
  535. // CountServerEntries returns a count of stored servers for the
  536. // specified region and protocol.
  537. func CountServerEntries(region, tunnelProtocol string) int {
  538. checkInitDataStore()
  539. count := 0
  540. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  541. if (region == "" || serverEntry.Region == region) &&
  542. (tunnelProtocol == "" || serverEntry.SupportsProtocol(tunnelProtocol)) {
  543. count += 1
  544. }
  545. })
  546. if err != nil {
  547. NoticeAlert("CountServerEntries failed: %s", err)
  548. return 0
  549. }
  550. return count
  551. }
  552. // CountNonImpairedProtocols returns the number of distinct tunnel
  553. // protocols supported by stored server entries, excluding the
  554. // specified impaired protocols.
  555. func CountNonImpairedProtocols(
  556. region, tunnelProtocol string,
  557. impairedProtocols []string) int {
  558. checkInitDataStore()
  559. distinctProtocols := make(map[string]bool)
  560. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  561. if region == "" || serverEntry.Region == region {
  562. if tunnelProtocol != "" {
  563. if serverEntry.SupportsProtocol(tunnelProtocol) {
  564. distinctProtocols[tunnelProtocol] = true
  565. // Exit early, since only one protocol is enabled
  566. return
  567. }
  568. } else {
  569. for _, protocol := range protocol.SupportedTunnelProtocols {
  570. if serverEntry.SupportsProtocol(protocol) {
  571. distinctProtocols[protocol] = true
  572. }
  573. }
  574. }
  575. }
  576. })
  577. for _, protocol := range impairedProtocols {
  578. delete(distinctProtocols, protocol)
  579. }
  580. if err != nil {
  581. NoticeAlert("CountNonImpairedProtocols failed: %s", err)
  582. return 0
  583. }
  584. return len(distinctProtocols)
  585. }
  586. // ReportAvailableRegions prints a notice with the available egress regions.
  587. // Note that this report ignores config.TunnelProtocol.
  588. func ReportAvailableRegions() {
  589. checkInitDataStore()
  590. regions := make(map[string]bool)
  591. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  592. regions[serverEntry.Region] = true
  593. })
  594. if err != nil {
  595. NoticeAlert("ReportAvailableRegions failed: %s", err)
  596. return
  597. }
  598. regionList := make([]string, 0, len(regions))
  599. for region, _ := range regions {
  600. // Some server entries do not have a region, but it makes no sense to return
  601. // an empty string as an "available region".
  602. if region != "" {
  603. regionList = append(regionList, region)
  604. }
  605. }
  606. NoticeAvailableEgressRegions(regionList)
  607. }
  608. // GetServerEntryIpAddresses returns an array containing
  609. // all stored server IP addresses.
  610. func GetServerEntryIpAddresses() (ipAddresses []string, err error) {
  611. checkInitDataStore()
  612. ipAddresses = make([]string, 0)
  613. err = scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  614. ipAddresses = append(ipAddresses, serverEntry.IpAddress)
  615. })
  616. if err != nil {
  617. return nil, common.ContextError(err)
  618. }
  619. return ipAddresses, nil
  620. }
  621. // SetSplitTunnelRoutes updates the cached routes data for
  622. // the given region. The associated etag is also stored and
  623. // used to make efficient web requests for updates to the data.
  624. func SetSplitTunnelRoutes(region, etag string, data []byte) error {
  625. checkInitDataStore()
  626. err := singleton.db.Update(func(tx *bolt.Tx) error {
  627. bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
  628. err := bucket.Put([]byte(region), []byte(etag))
  629. bucket = tx.Bucket([]byte(splitTunnelRouteDataBucket))
  630. err = bucket.Put([]byte(region), data)
  631. return err
  632. })
  633. if err != nil {
  634. return common.ContextError(err)
  635. }
  636. return nil
  637. }
  638. // GetSplitTunnelRoutesETag retrieves the etag for cached routes
  639. // data for the specified region. If not found, it returns an empty string value.
  640. func GetSplitTunnelRoutesETag(region string) (etag string, err error) {
  641. checkInitDataStore()
  642. err = singleton.db.View(func(tx *bolt.Tx) error {
  643. bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
  644. etag = string(bucket.Get([]byte(region)))
  645. return nil
  646. })
  647. if err != nil {
  648. return "", common.ContextError(err)
  649. }
  650. return etag, nil
  651. }
  652. // GetSplitTunnelRoutesData retrieves the cached routes data
  653. // for the specified region. If not found, it returns a nil value.
  654. func GetSplitTunnelRoutesData(region string) (data []byte, err error) {
  655. checkInitDataStore()
  656. err = singleton.db.View(func(tx *bolt.Tx) error {
  657. bucket := tx.Bucket([]byte(splitTunnelRouteDataBucket))
  658. value := bucket.Get([]byte(region))
  659. if value != nil {
  660. // Must make a copy as slice is only valid within transaction.
  661. data = make([]byte, len(value))
  662. copy(data, value)
  663. }
  664. return nil
  665. })
  666. if err != nil {
  667. return nil, common.ContextError(err)
  668. }
  669. return data, nil
  670. }
  671. // SetUrlETag stores an ETag for the specfied URL.
  672. // Note: input URL is treated as a string, and is not
  673. // encoded or decoded or otherwise canonicalized.
  674. func SetUrlETag(url, etag string) error {
  675. checkInitDataStore()
  676. err := singleton.db.Update(func(tx *bolt.Tx) error {
  677. bucket := tx.Bucket([]byte(urlETagsBucket))
  678. err := bucket.Put([]byte(url), []byte(etag))
  679. return err
  680. })
  681. if err != nil {
  682. return common.ContextError(err)
  683. }
  684. return nil
  685. }
  686. // GetUrlETag retrieves a previously stored an ETag for the
  687. // specfied URL. If not found, it returns an empty string value.
  688. func GetUrlETag(url string) (etag string, err error) {
  689. checkInitDataStore()
  690. err = singleton.db.View(func(tx *bolt.Tx) error {
  691. bucket := tx.Bucket([]byte(urlETagsBucket))
  692. etag = string(bucket.Get([]byte(url)))
  693. return nil
  694. })
  695. if err != nil {
  696. return "", common.ContextError(err)
  697. }
  698. return etag, nil
  699. }
  700. // SetKeyValue stores a key/value pair.
  701. func SetKeyValue(key, value string) error {
  702. checkInitDataStore()
  703. err := singleton.db.Update(func(tx *bolt.Tx) error {
  704. bucket := tx.Bucket([]byte(keyValueBucket))
  705. err := bucket.Put([]byte(key), []byte(value))
  706. return err
  707. })
  708. if err != nil {
  709. return common.ContextError(err)
  710. }
  711. return nil
  712. }
  713. // GetKeyValue retrieves the value for a given key. If not found,
  714. // it returns an empty string value.
  715. func GetKeyValue(key string) (value string, err error) {
  716. checkInitDataStore()
  717. err = singleton.db.View(func(tx *bolt.Tx) error {
  718. bucket := tx.Bucket([]byte(keyValueBucket))
  719. value = string(bucket.Get([]byte(key)))
  720. return nil
  721. })
  722. if err != nil {
  723. return "", common.ContextError(err)
  724. }
  725. return value, nil
  726. }
  727. // Persistent stat records in the persistentStatStateUnreported
  728. // state are available for take out.
  729. //
  730. // Records in the persistentStatStateReporting have been taken
  731. // out and are pending either deletion (for a successful request)
  732. // or change to StateUnreported (for a failed request).
  733. //
  734. // All persistent stat records are reverted to StateUnreported
  735. // when the datastore is initialized at start up.
  736. var persistentStatStateUnreported = []byte("0")
  737. var persistentStatStateReporting = []byte("1")
  738. var persistentStatTypes = []string{
  739. PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST,
  740. PERSISTENT_STAT_TYPE_TUNNEL,
  741. }
  742. // StorePersistentStats adds a new persistent stat record, which
  743. // is set to StateUnreported and is an immediate candidate for
  744. // reporting.
  745. //
  746. // The stat is a JSON byte array containing fields as
  747. // required by the Psiphon server API. It's assumed that the
  748. // JSON value contains enough unique information for the value to
  749. // function as a key in the key/value datastore. This assumption
  750. // is currently satisfied by the fields sessionId + tunnelNumber
  751. // for tunnel stats, and URL + ETag for remote server list stats.
  752. func StorePersistentStat(statType string, stat []byte) error {
  753. checkInitDataStore()
  754. if !common.Contains(persistentStatTypes, statType) {
  755. return common.ContextError(fmt.Errorf("invalid persistent stat type: %s", statType))
  756. }
  757. err := singleton.db.Update(func(tx *bolt.Tx) error {
  758. bucket := tx.Bucket([]byte(statType))
  759. err := bucket.Put(stat, persistentStatStateUnreported)
  760. return err
  761. })
  762. if err != nil {
  763. return common.ContextError(err)
  764. }
  765. return nil
  766. }
  767. // CountUnreportedPersistentStats returns the number of persistent
  768. // stat records in StateUnreported.
  769. func CountUnreportedPersistentStats() int {
  770. checkInitDataStore()
  771. unreported := 0
  772. err := singleton.db.Update(func(tx *bolt.Tx) error {
  773. for _, statType := range persistentStatTypes {
  774. bucket := tx.Bucket([]byte(statType))
  775. cursor := bucket.Cursor()
  776. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  777. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  778. unreported++
  779. break
  780. }
  781. }
  782. }
  783. return nil
  784. })
  785. if err != nil {
  786. NoticeAlert("CountUnreportedPersistentStats failed: %s", err)
  787. return 0
  788. }
  789. return unreported
  790. }
  791. // TakeOutUnreportedPersistentStats returns up to maxCount persistent
  792. // stats records that are in StateUnreported. The records are set to
  793. // StateReporting. If the records are successfully reported, clear them
  794. // with ClearReportedPersistentStats. If the records are not successfully
  795. // reported, restore them with PutBackUnreportedPersistentStats.
  796. func TakeOutUnreportedPersistentStats(maxCount int) (map[string][][]byte, error) {
  797. checkInitDataStore()
  798. stats := make(map[string][][]byte)
  799. err := singleton.db.Update(func(tx *bolt.Tx) error {
  800. count := 0
  801. for _, statType := range persistentStatTypes {
  802. stats[statType] = make([][]byte, 0)
  803. bucket := tx.Bucket([]byte(statType))
  804. cursor := bucket.Cursor()
  805. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  806. if count >= maxCount {
  807. break
  808. }
  809. // Perform a test JSON unmarshaling. In case of data corruption or a bug,
  810. // skip the record.
  811. var jsonData interface{}
  812. err := json.Unmarshal(key, &jsonData)
  813. if err != nil {
  814. NoticeAlert(
  815. "Invalid key in TakeOutUnreportedPersistentStats: %s: %s",
  816. string(key), err)
  817. continue
  818. }
  819. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  820. // Must make a copy as slice is only valid within transaction.
  821. data := make([]byte, len(key))
  822. copy(data, key)
  823. stats[statType] = append(stats[statType], data)
  824. count += 1
  825. }
  826. }
  827. for _, key := range stats[statType] {
  828. err := bucket.Put(key, persistentStatStateReporting)
  829. if err != nil {
  830. return err
  831. }
  832. }
  833. }
  834. return nil
  835. })
  836. if err != nil {
  837. return nil, common.ContextError(err)
  838. }
  839. return stats, nil
  840. }
  841. // PutBackUnreportedPersistentStats restores a list of persistent
  842. // stat records to StateUnreported.
  843. func PutBackUnreportedPersistentStats(stats map[string][][]byte) error {
  844. checkInitDataStore()
  845. err := singleton.db.Update(func(tx *bolt.Tx) error {
  846. for _, statType := range persistentStatTypes {
  847. bucket := tx.Bucket([]byte(statType))
  848. for _, key := range stats[statType] {
  849. err := bucket.Put(key, persistentStatStateUnreported)
  850. if err != nil {
  851. return err
  852. }
  853. }
  854. }
  855. return nil
  856. })
  857. if err != nil {
  858. return common.ContextError(err)
  859. }
  860. return nil
  861. }
  862. // ClearReportedPersistentStats deletes a list of persistent
  863. // stat records that were successfully reported.
  864. func ClearReportedPersistentStats(stats map[string][][]byte) error {
  865. checkInitDataStore()
  866. err := singleton.db.Update(func(tx *bolt.Tx) error {
  867. for _, statType := range persistentStatTypes {
  868. bucket := tx.Bucket([]byte(statType))
  869. for _, key := range stats[statType] {
  870. err := bucket.Delete(key)
  871. if err != nil {
  872. return err
  873. }
  874. }
  875. }
  876. return nil
  877. })
  878. if err != nil {
  879. return common.ContextError(err)
  880. }
  881. return nil
  882. }
  883. // resetAllPersistentStatsToUnreported sets all persistent stat
  884. // records to StateUnreported. This reset is called when the
  885. // datastore is initialized at start up, as we do not know if
  886. // persistent records in StateReporting were reported or not.
  887. func resetAllPersistentStatsToUnreported() error {
  888. checkInitDataStore()
  889. err := singleton.db.Update(func(tx *bolt.Tx) error {
  890. for _, statType := range persistentStatTypes {
  891. bucket := tx.Bucket([]byte(statType))
  892. resetKeys := make([][]byte, 0)
  893. cursor := bucket.Cursor()
  894. for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
  895. resetKeys = append(resetKeys, key)
  896. }
  897. // TODO: data mutation is done outside cursor. Is this
  898. // strictly necessary in this case? As is, this means
  899. // all stats need to be loaded into memory at once.
  900. // https://godoc.org/github.com/boltdb/bolt#Cursor
  901. for _, key := range resetKeys {
  902. err := bucket.Put(key, persistentStatStateUnreported)
  903. if err != nil {
  904. return err
  905. }
  906. }
  907. }
  908. return nil
  909. })
  910. if err != nil {
  911. return common.ContextError(err)
  912. }
  913. return nil
  914. }
  915. // CountSLOKs returns the total number of SLOK records.
  916. func CountSLOKs() int {
  917. checkInitDataStore()
  918. count := 0
  919. err := singleton.db.View(func(tx *bolt.Tx) error {
  920. bucket := tx.Bucket([]byte(slokBucket))
  921. cursor := bucket.Cursor()
  922. for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
  923. count++
  924. }
  925. return nil
  926. })
  927. if err != nil {
  928. NoticeAlert("CountSLOKs failed: %s", err)
  929. return 0
  930. }
  931. return count
  932. }
  933. // DeleteSLOKs deletes all SLOK records.
  934. func DeleteSLOKs() error {
  935. checkInitDataStore()
  936. err := singleton.db.Update(func(tx *bolt.Tx) error {
  937. bucket := tx.Bucket([]byte(slokBucket))
  938. return bucket.ForEach(
  939. func(id, _ []byte) error {
  940. return bucket.Delete(id)
  941. })
  942. })
  943. if err != nil {
  944. return common.ContextError(err)
  945. }
  946. return nil
  947. }
  948. // SetSLOK stores a SLOK key, referenced by its ID. The bool
  949. // return value indicates whether the SLOK was already stored.
  950. func SetSLOK(id, key []byte) (bool, error) {
  951. checkInitDataStore()
  952. var duplicate bool
  953. err := singleton.db.Update(func(tx *bolt.Tx) error {
  954. bucket := tx.Bucket([]byte(slokBucket))
  955. duplicate = bucket.Get(id) != nil
  956. err := bucket.Put([]byte(id), []byte(key))
  957. return err
  958. })
  959. if err != nil {
  960. return false, common.ContextError(err)
  961. }
  962. return duplicate, nil
  963. }
  964. // GetSLOK returns a SLOK key for the specified ID. The return
  965. // value is nil if the SLOK is not found.
  966. func GetSLOK(id []byte) (key []byte, err error) {
  967. checkInitDataStore()
  968. err = singleton.db.View(func(tx *bolt.Tx) error {
  969. bucket := tx.Bucket([]byte(slokBucket))
  970. key = bucket.Get(id)
  971. return nil
  972. })
  973. if err != nil {
  974. return nil, common.ContextError(err)
  975. }
  976. return key, nil
  977. }