dataStore.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "encoding/json"
  23. "errors"
  24. "fmt"
  25. "math/rand"
  26. "sync"
  27. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  30. )
  31. var (
  32. datastoreServerEntriesBucket = []byte("serverEntries")
  33. datastoreSplitTunnelRouteETagsBucket = []byte("splitTunnelRouteETags")
  34. datastoreSplitTunnelRouteDataBucket = []byte("splitTunnelRouteData")
  35. datastoreUrlETagsBucket = []byte("urlETags")
  36. datastoreKeyValueBucket = []byte("keyValues")
  37. datastoreRemoteServerListStatsBucket = []byte("remoteServerListStats")
  38. datastoreSLOKsBucket = []byte("SLOKs")
  39. datastoreTacticsBucket = []byte("tactics")
  40. datastoreSpeedTestSamplesBucket = []byte("speedTestSamples")
  41. datastoreLastConnectedKey = "lastConnected"
  42. datastoreLastServerEntryFilterKey = []byte("lastServerEntryFilter")
  43. datastoreAffinityServerEntryIDKey = []byte("affinityServerEntryID")
  44. datastorePersistentStatTypeRemoteServerList = string(datastoreRemoteServerListStatsBucket)
  45. datastoreServerEntryFetchGCThreshold = 20
  46. datastoreInitalizeMutex sync.Mutex
  47. datastoreReferenceMutex sync.Mutex
  48. activeDatastoreDB *datastoreDB
  49. )
  50. // OpenDataStore opens and initializes the singleton data store instance.
  51. func OpenDataStore(config *Config) error {
  52. datastoreInitalizeMutex.Lock()
  53. defer datastoreInitalizeMutex.Unlock()
  54. datastoreReferenceMutex.Lock()
  55. existingDB := activeDatastoreDB
  56. datastoreReferenceMutex.Unlock()
  57. if existingDB != nil {
  58. return common.ContextError(errors.New("db already open"))
  59. }
  60. newDB, err := datastoreOpenDB(config.DataStoreDirectory)
  61. if err != nil {
  62. return common.ContextError(err)
  63. }
  64. datastoreReferenceMutex.Lock()
  65. activeDatastoreDB = newDB
  66. datastoreReferenceMutex.Unlock()
  67. _ = resetAllPersistentStatsToUnreported()
  68. return nil
  69. }
  70. // CloseDataStore closes the singleton data store instance, if open.
  71. func CloseDataStore() {
  72. datastoreInitalizeMutex.Lock()
  73. defer datastoreInitalizeMutex.Unlock()
  74. datastoreReferenceMutex.Lock()
  75. defer datastoreReferenceMutex.Unlock()
  76. if activeDatastoreDB == nil {
  77. return
  78. }
  79. err := activeDatastoreDB.close()
  80. if err != nil {
  81. NoticeAlert("failed to close database: %s", common.ContextError(err))
  82. }
  83. activeDatastoreDB = nil
  84. }
  85. func datastoreView(fn func(tx *datastoreTx) error) error {
  86. datastoreReferenceMutex.Lock()
  87. db := activeDatastoreDB
  88. datastoreReferenceMutex.Unlock()
  89. if db == nil {
  90. return common.ContextError(errors.New("database not open"))
  91. }
  92. err := db.view(fn)
  93. if err != nil {
  94. err = common.ContextError(err)
  95. }
  96. return err
  97. }
  98. func datastoreUpdate(fn func(tx *datastoreTx) error) error {
  99. datastoreReferenceMutex.Lock()
  100. db := activeDatastoreDB
  101. datastoreReferenceMutex.Unlock()
  102. if db == nil {
  103. return common.ContextError(errors.New("database not open"))
  104. }
  105. err := db.update(fn)
  106. if err != nil {
  107. err = common.ContextError(err)
  108. }
  109. return err
  110. }
  111. // StoreServerEntry adds the server entry to the data store.
  112. //
  113. // When a server entry already exists for a given server, it will be
  114. // replaced only if replaceIfExists is set or if the the ConfigurationVersion
  115. // field of the new entry is strictly higher than the existing entry.
  116. //
  117. // If the server entry data is malformed, an alert notice is issued and
  118. // the entry is skipped; no error is returned.
  119. func StoreServerEntry(serverEntryFields protocol.ServerEntryFields, replaceIfExists bool) error {
  120. // Server entries should already be validated before this point,
  121. // so instead of skipping we fail with an error.
  122. err := protocol.ValidateServerEntryFields(serverEntryFields)
  123. if err != nil {
  124. return common.ContextError(
  125. fmt.Errorf("invalid server entry: %s", err))
  126. }
  127. // BoltDB implementation note:
  128. // For simplicity, we don't maintain indexes on server entry
  129. // region or supported protocols. Instead, we perform full-bucket
  130. // scans with a filter. With a small enough database (thousands or
  131. // even tens of thousand of server entries) and common enough
  132. // values (e.g., many servers support all protocols), performance
  133. // is expected to be acceptable.
  134. err = datastoreUpdate(func(tx *datastoreTx) error {
  135. serverEntries := tx.bucket(datastoreServerEntriesBucket)
  136. ipAddress := serverEntryFields.GetIPAddress()
  137. // Check not only that the entry exists, but is valid. This
  138. // will replace in the rare case where the data is corrupt.
  139. existingConfigurationVersion := -1
  140. existingData := serverEntries.get([]byte(ipAddress))
  141. if existingData != nil {
  142. var existingServerEntry *protocol.ServerEntry
  143. err := json.Unmarshal(existingData, &existingServerEntry)
  144. if err == nil {
  145. existingConfigurationVersion = existingServerEntry.ConfigurationVersion
  146. }
  147. }
  148. exists := existingConfigurationVersion > -1
  149. newer := exists && existingConfigurationVersion < serverEntryFields.GetConfigurationVersion()
  150. update := !exists || replaceIfExists || newer
  151. if !update {
  152. // Disabling this notice, for now, as it generates too much noise
  153. // in diagnostics with clients that always submit embedded servers
  154. // to the core on each run.
  155. // NoticeInfo("ignored update for server %s", serverEntry.IpAddress)
  156. return nil
  157. }
  158. data, err := json.Marshal(serverEntryFields)
  159. if err != nil {
  160. return common.ContextError(err)
  161. }
  162. err = serverEntries.put([]byte(ipAddress), data)
  163. if err != nil {
  164. return common.ContextError(err)
  165. }
  166. NoticeInfo("updated server %s", ipAddress)
  167. return nil
  168. })
  169. if err != nil {
  170. return common.ContextError(err)
  171. }
  172. return nil
  173. }
  174. // StoreServerEntries stores a list of server entries.
  175. // There is an independent transaction for each entry insert/update.
  176. func StoreServerEntries(
  177. config *Config,
  178. serverEntries []protocol.ServerEntryFields,
  179. replaceIfExists bool) error {
  180. for _, serverEntryFields := range serverEntries {
  181. err := StoreServerEntry(serverEntryFields, replaceIfExists)
  182. if err != nil {
  183. return common.ContextError(err)
  184. }
  185. }
  186. return nil
  187. }
  188. // StreamingStoreServerEntries stores a list of server entries.
  189. // There is an independent transaction for each entry insert/update.
  190. func StreamingStoreServerEntries(
  191. config *Config,
  192. serverEntries *protocol.StreamingServerEntryDecoder,
  193. replaceIfExists bool) error {
  194. // Note: both StreamingServerEntryDecoder.Next and StoreServerEntry
  195. // allocate temporary memory buffers for hex/JSON decoding/encoding,
  196. // so this isn't true constant-memory streaming (it depends on garbage
  197. // collection).
  198. n := 0
  199. for {
  200. serverEntry, err := serverEntries.Next()
  201. if err != nil {
  202. return common.ContextError(err)
  203. }
  204. if serverEntry == nil {
  205. // No more server entries
  206. break
  207. }
  208. err = StoreServerEntry(serverEntry, replaceIfExists)
  209. if err != nil {
  210. return common.ContextError(err)
  211. }
  212. n += 1
  213. if n == datastoreServerEntryFetchGCThreshold {
  214. defaultGarbageCollection()
  215. n = 0
  216. }
  217. }
  218. return nil
  219. }
  220. // PromoteServerEntry sets the server affinity server entry ID to the
  221. // specified server entry IP address.
  222. func PromoteServerEntry(config *Config, ipAddress string) error {
  223. err := datastoreUpdate(func(tx *datastoreTx) error {
  224. serverEntryID := []byte(ipAddress)
  225. // Ensure the corresponding server entry exists before
  226. // setting server affinity.
  227. bucket := tx.bucket(datastoreServerEntriesBucket)
  228. data := bucket.get(serverEntryID)
  229. if data == nil {
  230. NoticeAlert(
  231. "PromoteServerEntry: ignoring unknown server entry: %s",
  232. ipAddress)
  233. return nil
  234. }
  235. bucket = tx.bucket(datastoreKeyValueBucket)
  236. err := bucket.put(datastoreAffinityServerEntryIDKey, serverEntryID)
  237. if err != nil {
  238. return err
  239. }
  240. // Store the current server entry filter (e.g, region, etc.) that
  241. // was in use when the entry was promoted. This is used to detect
  242. // when the top ranked server entry was promoted under a different
  243. // filter.
  244. currentFilter, err := makeServerEntryFilterValue(config)
  245. if err != nil {
  246. return err
  247. }
  248. return bucket.put(datastoreLastServerEntryFilterKey, currentFilter)
  249. })
  250. if err != nil {
  251. return common.ContextError(err)
  252. }
  253. return nil
  254. }
  255. func makeServerEntryFilterValue(config *Config) ([]byte, error) {
  256. // Currently, only a change of EgressRegion will "break" server affinity.
  257. // If the tunnel protocol filter changes, any existing affinity server
  258. // either passes the new filter, or it will be skipped anyway.
  259. return []byte(config.EgressRegion), nil
  260. }
  261. func hasServerEntryFilterChanged(config *Config) (bool, error) {
  262. currentFilter, err := makeServerEntryFilterValue(config)
  263. if err != nil {
  264. return false, common.ContextError(err)
  265. }
  266. changed := false
  267. err = datastoreView(func(tx *datastoreTx) error {
  268. // previousFilter will be nil not found (not previously
  269. // set) which will never match any current filter.
  270. bucket := tx.bucket(datastoreKeyValueBucket)
  271. previousFilter := bucket.get(datastoreLastServerEntryFilterKey)
  272. if bytes.Compare(previousFilter, currentFilter) != 0 {
  273. changed = true
  274. }
  275. return nil
  276. })
  277. if err != nil {
  278. return false, common.ContextError(err)
  279. }
  280. return changed, nil
  281. }
  282. // ServerEntryIterator is used to iterate over
  283. // stored server entries in rank order.
  284. type ServerEntryIterator struct {
  285. config *Config
  286. applyServerAffinity bool
  287. serverEntryIDs [][]byte
  288. serverEntryIndex int
  289. isTacticsServerEntryIterator bool
  290. isTargetServerEntryIterator bool
  291. hasNextTargetServerEntry bool
  292. targetServerEntry *protocol.ServerEntry
  293. }
  294. // NewServerEntryIterator creates a new ServerEntryIterator.
  295. //
  296. // The boolean return value indicates whether to treat the first server(s)
  297. // as affinity servers or not. When the server entry selection filter changes
  298. // such as from a specific region to any region, or when there was no previous
  299. // filter/iterator, the the first server(s) are arbitrary and should not be
  300. // given affinity treatment.
  301. //
  302. // NewServerEntryIterator and any returned ServerEntryIterator are not
  303. // designed for concurrent use as not all related datastore operations are
  304. // performed in a single transaction.
  305. //
  306. func NewServerEntryIterator(config *Config) (bool, *ServerEntryIterator, error) {
  307. // When configured, this target server entry is the only candidate
  308. if config.TargetServerEntry != "" {
  309. return newTargetServerEntryIterator(config, false)
  310. }
  311. filterChanged, err := hasServerEntryFilterChanged(config)
  312. if err != nil {
  313. return false, nil, common.ContextError(err)
  314. }
  315. applyServerAffinity := !filterChanged
  316. iterator := &ServerEntryIterator{
  317. config: config,
  318. applyServerAffinity: applyServerAffinity,
  319. }
  320. err = iterator.Reset()
  321. if err != nil {
  322. return false, nil, common.ContextError(err)
  323. }
  324. return applyServerAffinity, iterator, nil
  325. }
  326. func NewTacticsServerEntryIterator(config *Config) (*ServerEntryIterator, error) {
  327. // When configured, this target server entry is the only candidate
  328. if config.TargetServerEntry != "" {
  329. _, iterator, err := newTargetServerEntryIterator(config, true)
  330. return iterator, err
  331. }
  332. iterator := &ServerEntryIterator{
  333. isTacticsServerEntryIterator: true,
  334. }
  335. err := iterator.Reset()
  336. if err != nil {
  337. return nil, common.ContextError(err)
  338. }
  339. return iterator, nil
  340. }
  341. // newTargetServerEntryIterator is a helper for initializing the TargetServerEntry case
  342. func newTargetServerEntryIterator(config *Config, isTactics bool) (bool, *ServerEntryIterator, error) {
  343. serverEntry, err := protocol.DecodeServerEntry(
  344. config.TargetServerEntry, common.GetCurrentTimestamp(), protocol.SERVER_ENTRY_SOURCE_TARGET)
  345. if err != nil {
  346. return false, nil, common.ContextError(err)
  347. }
  348. if isTactics {
  349. if len(serverEntry.GetSupportedTacticsProtocols()) == 0 {
  350. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support tactics protocols"))
  351. }
  352. } else {
  353. if config.EgressRegion != "" && serverEntry.Region != config.EgressRegion {
  354. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support EgressRegion"))
  355. }
  356. limitTunnelProtocols := config.clientParameters.Get().TunnelProtocols(parameters.LimitTunnelProtocols)
  357. if len(limitTunnelProtocols) > 0 {
  358. // At the ServerEntryIterator level, only limitTunnelProtocols is applied;
  359. // excludeIntensive is handled higher up.
  360. if len(serverEntry.GetSupportedProtocols(
  361. config.UseUpstreamProxy(), limitTunnelProtocols, false)) == 0 {
  362. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support LimitTunnelProtocols"))
  363. }
  364. }
  365. }
  366. iterator := &ServerEntryIterator{
  367. isTacticsServerEntryIterator: isTactics,
  368. isTargetServerEntryIterator: true,
  369. hasNextTargetServerEntry: true,
  370. targetServerEntry: serverEntry,
  371. }
  372. NoticeInfo("using TargetServerEntry: %s", serverEntry.IpAddress)
  373. return false, iterator, nil
  374. }
  375. // Reset a NewServerEntryIterator to the start of its cycle. The next
  376. // call to Next will return the first server entry.
  377. func (iterator *ServerEntryIterator) Reset() error {
  378. iterator.Close()
  379. if iterator.isTargetServerEntryIterator {
  380. iterator.hasNextTargetServerEntry = true
  381. return nil
  382. }
  383. // BoltDB implementation note:
  384. // We don't keep a transaction open for the duration of the iterator
  385. // because this would expose the following semantics to consumer code:
  386. //
  387. // Read-only transactions and read-write transactions ... generally
  388. // shouldn't be opened simultaneously in the same goroutine. This can
  389. // cause a deadlock as the read-write transaction needs to periodically
  390. // re-map the data file but it cannot do so while a read-only
  391. // transaction is open.
  392. // (https://github.com/boltdb/bolt)
  393. //
  394. // So the underlying serverEntriesBucket could change after the serverEntryIDs
  395. // list is built.
  396. var serverEntryIDs [][]byte
  397. err := datastoreView(func(tx *datastoreTx) error {
  398. bucket := tx.bucket(datastoreKeyValueBucket)
  399. serverEntryIDs = make([][]byte, 0)
  400. shuffleHead := 0
  401. var affinityServerEntryID []byte
  402. if iterator.applyServerAffinity {
  403. affinityServerEntryID = bucket.get(datastoreAffinityServerEntryIDKey)
  404. if affinityServerEntryID != nil {
  405. serverEntryIDs = append(serverEntryIDs, append([]byte(nil), affinityServerEntryID...))
  406. shuffleHead = 1
  407. }
  408. }
  409. bucket = tx.bucket(datastoreServerEntriesBucket)
  410. cursor := bucket.cursor()
  411. for key := cursor.firstKey(); key != nil; key = cursor.nextKey() {
  412. if affinityServerEntryID != nil {
  413. if bytes.Equal(affinityServerEntryID, key) {
  414. continue
  415. }
  416. }
  417. serverEntryIDs = append(serverEntryIDs, append([]byte(nil), key...))
  418. }
  419. cursor.close()
  420. for i := len(serverEntryIDs) - 1; i > shuffleHead-1; i-- {
  421. j := rand.Intn(i+1-shuffleHead) + shuffleHead
  422. serverEntryIDs[i], serverEntryIDs[j] = serverEntryIDs[j], serverEntryIDs[i]
  423. }
  424. return nil
  425. })
  426. if err != nil {
  427. return common.ContextError(err)
  428. }
  429. iterator.serverEntryIDs = serverEntryIDs
  430. iterator.serverEntryIndex = 0
  431. return nil
  432. }
  433. // Close cleans up resources associated with a ServerEntryIterator.
  434. func (iterator *ServerEntryIterator) Close() {
  435. iterator.serverEntryIDs = nil
  436. iterator.serverEntryIndex = 0
  437. }
  438. // Next returns the next server entry, by rank, for a ServerEntryIterator.
  439. // Returns nil with no error when there is no next item.
  440. func (iterator *ServerEntryIterator) Next() (*protocol.ServerEntry, error) {
  441. var serverEntry *protocol.ServerEntry
  442. var err error
  443. defer func() {
  444. if err != nil {
  445. iterator.Close()
  446. }
  447. }()
  448. if iterator.isTargetServerEntryIterator {
  449. if iterator.hasNextTargetServerEntry {
  450. iterator.hasNextTargetServerEntry = false
  451. return MakeCompatibleServerEntry(iterator.targetServerEntry), nil
  452. }
  453. return nil, nil
  454. }
  455. // There are no region/protocol indexes for the server entries bucket.
  456. // Loop until we have the next server entry that matches the iterator
  457. // filter requirements.
  458. for {
  459. if iterator.serverEntryIndex >= len(iterator.serverEntryIDs) {
  460. // There is no next item
  461. return nil, nil
  462. }
  463. serverEntryID := iterator.serverEntryIDs[iterator.serverEntryIndex]
  464. iterator.serverEntryIndex += 1
  465. var data []byte
  466. err = datastoreView(func(tx *datastoreTx) error {
  467. bucket := tx.bucket(datastoreServerEntriesBucket)
  468. value := bucket.get(serverEntryID)
  469. if value != nil {
  470. // Must make a copy as slice is only valid within transaction.
  471. data = make([]byte, len(value))
  472. copy(data, value)
  473. }
  474. return nil
  475. })
  476. if err != nil {
  477. return nil, common.ContextError(err)
  478. }
  479. if data == nil {
  480. // In case of data corruption or a bug causing this condition,
  481. // do not stop iterating.
  482. NoticeAlert("ServerEntryIterator.Next: unexpected missing server entry: %s", string(serverEntryID))
  483. continue
  484. }
  485. err = json.Unmarshal(data, &serverEntry)
  486. if err != nil {
  487. // In case of data corruption or a bug causing this condition,
  488. // do not stop iterating.
  489. NoticeAlert("ServerEntryIterator.Next: %s", common.ContextError(err))
  490. continue
  491. }
  492. if iterator.serverEntryIndex%datastoreServerEntryFetchGCThreshold == 0 {
  493. defaultGarbageCollection()
  494. }
  495. // Check filter requirements
  496. if iterator.isTacticsServerEntryIterator {
  497. // Tactics doesn't filter by egress region.
  498. if len(serverEntry.GetSupportedTacticsProtocols()) > 0 {
  499. break
  500. }
  501. } else {
  502. if iterator.config.EgressRegion == "" ||
  503. serverEntry.Region == iterator.config.EgressRegion {
  504. break
  505. }
  506. }
  507. }
  508. return MakeCompatibleServerEntry(serverEntry), nil
  509. }
  510. // MakeCompatibleServerEntry provides backwards compatibility with old server entries
  511. // which have a single meekFrontingDomain and not a meekFrontingAddresses array.
  512. // By copying this one meekFrontingDomain into meekFrontingAddresses, this client effectively
  513. // uses that single value as legacy clients do.
  514. func MakeCompatibleServerEntry(serverEntry *protocol.ServerEntry) *protocol.ServerEntry {
  515. if len(serverEntry.MeekFrontingAddresses) == 0 && serverEntry.MeekFrontingDomain != "" {
  516. serverEntry.MeekFrontingAddresses =
  517. append(serverEntry.MeekFrontingAddresses, serverEntry.MeekFrontingDomain)
  518. }
  519. return serverEntry
  520. }
  521. func scanServerEntries(scanner func(*protocol.ServerEntry)) error {
  522. err := datastoreView(func(tx *datastoreTx) error {
  523. bucket := tx.bucket(datastoreServerEntriesBucket)
  524. cursor := bucket.cursor()
  525. n := 0
  526. for key, value := cursor.first(); key != nil; key, value = cursor.next() {
  527. var serverEntry *protocol.ServerEntry
  528. err := json.Unmarshal(value, &serverEntry)
  529. if err != nil {
  530. // In case of data corruption or a bug causing this condition,
  531. // do not stop iterating.
  532. NoticeAlert("scanServerEntries: %s", common.ContextError(err))
  533. continue
  534. }
  535. scanner(serverEntry)
  536. n += 1
  537. if n == datastoreServerEntryFetchGCThreshold {
  538. defaultGarbageCollection()
  539. n = 0
  540. }
  541. }
  542. cursor.close()
  543. return nil
  544. })
  545. if err != nil {
  546. return common.ContextError(err)
  547. }
  548. return nil
  549. }
  550. // CountServerEntries returns a count of stored server entries.
  551. func CountServerEntries() int {
  552. count := 0
  553. err := scanServerEntries(func(_ *protocol.ServerEntry) {
  554. count += 1
  555. })
  556. if err != nil {
  557. NoticeAlert("CountServerEntries failed: %s", err)
  558. return 0
  559. }
  560. return count
  561. }
  562. // CountServerEntriesWithLimits returns a count of stored server entries for
  563. // the specified region and tunnel protocol limits.
  564. func CountServerEntriesWithLimits(
  565. useUpstreamProxy bool, region string, limitState *limitTunnelProtocolsState) (int, int) {
  566. // When CountServerEntriesWithLimits is called only
  567. // limitTunnelProtocolState is fixed; excludeIntensive is transitory.
  568. excludeIntensive := false
  569. initialCount := 0
  570. count := 0
  571. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  572. if region == "" || serverEntry.Region == region {
  573. if limitState.isInitialCandidate(excludeIntensive, serverEntry) {
  574. initialCount += 1
  575. }
  576. if limitState.isCandidate(excludeIntensive, serverEntry) {
  577. count += 1
  578. }
  579. }
  580. })
  581. if err != nil {
  582. NoticeAlert("CountServerEntriesWithLimits failed: %s", err)
  583. return 0, 0
  584. }
  585. return initialCount, count
  586. }
  587. // ReportAvailableRegions prints a notice with the available egress regions.
  588. func ReportAvailableRegions(config *Config, limitState *limitTunnelProtocolsState) {
  589. // When ReportAvailableRegions is called only
  590. // limitTunnelProtocolState is fixed; excludeIntensive is transitory.
  591. excludeIntensive := false
  592. regions := make(map[string]bool)
  593. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  594. if limitState.isInitialCandidate(excludeIntensive, serverEntry) ||
  595. limitState.isCandidate(excludeIntensive, serverEntry) {
  596. regions[serverEntry.Region] = true
  597. }
  598. })
  599. if err != nil {
  600. NoticeAlert("ReportAvailableRegions failed: %s", err)
  601. return
  602. }
  603. regionList := make([]string, 0, len(regions))
  604. for region := range regions {
  605. // Some server entries do not have a region, but it makes no sense to return
  606. // an empty string as an "available region".
  607. if region != "" {
  608. regionList = append(regionList, region)
  609. }
  610. }
  611. NoticeAvailableEgressRegions(regionList)
  612. }
  613. // SetSplitTunnelRoutes updates the cached routes data for
  614. // the given region. The associated etag is also stored and
  615. // used to make efficient web requests for updates to the data.
  616. func SetSplitTunnelRoutes(region, etag string, data []byte) error {
  617. err := datastoreUpdate(func(tx *datastoreTx) error {
  618. bucket := tx.bucket(datastoreSplitTunnelRouteETagsBucket)
  619. err := bucket.put([]byte(region), []byte(etag))
  620. bucket = tx.bucket(datastoreSplitTunnelRouteDataBucket)
  621. err = bucket.put([]byte(region), data)
  622. return err
  623. })
  624. if err != nil {
  625. return common.ContextError(err)
  626. }
  627. return nil
  628. }
  629. // GetSplitTunnelRoutesETag retrieves the etag for cached routes
  630. // data for the specified region. If not found, it returns an empty string value.
  631. func GetSplitTunnelRoutesETag(region string) (string, error) {
  632. var etag string
  633. err := datastoreView(func(tx *datastoreTx) error {
  634. bucket := tx.bucket(datastoreSplitTunnelRouteETagsBucket)
  635. etag = string(bucket.get([]byte(region)))
  636. return nil
  637. })
  638. if err != nil {
  639. return "", common.ContextError(err)
  640. }
  641. return etag, nil
  642. }
  643. // GetSplitTunnelRoutesData retrieves the cached routes data
  644. // for the specified region. If not found, it returns a nil value.
  645. func GetSplitTunnelRoutesData(region string) ([]byte, error) {
  646. var data []byte
  647. err := datastoreView(func(tx *datastoreTx) error {
  648. bucket := tx.bucket(datastoreSplitTunnelRouteDataBucket)
  649. value := bucket.get([]byte(region))
  650. if value != nil {
  651. // Must make a copy as slice is only valid within transaction.
  652. data = make([]byte, len(value))
  653. copy(data, value)
  654. }
  655. return nil
  656. })
  657. if err != nil {
  658. return nil, common.ContextError(err)
  659. }
  660. return data, nil
  661. }
  662. // SetUrlETag stores an ETag for the specfied URL.
  663. // Note: input URL is treated as a string, and is not
  664. // encoded or decoded or otherwise canonicalized.
  665. func SetUrlETag(url, etag string) error {
  666. err := datastoreUpdate(func(tx *datastoreTx) error {
  667. bucket := tx.bucket(datastoreUrlETagsBucket)
  668. err := bucket.put([]byte(url), []byte(etag))
  669. return err
  670. })
  671. if err != nil {
  672. return common.ContextError(err)
  673. }
  674. return nil
  675. }
  676. // GetUrlETag retrieves a previously stored an ETag for the
  677. // specfied URL. If not found, it returns an empty string value.
  678. func GetUrlETag(url string) (string, error) {
  679. var etag string
  680. err := datastoreView(func(tx *datastoreTx) error {
  681. bucket := tx.bucket(datastoreUrlETagsBucket)
  682. etag = string(bucket.get([]byte(url)))
  683. return nil
  684. })
  685. if err != nil {
  686. return "", common.ContextError(err)
  687. }
  688. return etag, nil
  689. }
  690. // SetKeyValue stores a key/value pair.
  691. func SetKeyValue(key, value string) error {
  692. err := datastoreUpdate(func(tx *datastoreTx) error {
  693. bucket := tx.bucket(datastoreKeyValueBucket)
  694. err := bucket.put([]byte(key), []byte(value))
  695. return err
  696. })
  697. if err != nil {
  698. return common.ContextError(err)
  699. }
  700. return nil
  701. }
  702. // GetKeyValue retrieves the value for a given key. If not found,
  703. // it returns an empty string value.
  704. func GetKeyValue(key string) (string, error) {
  705. var value string
  706. err := datastoreView(func(tx *datastoreTx) error {
  707. bucket := tx.bucket(datastoreKeyValueBucket)
  708. value = string(bucket.get([]byte(key)))
  709. return nil
  710. })
  711. if err != nil {
  712. return "", common.ContextError(err)
  713. }
  714. return value, nil
  715. }
  716. // Persistent stat records in the persistentStatStateUnreported
  717. // state are available for take out.
  718. //
  719. // Records in the persistentStatStateReporting have been taken
  720. // out and are pending either deletion (for a successful request)
  721. // or change to StateUnreported (for a failed request).
  722. //
  723. // All persistent stat records are reverted to StateUnreported
  724. // when the datastore is initialized at start up.
  725. var persistentStatStateUnreported = []byte("0")
  726. var persistentStatStateReporting = []byte("1")
  727. var persistentStatTypes = []string{
  728. datastorePersistentStatTypeRemoteServerList,
  729. }
  730. // StorePersistentStat adds a new persistent stat record, which
  731. // is set to StateUnreported and is an immediate candidate for
  732. // reporting.
  733. //
  734. // The stat is a JSON byte array containing fields as
  735. // required by the Psiphon server API. It's assumed that the
  736. // JSON value contains enough unique information for the value to
  737. // function as a key in the key/value datastore. This assumption
  738. // is currently satisfied by the fields sessionId + tunnelNumber
  739. // for tunnel stats, and URL + ETag for remote server list stats.
  740. func StorePersistentStat(statType string, stat []byte) error {
  741. if !common.Contains(persistentStatTypes, statType) {
  742. return common.ContextError(fmt.Errorf("invalid persistent stat type: %s", statType))
  743. }
  744. err := datastoreUpdate(func(tx *datastoreTx) error {
  745. bucket := tx.bucket([]byte(statType))
  746. err := bucket.put(stat, persistentStatStateUnreported)
  747. return err
  748. })
  749. if err != nil {
  750. return common.ContextError(err)
  751. }
  752. return nil
  753. }
  754. // CountUnreportedPersistentStats returns the number of persistent
  755. // stat records in StateUnreported.
  756. func CountUnreportedPersistentStats() int {
  757. unreported := 0
  758. err := datastoreView(func(tx *datastoreTx) error {
  759. for _, statType := range persistentStatTypes {
  760. bucket := tx.bucket([]byte(statType))
  761. cursor := bucket.cursor()
  762. for key, value := cursor.first(); key != nil; key, value = cursor.next() {
  763. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  764. unreported++
  765. break
  766. }
  767. }
  768. cursor.close()
  769. }
  770. return nil
  771. })
  772. if err != nil {
  773. NoticeAlert("CountUnreportedPersistentStats failed: %s", err)
  774. return 0
  775. }
  776. return unreported
  777. }
  778. // TakeOutUnreportedPersistentStats returns up to maxCount persistent
  779. // stats records that are in StateUnreported. The records are set to
  780. // StateReporting. If the records are successfully reported, clear them
  781. // with ClearReportedPersistentStats. If the records are not successfully
  782. // reported, restore them with PutBackUnreportedPersistentStats.
  783. func TakeOutUnreportedPersistentStats(maxCount int) (map[string][][]byte, error) {
  784. stats := make(map[string][][]byte)
  785. err := datastoreUpdate(func(tx *datastoreTx) error {
  786. count := 0
  787. for _, statType := range persistentStatTypes {
  788. bucket := tx.bucket([]byte(statType))
  789. cursor := bucket.cursor()
  790. for key, value := cursor.first(); key != nil; key, value = cursor.next() {
  791. if count >= maxCount {
  792. break
  793. }
  794. // Perform a test JSON unmarshaling. In case of data corruption or a bug,
  795. // skip the record.
  796. var jsonData interface{}
  797. err := json.Unmarshal(key, &jsonData)
  798. if err != nil {
  799. NoticeAlert(
  800. "Invalid key in TakeOutUnreportedPersistentStats: %s: %s",
  801. string(key), err)
  802. continue
  803. }
  804. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  805. // Must make a copy as slice is only valid within transaction.
  806. data := make([]byte, len(key))
  807. copy(data, key)
  808. if stats[statType] == nil {
  809. stats[statType] = make([][]byte, 0)
  810. }
  811. stats[statType] = append(stats[statType], data)
  812. count += 1
  813. }
  814. }
  815. cursor.close()
  816. for _, key := range stats[statType] {
  817. err := bucket.put(key, persistentStatStateReporting)
  818. if err != nil {
  819. return err
  820. }
  821. }
  822. }
  823. return nil
  824. })
  825. if err != nil {
  826. return nil, common.ContextError(err)
  827. }
  828. return stats, nil
  829. }
  830. // PutBackUnreportedPersistentStats restores a list of persistent
  831. // stat records to StateUnreported.
  832. func PutBackUnreportedPersistentStats(stats map[string][][]byte) error {
  833. err := datastoreUpdate(func(tx *datastoreTx) error {
  834. for _, statType := range persistentStatTypes {
  835. bucket := tx.bucket([]byte(statType))
  836. for _, key := range stats[statType] {
  837. err := bucket.put(key, persistentStatStateUnreported)
  838. if err != nil {
  839. return err
  840. }
  841. }
  842. }
  843. return nil
  844. })
  845. if err != nil {
  846. return common.ContextError(err)
  847. }
  848. return nil
  849. }
  850. // ClearReportedPersistentStats deletes a list of persistent
  851. // stat records that were successfully reported.
  852. func ClearReportedPersistentStats(stats map[string][][]byte) error {
  853. err := datastoreUpdate(func(tx *datastoreTx) error {
  854. for _, statType := range persistentStatTypes {
  855. bucket := tx.bucket([]byte(statType))
  856. for _, key := range stats[statType] {
  857. err := bucket.delete(key)
  858. if err != nil {
  859. return err
  860. }
  861. }
  862. }
  863. return nil
  864. })
  865. if err != nil {
  866. return common.ContextError(err)
  867. }
  868. return nil
  869. }
  870. // resetAllPersistentStatsToUnreported sets all persistent stat
  871. // records to StateUnreported. This reset is called when the
  872. // datastore is initialized at start up, as we do not know if
  873. // persistent records in StateReporting were reported or not.
  874. func resetAllPersistentStatsToUnreported() error {
  875. err := datastoreUpdate(func(tx *datastoreTx) error {
  876. for _, statType := range persistentStatTypes {
  877. bucket := tx.bucket([]byte(statType))
  878. resetKeys := make([][]byte, 0)
  879. cursor := bucket.cursor()
  880. for key := cursor.firstKey(); key != nil; key = cursor.nextKey() {
  881. resetKeys = append(resetKeys, key)
  882. }
  883. cursor.close()
  884. // TODO: data mutation is done outside cursor. Is this
  885. // strictly necessary in this case? As is, this means
  886. // all stats need to be loaded into memory at once.
  887. // https://godoc.org/github.com/boltdb/bolt#Cursor
  888. for _, key := range resetKeys {
  889. err := bucket.put(key, persistentStatStateUnreported)
  890. if err != nil {
  891. return err
  892. }
  893. }
  894. }
  895. return nil
  896. })
  897. if err != nil {
  898. return common.ContextError(err)
  899. }
  900. return nil
  901. }
  902. // CountSLOKs returns the total number of SLOK records.
  903. func CountSLOKs() int {
  904. count := 0
  905. err := datastoreView(func(tx *datastoreTx) error {
  906. bucket := tx.bucket(datastoreSLOKsBucket)
  907. cursor := bucket.cursor()
  908. for key := cursor.firstKey(); key != nil; key = cursor.nextKey() {
  909. count++
  910. }
  911. cursor.close()
  912. return nil
  913. })
  914. if err != nil {
  915. NoticeAlert("CountSLOKs failed: %s", err)
  916. return 0
  917. }
  918. return count
  919. }
  920. // DeleteSLOKs deletes all SLOK records.
  921. func DeleteSLOKs() error {
  922. err := datastoreUpdate(func(tx *datastoreTx) error {
  923. return tx.clearBucket(datastoreSLOKsBucket)
  924. })
  925. if err != nil {
  926. return common.ContextError(err)
  927. }
  928. return nil
  929. }
  930. // SetSLOK stores a SLOK key, referenced by its ID. The bool
  931. // return value indicates whether the SLOK was already stored.
  932. func SetSLOK(id, key []byte) (bool, error) {
  933. var duplicate bool
  934. err := datastoreUpdate(func(tx *datastoreTx) error {
  935. bucket := tx.bucket(datastoreSLOKsBucket)
  936. duplicate = bucket.get(id) != nil
  937. err := bucket.put([]byte(id), []byte(key))
  938. return err
  939. })
  940. if err != nil {
  941. return false, common.ContextError(err)
  942. }
  943. return duplicate, nil
  944. }
  945. // GetSLOK returns a SLOK key for the specified ID. The return
  946. // value is nil if the SLOK is not found.
  947. func GetSLOK(id []byte) ([]byte, error) {
  948. var key []byte
  949. err := datastoreView(func(tx *datastoreTx) error {
  950. bucket := tx.bucket(datastoreSLOKsBucket)
  951. key = bucket.get(id)
  952. return nil
  953. })
  954. if err != nil {
  955. return nil, common.ContextError(err)
  956. }
  957. return key, nil
  958. }
  959. // TacticsStorer implements tactics.Storer.
  960. type TacticsStorer struct {
  961. }
  962. func (t *TacticsStorer) SetTacticsRecord(networkID string, record []byte) error {
  963. return setBucketValue(datastoreTacticsBucket, []byte(networkID), record)
  964. }
  965. func (t *TacticsStorer) GetTacticsRecord(networkID string) ([]byte, error) {
  966. return getBucketValue(datastoreTacticsBucket, []byte(networkID))
  967. }
  968. func (t *TacticsStorer) SetSpeedTestSamplesRecord(networkID string, record []byte) error {
  969. return setBucketValue(datastoreSpeedTestSamplesBucket, []byte(networkID), record)
  970. }
  971. func (t *TacticsStorer) GetSpeedTestSamplesRecord(networkID string) ([]byte, error) {
  972. return getBucketValue(datastoreSpeedTestSamplesBucket, []byte(networkID))
  973. }
  974. // GetTacticsStorer creates a TacticsStorer.
  975. func GetTacticsStorer() *TacticsStorer {
  976. return &TacticsStorer{}
  977. }
  978. func setBucketValue(bucket, key, value []byte) error {
  979. err := datastoreUpdate(func(tx *datastoreTx) error {
  980. bucket := tx.bucket(bucket)
  981. err := bucket.put(key, value)
  982. return err
  983. })
  984. if err != nil {
  985. return common.ContextError(err)
  986. }
  987. return nil
  988. }
  989. func getBucketValue(bucket, key []byte) ([]byte, error) {
  990. var value []byte
  991. err := datastoreView(func(tx *datastoreTx) error {
  992. bucket := tx.bucket(bucket)
  993. value = bucket.get(key)
  994. return nil
  995. })
  996. if err != nil {
  997. return nil, common.ContextError(err)
  998. }
  999. return value, nil
  1000. }