dataStore.go 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "encoding/json"
  23. "errors"
  24. "fmt"
  25. "math/rand"
  26. "os"
  27. "path/filepath"
  28. "strings"
  29. "sync"
  30. "time"
  31. "github.com/Psiphon-Inc/bolt"
  32. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  33. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  34. )
  35. // The BoltDB dataStore implementation is an alternative to the sqlite3-based
  36. // implementation in dataStore.go. Both implementations have the same interface.
  37. //
  38. // BoltDB is pure Go, and is intended to be used in cases where we have trouble
  39. // building sqlite3/CGO (e.g., currently go mobile due to
  40. // https://github.com/mattn/go-sqlite3/issues/201), and perhaps ultimately as
  41. // the primary dataStore implementation.
  42. //
  43. type dataStore struct {
  44. init sync.Once
  45. db *bolt.DB
  46. }
  47. const (
  48. serverEntriesBucket = "serverEntries"
  49. rankedServerEntriesBucket = "rankedServerEntries"
  50. rankedServerEntriesKey = "rankedServerEntries"
  51. splitTunnelRouteETagsBucket = "splitTunnelRouteETags"
  52. splitTunnelRouteDataBucket = "splitTunnelRouteData"
  53. urlETagsBucket = "urlETags"
  54. keyValueBucket = "keyValues"
  55. tunnelStatsBucket = "tunnelStats"
  56. remoteServerListStatsBucket = "remoteServerListStats"
  57. slokBucket = "SLOKs"
  58. rankedServerEntryCount = 100
  59. )
  60. const (
  61. DATA_STORE_LAST_CONNECTED_KEY = "lastConnected"
  62. PERSISTENT_STAT_TYPE_TUNNEL = tunnelStatsBucket
  63. PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST = remoteServerListStatsBucket
  64. )
  65. var singleton dataStore
  66. // InitDataStore initializes the singleton instance of dataStore. This
  67. // function uses a sync.Once and is safe for use by concurrent goroutines.
  68. // The underlying sql.DB connection pool is also safe.
  69. //
  70. // Note: the sync.Once was more useful when initDataStore was private and
  71. // called on-demand by the public functions below. Now we require an explicit
  72. // InitDataStore() call with the filename passed in. The on-demand calls
  73. // have been replaced by checkInitDataStore() to assert that Init was called.
  74. func InitDataStore(config *Config) (err error) {
  75. singleton.init.Do(func() {
  76. // Need to gather the list of migratable server entries before
  77. // initializing the boltdb store (as prepareMigrationEntries
  78. // checks for the existence of the bolt db file)
  79. migratableServerEntries := prepareMigrationEntries(config)
  80. filename := filepath.Join(config.DataStoreDirectory, DATA_STORE_FILENAME)
  81. var db *bolt.DB
  82. for retry := 0; retry < 3; retry++ {
  83. if retry > 0 {
  84. NoticeAlert("InitDataStore retry: %d", retry)
  85. }
  86. db, err = bolt.Open(filename, 0600, &bolt.Options{Timeout: 1 * time.Second})
  87. // The datastore file may be corrupt, so attempt to delete and try again
  88. if err != nil {
  89. NoticeAlert("bolt.Open error: %s", err)
  90. os.Remove(filename)
  91. continue
  92. }
  93. // Run consistency checks on datastore and emit errors for diagnostics purposes
  94. // We assume this will complete quickly for typical size Psiphon datastores.
  95. err = db.View(func(tx *bolt.Tx) error {
  96. return tx.SynchronousCheck()
  97. })
  98. // The datastore file may be corrupt, so attempt to delete and try again
  99. if err != nil {
  100. NoticeAlert("bolt.SynchronousCheck error: %s", err)
  101. db.Close()
  102. os.Remove(filename)
  103. continue
  104. }
  105. break
  106. }
  107. if err != nil {
  108. // Note: intending to set the err return value for InitDataStore
  109. err = fmt.Errorf("initDataStore failed to open database: %s", err)
  110. return
  111. }
  112. err = db.Update(func(tx *bolt.Tx) error {
  113. requiredBuckets := []string{
  114. serverEntriesBucket,
  115. rankedServerEntriesBucket,
  116. splitTunnelRouteETagsBucket,
  117. splitTunnelRouteDataBucket,
  118. urlETagsBucket,
  119. keyValueBucket,
  120. tunnelStatsBucket,
  121. remoteServerListStatsBucket,
  122. slokBucket,
  123. }
  124. for _, bucket := range requiredBuckets {
  125. _, err := tx.CreateBucketIfNotExists([]byte(bucket))
  126. if err != nil {
  127. return err
  128. }
  129. }
  130. return nil
  131. })
  132. if err != nil {
  133. err = fmt.Errorf("initDataStore failed to create buckets: %s", err)
  134. return
  135. }
  136. singleton.db = db
  137. // The migrateServerEntries function requires the data store is
  138. // initialized prior to execution so that migrated entries can be stored
  139. if len(migratableServerEntries) > 0 {
  140. migrateEntries(migratableServerEntries, filepath.Join(config.DataStoreDirectory, LEGACY_DATA_STORE_FILENAME))
  141. }
  142. resetAllPersistentStatsToUnreported()
  143. })
  144. return err
  145. }
  146. func checkInitDataStore() {
  147. if singleton.db == nil {
  148. panic("checkInitDataStore: datastore not initialized")
  149. }
  150. }
  151. // StoreServerEntry adds the server entry to the data store.
  152. // A newly stored (or re-stored) server entry is assigned the next-to-top
  153. // rank for iteration order (the previous top ranked entry is promoted). The
  154. // purpose of inserting at next-to-top is to keep the last selected server
  155. // as the top ranked server.
  156. // When replaceIfExists is true, an existing server entry record is
  157. // overwritten; otherwise, the existing record is unchanged.
  158. // If the server entry data is malformed, an alert notice is issued and
  159. // the entry is skipped; no error is returned.
  160. func StoreServerEntry(serverEntry *protocol.ServerEntry, replaceIfExists bool) error {
  161. checkInitDataStore()
  162. // Server entries should already be validated before this point,
  163. // so instead of skipping we fail with an error.
  164. err := protocol.ValidateServerEntry(serverEntry)
  165. if err != nil {
  166. return common.ContextError(errors.New("invalid server entry"))
  167. }
  168. // BoltDB implementation note:
  169. // For simplicity, we don't maintain indexes on server entry
  170. // region or supported protocols. Instead, we perform full-bucket
  171. // scans with a filter. With a small enough database (thousands or
  172. // even tens of thousand of server entries) and common enough
  173. // values (e.g., many servers support all protocols), performance
  174. // is expected to be acceptable.
  175. err = singleton.db.Update(func(tx *bolt.Tx) error {
  176. serverEntries := tx.Bucket([]byte(serverEntriesBucket))
  177. // Check not only that the entry exists, but is valid. This
  178. // will replace in the rare case where the data is corrupt.
  179. existingServerEntryValid := false
  180. existingData := serverEntries.Get([]byte(serverEntry.IpAddress))
  181. if existingData != nil {
  182. existingServerEntry := new(protocol.ServerEntry)
  183. if json.Unmarshal(existingData, existingServerEntry) == nil {
  184. existingServerEntryValid = true
  185. }
  186. }
  187. if existingServerEntryValid && !replaceIfExists {
  188. // Disabling this notice, for now, as it generates too much noise
  189. // in diagnostics with clients that always submit embedded servers
  190. // to the core on each run.
  191. // NoticeInfo("ignored update for server %s", serverEntry.IpAddress)
  192. return nil
  193. }
  194. data, err := json.Marshal(serverEntry)
  195. if err != nil {
  196. return common.ContextError(err)
  197. }
  198. err = serverEntries.Put([]byte(serverEntry.IpAddress), data)
  199. if err != nil {
  200. return common.ContextError(err)
  201. }
  202. err = insertRankedServerEntry(tx, serverEntry.IpAddress, 1)
  203. if err != nil {
  204. return common.ContextError(err)
  205. }
  206. NoticeInfo("updated server %s", serverEntry.IpAddress)
  207. return nil
  208. })
  209. if err != nil {
  210. return common.ContextError(err)
  211. }
  212. return nil
  213. }
  214. // StoreServerEntries stores a list of server entries.
  215. // There is an independent transaction for each entry insert/update.
  216. func StoreServerEntries(serverEntries []*protocol.ServerEntry, replaceIfExists bool) error {
  217. checkInitDataStore()
  218. for _, serverEntry := range serverEntries {
  219. err := StoreServerEntry(serverEntry, replaceIfExists)
  220. if err != nil {
  221. return common.ContextError(err)
  222. }
  223. }
  224. // Since there has possibly been a significant change in the server entries,
  225. // take this opportunity to update the available egress regions.
  226. ReportAvailableRegions()
  227. return nil
  228. }
  229. // StreamingStoreServerEntries stores a list of server entries.
  230. // There is an independent transaction for each entry insert/update.
  231. func StreamingStoreServerEntries(
  232. serverEntries *protocol.StreamingServerEntryDecoder, replaceIfExists bool) error {
  233. checkInitDataStore()
  234. // Note: both StreamingServerEntryDecoder.Next and StoreServerEntry
  235. // allocate temporary memory buffers for hex/JSON decoding/encoding,
  236. // so this isn't true constant-memory streaming (it depends on garbage
  237. // collection).
  238. for {
  239. serverEntry, err := serverEntries.Next()
  240. if err != nil {
  241. return common.ContextError(err)
  242. }
  243. if serverEntry == nil {
  244. // No more server entries
  245. break
  246. }
  247. err = StoreServerEntry(serverEntry, replaceIfExists)
  248. if err != nil {
  249. return common.ContextError(err)
  250. }
  251. }
  252. // Since there has possibly been a significant change in the server entries,
  253. // take this opportunity to update the available egress regions.
  254. ReportAvailableRegions()
  255. return nil
  256. }
  257. // PromoteServerEntry assigns the top rank (one more than current
  258. // max rank) to the specified server entry. Server candidates are
  259. // iterated in decending rank order, so this server entry will be
  260. // the first candidate in a subsequent tunnel establishment.
  261. func PromoteServerEntry(ipAddress string) error {
  262. checkInitDataStore()
  263. err := singleton.db.Update(func(tx *bolt.Tx) error {
  264. // Ensure the corresponding entry exists before
  265. // inserting into rank.
  266. bucket := tx.Bucket([]byte(serverEntriesBucket))
  267. data := bucket.Get([]byte(ipAddress))
  268. if data == nil {
  269. NoticeAlert(
  270. "PromoteServerEntry: ignoring unknown server entry: %s",
  271. ipAddress)
  272. return nil
  273. }
  274. return insertRankedServerEntry(tx, ipAddress, 0)
  275. })
  276. if err != nil {
  277. return common.ContextError(err)
  278. }
  279. return nil
  280. }
  281. func getRankedServerEntries(tx *bolt.Tx) ([]string, error) {
  282. bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
  283. data := bucket.Get([]byte(rankedServerEntriesKey))
  284. if data == nil {
  285. return []string{}, nil
  286. }
  287. rankedServerEntries := make([]string, 0)
  288. err := json.Unmarshal(data, &rankedServerEntries)
  289. if err != nil {
  290. return nil, common.ContextError(err)
  291. }
  292. return rankedServerEntries, nil
  293. }
  294. func setRankedServerEntries(tx *bolt.Tx, rankedServerEntries []string) error {
  295. data, err := json.Marshal(rankedServerEntries)
  296. if err != nil {
  297. return common.ContextError(err)
  298. }
  299. bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
  300. err = bucket.Put([]byte(rankedServerEntriesKey), data)
  301. if err != nil {
  302. return common.ContextError(err)
  303. }
  304. return nil
  305. }
  306. func insertRankedServerEntry(tx *bolt.Tx, serverEntryId string, position int) error {
  307. rankedServerEntries, err := getRankedServerEntries(tx)
  308. if err != nil {
  309. return common.ContextError(err)
  310. }
  311. // BoltDB implementation note:
  312. // For simplicity, we store the ranked server ids in an array serialized to
  313. // a single key value. To ensure this value doesn't grow without bound,
  314. // it's capped at rankedServerEntryCount. For now, this cap should be large
  315. // enough to meet the shuffleHeadLength = config.TunnelPoolSize criteria, for
  316. // any reasonable configuration of config.TunnelPoolSize.
  317. // Using: https://github.com/golang/go/wiki/SliceTricks
  318. // When serverEntryId is already ranked, remove it first to avoid duplicates
  319. for i, rankedServerEntryId := range rankedServerEntries {
  320. if rankedServerEntryId == serverEntryId {
  321. rankedServerEntries = append(
  322. rankedServerEntries[:i], rankedServerEntries[i+1:]...)
  323. break
  324. }
  325. }
  326. // SliceTricks insert, with length cap enforced
  327. if len(rankedServerEntries) < rankedServerEntryCount {
  328. rankedServerEntries = append(rankedServerEntries, "")
  329. }
  330. if position >= len(rankedServerEntries) {
  331. position = len(rankedServerEntries) - 1
  332. }
  333. copy(rankedServerEntries[position+1:], rankedServerEntries[position:])
  334. rankedServerEntries[position] = serverEntryId
  335. err = setRankedServerEntries(tx, rankedServerEntries)
  336. if err != nil {
  337. return common.ContextError(err)
  338. }
  339. return nil
  340. }
  341. // ServerEntryIterator is used to iterate over
  342. // stored server entries in rank order.
  343. type ServerEntryIterator struct {
  344. region string
  345. protocol string
  346. shuffleHeadLength int
  347. serverEntryIds []string
  348. serverEntryIndex int
  349. isTargetServerEntryIterator bool
  350. hasNextTargetServerEntry bool
  351. targetServerEntry *protocol.ServerEntry
  352. }
  353. // NewServerEntryIterator creates a new ServerEntryIterator
  354. func NewServerEntryIterator(config *Config) (iterator *ServerEntryIterator, err error) {
  355. // When configured, this target server entry is the only candidate
  356. if config.TargetServerEntry != "" {
  357. return newTargetServerEntryIterator(config)
  358. }
  359. checkInitDataStore()
  360. iterator = &ServerEntryIterator{
  361. region: config.EgressRegion,
  362. protocol: config.TunnelProtocol,
  363. shuffleHeadLength: config.TunnelPoolSize,
  364. isTargetServerEntryIterator: false,
  365. }
  366. err = iterator.Reset()
  367. if err != nil {
  368. return nil, err
  369. }
  370. return iterator, nil
  371. }
  372. // newTargetServerEntryIterator is a helper for initializing the TargetServerEntry case
  373. func newTargetServerEntryIterator(config *Config) (iterator *ServerEntryIterator, err error) {
  374. serverEntry, err := protocol.DecodeServerEntry(
  375. config.TargetServerEntry, common.GetCurrentTimestamp(), protocol.SERVER_ENTRY_SOURCE_TARGET)
  376. if err != nil {
  377. return nil, err
  378. }
  379. if config.EgressRegion != "" && serverEntry.Region != config.EgressRegion {
  380. return nil, errors.New("TargetServerEntry does not support EgressRegion")
  381. }
  382. if config.TunnelProtocol != "" {
  383. // Note: same capability/protocol mapping as in StoreServerEntry
  384. requiredCapability := strings.TrimSuffix(config.TunnelProtocol, "-OSSH")
  385. if !common.Contains(serverEntry.Capabilities, requiredCapability) {
  386. return nil, errors.New("TargetServerEntry does not support TunnelProtocol")
  387. }
  388. }
  389. iterator = &ServerEntryIterator{
  390. isTargetServerEntryIterator: true,
  391. hasNextTargetServerEntry: true,
  392. targetServerEntry: serverEntry,
  393. }
  394. NoticeInfo("using TargetServerEntry: %s", serverEntry.IpAddress)
  395. return iterator, nil
  396. }
  397. // Reset a NewServerEntryIterator to the start of its cycle. The next
  398. // call to Next will return the first server entry.
  399. func (iterator *ServerEntryIterator) Reset() error {
  400. iterator.Close()
  401. if iterator.isTargetServerEntryIterator {
  402. iterator.hasNextTargetServerEntry = true
  403. return nil
  404. }
  405. count := CountServerEntries(iterator.region, iterator.protocol)
  406. NoticeCandidateServers(iterator.region, iterator.protocol, count)
  407. // This query implements the Psiphon server candidate selection
  408. // algorithm: the first TunnelPoolSize server candidates are in rank
  409. // (priority) order, to favor previously successful servers; then the
  410. // remaining long tail is shuffled to raise up less recent candidates.
  411. // BoltDB implementation note:
  412. // We don't keep a transaction open for the duration of the iterator
  413. // because this would expose the following semantics to consumer code:
  414. //
  415. // Read-only transactions and read-write transactions ... generally
  416. // shouldn't be opened simultaneously in the same goroutine. This can
  417. // cause a deadlock as the read-write transaction needs to periodically
  418. // re-map the data file but it cannot do so while a read-only
  419. // transaction is open.
  420. // (https://github.com/boltdb/bolt)
  421. //
  422. // So the underlying serverEntriesBucket could change after the serverEntryIds
  423. // list is built.
  424. var serverEntryIds []string
  425. err := singleton.db.View(func(tx *bolt.Tx) error {
  426. var err error
  427. serverEntryIds, err = getRankedServerEntries(tx)
  428. if err != nil {
  429. return err
  430. }
  431. skipServerEntryIds := make(map[string]bool)
  432. for _, serverEntryId := range serverEntryIds {
  433. skipServerEntryIds[serverEntryId] = true
  434. }
  435. bucket := tx.Bucket([]byte(serverEntriesBucket))
  436. cursor := bucket.Cursor()
  437. for key, _ := cursor.Last(); key != nil; key, _ = cursor.Prev() {
  438. serverEntryId := string(key)
  439. if _, ok := skipServerEntryIds[serverEntryId]; ok {
  440. continue
  441. }
  442. serverEntryIds = append(serverEntryIds, serverEntryId)
  443. }
  444. return nil
  445. })
  446. if err != nil {
  447. return common.ContextError(err)
  448. }
  449. for i := len(serverEntryIds) - 1; i > iterator.shuffleHeadLength-1; i-- {
  450. j := rand.Intn(i+1-iterator.shuffleHeadLength) + iterator.shuffleHeadLength
  451. serverEntryIds[i], serverEntryIds[j] = serverEntryIds[j], serverEntryIds[i]
  452. }
  453. iterator.serverEntryIds = serverEntryIds
  454. iterator.serverEntryIndex = 0
  455. return nil
  456. }
  457. // Close cleans up resources associated with a ServerEntryIterator.
  458. func (iterator *ServerEntryIterator) Close() {
  459. iterator.serverEntryIds = nil
  460. iterator.serverEntryIndex = 0
  461. }
  462. // Next returns the next server entry, by rank, for a ServerEntryIterator.
  463. // Returns nil with no error when there is no next item.
  464. func (iterator *ServerEntryIterator) Next() (serverEntry *protocol.ServerEntry, err error) {
  465. defer func() {
  466. if err != nil {
  467. iterator.Close()
  468. }
  469. }()
  470. if iterator.isTargetServerEntryIterator {
  471. if iterator.hasNextTargetServerEntry {
  472. iterator.hasNextTargetServerEntry = false
  473. return MakeCompatibleServerEntry(iterator.targetServerEntry), nil
  474. }
  475. return nil, nil
  476. }
  477. // There are no region/protocol indexes for the server entries bucket.
  478. // Loop until we have the next server entry that matches the iterator
  479. // filter requirements.
  480. for {
  481. if iterator.serverEntryIndex >= len(iterator.serverEntryIds) {
  482. // There is no next item
  483. return nil, nil
  484. }
  485. serverEntryId := iterator.serverEntryIds[iterator.serverEntryIndex]
  486. iterator.serverEntryIndex += 1
  487. var data []byte
  488. err = singleton.db.View(func(tx *bolt.Tx) error {
  489. bucket := tx.Bucket([]byte(serverEntriesBucket))
  490. value := bucket.Get([]byte(serverEntryId))
  491. if value != nil {
  492. // Must make a copy as slice is only valid within transaction.
  493. data = make([]byte, len(value))
  494. copy(data, value)
  495. }
  496. return nil
  497. })
  498. if err != nil {
  499. return nil, common.ContextError(err)
  500. }
  501. if data == nil {
  502. // In case of data corruption or a bug causing this condition,
  503. // do not stop iterating.
  504. NoticeAlert("ServerEntryIterator.Next: unexpected missing server entry: %s", serverEntryId)
  505. continue
  506. }
  507. serverEntry = new(protocol.ServerEntry)
  508. err = json.Unmarshal(data, serverEntry)
  509. if err != nil {
  510. // In case of data corruption or a bug causing this condition,
  511. // do not stop iterating.
  512. NoticeAlert("ServerEntryIterator.Next: %s", common.ContextError(err))
  513. continue
  514. }
  515. // Check filter requirements
  516. if (iterator.region == "" || serverEntry.Region == iterator.region) &&
  517. (iterator.protocol == "" || serverEntry.SupportsProtocol(iterator.protocol)) {
  518. break
  519. }
  520. }
  521. return MakeCompatibleServerEntry(serverEntry), nil
  522. }
  523. // MakeCompatibleServerEntry provides backwards compatibility with old server entries
  524. // which have a single meekFrontingDomain and not a meekFrontingAddresses array.
  525. // By copying this one meekFrontingDomain into meekFrontingAddresses, this client effectively
  526. // uses that single value as legacy clients do.
  527. func MakeCompatibleServerEntry(serverEntry *protocol.ServerEntry) *protocol.ServerEntry {
  528. if len(serverEntry.MeekFrontingAddresses) == 0 && serverEntry.MeekFrontingDomain != "" {
  529. serverEntry.MeekFrontingAddresses =
  530. append(serverEntry.MeekFrontingAddresses, serverEntry.MeekFrontingDomain)
  531. }
  532. return serverEntry
  533. }
  534. func scanServerEntries(scanner func(*protocol.ServerEntry)) error {
  535. err := singleton.db.View(func(tx *bolt.Tx) error {
  536. bucket := tx.Bucket([]byte(serverEntriesBucket))
  537. cursor := bucket.Cursor()
  538. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  539. serverEntry := new(protocol.ServerEntry)
  540. err := json.Unmarshal(value, serverEntry)
  541. if err != nil {
  542. // In case of data corruption or a bug causing this condition,
  543. // do not stop iterating.
  544. NoticeAlert("scanServerEntries: %s", common.ContextError(err))
  545. continue
  546. }
  547. scanner(serverEntry)
  548. }
  549. return nil
  550. })
  551. if err != nil {
  552. return common.ContextError(err)
  553. }
  554. return nil
  555. }
  556. // CountServerEntries returns a count of stored servers for the
  557. // specified region and protocol.
  558. func CountServerEntries(region, tunnelProtocol string) int {
  559. checkInitDataStore()
  560. count := 0
  561. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  562. if (region == "" || serverEntry.Region == region) &&
  563. (tunnelProtocol == "" || serverEntry.SupportsProtocol(tunnelProtocol)) {
  564. count += 1
  565. }
  566. })
  567. if err != nil {
  568. NoticeAlert("CountServerEntries failed: %s", err)
  569. return 0
  570. }
  571. return count
  572. }
  573. // CountNonImpairedProtocols returns the number of distinct tunnel
  574. // protocols supported by stored server entries, excluding the
  575. // specified impaired protocols.
  576. func CountNonImpairedProtocols(
  577. region, tunnelProtocol string,
  578. impairedProtocols []string) int {
  579. checkInitDataStore()
  580. distinctProtocols := make(map[string]bool)
  581. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  582. if region == "" || serverEntry.Region == region {
  583. if tunnelProtocol != "" {
  584. if serverEntry.SupportsProtocol(tunnelProtocol) {
  585. distinctProtocols[tunnelProtocol] = true
  586. // Exit early, since only one protocol is enabled
  587. return
  588. }
  589. } else {
  590. for _, protocol := range protocol.SupportedTunnelProtocols {
  591. if serverEntry.SupportsProtocol(protocol) {
  592. distinctProtocols[protocol] = true
  593. }
  594. }
  595. }
  596. }
  597. })
  598. for _, protocol := range impairedProtocols {
  599. delete(distinctProtocols, protocol)
  600. }
  601. if err != nil {
  602. NoticeAlert("CountNonImpairedProtocols failed: %s", err)
  603. return 0
  604. }
  605. return len(distinctProtocols)
  606. }
  607. // ReportAvailableRegions prints a notice with the available egress regions.
  608. // Note that this report ignores config.TunnelProtocol.
  609. func ReportAvailableRegions() {
  610. checkInitDataStore()
  611. regions := make(map[string]bool)
  612. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  613. regions[serverEntry.Region] = true
  614. })
  615. if err != nil {
  616. NoticeAlert("ReportAvailableRegions failed: %s", err)
  617. return
  618. }
  619. regionList := make([]string, 0, len(regions))
  620. for region := range regions {
  621. // Some server entries do not have a region, but it makes no sense to return
  622. // an empty string as an "available region".
  623. if region != "" {
  624. regionList = append(regionList, region)
  625. }
  626. }
  627. NoticeAvailableEgressRegions(regionList)
  628. }
  629. // GetServerEntryIpAddresses returns an array containing
  630. // all stored server IP addresses.
  631. func GetServerEntryIpAddresses() (ipAddresses []string, err error) {
  632. checkInitDataStore()
  633. ipAddresses = make([]string, 0)
  634. err = scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  635. ipAddresses = append(ipAddresses, serverEntry.IpAddress)
  636. })
  637. if err != nil {
  638. return nil, common.ContextError(err)
  639. }
  640. return ipAddresses, nil
  641. }
  642. // SetSplitTunnelRoutes updates the cached routes data for
  643. // the given region. The associated etag is also stored and
  644. // used to make efficient web requests for updates to the data.
  645. func SetSplitTunnelRoutes(region, etag string, data []byte) error {
  646. checkInitDataStore()
  647. err := singleton.db.Update(func(tx *bolt.Tx) error {
  648. bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
  649. err := bucket.Put([]byte(region), []byte(etag))
  650. bucket = tx.Bucket([]byte(splitTunnelRouteDataBucket))
  651. err = bucket.Put([]byte(region), data)
  652. return err
  653. })
  654. if err != nil {
  655. return common.ContextError(err)
  656. }
  657. return nil
  658. }
  659. // GetSplitTunnelRoutesETag retrieves the etag for cached routes
  660. // data for the specified region. If not found, it returns an empty string value.
  661. func GetSplitTunnelRoutesETag(region string) (etag string, err error) {
  662. checkInitDataStore()
  663. err = singleton.db.View(func(tx *bolt.Tx) error {
  664. bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
  665. etag = string(bucket.Get([]byte(region)))
  666. return nil
  667. })
  668. if err != nil {
  669. return "", common.ContextError(err)
  670. }
  671. return etag, nil
  672. }
  673. // GetSplitTunnelRoutesData retrieves the cached routes data
  674. // for the specified region. If not found, it returns a nil value.
  675. func GetSplitTunnelRoutesData(region string) (data []byte, err error) {
  676. checkInitDataStore()
  677. err = singleton.db.View(func(tx *bolt.Tx) error {
  678. bucket := tx.Bucket([]byte(splitTunnelRouteDataBucket))
  679. value := bucket.Get([]byte(region))
  680. if value != nil {
  681. // Must make a copy as slice is only valid within transaction.
  682. data = make([]byte, len(value))
  683. copy(data, value)
  684. }
  685. return nil
  686. })
  687. if err != nil {
  688. return nil, common.ContextError(err)
  689. }
  690. return data, nil
  691. }
  692. // SetUrlETag stores an ETag for the specfied URL.
  693. // Note: input URL is treated as a string, and is not
  694. // encoded or decoded or otherwise canonicalized.
  695. func SetUrlETag(url, etag string) error {
  696. checkInitDataStore()
  697. err := singleton.db.Update(func(tx *bolt.Tx) error {
  698. bucket := tx.Bucket([]byte(urlETagsBucket))
  699. err := bucket.Put([]byte(url), []byte(etag))
  700. return err
  701. })
  702. if err != nil {
  703. return common.ContextError(err)
  704. }
  705. return nil
  706. }
  707. // GetUrlETag retrieves a previously stored an ETag for the
  708. // specfied URL. If not found, it returns an empty string value.
  709. func GetUrlETag(url string) (etag string, err error) {
  710. checkInitDataStore()
  711. err = singleton.db.View(func(tx *bolt.Tx) error {
  712. bucket := tx.Bucket([]byte(urlETagsBucket))
  713. etag = string(bucket.Get([]byte(url)))
  714. return nil
  715. })
  716. if err != nil {
  717. return "", common.ContextError(err)
  718. }
  719. return etag, nil
  720. }
  721. // SetKeyValue stores a key/value pair.
  722. func SetKeyValue(key, value string) error {
  723. checkInitDataStore()
  724. err := singleton.db.Update(func(tx *bolt.Tx) error {
  725. bucket := tx.Bucket([]byte(keyValueBucket))
  726. err := bucket.Put([]byte(key), []byte(value))
  727. return err
  728. })
  729. if err != nil {
  730. return common.ContextError(err)
  731. }
  732. return nil
  733. }
  734. // GetKeyValue retrieves the value for a given key. If not found,
  735. // it returns an empty string value.
  736. func GetKeyValue(key string) (value string, err error) {
  737. checkInitDataStore()
  738. err = singleton.db.View(func(tx *bolt.Tx) error {
  739. bucket := tx.Bucket([]byte(keyValueBucket))
  740. value = string(bucket.Get([]byte(key)))
  741. return nil
  742. })
  743. if err != nil {
  744. return "", common.ContextError(err)
  745. }
  746. return value, nil
  747. }
  748. // Persistent stat records in the persistentStatStateUnreported
  749. // state are available for take out.
  750. //
  751. // Records in the persistentStatStateReporting have been taken
  752. // out and are pending either deletion (for a successful request)
  753. // or change to StateUnreported (for a failed request).
  754. //
  755. // All persistent stat records are reverted to StateUnreported
  756. // when the datastore is initialized at start up.
  757. var persistentStatStateUnreported = []byte("0")
  758. var persistentStatStateReporting = []byte("1")
  759. var persistentStatTypes = []string{
  760. PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST,
  761. PERSISTENT_STAT_TYPE_TUNNEL,
  762. }
  763. // StorePersistentStat adds a new persistent stat record, which
  764. // is set to StateUnreported and is an immediate candidate for
  765. // reporting.
  766. //
  767. // The stat is a JSON byte array containing fields as
  768. // required by the Psiphon server API. It's assumed that the
  769. // JSON value contains enough unique information for the value to
  770. // function as a key in the key/value datastore. This assumption
  771. // is currently satisfied by the fields sessionId + tunnelNumber
  772. // for tunnel stats, and URL + ETag for remote server list stats.
  773. func StorePersistentStat(statType string, stat []byte) error {
  774. checkInitDataStore()
  775. if !common.Contains(persistentStatTypes, statType) {
  776. return common.ContextError(fmt.Errorf("invalid persistent stat type: %s", statType))
  777. }
  778. err := singleton.db.Update(func(tx *bolt.Tx) error {
  779. bucket := tx.Bucket([]byte(statType))
  780. err := bucket.Put(stat, persistentStatStateUnreported)
  781. return err
  782. })
  783. if err != nil {
  784. return common.ContextError(err)
  785. }
  786. return nil
  787. }
  788. // CountUnreportedPersistentStats returns the number of persistent
  789. // stat records in StateUnreported.
  790. func CountUnreportedPersistentStats() int {
  791. checkInitDataStore()
  792. unreported := 0
  793. err := singleton.db.Update(func(tx *bolt.Tx) error {
  794. for _, statType := range persistentStatTypes {
  795. bucket := tx.Bucket([]byte(statType))
  796. cursor := bucket.Cursor()
  797. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  798. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  799. unreported++
  800. break
  801. }
  802. }
  803. }
  804. return nil
  805. })
  806. if err != nil {
  807. NoticeAlert("CountUnreportedPersistentStats failed: %s", err)
  808. return 0
  809. }
  810. return unreported
  811. }
  812. // TakeOutUnreportedPersistentStats returns up to maxCount persistent
  813. // stats records that are in StateUnreported. The records are set to
  814. // StateReporting. If the records are successfully reported, clear them
  815. // with ClearReportedPersistentStats. If the records are not successfully
  816. // reported, restore them with PutBackUnreportedPersistentStats.
  817. func TakeOutUnreportedPersistentStats(maxCount int) (map[string][][]byte, error) {
  818. checkInitDataStore()
  819. stats := make(map[string][][]byte)
  820. err := singleton.db.Update(func(tx *bolt.Tx) error {
  821. count := 0
  822. for _, statType := range persistentStatTypes {
  823. stats[statType] = make([][]byte, 0)
  824. bucket := tx.Bucket([]byte(statType))
  825. cursor := bucket.Cursor()
  826. for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
  827. if count >= maxCount {
  828. break
  829. }
  830. // Perform a test JSON unmarshaling. In case of data corruption or a bug,
  831. // skip the record.
  832. var jsonData interface{}
  833. err := json.Unmarshal(key, &jsonData)
  834. if err != nil {
  835. NoticeAlert(
  836. "Invalid key in TakeOutUnreportedPersistentStats: %s: %s",
  837. string(key), err)
  838. continue
  839. }
  840. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  841. // Must make a copy as slice is only valid within transaction.
  842. data := make([]byte, len(key))
  843. copy(data, key)
  844. stats[statType] = append(stats[statType], data)
  845. count += 1
  846. }
  847. }
  848. for _, key := range stats[statType] {
  849. err := bucket.Put(key, persistentStatStateReporting)
  850. if err != nil {
  851. return err
  852. }
  853. }
  854. }
  855. return nil
  856. })
  857. if err != nil {
  858. return nil, common.ContextError(err)
  859. }
  860. return stats, nil
  861. }
  862. // PutBackUnreportedPersistentStats restores a list of persistent
  863. // stat records to StateUnreported.
  864. func PutBackUnreportedPersistentStats(stats map[string][][]byte) error {
  865. checkInitDataStore()
  866. err := singleton.db.Update(func(tx *bolt.Tx) error {
  867. for _, statType := range persistentStatTypes {
  868. bucket := tx.Bucket([]byte(statType))
  869. for _, key := range stats[statType] {
  870. err := bucket.Put(key, persistentStatStateUnreported)
  871. if err != nil {
  872. return err
  873. }
  874. }
  875. }
  876. return nil
  877. })
  878. if err != nil {
  879. return common.ContextError(err)
  880. }
  881. return nil
  882. }
  883. // ClearReportedPersistentStats deletes a list of persistent
  884. // stat records that were successfully reported.
  885. func ClearReportedPersistentStats(stats map[string][][]byte) error {
  886. checkInitDataStore()
  887. err := singleton.db.Update(func(tx *bolt.Tx) error {
  888. for _, statType := range persistentStatTypes {
  889. bucket := tx.Bucket([]byte(statType))
  890. for _, key := range stats[statType] {
  891. err := bucket.Delete(key)
  892. if err != nil {
  893. return err
  894. }
  895. }
  896. }
  897. return nil
  898. })
  899. if err != nil {
  900. return common.ContextError(err)
  901. }
  902. return nil
  903. }
  904. // resetAllPersistentStatsToUnreported sets all persistent stat
  905. // records to StateUnreported. This reset is called when the
  906. // datastore is initialized at start up, as we do not know if
  907. // persistent records in StateReporting were reported or not.
  908. func resetAllPersistentStatsToUnreported() error {
  909. checkInitDataStore()
  910. err := singleton.db.Update(func(tx *bolt.Tx) error {
  911. for _, statType := range persistentStatTypes {
  912. bucket := tx.Bucket([]byte(statType))
  913. resetKeys := make([][]byte, 0)
  914. cursor := bucket.Cursor()
  915. for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
  916. resetKeys = append(resetKeys, key)
  917. }
  918. // TODO: data mutation is done outside cursor. Is this
  919. // strictly necessary in this case? As is, this means
  920. // all stats need to be loaded into memory at once.
  921. // https://godoc.org/github.com/boltdb/bolt#Cursor
  922. for _, key := range resetKeys {
  923. err := bucket.Put(key, persistentStatStateUnreported)
  924. if err != nil {
  925. return err
  926. }
  927. }
  928. }
  929. return nil
  930. })
  931. if err != nil {
  932. return common.ContextError(err)
  933. }
  934. return nil
  935. }
  936. // CountSLOKs returns the total number of SLOK records.
  937. func CountSLOKs() int {
  938. checkInitDataStore()
  939. count := 0
  940. err := singleton.db.View(func(tx *bolt.Tx) error {
  941. bucket := tx.Bucket([]byte(slokBucket))
  942. cursor := bucket.Cursor()
  943. for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
  944. count++
  945. }
  946. return nil
  947. })
  948. if err != nil {
  949. NoticeAlert("CountSLOKs failed: %s", err)
  950. return 0
  951. }
  952. return count
  953. }
  954. // DeleteSLOKs deletes all SLOK records.
  955. func DeleteSLOKs() error {
  956. checkInitDataStore()
  957. err := singleton.db.Update(func(tx *bolt.Tx) error {
  958. bucket := tx.Bucket([]byte(slokBucket))
  959. return bucket.ForEach(
  960. func(id, _ []byte) error {
  961. return bucket.Delete(id)
  962. })
  963. })
  964. if err != nil {
  965. return common.ContextError(err)
  966. }
  967. return nil
  968. }
  969. // SetSLOK stores a SLOK key, referenced by its ID. The bool
  970. // return value indicates whether the SLOK was already stored.
  971. func SetSLOK(id, key []byte) (bool, error) {
  972. checkInitDataStore()
  973. var duplicate bool
  974. err := singleton.db.Update(func(tx *bolt.Tx) error {
  975. bucket := tx.Bucket([]byte(slokBucket))
  976. duplicate = bucket.Get(id) != nil
  977. err := bucket.Put([]byte(id), []byte(key))
  978. return err
  979. })
  980. if err != nil {
  981. return false, common.ContextError(err)
  982. }
  983. return duplicate, nil
  984. }
  985. // GetSLOK returns a SLOK key for the specified ID. The return
  986. // value is nil if the SLOK is not found.
  987. func GetSLOK(id []byte) (key []byte, err error) {
  988. checkInitDataStore()
  989. err = singleton.db.View(func(tx *bolt.Tx) error {
  990. bucket := tx.Bucket([]byte(slokBucket))
  991. key = bucket.Get(id)
  992. return nil
  993. })
  994. if err != nil {
  995. return nil, common.ContextError(err)
  996. }
  997. return key, nil
  998. }