dataStore.go 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "encoding/json"
  23. "errors"
  24. "fmt"
  25. "sync"
  26. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  27. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  30. )
  31. var (
  32. datastoreServerEntriesBucket = []byte("serverEntries")
  33. datastoreSplitTunnelRouteETagsBucket = []byte("splitTunnelRouteETags")
  34. datastoreSplitTunnelRouteDataBucket = []byte("splitTunnelRouteData")
  35. datastoreUrlETagsBucket = []byte("urlETags")
  36. datastoreKeyValueBucket = []byte("keyValues")
  37. datastoreRemoteServerListStatsBucket = []byte("remoteServerListStats")
  38. datastoreSLOKsBucket = []byte("SLOKs")
  39. datastoreTacticsBucket = []byte("tactics")
  40. datastoreSpeedTestSamplesBucket = []byte("speedTestSamples")
  41. datastoreDialParametersBucket = []byte("dialParameters")
  42. datastoreLastConnectedKey = "lastConnected"
  43. datastoreLastServerEntryFilterKey = []byte("lastServerEntryFilter")
  44. datastoreAffinityServerEntryIDKey = []byte("affinityServerEntryID")
  45. datastorePersistentStatTypeRemoteServerList = string(datastoreRemoteServerListStatsBucket)
  46. datastoreServerEntryFetchGCThreshold = 20
  47. datastoreMutex sync.RWMutex
  48. activeDatastoreDB *datastoreDB
  49. )
  50. // OpenDataStore opens and initializes the singleton data store instance.
  51. func OpenDataStore(config *Config) error {
  52. datastoreMutex.Lock()
  53. existingDB := activeDatastoreDB
  54. if existingDB != nil {
  55. datastoreMutex.Unlock()
  56. return common.ContextError(errors.New("db already open"))
  57. }
  58. newDB, err := datastoreOpenDB(config.DataStoreDirectory)
  59. if err != nil {
  60. datastoreMutex.Unlock()
  61. return common.ContextError(err)
  62. }
  63. activeDatastoreDB = newDB
  64. datastoreMutex.Unlock()
  65. _ = resetAllPersistentStatsToUnreported()
  66. return nil
  67. }
  68. // CloseDataStore closes the singleton data store instance, if open.
  69. func CloseDataStore() {
  70. datastoreMutex.Lock()
  71. defer datastoreMutex.Unlock()
  72. if activeDatastoreDB == nil {
  73. return
  74. }
  75. err := activeDatastoreDB.close()
  76. if err != nil {
  77. NoticeAlert("failed to close database: %s", common.ContextError(err))
  78. }
  79. activeDatastoreDB = nil
  80. }
  81. func datastoreView(fn func(tx *datastoreTx) error) error {
  82. datastoreMutex.RLock()
  83. defer datastoreMutex.RUnlock()
  84. if activeDatastoreDB == nil {
  85. return common.ContextError(errors.New("database not open"))
  86. }
  87. err := activeDatastoreDB.view(fn)
  88. if err != nil {
  89. err = common.ContextError(err)
  90. }
  91. return err
  92. }
  93. func datastoreUpdate(fn func(tx *datastoreTx) error) error {
  94. datastoreMutex.RLock()
  95. defer datastoreMutex.RUnlock()
  96. if activeDatastoreDB == nil {
  97. return common.ContextError(errors.New("database not open"))
  98. }
  99. err := activeDatastoreDB.update(fn)
  100. if err != nil {
  101. err = common.ContextError(err)
  102. }
  103. return err
  104. }
  105. // StoreServerEntry adds the server entry to the data store.
  106. //
  107. // When a server entry already exists for a given server, it will be
  108. // replaced only if replaceIfExists is set or if the the ConfigurationVersion
  109. // field of the new entry is strictly higher than the existing entry.
  110. //
  111. // If the server entry data is malformed, an alert notice is issued and
  112. // the entry is skipped; no error is returned.
  113. func StoreServerEntry(serverEntryFields protocol.ServerEntryFields, replaceIfExists bool) error {
  114. // Server entries should already be validated before this point,
  115. // so instead of skipping we fail with an error.
  116. err := protocol.ValidateServerEntryFields(serverEntryFields)
  117. if err != nil {
  118. return common.ContextError(
  119. fmt.Errorf("invalid server entry: %s", err))
  120. }
  121. // BoltDB implementation note:
  122. // For simplicity, we don't maintain indexes on server entry
  123. // region or supported protocols. Instead, we perform full-bucket
  124. // scans with a filter. With a small enough database (thousands or
  125. // even tens of thousand of server entries) and common enough
  126. // values (e.g., many servers support all protocols), performance
  127. // is expected to be acceptable.
  128. err = datastoreUpdate(func(tx *datastoreTx) error {
  129. serverEntries := tx.bucket(datastoreServerEntriesBucket)
  130. ipAddress := serverEntryFields.GetIPAddress()
  131. // Check not only that the entry exists, but is valid. This
  132. // will replace in the rare case where the data is corrupt.
  133. existingConfigurationVersion := -1
  134. existingData := serverEntries.get([]byte(ipAddress))
  135. if existingData != nil {
  136. var existingServerEntry *protocol.ServerEntry
  137. err := json.Unmarshal(existingData, &existingServerEntry)
  138. if err == nil {
  139. existingConfigurationVersion = existingServerEntry.ConfigurationVersion
  140. }
  141. }
  142. exists := existingConfigurationVersion > -1
  143. newer := exists && existingConfigurationVersion < serverEntryFields.GetConfigurationVersion()
  144. update := !exists || replaceIfExists || newer
  145. if !update {
  146. // Disabling this notice, for now, as it generates too much noise
  147. // in diagnostics with clients that always submit embedded servers
  148. // to the core on each run.
  149. // NoticeInfo("ignored update for server %s", serverEntry.IpAddress)
  150. return nil
  151. }
  152. data, err := json.Marshal(serverEntryFields)
  153. if err != nil {
  154. return common.ContextError(err)
  155. }
  156. err = serverEntries.put([]byte(ipAddress), data)
  157. if err != nil {
  158. return common.ContextError(err)
  159. }
  160. NoticeInfo("updated server %s", ipAddress)
  161. return nil
  162. })
  163. if err != nil {
  164. return common.ContextError(err)
  165. }
  166. return nil
  167. }
  168. // StoreServerEntries stores a list of server entries.
  169. // There is an independent transaction for each entry insert/update.
  170. func StoreServerEntries(
  171. config *Config,
  172. serverEntries []protocol.ServerEntryFields,
  173. replaceIfExists bool) error {
  174. for _, serverEntryFields := range serverEntries {
  175. err := StoreServerEntry(serverEntryFields, replaceIfExists)
  176. if err != nil {
  177. return common.ContextError(err)
  178. }
  179. }
  180. return nil
  181. }
  182. // StreamingStoreServerEntries stores a list of server entries.
  183. // There is an independent transaction for each entry insert/update.
  184. func StreamingStoreServerEntries(
  185. config *Config,
  186. serverEntries *protocol.StreamingServerEntryDecoder,
  187. replaceIfExists bool) error {
  188. // Note: both StreamingServerEntryDecoder.Next and StoreServerEntry
  189. // allocate temporary memory buffers for hex/JSON decoding/encoding,
  190. // so this isn't true constant-memory streaming (it depends on garbage
  191. // collection).
  192. n := 0
  193. for {
  194. serverEntry, err := serverEntries.Next()
  195. if err != nil {
  196. return common.ContextError(err)
  197. }
  198. if serverEntry == nil {
  199. // No more server entries
  200. break
  201. }
  202. err = StoreServerEntry(serverEntry, replaceIfExists)
  203. if err != nil {
  204. return common.ContextError(err)
  205. }
  206. n += 1
  207. if n == datastoreServerEntryFetchGCThreshold {
  208. DoGarbageCollection()
  209. n = 0
  210. }
  211. }
  212. return nil
  213. }
  214. // PromoteServerEntry sets the server affinity server entry ID to the
  215. // specified server entry IP address.
  216. func PromoteServerEntry(config *Config, ipAddress string) error {
  217. err := datastoreUpdate(func(tx *datastoreTx) error {
  218. serverEntryID := []byte(ipAddress)
  219. // Ensure the corresponding server entry exists before
  220. // setting server affinity.
  221. bucket := tx.bucket(datastoreServerEntriesBucket)
  222. data := bucket.get(serverEntryID)
  223. if data == nil {
  224. NoticeAlert(
  225. "PromoteServerEntry: ignoring unknown server entry: %s",
  226. ipAddress)
  227. return nil
  228. }
  229. bucket = tx.bucket(datastoreKeyValueBucket)
  230. err := bucket.put(datastoreAffinityServerEntryIDKey, serverEntryID)
  231. if err != nil {
  232. return err
  233. }
  234. // Store the current server entry filter (e.g, region, etc.) that
  235. // was in use when the entry was promoted. This is used to detect
  236. // when the top ranked server entry was promoted under a different
  237. // filter.
  238. currentFilter, err := makeServerEntryFilterValue(config)
  239. if err != nil {
  240. return err
  241. }
  242. return bucket.put(datastoreLastServerEntryFilterKey, currentFilter)
  243. })
  244. if err != nil {
  245. return common.ContextError(err)
  246. }
  247. return nil
  248. }
  249. func makeServerEntryFilterValue(config *Config) ([]byte, error) {
  250. // Currently, only a change of EgressRegion will "break" server affinity.
  251. // If the tunnel protocol filter changes, any existing affinity server
  252. // either passes the new filter, or it will be skipped anyway.
  253. return []byte(config.EgressRegion), nil
  254. }
  255. func hasServerEntryFilterChanged(config *Config) (bool, error) {
  256. currentFilter, err := makeServerEntryFilterValue(config)
  257. if err != nil {
  258. return false, common.ContextError(err)
  259. }
  260. changed := false
  261. err = datastoreView(func(tx *datastoreTx) error {
  262. bucket := tx.bucket(datastoreKeyValueBucket)
  263. previousFilter := bucket.get(datastoreLastServerEntryFilterKey)
  264. // When not found, previousFilter will be nil; ensure this
  265. // results in "changed", even if currentFilter is len(0).
  266. if previousFilter == nil ||
  267. bytes.Compare(previousFilter, currentFilter) != 0 {
  268. changed = true
  269. }
  270. return nil
  271. })
  272. if err != nil {
  273. return false, common.ContextError(err)
  274. }
  275. return changed, nil
  276. }
  277. // ServerEntryIterator is used to iterate over
  278. // stored server entries in rank order.
  279. type ServerEntryIterator struct {
  280. config *Config
  281. applyServerAffinity bool
  282. serverEntryIDs [][]byte
  283. serverEntryIndex int
  284. isTacticsServerEntryIterator bool
  285. isTargetServerEntryIterator bool
  286. hasNextTargetServerEntry bool
  287. targetServerEntry *protocol.ServerEntry
  288. }
  289. // NewServerEntryIterator creates a new ServerEntryIterator.
  290. //
  291. // The boolean return value indicates whether to treat the first server(s)
  292. // as affinity servers or not. When the server entry selection filter changes
  293. // such as from a specific region to any region, or when there was no previous
  294. // filter/iterator, the the first server(s) are arbitrary and should not be
  295. // given affinity treatment.
  296. //
  297. // NewServerEntryIterator and any returned ServerEntryIterator are not
  298. // designed for concurrent use as not all related datastore operations are
  299. // performed in a single transaction.
  300. //
  301. func NewServerEntryIterator(config *Config) (bool, *ServerEntryIterator, error) {
  302. // When configured, this target server entry is the only candidate
  303. if config.TargetServerEntry != "" {
  304. return newTargetServerEntryIterator(config, false)
  305. }
  306. filterChanged, err := hasServerEntryFilterChanged(config)
  307. if err != nil {
  308. return false, nil, common.ContextError(err)
  309. }
  310. applyServerAffinity := !filterChanged
  311. iterator := &ServerEntryIterator{
  312. config: config,
  313. applyServerAffinity: applyServerAffinity,
  314. }
  315. err = iterator.reset(true)
  316. if err != nil {
  317. return false, nil, common.ContextError(err)
  318. }
  319. return applyServerAffinity, iterator, nil
  320. }
  321. func NewTacticsServerEntryIterator(config *Config) (*ServerEntryIterator, error) {
  322. // When configured, this target server entry is the only candidate
  323. if config.TargetServerEntry != "" {
  324. _, iterator, err := newTargetServerEntryIterator(config, true)
  325. return iterator, err
  326. }
  327. iterator := &ServerEntryIterator{
  328. config: config,
  329. isTacticsServerEntryIterator: true,
  330. }
  331. err := iterator.reset(true)
  332. if err != nil {
  333. return nil, common.ContextError(err)
  334. }
  335. return iterator, nil
  336. }
  337. // newTargetServerEntryIterator is a helper for initializing the TargetServerEntry case
  338. func newTargetServerEntryIterator(config *Config, isTactics bool) (bool, *ServerEntryIterator, error) {
  339. serverEntry, err := protocol.DecodeServerEntry(
  340. config.TargetServerEntry, common.GetCurrentTimestamp(), protocol.SERVER_ENTRY_SOURCE_TARGET)
  341. if err != nil {
  342. return false, nil, common.ContextError(err)
  343. }
  344. if isTactics {
  345. if len(serverEntry.GetSupportedTacticsProtocols()) == 0 {
  346. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support tactics protocols"))
  347. }
  348. } else {
  349. if config.EgressRegion != "" && serverEntry.Region != config.EgressRegion {
  350. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support EgressRegion"))
  351. }
  352. limitTunnelProtocols := config.GetClientParameters().TunnelProtocols(parameters.LimitTunnelProtocols)
  353. if len(limitTunnelProtocols) > 0 {
  354. // At the ServerEntryIterator level, only limitTunnelProtocols is applied;
  355. // excludeIntensive is handled higher up.
  356. if len(serverEntry.GetSupportedProtocols(
  357. config.UseUpstreamProxy(), limitTunnelProtocols, false)) == 0 {
  358. return false, nil, common.ContextError(errors.New("TargetServerEntry does not support LimitTunnelProtocols"))
  359. }
  360. }
  361. }
  362. iterator := &ServerEntryIterator{
  363. isTacticsServerEntryIterator: isTactics,
  364. isTargetServerEntryIterator: true,
  365. hasNextTargetServerEntry: true,
  366. targetServerEntry: serverEntry,
  367. }
  368. NoticeInfo("using TargetServerEntry: %s", serverEntry.IpAddress)
  369. return false, iterator, nil
  370. }
  371. // Reset a NewServerEntryIterator to the start of its cycle. The next
  372. // call to Next will return the first server entry.
  373. func (iterator *ServerEntryIterator) Reset() error {
  374. return iterator.reset(false)
  375. }
  376. func (iterator *ServerEntryIterator) reset(isInitialRound bool) error {
  377. iterator.Close()
  378. if iterator.isTargetServerEntryIterator {
  379. iterator.hasNextTargetServerEntry = true
  380. return nil
  381. }
  382. // BoltDB implementation note:
  383. // We don't keep a transaction open for the duration of the iterator
  384. // because this would expose the following semantics to consumer code:
  385. //
  386. // Read-only transactions and read-write transactions ... generally
  387. // shouldn't be opened simultaneously in the same goroutine. This can
  388. // cause a deadlock as the read-write transaction needs to periodically
  389. // re-map the data file but it cannot do so while a read-only
  390. // transaction is open.
  391. // (https://github.com/boltdb/bolt)
  392. //
  393. // So the underlying serverEntriesBucket could change after the serverEntryIDs
  394. // list is built.
  395. var serverEntryIDs [][]byte
  396. err := datastoreView(func(tx *datastoreTx) error {
  397. bucket := tx.bucket(datastoreKeyValueBucket)
  398. serverEntryIDs = make([][]byte, 0)
  399. shuffleHead := 0
  400. var affinityServerEntryID []byte
  401. // In the first round only, move any server affinity candiate to the
  402. // very first position.
  403. if isInitialRound &&
  404. iterator.applyServerAffinity {
  405. affinityServerEntryID = bucket.get(datastoreAffinityServerEntryIDKey)
  406. if affinityServerEntryID != nil {
  407. serverEntryIDs = append(serverEntryIDs, append([]byte(nil), affinityServerEntryID...))
  408. shuffleHead = 1
  409. }
  410. }
  411. bucket = tx.bucket(datastoreServerEntriesBucket)
  412. cursor := bucket.cursor()
  413. for key := cursor.firstKey(); key != nil; key = cursor.nextKey() {
  414. if affinityServerEntryID != nil {
  415. if bytes.Equal(affinityServerEntryID, key) {
  416. continue
  417. }
  418. }
  419. serverEntryIDs = append(serverEntryIDs, append([]byte(nil), key...))
  420. }
  421. cursor.close()
  422. // Randomly shuffle the entire list of server IDs, excluding the
  423. // server affinity candidate.
  424. for i := len(serverEntryIDs) - 1; i > shuffleHead-1; i-- {
  425. j := prng.Intn(i+1-shuffleHead) + shuffleHead
  426. serverEntryIDs[i], serverEntryIDs[j] = serverEntryIDs[j], serverEntryIDs[i]
  427. }
  428. // In the first round only, move _potential_ replay candidates to the
  429. // front of the list (excepting the server affinity slot, if any).
  430. // This move is post-shuffle so the order is still randomized. To save
  431. // the memory overhead of unmarshalling all dial parameters, this
  432. // operation just moves any server with a dial parameter record to the
  433. // front. Whether the dial parameter remains valid for replay -- TTL,
  434. // tactics/config unchanged, etc. --- is checked later.
  435. //
  436. // TODO: move only up to parameters.ReplayCandidateCount to front?
  437. if isInitialRound &&
  438. iterator.config.GetClientParameters().Int(parameters.ReplayCandidateCount) > 0 {
  439. networkID := []byte(iterator.config.GetNetworkID())
  440. dialParamsBucket := tx.bucket(datastoreDialParametersBucket)
  441. i := shuffleHead
  442. j := len(serverEntryIDs) - 1
  443. for {
  444. for ; i < j; i++ {
  445. key := makeDialParametersKey(serverEntryIDs[i], networkID)
  446. if dialParamsBucket.get(key) == nil {
  447. break
  448. }
  449. }
  450. for ; i < j; j-- {
  451. key := makeDialParametersKey(serverEntryIDs[j], networkID)
  452. if dialParamsBucket.get(key) != nil {
  453. break
  454. }
  455. }
  456. if i < j {
  457. serverEntryIDs[i], serverEntryIDs[j] = serverEntryIDs[j], serverEntryIDs[i]
  458. i++
  459. j--
  460. } else {
  461. break
  462. }
  463. }
  464. }
  465. return nil
  466. })
  467. if err != nil {
  468. return common.ContextError(err)
  469. }
  470. iterator.serverEntryIDs = serverEntryIDs
  471. iterator.serverEntryIndex = 0
  472. return nil
  473. }
  474. // Close cleans up resources associated with a ServerEntryIterator.
  475. func (iterator *ServerEntryIterator) Close() {
  476. iterator.serverEntryIDs = nil
  477. iterator.serverEntryIndex = 0
  478. }
  479. // Next returns the next server entry, by rank, for a ServerEntryIterator.
  480. // Returns nil with no error when there is no next item.
  481. func (iterator *ServerEntryIterator) Next() (*protocol.ServerEntry, error) {
  482. var serverEntry *protocol.ServerEntry
  483. var err error
  484. defer func() {
  485. if err != nil {
  486. iterator.Close()
  487. }
  488. }()
  489. if iterator.isTargetServerEntryIterator {
  490. if iterator.hasNextTargetServerEntry {
  491. iterator.hasNextTargetServerEntry = false
  492. return MakeCompatibleServerEntry(iterator.targetServerEntry), nil
  493. }
  494. return nil, nil
  495. }
  496. // There are no region/protocol indexes for the server entries bucket.
  497. // Loop until we have the next server entry that matches the iterator
  498. // filter requirements.
  499. for {
  500. if iterator.serverEntryIndex >= len(iterator.serverEntryIDs) {
  501. // There is no next item
  502. return nil, nil
  503. }
  504. serverEntryID := iterator.serverEntryIDs[iterator.serverEntryIndex]
  505. iterator.serverEntryIndex += 1
  506. var data []byte
  507. err = datastoreView(func(tx *datastoreTx) error {
  508. bucket := tx.bucket(datastoreServerEntriesBucket)
  509. value := bucket.get(serverEntryID)
  510. if value != nil {
  511. // Must make a copy as slice is only valid within transaction.
  512. data = make([]byte, len(value))
  513. copy(data, value)
  514. }
  515. return nil
  516. })
  517. if err != nil {
  518. return nil, common.ContextError(err)
  519. }
  520. if data == nil {
  521. // In case of data corruption or a bug causing this condition,
  522. // do not stop iterating.
  523. NoticeAlert("ServerEntryIterator.Next: unexpected missing server entry: %s", string(serverEntryID))
  524. continue
  525. }
  526. err = json.Unmarshal(data, &serverEntry)
  527. if err != nil {
  528. // In case of data corruption or a bug causing this condition,
  529. // do not stop iterating.
  530. NoticeAlert("ServerEntryIterator.Next: %s", common.ContextError(err))
  531. continue
  532. }
  533. if iterator.serverEntryIndex%datastoreServerEntryFetchGCThreshold == 0 {
  534. DoGarbageCollection()
  535. }
  536. // Check filter requirements
  537. if iterator.isTacticsServerEntryIterator {
  538. // Tactics doesn't filter by egress region.
  539. if len(serverEntry.GetSupportedTacticsProtocols()) > 0 {
  540. break
  541. }
  542. } else {
  543. if iterator.config.EgressRegion == "" ||
  544. serverEntry.Region == iterator.config.EgressRegion {
  545. break
  546. }
  547. }
  548. }
  549. return MakeCompatibleServerEntry(serverEntry), nil
  550. }
  551. // MakeCompatibleServerEntry provides backwards compatibility with old server entries
  552. // which have a single meekFrontingDomain and not a meekFrontingAddresses array.
  553. // By copying this one meekFrontingDomain into meekFrontingAddresses, this client effectively
  554. // uses that single value as legacy clients do.
  555. func MakeCompatibleServerEntry(serverEntry *protocol.ServerEntry) *protocol.ServerEntry {
  556. if len(serverEntry.MeekFrontingAddresses) == 0 && serverEntry.MeekFrontingDomain != "" {
  557. serverEntry.MeekFrontingAddresses =
  558. append(serverEntry.MeekFrontingAddresses, serverEntry.MeekFrontingDomain)
  559. }
  560. return serverEntry
  561. }
  562. func scanServerEntries(scanner func(*protocol.ServerEntry)) error {
  563. err := datastoreView(func(tx *datastoreTx) error {
  564. bucket := tx.bucket(datastoreServerEntriesBucket)
  565. cursor := bucket.cursor()
  566. n := 0
  567. for key, value := cursor.first(); key != nil; key, value = cursor.next() {
  568. var serverEntry *protocol.ServerEntry
  569. err := json.Unmarshal(value, &serverEntry)
  570. if err != nil {
  571. // In case of data corruption or a bug causing this condition,
  572. // do not stop iterating.
  573. NoticeAlert("scanServerEntries: %s", common.ContextError(err))
  574. continue
  575. }
  576. scanner(serverEntry)
  577. n += 1
  578. if n == datastoreServerEntryFetchGCThreshold {
  579. DoGarbageCollection()
  580. n = 0
  581. }
  582. }
  583. cursor.close()
  584. return nil
  585. })
  586. if err != nil {
  587. return common.ContextError(err)
  588. }
  589. return nil
  590. }
  591. // CountServerEntries returns a count of stored server entries.
  592. func CountServerEntries() int {
  593. count := 0
  594. err := scanServerEntries(func(_ *protocol.ServerEntry) {
  595. count += 1
  596. })
  597. if err != nil {
  598. NoticeAlert("CountServerEntries failed: %s", err)
  599. return 0
  600. }
  601. return count
  602. }
  603. // CountServerEntriesWithConstraints returns a count of stored server entries for
  604. // the specified region and tunnel protocol limits.
  605. func CountServerEntriesWithConstraints(
  606. useUpstreamProxy bool,
  607. region string,
  608. constraints *protocolSelectionConstraints) (int, int) {
  609. // When CountServerEntriesWithConstraints is called only
  610. // limitTunnelProtocolState is fixed; excludeIntensive is transitory.
  611. excludeIntensive := false
  612. initialCount := 0
  613. count := 0
  614. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  615. if region == "" || serverEntry.Region == region {
  616. if constraints.isInitialCandidate(excludeIntensive, serverEntry) {
  617. initialCount += 1
  618. }
  619. if constraints.isCandidate(excludeIntensive, serverEntry) {
  620. count += 1
  621. }
  622. }
  623. })
  624. if err != nil {
  625. NoticeAlert("CountServerEntriesWithConstraints failed: %s", err)
  626. return 0, 0
  627. }
  628. return initialCount, count
  629. }
  630. // ReportAvailableRegions prints a notice with the available egress regions.
  631. // When limitState has initial protocols, the available regions are limited
  632. // to those available for the initial protocols; or if limitState has general
  633. // limited protocols, the available regions are similarly limited.
  634. func ReportAvailableRegions(config *Config, constraints *protocolSelectionConstraints) {
  635. // When ReportAvailableRegions is called only limitTunnelProtocolState is
  636. // fixed; excludeIntensive is transitory.
  637. excludeIntensive := false
  638. regions := make(map[string]bool)
  639. err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
  640. isCandidate := false
  641. if constraints.hasInitialProtocols() {
  642. isCandidate = constraints.isInitialCandidate(excludeIntensive, serverEntry)
  643. } else {
  644. isCandidate = constraints.isCandidate(excludeIntensive, serverEntry)
  645. }
  646. if isCandidate {
  647. regions[serverEntry.Region] = true
  648. }
  649. })
  650. if err != nil {
  651. NoticeAlert("ReportAvailableRegions failed: %s", err)
  652. return
  653. }
  654. regionList := make([]string, 0, len(regions))
  655. for region := range regions {
  656. // Some server entries do not have a region, but it makes no sense to return
  657. // an empty string as an "available region".
  658. if region != "" {
  659. regionList = append(regionList, region)
  660. }
  661. }
  662. NoticeAvailableEgressRegions(regionList)
  663. }
  664. // SetSplitTunnelRoutes updates the cached routes data for
  665. // the given region. The associated etag is also stored and
  666. // used to make efficient web requests for updates to the data.
  667. func SetSplitTunnelRoutes(region, etag string, data []byte) error {
  668. err := datastoreUpdate(func(tx *datastoreTx) error {
  669. bucket := tx.bucket(datastoreSplitTunnelRouteETagsBucket)
  670. err := bucket.put([]byte(region), []byte(etag))
  671. bucket = tx.bucket(datastoreSplitTunnelRouteDataBucket)
  672. err = bucket.put([]byte(region), data)
  673. return err
  674. })
  675. if err != nil {
  676. return common.ContextError(err)
  677. }
  678. return nil
  679. }
  680. // GetSplitTunnelRoutesETag retrieves the etag for cached routes
  681. // data for the specified region. If not found, it returns an empty string value.
  682. func GetSplitTunnelRoutesETag(region string) (string, error) {
  683. var etag string
  684. err := datastoreView(func(tx *datastoreTx) error {
  685. bucket := tx.bucket(datastoreSplitTunnelRouteETagsBucket)
  686. etag = string(bucket.get([]byte(region)))
  687. return nil
  688. })
  689. if err != nil {
  690. return "", common.ContextError(err)
  691. }
  692. return etag, nil
  693. }
  694. // GetSplitTunnelRoutesData retrieves the cached routes data
  695. // for the specified region. If not found, it returns a nil value.
  696. func GetSplitTunnelRoutesData(region string) ([]byte, error) {
  697. var data []byte
  698. err := datastoreView(func(tx *datastoreTx) error {
  699. bucket := tx.bucket(datastoreSplitTunnelRouteDataBucket)
  700. value := bucket.get([]byte(region))
  701. if value != nil {
  702. // Must make a copy as slice is only valid within transaction.
  703. data = make([]byte, len(value))
  704. copy(data, value)
  705. }
  706. return nil
  707. })
  708. if err != nil {
  709. return nil, common.ContextError(err)
  710. }
  711. return data, nil
  712. }
  713. // SetUrlETag stores an ETag for the specfied URL.
  714. // Note: input URL is treated as a string, and is not
  715. // encoded or decoded or otherwise canonicalized.
  716. func SetUrlETag(url, etag string) error {
  717. err := datastoreUpdate(func(tx *datastoreTx) error {
  718. bucket := tx.bucket(datastoreUrlETagsBucket)
  719. err := bucket.put([]byte(url), []byte(etag))
  720. return err
  721. })
  722. if err != nil {
  723. return common.ContextError(err)
  724. }
  725. return nil
  726. }
  727. // GetUrlETag retrieves a previously stored an ETag for the
  728. // specfied URL. If not found, it returns an empty string value.
  729. func GetUrlETag(url string) (string, error) {
  730. var etag string
  731. err := datastoreView(func(tx *datastoreTx) error {
  732. bucket := tx.bucket(datastoreUrlETagsBucket)
  733. etag = string(bucket.get([]byte(url)))
  734. return nil
  735. })
  736. if err != nil {
  737. return "", common.ContextError(err)
  738. }
  739. return etag, nil
  740. }
  741. // SetKeyValue stores a key/value pair.
  742. func SetKeyValue(key, value string) error {
  743. err := datastoreUpdate(func(tx *datastoreTx) error {
  744. bucket := tx.bucket(datastoreKeyValueBucket)
  745. err := bucket.put([]byte(key), []byte(value))
  746. return err
  747. })
  748. if err != nil {
  749. return common.ContextError(err)
  750. }
  751. return nil
  752. }
  753. // GetKeyValue retrieves the value for a given key. If not found,
  754. // it returns an empty string value.
  755. func GetKeyValue(key string) (string, error) {
  756. var value string
  757. err := datastoreView(func(tx *datastoreTx) error {
  758. bucket := tx.bucket(datastoreKeyValueBucket)
  759. value = string(bucket.get([]byte(key)))
  760. return nil
  761. })
  762. if err != nil {
  763. return "", common.ContextError(err)
  764. }
  765. return value, nil
  766. }
  767. // Persistent stat records in the persistentStatStateUnreported
  768. // state are available for take out.
  769. //
  770. // Records in the persistentStatStateReporting have been taken
  771. // out and are pending either deletion (for a successful request)
  772. // or change to StateUnreported (for a failed request).
  773. //
  774. // All persistent stat records are reverted to StateUnreported
  775. // when the datastore is initialized at start up.
  776. var persistentStatStateUnreported = []byte("0")
  777. var persistentStatStateReporting = []byte("1")
  778. var persistentStatTypes = []string{
  779. datastorePersistentStatTypeRemoteServerList,
  780. }
  781. // StorePersistentStat adds a new persistent stat record, which
  782. // is set to StateUnreported and is an immediate candidate for
  783. // reporting.
  784. //
  785. // The stat is a JSON byte array containing fields as
  786. // required by the Psiphon server API. It's assumed that the
  787. // JSON value contains enough unique information for the value to
  788. // function as a key in the key/value datastore. This assumption
  789. // is currently satisfied by the fields sessionId + tunnelNumber
  790. // for tunnel stats, and URL + ETag for remote server list stats.
  791. func StorePersistentStat(statType string, stat []byte) error {
  792. if !common.Contains(persistentStatTypes, statType) {
  793. return common.ContextError(fmt.Errorf("invalid persistent stat type: %s", statType))
  794. }
  795. err := datastoreUpdate(func(tx *datastoreTx) error {
  796. bucket := tx.bucket([]byte(statType))
  797. err := bucket.put(stat, persistentStatStateUnreported)
  798. return err
  799. })
  800. if err != nil {
  801. return common.ContextError(err)
  802. }
  803. return nil
  804. }
  805. // CountUnreportedPersistentStats returns the number of persistent
  806. // stat records in StateUnreported.
  807. func CountUnreportedPersistentStats() int {
  808. unreported := 0
  809. err := datastoreView(func(tx *datastoreTx) error {
  810. for _, statType := range persistentStatTypes {
  811. bucket := tx.bucket([]byte(statType))
  812. cursor := bucket.cursor()
  813. for key, value := cursor.first(); key != nil; key, value = cursor.next() {
  814. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  815. unreported++
  816. break
  817. }
  818. }
  819. cursor.close()
  820. }
  821. return nil
  822. })
  823. if err != nil {
  824. NoticeAlert("CountUnreportedPersistentStats failed: %s", err)
  825. return 0
  826. }
  827. return unreported
  828. }
  829. // TakeOutUnreportedPersistentStats returns up to maxCount persistent
  830. // stats records that are in StateUnreported. The records are set to
  831. // StateReporting. If the records are successfully reported, clear them
  832. // with ClearReportedPersistentStats. If the records are not successfully
  833. // reported, restore them with PutBackUnreportedPersistentStats.
  834. func TakeOutUnreportedPersistentStats(maxCount int) (map[string][][]byte, error) {
  835. stats := make(map[string][][]byte)
  836. err := datastoreUpdate(func(tx *datastoreTx) error {
  837. count := 0
  838. for _, statType := range persistentStatTypes {
  839. bucket := tx.bucket([]byte(statType))
  840. cursor := bucket.cursor()
  841. for key, value := cursor.first(); key != nil; key, value = cursor.next() {
  842. if count >= maxCount {
  843. break
  844. }
  845. // Perform a test JSON unmarshaling. In case of data corruption or a bug,
  846. // skip the record.
  847. var jsonData interface{}
  848. err := json.Unmarshal(key, &jsonData)
  849. if err != nil {
  850. NoticeAlert(
  851. "Invalid key in TakeOutUnreportedPersistentStats: %s: %s",
  852. string(key), err)
  853. continue
  854. }
  855. if 0 == bytes.Compare(value, persistentStatStateUnreported) {
  856. // Must make a copy as slice is only valid within transaction.
  857. data := make([]byte, len(key))
  858. copy(data, key)
  859. if stats[statType] == nil {
  860. stats[statType] = make([][]byte, 0)
  861. }
  862. stats[statType] = append(stats[statType], data)
  863. count += 1
  864. }
  865. }
  866. cursor.close()
  867. for _, key := range stats[statType] {
  868. err := bucket.put(key, persistentStatStateReporting)
  869. if err != nil {
  870. return err
  871. }
  872. }
  873. }
  874. return nil
  875. })
  876. if err != nil {
  877. return nil, common.ContextError(err)
  878. }
  879. return stats, nil
  880. }
  881. // PutBackUnreportedPersistentStats restores a list of persistent
  882. // stat records to StateUnreported.
  883. func PutBackUnreportedPersistentStats(stats map[string][][]byte) error {
  884. err := datastoreUpdate(func(tx *datastoreTx) error {
  885. for _, statType := range persistentStatTypes {
  886. bucket := tx.bucket([]byte(statType))
  887. for _, key := range stats[statType] {
  888. err := bucket.put(key, persistentStatStateUnreported)
  889. if err != nil {
  890. return err
  891. }
  892. }
  893. }
  894. return nil
  895. })
  896. if err != nil {
  897. return common.ContextError(err)
  898. }
  899. return nil
  900. }
  901. // ClearReportedPersistentStats deletes a list of persistent
  902. // stat records that were successfully reported.
  903. func ClearReportedPersistentStats(stats map[string][][]byte) error {
  904. err := datastoreUpdate(func(tx *datastoreTx) error {
  905. for _, statType := range persistentStatTypes {
  906. bucket := tx.bucket([]byte(statType))
  907. for _, key := range stats[statType] {
  908. err := bucket.delete(key)
  909. if err != nil {
  910. return err
  911. }
  912. }
  913. }
  914. return nil
  915. })
  916. if err != nil {
  917. return common.ContextError(err)
  918. }
  919. return nil
  920. }
  921. // resetAllPersistentStatsToUnreported sets all persistent stat
  922. // records to StateUnreported. This reset is called when the
  923. // datastore is initialized at start up, as we do not know if
  924. // persistent records in StateReporting were reported or not.
  925. func resetAllPersistentStatsToUnreported() error {
  926. err := datastoreUpdate(func(tx *datastoreTx) error {
  927. for _, statType := range persistentStatTypes {
  928. bucket := tx.bucket([]byte(statType))
  929. resetKeys := make([][]byte, 0)
  930. cursor := bucket.cursor()
  931. for key := cursor.firstKey(); key != nil; key = cursor.nextKey() {
  932. resetKeys = append(resetKeys, key)
  933. }
  934. cursor.close()
  935. // TODO: data mutation is done outside cursor. Is this
  936. // strictly necessary in this case? As is, this means
  937. // all stats need to be loaded into memory at once.
  938. // https://godoc.org/github.com/boltdb/bolt#Cursor
  939. for _, key := range resetKeys {
  940. err := bucket.put(key, persistentStatStateUnreported)
  941. if err != nil {
  942. return err
  943. }
  944. }
  945. }
  946. return nil
  947. })
  948. if err != nil {
  949. return common.ContextError(err)
  950. }
  951. return nil
  952. }
  953. // CountSLOKs returns the total number of SLOK records.
  954. func CountSLOKs() int {
  955. count := 0
  956. err := datastoreView(func(tx *datastoreTx) error {
  957. bucket := tx.bucket(datastoreSLOKsBucket)
  958. cursor := bucket.cursor()
  959. for key := cursor.firstKey(); key != nil; key = cursor.nextKey() {
  960. count++
  961. }
  962. cursor.close()
  963. return nil
  964. })
  965. if err != nil {
  966. NoticeAlert("CountSLOKs failed: %s", err)
  967. return 0
  968. }
  969. return count
  970. }
  971. // DeleteSLOKs deletes all SLOK records.
  972. func DeleteSLOKs() error {
  973. err := datastoreUpdate(func(tx *datastoreTx) error {
  974. return tx.clearBucket(datastoreSLOKsBucket)
  975. })
  976. if err != nil {
  977. return common.ContextError(err)
  978. }
  979. return nil
  980. }
  981. // SetSLOK stores a SLOK key, referenced by its ID. The bool
  982. // return value indicates whether the SLOK was already stored.
  983. func SetSLOK(id, key []byte) (bool, error) {
  984. var duplicate bool
  985. err := datastoreUpdate(func(tx *datastoreTx) error {
  986. bucket := tx.bucket(datastoreSLOKsBucket)
  987. duplicate = bucket.get(id) != nil
  988. err := bucket.put([]byte(id), []byte(key))
  989. return err
  990. })
  991. if err != nil {
  992. return false, common.ContextError(err)
  993. }
  994. return duplicate, nil
  995. }
  996. // GetSLOK returns a SLOK key for the specified ID. The return
  997. // value is nil if the SLOK is not found.
  998. func GetSLOK(id []byte) ([]byte, error) {
  999. var key []byte
  1000. err := datastoreView(func(tx *datastoreTx) error {
  1001. bucket := tx.bucket(datastoreSLOKsBucket)
  1002. key = bucket.get(id)
  1003. return nil
  1004. })
  1005. if err != nil {
  1006. return nil, common.ContextError(err)
  1007. }
  1008. return key, nil
  1009. }
  1010. func makeDialParametersKey(serverIPAddress, networkID []byte) []byte {
  1011. // TODO: structured key?
  1012. return append(append([]byte(nil), serverIPAddress...), networkID...)
  1013. }
  1014. // SetDialParameters stores dial parameters associated with the specified
  1015. // server/network ID.
  1016. func SetDialParameters(serverIPAddress, networkID string, dialParams *DialParameters) error {
  1017. key := makeDialParametersKey([]byte(serverIPAddress), []byte(networkID))
  1018. data, err := json.Marshal(dialParams)
  1019. if err != nil {
  1020. return common.ContextError(err)
  1021. }
  1022. return setBucketValue(datastoreDialParametersBucket, key, data)
  1023. }
  1024. // GetDialParameters fetches any dial parameters associated with the specified
  1025. // server/network ID. Returns nil, nil when no record is found.
  1026. func GetDialParameters(serverIPAddress, networkID string) (*DialParameters, error) {
  1027. key := makeDialParametersKey([]byte(serverIPAddress), []byte(networkID))
  1028. data, err := getBucketValue(datastoreDialParametersBucket, key)
  1029. if err != nil {
  1030. return nil, common.ContextError(err)
  1031. }
  1032. if data == nil {
  1033. return nil, nil
  1034. }
  1035. var dialParams *DialParameters
  1036. err = json.Unmarshal(data, &dialParams)
  1037. if err != nil {
  1038. return nil, common.ContextError(err)
  1039. }
  1040. return dialParams, nil
  1041. }
  1042. // DeleteDialParameters clears any dial parameters associated with the
  1043. // specified server/network ID.
  1044. func DeleteDialParameters(serverIPAddress, networkID string) error {
  1045. key := makeDialParametersKey([]byte(serverIPAddress), []byte(networkID))
  1046. return deleteBucketValue(datastoreDialParametersBucket, key)
  1047. }
  1048. // TacticsStorer implements tactics.Storer.
  1049. type TacticsStorer struct {
  1050. }
  1051. func (t *TacticsStorer) SetTacticsRecord(networkID string, record []byte) error {
  1052. return setBucketValue(datastoreTacticsBucket, []byte(networkID), record)
  1053. }
  1054. func (t *TacticsStorer) GetTacticsRecord(networkID string) ([]byte, error) {
  1055. return getBucketValue(datastoreTacticsBucket, []byte(networkID))
  1056. }
  1057. func (t *TacticsStorer) SetSpeedTestSamplesRecord(networkID string, record []byte) error {
  1058. return setBucketValue(datastoreSpeedTestSamplesBucket, []byte(networkID), record)
  1059. }
  1060. func (t *TacticsStorer) GetSpeedTestSamplesRecord(networkID string) ([]byte, error) {
  1061. return getBucketValue(datastoreSpeedTestSamplesBucket, []byte(networkID))
  1062. }
  1063. // GetTacticsStorer creates a TacticsStorer.
  1064. func GetTacticsStorer() *TacticsStorer {
  1065. return &TacticsStorer{}
  1066. }
  1067. func setBucketValue(bucket, key, value []byte) error {
  1068. err := datastoreUpdate(func(tx *datastoreTx) error {
  1069. bucket := tx.bucket(bucket)
  1070. return bucket.put(key, value)
  1071. })
  1072. if err != nil {
  1073. return common.ContextError(err)
  1074. }
  1075. return nil
  1076. }
  1077. func getBucketValue(bucket, key []byte) ([]byte, error) {
  1078. var value []byte
  1079. err := datastoreView(func(tx *datastoreTx) error {
  1080. bucket := tx.bucket(bucket)
  1081. value = bucket.get(key)
  1082. return nil
  1083. })
  1084. if err != nil {
  1085. return nil, common.ContextError(err)
  1086. }
  1087. return value, nil
  1088. }
  1089. func deleteBucketValue(bucket, key []byte) error {
  1090. err := datastoreUpdate(func(tx *datastoreTx) error {
  1091. bucket := tx.bucket(bucket)
  1092. return bucket.delete(key)
  1093. })
  1094. if err != nil {
  1095. return common.ContextError(err)
  1096. }
  1097. return nil
  1098. }