dataStore.go 61 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "context"
  23. "encoding/json"
  24. "io"
  25. "math"
  26. "os"
  27. "strings"
  28. "sync"
  29. "time"
  30. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  31. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  32. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  33. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  34. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  35. )
  36. var (
  37. datastoreServerEntriesBucket = []byte("serverEntries")
  38. datastoreServerEntryTagsBucket = []byte("serverEntryTags")
  39. datastoreServerEntryTombstoneTagsBucket = []byte("serverEntryTombstoneTags")
  40. datastoreSplitTunnelRouteETagsBucket = []byte("splitTunnelRouteETags")
  41. datastoreSplitTunnelRouteDataBucket = []byte("splitTunnelRouteData")
  42. datastoreUrlETagsBucket = []byte("urlETags")
  43. datastoreKeyValueBucket = []byte("keyValues")
  44. datastoreRemoteServerListStatsBucket = []byte("remoteServerListStats")
  45. datastoreFailedTunnelStatsBucket = []byte("failedTunnelStats")
  46. datastoreSLOKsBucket = []byte("SLOKs")
  47. datastoreTacticsBucket = []byte("tactics")
  48. datastoreSpeedTestSamplesBucket = []byte("speedTestSamples")
  49. datastoreDialParametersBucket = []byte("dialParameters")
  50. datastoreLastConnectedKey = "lastConnected"
  51. datastoreLastServerEntryFilterKey = []byte("lastServerEntryFilter")
  52. datastoreAffinityServerEntryIDKey = []byte("affinityServerEntryID")
  53. datastorePersistentStatTypeRemoteServerList = string(datastoreRemoteServerListStatsBucket)
  54. datastorePersistentStatTypeFailedTunnel = string(datastoreFailedTunnelStatsBucket)
  55. datastoreServerEntryFetchGCThreshold = 10
  56. datastoreReferenceCountMutex sync.RWMutex
  57. datastoreReferenceCount int64
  58. datastoreMutex sync.RWMutex
  59. activeDatastoreDB *datastoreDB
  60. )
  61. // OpenDataStore opens and initializes the singleton datastore instance.
  62. //
  63. // Nested Open/CloseDataStore calls are supported: OpenDataStore will succeed
  64. // when called when the datastore is initialized. Every call to OpenDataStore
  65. // must be paired with a corresponding call to CloseDataStore to ensure the
  66. // datastore is closed.
  67. func OpenDataStore(config *Config) error {
  68. return openDataStore(config, true)
  69. }
  70. // OpenDataStoreWithoutReset performs an OpenDataStore but does not retry or
  71. // reset the datastore file in case of failures. Use OpenDataStoreWithoutReset
  72. // when the datastore may be locked by another process.
  73. func OpenDataStoreWithoutReset(config *Config) error {
  74. return openDataStore(config, false)
  75. }
  76. func openDataStore(config *Config, retryAndReset bool) error {
  77. // The datastoreReferenceCountMutex/datastoreMutex mutex pair allow for:
  78. //
  79. // _Nested_ OpenDataStore/CloseDataStore calls to not block when a
  80. // datastoreView is in progress (for example, a GetDialParameters call while
  81. // a slow ScanServerEntries is running). In this case the nested
  82. // OpenDataStore/CloseDataStore calls will lock only
  83. // datastoreReferenceCountMutex and not datastoreMutex.
  84. //
  85. // Synchronized access, for OpenDataStore/CloseDataStore, to
  86. // activeDatastoreDB based on a consistent view of datastoreReferenceCount
  87. // via locking first datastoreReferenceCount and then datastoreMutex while
  88. // holding datastoreReferenceCount.
  89. //
  90. // Concurrent access, for datastoreView/datastoreUpdate, to activeDatastoreDB
  91. // via datastoreMutex read locks.
  92. //
  93. // Exclusive access, for OpenDataStore/CloseDataStore, to activeDatastoreDB,
  94. // with no running datastoreView/datastoreUpdate, by aquiring a
  95. // datastoreMutex write lock.
  96. datastoreReferenceCountMutex.Lock()
  97. if datastoreReferenceCount < 0 || datastoreReferenceCount == math.MaxInt64 {
  98. datastoreReferenceCountMutex.Unlock()
  99. return errors.Tracef(
  100. "invalid datastore reference count: %d", datastoreReferenceCount)
  101. }
  102. if datastoreReferenceCount > 0 {
  103. // For this sanity check, we need only the read-only lock; and must use the
  104. // read-only lock to allow concurrent datastoreView calls.
  105. datastoreMutex.RLock()
  106. isNil := activeDatastoreDB == nil
  107. datastoreMutex.RUnlock()
  108. if isNil {
  109. return errors.TraceNew("datastore unexpectedly closed")
  110. }
  111. // Add a reference to the open datastore.
  112. datastoreReferenceCount += 1
  113. datastoreReferenceCountMutex.Unlock()
  114. return nil
  115. }
  116. // Only lock datastoreMutex now that it's necessary.
  117. // datastoreReferenceCountMutex remains locked.
  118. datastoreMutex.Lock()
  119. if activeDatastoreDB != nil {
  120. datastoreMutex.Unlock()
  121. datastoreReferenceCountMutex.Unlock()
  122. return errors.TraceNew("datastore unexpectedly open")
  123. }
  124. // datastoreReferenceCount is 0, so open the datastore.
  125. newDB, err := datastoreOpenDB(
  126. config.GetDataStoreDirectory(), retryAndReset)
  127. if err != nil {
  128. datastoreMutex.Unlock()
  129. datastoreReferenceCountMutex.Unlock()
  130. return errors.Trace(err)
  131. }
  132. datastoreReferenceCount = 1
  133. activeDatastoreDB = newDB
  134. datastoreMutex.Unlock()
  135. datastoreReferenceCountMutex.Unlock()
  136. _ = resetAllPersistentStatsToUnreported()
  137. return nil
  138. }
  139. // CloseDataStore closes the singleton datastore instance, if open.
  140. func CloseDataStore() {
  141. datastoreReferenceCountMutex.Lock()
  142. defer datastoreReferenceCountMutex.Unlock()
  143. if datastoreReferenceCount <= 0 {
  144. NoticeWarning(
  145. "invalid datastore reference count: %d", datastoreReferenceCount)
  146. return
  147. }
  148. datastoreReferenceCount -= 1
  149. if datastoreReferenceCount > 0 {
  150. return
  151. }
  152. // Only lock datastoreMutex now that it's necessary.
  153. // datastoreReferenceCountMutex remains locked.
  154. datastoreMutex.Lock()
  155. defer datastoreMutex.Unlock()
  156. if activeDatastoreDB == nil {
  157. return
  158. }
  159. err := activeDatastoreDB.close()
  160. if err != nil {
  161. NoticeWarning("failed to close datastore: %s", errors.Trace(err))
  162. }
  163. activeDatastoreDB = nil
  164. }
  165. // datastoreView runs a read-only transaction, making datastore buckets and
  166. // values available to the supplied function.
  167. //
  168. // Bucket value slices are only valid for the duration of the transaction and
  169. // _must_ not be referenced directly outside the transaction.
  170. func datastoreView(fn func(tx *datastoreTx) error) error {
  171. datastoreMutex.RLock()
  172. defer datastoreMutex.RUnlock()
  173. if activeDatastoreDB == nil {
  174. return errors.TraceNew("datastore not open")
  175. }
  176. err := activeDatastoreDB.view(fn)
  177. if err != nil {
  178. err = errors.Trace(err)
  179. }
  180. return err
  181. }
  182. // datastoreUpdate runs a read-write transaction, making datastore buckets and
  183. // values available to the supplied function.
  184. //
  185. // Bucket value slices are only valid for the duration of the transaction and
  186. // _must_ not be referenced directly outside the transaction.
  187. func datastoreUpdate(fn func(tx *datastoreTx) error) error {
  188. datastoreMutex.RLock()
  189. defer datastoreMutex.RUnlock()
  190. if activeDatastoreDB == nil {
  191. return errors.TraceNew("database not open")
  192. }
  193. err := activeDatastoreDB.update(fn)
  194. if err != nil {
  195. err = errors.Trace(err)
  196. }
  197. return err
  198. }
  199. // StoreServerEntry adds the server entry to the datastore.
  200. //
  201. // When a server entry already exists for a given server, it will be
  202. // replaced only if replaceIfExists is set or if the the ConfigurationVersion
  203. // field of the new entry is strictly higher than the existing entry.
  204. //
  205. // If the server entry data is malformed, an alert notice is issued and
  206. // the entry is skipped; no error is returned.
  207. func StoreServerEntry(serverEntryFields protocol.ServerEntryFields, replaceIfExists bool) error {
  208. // TODO: call serverEntryFields.VerifySignature. At this time, we do not do
  209. // this as not all server entries have an individual signature field. All
  210. // StoreServerEntry callers either call VerifySignature or obtain server
  211. // entries from a trusted source (embedded in a signed client, or in a signed
  212. // authenticated package).
  213. // Server entries should already be validated before this point,
  214. // so instead of skipping we fail with an error.
  215. err := protocol.ValidateServerEntryFields(serverEntryFields)
  216. if err != nil {
  217. return errors.Tracef("invalid server entry: %s", err)
  218. }
  219. // BoltDB implementation note:
  220. // For simplicity, we don't maintain indexes on server entry
  221. // region or supported protocols. Instead, we perform full-bucket
  222. // scans with a filter. With a small enough database (thousands or
  223. // even tens of thousand of server entries) and common enough
  224. // values (e.g., many servers support all protocols), performance
  225. // is expected to be acceptable.
  226. err = datastoreUpdate(func(tx *datastoreTx) error {
  227. serverEntries := tx.bucket(datastoreServerEntriesBucket)
  228. serverEntryTags := tx.bucket(datastoreServerEntryTagsBucket)
  229. serverEntryTombstoneTags := tx.bucket(datastoreServerEntryTombstoneTagsBucket)
  230. serverEntryID := []byte(serverEntryFields.GetIPAddress())
  231. // Check not only that the entry exists, but is valid. This
  232. // will replace in the rare case where the data is corrupt.
  233. existingConfigurationVersion := -1
  234. existingData := serverEntries.get(serverEntryID)
  235. if existingData != nil {
  236. var existingServerEntry *protocol.ServerEntry
  237. err := json.Unmarshal(existingData, &existingServerEntry)
  238. if err == nil {
  239. existingConfigurationVersion = existingServerEntry.ConfigurationVersion
  240. }
  241. }
  242. exists := existingConfigurationVersion > -1
  243. newer := exists && existingConfigurationVersion < serverEntryFields.GetConfigurationVersion()
  244. update := !exists || replaceIfExists || newer
  245. if !update {
  246. return nil
  247. }
  248. serverEntryTag := serverEntryFields.GetTag()
  249. // Generate a derived tag when the server entry has no tag.
  250. if serverEntryTag == "" {
  251. serverEntryTag = protocol.GenerateServerEntryTag(
  252. serverEntryFields.GetIPAddress(),
  253. serverEntryFields.GetWebServerSecret())
  254. serverEntryFields.SetTag(serverEntryTag)
  255. }
  256. serverEntryTagBytes := []byte(serverEntryTag)
  257. // Ignore the server entry if it was previously pruned and a tombstone is
  258. // set.
  259. //
  260. // This logic is enforced only for embedded server entries, as all other
  261. // sources are considered to be definitive and non-stale. These exceptions
  262. // intentionally allow the scenario where a server is temporarily deleted
  263. // and then restored; in this case, it's desired for pruned server entries
  264. // to be restored.
  265. if serverEntryFields.GetLocalSource() == protocol.SERVER_ENTRY_SOURCE_EMBEDDED {
  266. if serverEntryTombstoneTags.get(serverEntryTagBytes) != nil {
  267. return nil
  268. }
  269. }
  270. data, err := json.Marshal(serverEntryFields)
  271. if err != nil {
  272. return errors.Trace(err)
  273. }
  274. err = serverEntries.put(serverEntryID, data)
  275. if err != nil {
  276. return errors.Trace(err)
  277. }
  278. err = serverEntryTags.put(serverEntryTagBytes, serverEntryID)
  279. if err != nil {
  280. return errors.Trace(err)
  281. }
  282. NoticeInfo("updated server %s", serverEntryFields.GetDiagnosticID())
  283. return nil
  284. })
  285. if err != nil {
  286. return errors.Trace(err)
  287. }
  288. return nil
  289. }
  290. // StoreServerEntries stores a list of server entries.
  291. // There is an independent transaction for each entry insert/update.
  292. func StoreServerEntries(
  293. config *Config,
  294. serverEntries []protocol.ServerEntryFields,
  295. replaceIfExists bool) error {
  296. for _, serverEntryFields := range serverEntries {
  297. err := StoreServerEntry(serverEntryFields, replaceIfExists)
  298. if err != nil {
  299. return errors.Trace(err)
  300. }
  301. }
  302. return nil
  303. }
  304. // StreamingStoreServerEntries stores a list of server entries. There is an
  305. // independent transaction for each entry insert/update.
  306. // StreamingStoreServerEntries stops early and returns an error if ctx becomes
  307. // done; any server entries stored up to that point are retained.
  308. func StreamingStoreServerEntries(
  309. ctx context.Context,
  310. config *Config,
  311. serverEntries *protocol.StreamingServerEntryDecoder,
  312. replaceIfExists bool) error {
  313. // Note: both StreamingServerEntryDecoder.Next and StoreServerEntry
  314. // allocate temporary memory buffers for hex/JSON decoding/encoding,
  315. // so this isn't true constant-memory streaming (it depends on garbage
  316. // collection).
  317. n := 0
  318. for {
  319. select {
  320. case <-ctx.Done():
  321. return errors.Trace(ctx.Err())
  322. default:
  323. }
  324. serverEntry, err := serverEntries.Next()
  325. if err != nil {
  326. return errors.Trace(err)
  327. }
  328. if serverEntry == nil {
  329. // No more server entries
  330. return nil
  331. }
  332. err = StoreServerEntry(serverEntry, replaceIfExists)
  333. if err != nil {
  334. return errors.Trace(err)
  335. }
  336. n += 1
  337. if n == datastoreServerEntryFetchGCThreshold {
  338. DoGarbageCollection()
  339. n = 0
  340. }
  341. }
  342. return nil
  343. }
  344. // ImportEmbeddedServerEntries loads, decodes, and stores a list of server
  345. // entries. If embeddedServerEntryListFilename is not empty,
  346. // embeddedServerEntryList will be ignored and the encoded server entry list
  347. // will be loaded from the specified file. The import process stops early if
  348. // ctx becomes done; any server entries imported up to that point are
  349. // retained.
  350. func ImportEmbeddedServerEntries(
  351. ctx context.Context,
  352. config *Config,
  353. embeddedServerEntryListFilename string,
  354. embeddedServerEntryList string) error {
  355. var reader io.Reader
  356. if embeddedServerEntryListFilename != "" {
  357. file, err := os.Open(embeddedServerEntryListFilename)
  358. if err != nil {
  359. return errors.Trace(err)
  360. }
  361. defer file.Close()
  362. reader = file
  363. } else {
  364. reader = strings.NewReader(embeddedServerEntryList)
  365. }
  366. err := StreamingStoreServerEntries(
  367. ctx,
  368. config,
  369. protocol.NewStreamingServerEntryDecoder(
  370. reader,
  371. common.TruncateTimestampToHour(common.GetCurrentTimestamp()),
  372. protocol.SERVER_ENTRY_SOURCE_EMBEDDED),
  373. false)
  374. if err != nil {
  375. return errors.Trace(err)
  376. }
  377. return nil
  378. }
  379. // PromoteServerEntry sets the server affinity server entry ID to the
  380. // specified server entry IP address.
  381. func PromoteServerEntry(config *Config, ipAddress string) error {
  382. err := datastoreUpdate(func(tx *datastoreTx) error {
  383. serverEntryID := []byte(ipAddress)
  384. // Ensure the corresponding server entry exists before
  385. // setting server affinity.
  386. bucket := tx.bucket(datastoreServerEntriesBucket)
  387. data := bucket.get(serverEntryID)
  388. if data == nil {
  389. NoticeWarning(
  390. "PromoteServerEntry: ignoring unknown server entry: %s",
  391. ipAddress)
  392. return nil
  393. }
  394. bucket = tx.bucket(datastoreKeyValueBucket)
  395. err := bucket.put(datastoreAffinityServerEntryIDKey, serverEntryID)
  396. if err != nil {
  397. return errors.Trace(err)
  398. }
  399. // Store the current server entry filter (e.g, region, etc.) that
  400. // was in use when the entry was promoted. This is used to detect
  401. // when the top ranked server entry was promoted under a different
  402. // filter.
  403. currentFilter, err := makeServerEntryFilterValue(config)
  404. if err != nil {
  405. return errors.Trace(err)
  406. }
  407. err = bucket.put(datastoreLastServerEntryFilterKey, currentFilter)
  408. if err != nil {
  409. return errors.Trace(err)
  410. }
  411. return nil
  412. })
  413. if err != nil {
  414. return errors.Trace(err)
  415. }
  416. return nil
  417. }
  418. // DeleteServerEntryAffinity clears server affinity if set to the specified
  419. // server.
  420. func DeleteServerEntryAffinity(ipAddress string) error {
  421. err := datastoreUpdate(func(tx *datastoreTx) error {
  422. serverEntryID := []byte(ipAddress)
  423. bucket := tx.bucket(datastoreKeyValueBucket)
  424. affinityServerEntryID := bucket.get(datastoreAffinityServerEntryIDKey)
  425. if bytes.Equal(affinityServerEntryID, serverEntryID) {
  426. err := bucket.delete(datastoreAffinityServerEntryIDKey)
  427. if err != nil {
  428. return errors.Trace(err)
  429. }
  430. err = bucket.delete(datastoreLastServerEntryFilterKey)
  431. if err != nil {
  432. return errors.Trace(err)
  433. }
  434. }
  435. return nil
  436. })
  437. if err != nil {
  438. return errors.Trace(err)
  439. }
  440. return nil
  441. }
  442. func makeServerEntryFilterValue(config *Config) ([]byte, error) {
  443. // Currently, only a change of EgressRegion will "break" server affinity.
  444. // If the tunnel protocol filter changes, any existing affinity server
  445. // either passes the new filter, or it will be skipped anyway.
  446. return []byte(config.EgressRegion), nil
  447. }
  448. func hasServerEntryFilterChanged(config *Config) (bool, error) {
  449. currentFilter, err := makeServerEntryFilterValue(config)
  450. if err != nil {
  451. return false, errors.Trace(err)
  452. }
  453. changed := false
  454. err = datastoreView(func(tx *datastoreTx) error {
  455. bucket := tx.bucket(datastoreKeyValueBucket)
  456. previousFilter := bucket.get(datastoreLastServerEntryFilterKey)
  457. // When not found, previousFilter will be nil; ensures this
  458. // results in "changed", even if currentFilter is len(0).
  459. if previousFilter == nil ||
  460. !bytes.Equal(previousFilter, currentFilter) {
  461. changed = true
  462. }
  463. return nil
  464. })
  465. if err != nil {
  466. return false, errors.Trace(err)
  467. }
  468. return changed, nil
  469. }
  470. // ServerEntryIterator is used to iterate over
  471. // stored server entries in rank order.
  472. type ServerEntryIterator struct {
  473. config *Config
  474. applyServerAffinity bool
  475. serverEntryIDs [][]byte
  476. serverEntryIndex int
  477. isTacticsServerEntryIterator bool
  478. isTargetServerEntryIterator bool
  479. hasNextTargetServerEntry bool
  480. targetServerEntry *protocol.ServerEntry
  481. }
  482. // NewServerEntryIterator creates a new ServerEntryIterator.
  483. //
  484. // The boolean return value indicates whether to treat the first server(s)
  485. // as affinity servers or not. When the server entry selection filter changes
  486. // such as from a specific region to any region, or when there was no previous
  487. // filter/iterator, the the first server(s) are arbitrary and should not be
  488. // given affinity treatment.
  489. //
  490. // NewServerEntryIterator and any returned ServerEntryIterator are not
  491. // designed for concurrent use as not all related datastore operations are
  492. // performed in a single transaction.
  493. //
  494. func NewServerEntryIterator(config *Config) (bool, *ServerEntryIterator, error) {
  495. // When configured, this target server entry is the only candidate
  496. if config.TargetServerEntry != "" {
  497. return newTargetServerEntryIterator(config, false)
  498. }
  499. filterChanged, err := hasServerEntryFilterChanged(config)
  500. if err != nil {
  501. return false, nil, errors.Trace(err)
  502. }
  503. applyServerAffinity := !filterChanged
  504. iterator := &ServerEntryIterator{
  505. config: config,
  506. applyServerAffinity: applyServerAffinity,
  507. }
  508. err = iterator.reset(true)
  509. if err != nil {
  510. return false, nil, errors.Trace(err)
  511. }
  512. return applyServerAffinity, iterator, nil
  513. }
  514. func NewTacticsServerEntryIterator(config *Config) (*ServerEntryIterator, error) {
  515. // When configured, this target server entry is the only candidate
  516. if config.TargetServerEntry != "" {
  517. _, iterator, err := newTargetServerEntryIterator(config, true)
  518. return iterator, err
  519. }
  520. iterator := &ServerEntryIterator{
  521. config: config,
  522. isTacticsServerEntryIterator: true,
  523. }
  524. err := iterator.reset(true)
  525. if err != nil {
  526. return nil, errors.Trace(err)
  527. }
  528. return iterator, nil
  529. }
  530. // newTargetServerEntryIterator is a helper for initializing the TargetServerEntry case
  531. func newTargetServerEntryIterator(config *Config, isTactics bool) (bool, *ServerEntryIterator, error) {
  532. serverEntry, err := protocol.DecodeServerEntry(
  533. config.TargetServerEntry, config.loadTimestamp, protocol.SERVER_ENTRY_SOURCE_TARGET)
  534. if err != nil {
  535. return false, nil, errors.Trace(err)
  536. }
  537. if serverEntry.Tag == "" {
  538. serverEntry.Tag = protocol.GenerateServerEntryTag(
  539. serverEntry.IpAddress, serverEntry.WebServerSecret)
  540. }
  541. if isTactics {
  542. if len(serverEntry.GetSupportedTacticsProtocols()) == 0 {
  543. return false, nil, errors.TraceNew("TargetServerEntry does not support tactics protocols")
  544. }
  545. } else {
  546. if config.EgressRegion != "" && serverEntry.Region != config.EgressRegion {
  547. return false, nil, errors.TraceNew("TargetServerEntry does not support EgressRegion")
  548. }
  549. limitTunnelProtocols := config.GetParameters().Get().TunnelProtocols(parameters.LimitTunnelProtocols)
  550. if len(limitTunnelProtocols) > 0 {
  551. // At the ServerEntryIterator level, only limitTunnelProtocols is applied;
  552. // excludeIntensive is handled higher up.
  553. if len(serverEntry.GetSupportedProtocols(
  554. conditionallyEnabledComponents{},
  555. config.UseUpstreamProxy(),
  556. limitTunnelProtocols,
  557. false)) == 0 {
  558. return false, nil, errors.Tracef(
  559. "TargetServerEntry does not support LimitTunnelProtocols: %v", limitTunnelProtocols)
  560. }
  561. }
  562. }
  563. iterator := &ServerEntryIterator{
  564. isTacticsServerEntryIterator: isTactics,
  565. isTargetServerEntryIterator: true,
  566. hasNextTargetServerEntry: true,
  567. targetServerEntry: serverEntry,
  568. }
  569. NoticeInfo("using TargetServerEntry: %s", serverEntry.GetDiagnosticID())
  570. return false, iterator, nil
  571. }
  572. // Reset a NewServerEntryIterator to the start of its cycle. The next
  573. // call to Next will return the first server entry.
  574. func (iterator *ServerEntryIterator) Reset() error {
  575. return iterator.reset(false)
  576. }
  577. func (iterator *ServerEntryIterator) reset(isInitialRound bool) error {
  578. iterator.Close()
  579. if iterator.isTargetServerEntryIterator {
  580. iterator.hasNextTargetServerEntry = true
  581. return nil
  582. }
  583. // Support stand-alone GetTactics operation. See TacticsStorer for more
  584. // details.
  585. if iterator.isTacticsServerEntryIterator {
  586. err := OpenDataStoreWithoutReset(iterator.config)
  587. if err != nil {
  588. return errors.Trace(err)
  589. }
  590. defer CloseDataStore()
  591. }
  592. // BoltDB implementation note:
  593. // We don't keep a transaction open for the duration of the iterator
  594. // because this would expose the following semantics to consumer code:
  595. //
  596. // Read-only transactions and read-write transactions ... generally
  597. // shouldn't be opened simultaneously in the same goroutine. This can
  598. // cause a deadlock as the read-write transaction needs to periodically
  599. // re-map the data file but it cannot do so while a read-only
  600. // transaction is open.
  601. // (https://github.com/boltdb/bolt)
  602. //
  603. // So the underlying serverEntriesBucket could change after the serverEntryIDs
  604. // list is built.
  605. var serverEntryIDs [][]byte
  606. err := datastoreView(func(tx *datastoreTx) error {
  607. bucket := tx.bucket(datastoreKeyValueBucket)
  608. serverEntryIDs = make([][]byte, 0)
  609. shuffleHead := 0
  610. var affinityServerEntryID []byte
  611. // In the first round only, move any server affinity candiate to the
  612. // very first position.
  613. if isInitialRound &&
  614. iterator.applyServerAffinity {
  615. affinityServerEntryID = bucket.get(datastoreAffinityServerEntryIDKey)
  616. if affinityServerEntryID != nil {
  617. serverEntryIDs = append(serverEntryIDs, append([]byte(nil), affinityServerEntryID...))
  618. shuffleHead = 1
  619. }
  620. }
  621. bucket = tx.bucket(datastoreServerEntriesBucket)
  622. cursor := bucket.cursor()
  623. for key := cursor.firstKey(); key != nil; key = cursor.nextKey() {
  624. if affinityServerEntryID != nil {
  625. if bytes.Equal(affinityServerEntryID, key) {
  626. continue
  627. }
  628. }
  629. serverEntryIDs = append(serverEntryIDs, append([]byte(nil), key...))
  630. }
  631. cursor.close()
  632. // Randomly shuffle the entire list of server IDs, excluding the
  633. // server affinity candidate.
  634. for i := len(serverEntryIDs) - 1; i > shuffleHead-1; i-- {
  635. j := prng.Intn(i+1-shuffleHead) + shuffleHead
  636. serverEntryIDs[i], serverEntryIDs[j] = serverEntryIDs[j], serverEntryIDs[i]
  637. }
  638. // In the first round, or with some probability, move _potential_ replay
  639. // candidates to the front of the list (excepting the server affinity slot,
  640. // if any). This move is post-shuffle so the order is still randomized. To
  641. // save the memory overhead of unmarshalling all dial parameters, this
  642. // operation just moves any server with a dial parameter record to the
  643. // front. Whether the dial parameter remains valid for replay -- TTL,
  644. // tactics/config unchanged, etc. --- is checked later.
  645. //
  646. // TODO: move only up to parameters.ReplayCandidateCount to front?
  647. p := iterator.config.GetParameters().Get()
  648. if (isInitialRound || p.WeightedCoinFlip(parameters.ReplayLaterRoundMoveToFrontProbability)) &&
  649. p.Int(parameters.ReplayCandidateCount) != 0 {
  650. networkID := []byte(iterator.config.GetNetworkID())
  651. dialParamsBucket := tx.bucket(datastoreDialParametersBucket)
  652. i := shuffleHead
  653. j := len(serverEntryIDs) - 1
  654. for {
  655. for ; i < j; i++ {
  656. key := makeDialParametersKey(serverEntryIDs[i], networkID)
  657. if dialParamsBucket.get(key) == nil {
  658. break
  659. }
  660. }
  661. for ; i < j; j-- {
  662. key := makeDialParametersKey(serverEntryIDs[j], networkID)
  663. if dialParamsBucket.get(key) != nil {
  664. break
  665. }
  666. }
  667. if i < j {
  668. serverEntryIDs[i], serverEntryIDs[j] = serverEntryIDs[j], serverEntryIDs[i]
  669. i++
  670. j--
  671. } else {
  672. break
  673. }
  674. }
  675. }
  676. return nil
  677. })
  678. if err != nil {
  679. return errors.Trace(err)
  680. }
  681. iterator.serverEntryIDs = serverEntryIDs
  682. iterator.serverEntryIndex = 0
  683. return nil
  684. }
  685. // Close cleans up resources associated with a ServerEntryIterator.
  686. func (iterator *ServerEntryIterator) Close() {
  687. iterator.serverEntryIDs = nil
  688. iterator.serverEntryIndex = 0
  689. }
  690. // Next returns the next server entry, by rank, for a ServerEntryIterator.
  691. // Returns nil with no error when there is no next item.
  692. func (iterator *ServerEntryIterator) Next() (*protocol.ServerEntry, error) {
  693. var serverEntry *protocol.ServerEntry
  694. var err error
  695. defer func() {
  696. if err != nil {
  697. iterator.Close()
  698. }
  699. }()
  700. if iterator.isTargetServerEntryIterator {
  701. if iterator.hasNextTargetServerEntry {
  702. iterator.hasNextTargetServerEntry = false
  703. return MakeCompatibleServerEntry(iterator.targetServerEntry), nil
  704. }
  705. return nil, nil
  706. }
  707. // Support stand-alone GetTactics operation. See TacticsStorer for more
  708. // details.
  709. if iterator.isTacticsServerEntryIterator {
  710. err := OpenDataStoreWithoutReset(iterator.config)
  711. if err != nil {
  712. return nil, errors.Trace(err)
  713. }
  714. defer CloseDataStore()
  715. }
  716. // There are no region/protocol indexes for the server entries bucket.
  717. // Loop until we have the next server entry that matches the iterator
  718. // filter requirements.
  719. for {
  720. if iterator.serverEntryIndex >= len(iterator.serverEntryIDs) {
  721. // There is no next item
  722. return nil, nil
  723. }
  724. serverEntryID := iterator.serverEntryIDs[iterator.serverEntryIndex]
  725. iterator.serverEntryIndex += 1
  726. serverEntry = nil
  727. doDeleteServerEntry := false
  728. err = datastoreView(func(tx *datastoreTx) error {
  729. serverEntries := tx.bucket(datastoreServerEntriesBucket)
  730. value := serverEntries.get(serverEntryID)
  731. if value == nil {
  732. return nil
  733. }
  734. // When the server entry has a signature and the signature verification
  735. // public key is configured, perform a signature verification, which will
  736. // detect data corruption of most server entry fields. When the check
  737. // fails, the server entry is deleted and skipped and iteration continues.
  738. //
  739. // This prevents wasteful, time-consuming dials in cases where the server
  740. // entry is intact except for a bit flip in the obfuscation key, for
  741. // example. A delete is triggered also in the case where the server entry
  742. // record fails to unmarshal.
  743. if iterator.config.ServerEntrySignaturePublicKey != "" {
  744. var serverEntryFields protocol.ServerEntryFields
  745. err = json.Unmarshal(value, &serverEntryFields)
  746. if err != nil {
  747. doDeleteServerEntry = true
  748. NoticeWarning(
  749. "ServerEntryIterator.Next: unmarshal failed: %s",
  750. errors.Trace(err))
  751. // Do not stop iterating.
  752. return nil
  753. }
  754. if serverEntryFields.HasSignature() {
  755. err = serverEntryFields.VerifySignature(
  756. iterator.config.ServerEntrySignaturePublicKey)
  757. if err != nil {
  758. doDeleteServerEntry = true
  759. NoticeWarning(
  760. "ServerEntryIterator.Next: verify signature failed: %s",
  761. errors.Trace(err))
  762. // Do not stop iterating.
  763. return nil
  764. }
  765. }
  766. }
  767. // Must unmarshal here as slice is only valid within transaction.
  768. err = json.Unmarshal(value, &serverEntry)
  769. if err != nil {
  770. serverEntry = nil
  771. doDeleteServerEntry = true
  772. NoticeWarning(
  773. "ServerEntryIterator.Next: unmarshal failed: %s",
  774. errors.Trace(err))
  775. // Do not stop iterating.
  776. return nil
  777. }
  778. return nil
  779. })
  780. if err != nil {
  781. return nil, errors.Trace(err)
  782. }
  783. if doDeleteServerEntry {
  784. deleteServerEntry(iterator.config, serverEntryID)
  785. continue
  786. }
  787. if serverEntry == nil {
  788. // In case of data corruption or a bug causing this condition,
  789. // do not stop iterating.
  790. NoticeWarning("ServerEntryIterator.Next: unexpected missing server entry")
  791. continue
  792. }
  793. // Generate a derived server entry tag for server entries with no tag. Store
  794. // back the updated server entry so that (a) the tag doesn't need to be
  795. // regenerated; (b) the server entry can be looked up by tag (currently used
  796. // in the status request prune case).
  797. //
  798. // This is a distinct transaction so as to avoid the overhead of regular
  799. // write transactions in the iterator; once tags have been stored back, most
  800. // iterator transactions will remain read-only.
  801. if serverEntry.Tag == "" {
  802. serverEntry.Tag = protocol.GenerateServerEntryTag(
  803. serverEntry.IpAddress, serverEntry.WebServerSecret)
  804. err = datastoreUpdate(func(tx *datastoreTx) error {
  805. serverEntries := tx.bucket(datastoreServerEntriesBucket)
  806. serverEntryTags := tx.bucket(datastoreServerEntryTagsBucket)
  807. // We must reload and store back the server entry _fields_ to preserve any
  808. // currently unrecognized fields, for future compatibility.
  809. value := serverEntries.get(serverEntryID)
  810. if value == nil {
  811. return nil
  812. }
  813. var serverEntryFields protocol.ServerEntryFields
  814. err := json.Unmarshal(value, &serverEntryFields)
  815. if err != nil {
  816. return errors.Trace(err)
  817. }
  818. // As there is minor race condition between loading/checking serverEntry
  819. // and reloading/modifying serverEntryFields, this transaction references
  820. // only the freshly loaded fields when checking and setting the tag.
  821. serverEntryTag := serverEntryFields.GetTag()
  822. if serverEntryTag != "" {
  823. return nil
  824. }
  825. serverEntryTag = protocol.GenerateServerEntryTag(
  826. serverEntryFields.GetIPAddress(),
  827. serverEntryFields.GetWebServerSecret())
  828. serverEntryFields.SetTag(serverEntryTag)
  829. jsonServerEntryFields, err := json.Marshal(serverEntryFields)
  830. if err != nil {
  831. return errors.Trace(err)
  832. }
  833. serverEntries.put(serverEntryID, jsonServerEntryFields)
  834. if err != nil {
  835. return errors.Trace(err)
  836. }
  837. serverEntryTags.put([]byte(serverEntryTag), serverEntryID)
  838. if err != nil {
  839. return errors.Trace(err)
  840. }
  841. return nil
  842. })
  843. if err != nil {
  844. // Do not stop.
  845. NoticeWarning(
  846. "ServerEntryIterator.Next: update server entry failed: %s",
  847. errors.Trace(err))
  848. }
  849. }
  850. if iterator.serverEntryIndex%datastoreServerEntryFetchGCThreshold == 0 {
  851. DoGarbageCollection()
  852. }
  853. // Check filter requirements
  854. if iterator.isTacticsServerEntryIterator {
  855. // Tactics doesn't filter by egress region.
  856. if len(serverEntry.GetSupportedTacticsProtocols()) > 0 {
  857. break
  858. }
  859. } else {
  860. if iterator.config.EgressRegion == "" ||
  861. serverEntry.Region == iterator.config.EgressRegion {
  862. break
  863. }
  864. }
  865. }
  866. return MakeCompatibleServerEntry(serverEntry), nil
  867. }
  868. // MakeCompatibleServerEntry provides backwards compatibility with old server entries
  869. // which have a single meekFrontingDomain and not a meekFrontingAddresses array.
  870. // By copying this one meekFrontingDomain into meekFrontingAddresses, this client effectively
  871. // uses that single value as legacy clients do.
  872. func MakeCompatibleServerEntry(serverEntry *protocol.ServerEntry) *protocol.ServerEntry {
  873. if len(serverEntry.MeekFrontingAddresses) == 0 && serverEntry.MeekFrontingDomain != "" {
  874. serverEntry.MeekFrontingAddresses =
  875. append(serverEntry.MeekFrontingAddresses, serverEntry.MeekFrontingDomain)
  876. }
  877. return serverEntry
  878. }
  879. // PruneServerEntry deletes the server entry, along with associated data,
  880. // corresponding to the specified server entry tag. Pruning is subject to an
  881. // age check. In the case of an error, a notice is emitted.
  882. func PruneServerEntry(config *Config, serverEntryTag string) {
  883. err := pruneServerEntry(config, serverEntryTag)
  884. if err != nil {
  885. NoticeWarning(
  886. "PruneServerEntry failed: %s: %s",
  887. serverEntryTag, errors.Trace(err))
  888. return
  889. }
  890. NoticePruneServerEntry(serverEntryTag)
  891. }
  892. func pruneServerEntry(config *Config, serverEntryTag string) error {
  893. minimumAgeForPruning := config.GetParameters().Get().Duration(
  894. parameters.ServerEntryMinimumAgeForPruning)
  895. return datastoreUpdate(func(tx *datastoreTx) error {
  896. serverEntries := tx.bucket(datastoreServerEntriesBucket)
  897. serverEntryTags := tx.bucket(datastoreServerEntryTagsBucket)
  898. serverEntryTombstoneTags := tx.bucket(datastoreServerEntryTombstoneTagsBucket)
  899. keyValues := tx.bucket(datastoreKeyValueBucket)
  900. dialParameters := tx.bucket(datastoreDialParametersBucket)
  901. serverEntryTagBytes := []byte(serverEntryTag)
  902. serverEntryID := serverEntryTags.get(serverEntryTagBytes)
  903. if serverEntryID == nil {
  904. return errors.TraceNew("server entry tag not found")
  905. }
  906. serverEntryJson := serverEntries.get(serverEntryID)
  907. if serverEntryJson == nil {
  908. return errors.TraceNew("server entry not found")
  909. }
  910. var serverEntry *protocol.ServerEntry
  911. err := json.Unmarshal(serverEntryJson, &serverEntry)
  912. if err != nil {
  913. errors.Trace(err)
  914. }
  915. // Only prune sufficiently old server entries. This mitigates the case where
  916. // stale data in psiphond will incorrectly identify brand new servers as
  917. // being invalid/deleted.
  918. serverEntryLocalTimestamp, err := time.Parse(time.RFC3339, serverEntry.LocalTimestamp)
  919. if err != nil {
  920. errors.Trace(err)
  921. }
  922. if serverEntryLocalTimestamp.Add(minimumAgeForPruning).After(time.Now()) {
  923. return nil
  924. }
  925. // Handle the server IP recycle case where multiple serverEntryTags records
  926. // refer to the same server IP. Only delete the server entry record when its
  927. // tag matches the pruned tag. Otherwise, the server entry record is
  928. // associated with another tag. The pruned tag is still deleted.
  929. doDeleteServerEntry := (serverEntry.Tag == serverEntryTag)
  930. err = serverEntryTags.delete(serverEntryTagBytes)
  931. if err != nil {
  932. errors.Trace(err)
  933. }
  934. if doDeleteServerEntry {
  935. err = deleteServerEntryHelper(
  936. config,
  937. serverEntryID,
  938. serverEntries,
  939. keyValues,
  940. dialParameters)
  941. if err != nil {
  942. errors.Trace(err)
  943. }
  944. }
  945. // Tombstones prevent reimporting pruned server entries. Tombstone
  946. // identifiers are tags, which are derived from the web server secret in
  947. // addition to the server IP, so tombstones will not clobber recycled server
  948. // IPs as long as new web server secrets are generated in the recycle case.
  949. //
  950. // Tombstones are set only for embedded server entries, as all other sources
  951. // are expected to provide valid server entries; this also provides a fail-
  952. // safe mechanism to restore pruned server entries through all non-embedded
  953. // sources.
  954. if serverEntry.LocalSource == protocol.SERVER_ENTRY_SOURCE_EMBEDDED {
  955. err = serverEntryTombstoneTags.put(serverEntryTagBytes, []byte{1})
  956. if err != nil {
  957. return errors.Trace(err)
  958. }
  959. }
  960. return nil
  961. })
  962. }
  963. // DeleteServerEntry deletes the specified server entry and associated data.
  964. func DeleteServerEntry(config *Config, ipAddress string) {
  965. serverEntryID := []byte(ipAddress)
  966. // For notices, we cannot assume we have a valid server entry tag value to
  967. // log, as DeleteServerEntry is called when a server entry fails to unmarshal
  968. // or fails signature verification.
  969. err := deleteServerEntry(config, serverEntryID)
  970. if err != nil {
  971. NoticeWarning("DeleteServerEntry failed: %s", errors.Trace(err))
  972. return
  973. }
  974. NoticeInfo("Server entry deleted")
  975. }
  976. func deleteServerEntry(config *Config, serverEntryID []byte) error {
  977. return datastoreUpdate(func(tx *datastoreTx) error {
  978. serverEntries := tx.bucket(datastoreServerEntriesBucket)
  979. serverEntryTags := tx.bucket(datastoreServerEntryTagsBucket)
  980. keyValues := tx.bucket(datastoreKeyValueBucket)
  981. dialParameters := tx.bucket(datastoreDialParametersBucket)
  982. err := deleteServerEntryHelper(
  983. config,
  984. serverEntryID,
  985. serverEntries,
  986. keyValues,
  987. dialParameters)
  988. if err != nil {
  989. errors.Trace(err)
  990. }
  991. // Remove any tags pointing to the deleted server entry.
  992. cursor := serverEntryTags.cursor()
  993. defer cursor.close()
  994. for key, value := cursor.first(); key != nil; key, value = cursor.next() {
  995. if bytes.Equal(value, serverEntryID) {
  996. err := serverEntryTags.delete(key)
  997. if err != nil {
  998. return errors.Trace(err)
  999. }
  1000. }
  1001. }
  1002. return nil
  1003. })
  1004. }
  1005. func deleteServerEntryHelper(
  1006. config *Config,
  1007. serverEntryID []byte,
  1008. serverEntries *datastoreBucket,
  1009. keyValues *datastoreBucket,
  1010. dialParameters *datastoreBucket) error {
  1011. err := serverEntries.delete(serverEntryID)
  1012. if err != nil {
  1013. errors.Trace(err)
  1014. }
  1015. affinityServerEntryID := keyValues.get(datastoreAffinityServerEntryIDKey)
  1016. if bytes.Equal(affinityServerEntryID, serverEntryID) {
  1017. err = keyValues.delete(datastoreAffinityServerEntryIDKey)
  1018. if err != nil {
  1019. return errors.Trace(err)
  1020. }
  1021. err = keyValues.delete(datastoreLastServerEntryFilterKey)
  1022. if err != nil {
  1023. return errors.Trace(err)
  1024. }
  1025. }
  1026. // TODO: expose boltdb Seek functionality to skip to first matching record.
  1027. cursor := dialParameters.cursor()
  1028. defer cursor.close()
  1029. foundFirstMatch := false
  1030. for key, _ := cursor.first(); key != nil; key, _ = cursor.next() {
  1031. // Dial parameters key has serverID as a prefix; see makeDialParametersKey.
  1032. if bytes.HasPrefix(key, serverEntryID) {
  1033. foundFirstMatch = true
  1034. err := dialParameters.delete(key)
  1035. if err != nil {
  1036. return errors.Trace(err)
  1037. }
  1038. } else if foundFirstMatch {
  1039. break
  1040. }
  1041. }
  1042. return nil
  1043. }
  1044. // ScanServerEntries iterates over all stored server entries, unmarshals each,
  1045. // and passes it to callback for processing. If callback returns false, the
  1046. // iteration is cancelled and an error is returned.
  1047. //
  1048. // ScanServerEntries may be slow to execute, particularly for older devices
  1049. // and/or very large server lists. Callers should avoid blocking on
  1050. // ScanServerEntries where possible; and use the canel option to interrupt
  1051. // scans that are no longer required.
  1052. func ScanServerEntries(callback func(*protocol.ServerEntry) bool) error {
  1053. // TODO: this operation can be sped up (by a factor of ~2x, in one test
  1054. // scenario) by using a faster JSON implementation
  1055. // (https://github.com/json-iterator/go) and increasing
  1056. // datastoreServerEntryFetchGCThreshold.
  1057. //
  1058. // json-iterator increases the binary code size significantly, which affects
  1059. // memory limit accounting on some platforms, so it's not clear we can use it
  1060. // universally. Similarly, tuning datastoreServerEntryFetchGCThreshold has a
  1061. // memory limit tradeoff.
  1062. //
  1063. // Since ScanServerEntries is now called asynchronously and doesn't block
  1064. // establishment at all, we can tolerate its slower performance. Other
  1065. // bulk-JSON operations such as [Streaming]StoreServerEntries also benefit
  1066. // from using a faster JSON implementation, but the relative performance
  1067. // increase is far smaller as import times are dominated by data store write
  1068. // transaction overhead. Other operations such as ServerEntryIterator
  1069. // amortize the cost of JSON unmarshalling over many other operations.
  1070. err := datastoreView(func(tx *datastoreTx) error {
  1071. bucket := tx.bucket(datastoreServerEntriesBucket)
  1072. cursor := bucket.cursor()
  1073. n := 0
  1074. for key, value := cursor.first(); key != nil; key, value = cursor.next() {
  1075. var serverEntry *protocol.ServerEntry
  1076. err := json.Unmarshal(value, &serverEntry)
  1077. if err != nil {
  1078. // In case of data corruption or a bug causing this condition,
  1079. // do not stop iterating.
  1080. NoticeWarning("ScanServerEntries: %s", errors.Trace(err))
  1081. continue
  1082. }
  1083. if !callback(serverEntry) {
  1084. cursor.close()
  1085. return errors.TraceNew("scan cancelled")
  1086. }
  1087. n += 1
  1088. if n == datastoreServerEntryFetchGCThreshold {
  1089. DoGarbageCollection()
  1090. n = 0
  1091. }
  1092. }
  1093. cursor.close()
  1094. return nil
  1095. })
  1096. if err != nil {
  1097. return errors.Trace(err)
  1098. }
  1099. return nil
  1100. }
  1101. // HasServerEntries returns a bool indicating if the data store contains at
  1102. // least one server entry. This is a faster operation than CountServerEntries.
  1103. // On failure, HasServerEntries returns false.
  1104. func HasServerEntries() bool {
  1105. hasServerEntries := false
  1106. err := datastoreView(func(tx *datastoreTx) error {
  1107. bucket := tx.bucket(datastoreServerEntriesBucket)
  1108. cursor := bucket.cursor()
  1109. key, _ := cursor.first()
  1110. hasServerEntries = (key != nil)
  1111. cursor.close()
  1112. return nil
  1113. })
  1114. if err != nil {
  1115. NoticeWarning("HasServerEntries failed: %s", errors.Trace(err))
  1116. return false
  1117. }
  1118. return hasServerEntries
  1119. }
  1120. // CountServerEntries returns a count of stored server entries. On failure,
  1121. // CountServerEntries returns 0.
  1122. func CountServerEntries() int {
  1123. count := 0
  1124. err := datastoreView(func(tx *datastoreTx) error {
  1125. bucket := tx.bucket(datastoreServerEntriesBucket)
  1126. cursor := bucket.cursor()
  1127. for key, _ := cursor.first(); key != nil; key, _ = cursor.next() {
  1128. count += 1
  1129. }
  1130. cursor.close()
  1131. return nil
  1132. })
  1133. if err != nil {
  1134. NoticeWarning("CountServerEntries failed: %s", err)
  1135. return 0
  1136. }
  1137. return count
  1138. }
  1139. // SetSplitTunnelRoutes updates the cached routes data for
  1140. // the given region. The associated etag is also stored and
  1141. // used to make efficient web requests for updates to the data.
  1142. func SetSplitTunnelRoutes(region, etag string, data []byte) error {
  1143. err := datastoreUpdate(func(tx *datastoreTx) error {
  1144. bucket := tx.bucket(datastoreSplitTunnelRouteETagsBucket)
  1145. err := bucket.put([]byte(region), []byte(etag))
  1146. if err != nil {
  1147. return errors.Trace(err)
  1148. }
  1149. bucket = tx.bucket(datastoreSplitTunnelRouteDataBucket)
  1150. err = bucket.put([]byte(region), data)
  1151. if err != nil {
  1152. return errors.Trace(err)
  1153. }
  1154. return nil
  1155. })
  1156. if err != nil {
  1157. return errors.Trace(err)
  1158. }
  1159. return nil
  1160. }
  1161. // GetSplitTunnelRoutesETag retrieves the etag for cached routes
  1162. // data for the specified region. If not found, it returns an empty string value.
  1163. func GetSplitTunnelRoutesETag(region string) (string, error) {
  1164. var etag string
  1165. err := datastoreView(func(tx *datastoreTx) error {
  1166. bucket := tx.bucket(datastoreSplitTunnelRouteETagsBucket)
  1167. etag = string(bucket.get([]byte(region)))
  1168. return nil
  1169. })
  1170. if err != nil {
  1171. return "", errors.Trace(err)
  1172. }
  1173. return etag, nil
  1174. }
  1175. // GetSplitTunnelRoutesData retrieves the cached routes data
  1176. // for the specified region. If not found, it returns a nil value.
  1177. func GetSplitTunnelRoutesData(region string) ([]byte, error) {
  1178. var data []byte
  1179. err := datastoreView(func(tx *datastoreTx) error {
  1180. bucket := tx.bucket(datastoreSplitTunnelRouteDataBucket)
  1181. value := bucket.get([]byte(region))
  1182. if value != nil {
  1183. // Must make a copy as slice is only valid within transaction.
  1184. data = make([]byte, len(value))
  1185. copy(data, value)
  1186. }
  1187. return nil
  1188. })
  1189. if err != nil {
  1190. return nil, errors.Trace(err)
  1191. }
  1192. return data, nil
  1193. }
  1194. // SetUrlETag stores an ETag for the specfied URL.
  1195. // Note: input URL is treated as a string, and is not
  1196. // encoded or decoded or otherwise canonicalized.
  1197. func SetUrlETag(url, etag string) error {
  1198. err := datastoreUpdate(func(tx *datastoreTx) error {
  1199. bucket := tx.bucket(datastoreUrlETagsBucket)
  1200. err := bucket.put([]byte(url), []byte(etag))
  1201. if err != nil {
  1202. return errors.Trace(err)
  1203. }
  1204. return nil
  1205. })
  1206. if err != nil {
  1207. return errors.Trace(err)
  1208. }
  1209. return nil
  1210. }
  1211. // GetUrlETag retrieves a previously stored an ETag for the
  1212. // specfied URL. If not found, it returns an empty string value.
  1213. func GetUrlETag(url string) (string, error) {
  1214. var etag string
  1215. err := datastoreView(func(tx *datastoreTx) error {
  1216. bucket := tx.bucket(datastoreUrlETagsBucket)
  1217. etag = string(bucket.get([]byte(url)))
  1218. return nil
  1219. })
  1220. if err != nil {
  1221. return "", errors.Trace(err)
  1222. }
  1223. return etag, nil
  1224. }
  1225. // SetKeyValue stores a key/value pair.
  1226. func SetKeyValue(key, value string) error {
  1227. err := datastoreUpdate(func(tx *datastoreTx) error {
  1228. bucket := tx.bucket(datastoreKeyValueBucket)
  1229. err := bucket.put([]byte(key), []byte(value))
  1230. if err != nil {
  1231. return errors.Trace(err)
  1232. }
  1233. return nil
  1234. })
  1235. if err != nil {
  1236. return errors.Trace(err)
  1237. }
  1238. return nil
  1239. }
  1240. // GetKeyValue retrieves the value for a given key. If not found,
  1241. // it returns an empty string value.
  1242. func GetKeyValue(key string) (string, error) {
  1243. var value string
  1244. err := datastoreView(func(tx *datastoreTx) error {
  1245. bucket := tx.bucket(datastoreKeyValueBucket)
  1246. value = string(bucket.get([]byte(key)))
  1247. return nil
  1248. })
  1249. if err != nil {
  1250. return "", errors.Trace(err)
  1251. }
  1252. return value, nil
  1253. }
  1254. // Persistent stat records in the persistentStatStateUnreported
  1255. // state are available for take out.
  1256. //
  1257. // Records in the persistentStatStateReporting have been taken
  1258. // out and are pending either deletion (for a successful request)
  1259. // or change to StateUnreported (for a failed request).
  1260. //
  1261. // All persistent stat records are reverted to StateUnreported
  1262. // when the datastore is initialized at start up.
  1263. var persistentStatStateUnreported = []byte("0")
  1264. var persistentStatStateReporting = []byte("1")
  1265. var persistentStatTypes = []string{
  1266. datastorePersistentStatTypeRemoteServerList,
  1267. datastorePersistentStatTypeFailedTunnel,
  1268. }
  1269. // StorePersistentStat adds a new persistent stat record, which
  1270. // is set to StateUnreported and is an immediate candidate for
  1271. // reporting.
  1272. //
  1273. // The stat is a JSON byte array containing fields as
  1274. // required by the Psiphon server API. It's assumed that the
  1275. // JSON value contains enough unique information for the value to
  1276. // function as a key in the key/value datastore.
  1277. //
  1278. // Only up to PersistentStatsMaxStoreRecords are stored. Once this
  1279. // limit is reached, new records are discarded.
  1280. func StorePersistentStat(config *Config, statType string, stat []byte) error {
  1281. if !common.Contains(persistentStatTypes, statType) {
  1282. return errors.Tracef("invalid persistent stat type: %s", statType)
  1283. }
  1284. maxStoreRecords := config.GetParameters().Get().Int(
  1285. parameters.PersistentStatsMaxStoreRecords)
  1286. err := datastoreUpdate(func(tx *datastoreTx) error {
  1287. bucket := tx.bucket([]byte(statType))
  1288. count := 0
  1289. cursor := bucket.cursor()
  1290. for key, _ := cursor.first(); key != nil; key, _ = cursor.next() {
  1291. count++
  1292. }
  1293. cursor.close()
  1294. // TODO: assuming newer metrics are more useful, replace oldest record
  1295. // instead of discarding?
  1296. if count >= maxStoreRecords {
  1297. // Silently discard.
  1298. return nil
  1299. }
  1300. err := bucket.put(stat, persistentStatStateUnreported)
  1301. if err != nil {
  1302. return errors.Trace(err)
  1303. }
  1304. return nil
  1305. })
  1306. if err != nil {
  1307. return errors.Trace(err)
  1308. }
  1309. return nil
  1310. }
  1311. // CountUnreportedPersistentStats returns the number of persistent
  1312. // stat records in StateUnreported.
  1313. func CountUnreportedPersistentStats() int {
  1314. unreported := 0
  1315. err := datastoreView(func(tx *datastoreTx) error {
  1316. for _, statType := range persistentStatTypes {
  1317. bucket := tx.bucket([]byte(statType))
  1318. cursor := bucket.cursor()
  1319. for key, value := cursor.first(); key != nil; key, value = cursor.next() {
  1320. if bytes.Equal(value, persistentStatStateUnreported) {
  1321. unreported++
  1322. }
  1323. }
  1324. cursor.close()
  1325. }
  1326. return nil
  1327. })
  1328. if err != nil {
  1329. NoticeWarning("CountUnreportedPersistentStats failed: %s", err)
  1330. return 0
  1331. }
  1332. return unreported
  1333. }
  1334. // TakeOutUnreportedPersistentStats returns persistent stats records that are
  1335. // in StateUnreported. At least one record, if present, will be returned and
  1336. // then additional records up to PersistentStatsMaxSendBytes. The records are
  1337. // set to StateReporting. If the records are successfully reported, clear them
  1338. // with ClearReportedPersistentStats. If the records are not successfully
  1339. // reported, restore them with PutBackUnreportedPersistentStats.
  1340. func TakeOutUnreportedPersistentStats(config *Config) (map[string][][]byte, error) {
  1341. stats := make(map[string][][]byte)
  1342. maxSendBytes := config.GetParameters().Get().Int(
  1343. parameters.PersistentStatsMaxSendBytes)
  1344. err := datastoreUpdate(func(tx *datastoreTx) error {
  1345. sendBytes := 0
  1346. for _, statType := range persistentStatTypes {
  1347. bucket := tx.bucket([]byte(statType))
  1348. cursor := bucket.cursor()
  1349. for key, value := cursor.first(); key != nil; key, value = cursor.next() {
  1350. // Perform a test JSON unmarshaling. In case of data corruption or a bug,
  1351. // delete and skip the record.
  1352. var jsonData interface{}
  1353. err := json.Unmarshal(key, &jsonData)
  1354. if err != nil {
  1355. NoticeWarning(
  1356. "Invalid key in TakeOutUnreportedPersistentStats: %s: %s",
  1357. string(key), err)
  1358. bucket.delete(key)
  1359. continue
  1360. }
  1361. if bytes.Equal(value, persistentStatStateUnreported) {
  1362. // Must make a copy as slice is only valid within transaction.
  1363. data := make([]byte, len(key))
  1364. copy(data, key)
  1365. if stats[statType] == nil {
  1366. stats[statType] = make([][]byte, 0)
  1367. }
  1368. stats[statType] = append(stats[statType], data)
  1369. sendBytes += len(data)
  1370. if sendBytes >= maxSendBytes {
  1371. break
  1372. }
  1373. }
  1374. }
  1375. cursor.close()
  1376. for _, key := range stats[statType] {
  1377. err := bucket.put(key, persistentStatStateReporting)
  1378. if err != nil {
  1379. return errors.Trace(err)
  1380. }
  1381. }
  1382. }
  1383. return nil
  1384. })
  1385. if err != nil {
  1386. return nil, errors.Trace(err)
  1387. }
  1388. return stats, nil
  1389. }
  1390. // PutBackUnreportedPersistentStats restores a list of persistent
  1391. // stat records to StateUnreported.
  1392. func PutBackUnreportedPersistentStats(stats map[string][][]byte) error {
  1393. err := datastoreUpdate(func(tx *datastoreTx) error {
  1394. for _, statType := range persistentStatTypes {
  1395. bucket := tx.bucket([]byte(statType))
  1396. for _, key := range stats[statType] {
  1397. err := bucket.put(key, persistentStatStateUnreported)
  1398. if err != nil {
  1399. return errors.Trace(err)
  1400. }
  1401. }
  1402. }
  1403. return nil
  1404. })
  1405. if err != nil {
  1406. return errors.Trace(err)
  1407. }
  1408. return nil
  1409. }
  1410. // ClearReportedPersistentStats deletes a list of persistent
  1411. // stat records that were successfully reported.
  1412. func ClearReportedPersistentStats(stats map[string][][]byte) error {
  1413. err := datastoreUpdate(func(tx *datastoreTx) error {
  1414. for _, statType := range persistentStatTypes {
  1415. bucket := tx.bucket([]byte(statType))
  1416. for _, key := range stats[statType] {
  1417. err := bucket.delete(key)
  1418. if err != nil {
  1419. return err
  1420. }
  1421. }
  1422. }
  1423. return nil
  1424. })
  1425. if err != nil {
  1426. return errors.Trace(err)
  1427. }
  1428. return nil
  1429. }
  1430. // resetAllPersistentStatsToUnreported sets all persistent stat
  1431. // records to StateUnreported. This reset is called when the
  1432. // datastore is initialized at start up, as we do not know if
  1433. // persistent records in StateReporting were reported or not.
  1434. func resetAllPersistentStatsToUnreported() error {
  1435. err := datastoreUpdate(func(tx *datastoreTx) error {
  1436. for _, statType := range persistentStatTypes {
  1437. bucket := tx.bucket([]byte(statType))
  1438. resetKeys := make([][]byte, 0)
  1439. cursor := bucket.cursor()
  1440. for key := cursor.firstKey(); key != nil; key = cursor.nextKey() {
  1441. resetKeys = append(resetKeys, key)
  1442. }
  1443. cursor.close()
  1444. // TODO: data mutation is done outside cursor. Is this
  1445. // strictly necessary in this case? As is, this means
  1446. // all stats need to be loaded into memory at once.
  1447. // https://godoc.org/github.com/boltdb/bolt#Cursor
  1448. for _, key := range resetKeys {
  1449. err := bucket.put(key, persistentStatStateUnreported)
  1450. if err != nil {
  1451. return errors.Trace(err)
  1452. }
  1453. }
  1454. }
  1455. return nil
  1456. })
  1457. if err != nil {
  1458. return errors.Trace(err)
  1459. }
  1460. return nil
  1461. }
  1462. // CountSLOKs returns the total number of SLOK records.
  1463. func CountSLOKs() int {
  1464. count := 0
  1465. err := datastoreView(func(tx *datastoreTx) error {
  1466. bucket := tx.bucket(datastoreSLOKsBucket)
  1467. cursor := bucket.cursor()
  1468. for key := cursor.firstKey(); key != nil; key = cursor.nextKey() {
  1469. count++
  1470. }
  1471. cursor.close()
  1472. return nil
  1473. })
  1474. if err != nil {
  1475. NoticeWarning("CountSLOKs failed: %s", err)
  1476. return 0
  1477. }
  1478. return count
  1479. }
  1480. // DeleteSLOKs deletes all SLOK records.
  1481. func DeleteSLOKs() error {
  1482. err := datastoreUpdate(func(tx *datastoreTx) error {
  1483. return tx.clearBucket(datastoreSLOKsBucket)
  1484. })
  1485. if err != nil {
  1486. return errors.Trace(err)
  1487. }
  1488. return nil
  1489. }
  1490. // SetSLOK stores a SLOK key, referenced by its ID. The bool
  1491. // return value indicates whether the SLOK was already stored.
  1492. func SetSLOK(id, slok []byte) (bool, error) {
  1493. var duplicate bool
  1494. err := datastoreUpdate(func(tx *datastoreTx) error {
  1495. bucket := tx.bucket(datastoreSLOKsBucket)
  1496. duplicate = bucket.get(id) != nil
  1497. err := bucket.put(id, slok)
  1498. if err != nil {
  1499. return errors.Trace(err)
  1500. }
  1501. return nil
  1502. })
  1503. if err != nil {
  1504. return false, errors.Trace(err)
  1505. }
  1506. return duplicate, nil
  1507. }
  1508. // GetSLOK returns a SLOK key for the specified ID. The return
  1509. // value is nil if the SLOK is not found.
  1510. func GetSLOK(id []byte) ([]byte, error) {
  1511. var slok []byte
  1512. err := datastoreView(func(tx *datastoreTx) error {
  1513. bucket := tx.bucket(datastoreSLOKsBucket)
  1514. value := bucket.get(id)
  1515. if value != nil {
  1516. // Must make a copy as slice is only valid within transaction.
  1517. slok = make([]byte, len(value))
  1518. copy(slok, value)
  1519. }
  1520. return nil
  1521. })
  1522. if err != nil {
  1523. return nil, errors.Trace(err)
  1524. }
  1525. return slok, nil
  1526. }
  1527. func makeDialParametersKey(serverIPAddress, networkID []byte) []byte {
  1528. // TODO: structured key?
  1529. return append(append([]byte(nil), serverIPAddress...), networkID...)
  1530. }
  1531. // SetDialParameters stores dial parameters associated with the specified
  1532. // server/network ID.
  1533. func SetDialParameters(serverIPAddress, networkID string, dialParams *DialParameters) error {
  1534. key := makeDialParametersKey([]byte(serverIPAddress), []byte(networkID))
  1535. data, err := json.Marshal(dialParams)
  1536. if err != nil {
  1537. return errors.Trace(err)
  1538. }
  1539. return setBucketValue(datastoreDialParametersBucket, key, data)
  1540. }
  1541. // GetDialParameters fetches any dial parameters associated with the specified
  1542. // server/network ID. Returns nil, nil when no record is found.
  1543. func GetDialParameters(
  1544. config *Config, serverIPAddress, networkID string) (*DialParameters, error) {
  1545. // Support stand-alone GetTactics operation. See TacticsStorer for more
  1546. // details.
  1547. err := OpenDataStoreWithoutReset(config)
  1548. if err != nil {
  1549. return nil, errors.Trace(err)
  1550. }
  1551. defer CloseDataStore()
  1552. key := makeDialParametersKey([]byte(serverIPAddress), []byte(networkID))
  1553. var dialParams *DialParameters
  1554. err = getBucketValue(
  1555. datastoreDialParametersBucket,
  1556. key,
  1557. func(value []byte) error {
  1558. if value == nil {
  1559. return nil
  1560. }
  1561. // Note: unlike with server entries, this record is not deleted when the
  1562. // unmarshal fails, as the caller should proceed with the dial without dial
  1563. // parameters; and when when the dial succeeds, new dial parameters will be
  1564. // written over this record.
  1565. err := json.Unmarshal(value, &dialParams)
  1566. if err != nil {
  1567. return errors.Trace(err)
  1568. }
  1569. return nil
  1570. })
  1571. if err != nil {
  1572. return nil, errors.Trace(err)
  1573. }
  1574. return dialParams, nil
  1575. }
  1576. // DeleteDialParameters clears any dial parameters associated with the
  1577. // specified server/network ID.
  1578. func DeleteDialParameters(serverIPAddress, networkID string) error {
  1579. key := makeDialParametersKey([]byte(serverIPAddress), []byte(networkID))
  1580. return deleteBucketValue(datastoreDialParametersBucket, key)
  1581. }
  1582. // TacticsStorer implements tactics.Storer.
  1583. //
  1584. // Each TacticsStorer datastore operation is wrapped with
  1585. // OpenDataStoreWithoutReset/CloseDataStore, which enables a limited degree of
  1586. // multiprocess datastore synchronization:
  1587. //
  1588. // One process runs a Controller. Another process runs a stand-alone operation
  1589. // which accesses tactics via GetTactics. For example, SendFeedback.
  1590. //
  1591. // When the Controller is running, it holds an exclusive lock on the datastore
  1592. // and TacticsStorer operations in GetTactics in another process will fail.
  1593. // The stand-alone operation should proceed without tactics. In many cases,
  1594. // this is acceptable since any stand-alone operation network traffic will be
  1595. // tunneled.
  1596. //
  1597. // When the Controller is not running, the TacticsStorer operations in
  1598. // GetTactics in another process will succeed, with no operation holding a
  1599. // datastore lock for longer than the handful of milliseconds required to
  1600. // perform a single datastore operation.
  1601. //
  1602. // If the Controller is started while the stand-alone operation is in
  1603. // progress, the Controller start will not be blocked by the brief
  1604. // TacticsStorer datastore locks; the bolt Open call, in particular, has a 1
  1605. // second lock aquisition timeout.
  1606. //
  1607. // In this scheme, no attempt is made to detect interleaving datastore writes;
  1608. // that is, if a different process writes tactics in between GetTactics calls
  1609. // to GetTacticsRecord and then SetTacticsRecord. This is because all tactics
  1610. // writes are considered fresh and valid.
  1611. //
  1612. //
  1613. // Using OpenDataStoreWithoutReset ensures that the GetTactics attempt in the
  1614. // non-Controller operation will immediately fail if the datastore is locked
  1615. // and not reset (delete) the datastore file when open fails. The Controller
  1616. // process will use OpenDataStore, which performs the reset on failure, to
  1617. // recover from datastore corruption; when OpenDataStore is called while the
  1618. // non-Controller operation holds a datastore lock, the OpenDataStore timeout,
  1619. // 1s, should be sufficient to avoid an unnecessary reset.
  1620. type TacticsStorer struct {
  1621. config *Config
  1622. }
  1623. func (t *TacticsStorer) SetTacticsRecord(networkID string, record []byte) error {
  1624. err := OpenDataStoreWithoutReset(t.config)
  1625. if err != nil {
  1626. return errors.Trace(err)
  1627. }
  1628. defer CloseDataStore()
  1629. err = setBucketValue(datastoreTacticsBucket, []byte(networkID), record)
  1630. if err != nil {
  1631. return errors.Trace(err)
  1632. }
  1633. return nil
  1634. }
  1635. func (t *TacticsStorer) GetTacticsRecord(networkID string) ([]byte, error) {
  1636. err := OpenDataStoreWithoutReset(t.config)
  1637. if err != nil {
  1638. return nil, errors.Trace(err)
  1639. }
  1640. defer CloseDataStore()
  1641. value, err := copyBucketValue(datastoreTacticsBucket, []byte(networkID))
  1642. if err != nil {
  1643. return nil, errors.Trace(err)
  1644. }
  1645. return value, nil
  1646. }
  1647. func (t *TacticsStorer) SetSpeedTestSamplesRecord(networkID string, record []byte) error {
  1648. err := OpenDataStoreWithoutReset(t.config)
  1649. if err != nil {
  1650. return errors.Trace(err)
  1651. }
  1652. defer CloseDataStore()
  1653. err = setBucketValue(datastoreSpeedTestSamplesBucket, []byte(networkID), record)
  1654. if err != nil {
  1655. return errors.Trace(err)
  1656. }
  1657. return nil
  1658. }
  1659. func (t *TacticsStorer) GetSpeedTestSamplesRecord(networkID string) ([]byte, error) {
  1660. err := OpenDataStoreWithoutReset(t.config)
  1661. if err != nil {
  1662. return nil, errors.Trace(err)
  1663. }
  1664. defer CloseDataStore()
  1665. value, err := copyBucketValue(datastoreSpeedTestSamplesBucket, []byte(networkID))
  1666. if err != nil {
  1667. return nil, errors.Trace(err)
  1668. }
  1669. return value, nil
  1670. }
  1671. // GetTacticsStorer creates a TacticsStorer.
  1672. func GetTacticsStorer(config *Config) *TacticsStorer {
  1673. return &TacticsStorer{config: config}
  1674. }
  1675. // GetAffinityServerEntryAndDialParameters fetches the current affinity server
  1676. // entry value and any corresponding dial parameters for the specified network
  1677. // ID. An error is returned when no affinity server is available. The
  1678. // DialParameter output may be nil when a server entry is found but has no
  1679. // dial parameters.
  1680. func GetAffinityServerEntryAndDialParameters(
  1681. networkID string) (protocol.ServerEntryFields, *DialParameters, error) {
  1682. var serverEntryFields protocol.ServerEntryFields
  1683. var dialParams *DialParameters
  1684. err := datastoreView(func(tx *datastoreTx) error {
  1685. keyValues := tx.bucket(datastoreKeyValueBucket)
  1686. serverEntries := tx.bucket(datastoreServerEntriesBucket)
  1687. dialParameters := tx.bucket(datastoreDialParametersBucket)
  1688. affinityServerEntryID := keyValues.get(datastoreAffinityServerEntryIDKey)
  1689. if affinityServerEntryID == nil {
  1690. return errors.TraceNew("no affinity server available")
  1691. }
  1692. serverEntryRecord := serverEntries.get(affinityServerEntryID)
  1693. if serverEntryRecord == nil {
  1694. return errors.TraceNew("affinity server entry not found")
  1695. }
  1696. err := json.Unmarshal(
  1697. serverEntryRecord,
  1698. &serverEntryFields)
  1699. if err != nil {
  1700. return errors.Trace(err)
  1701. }
  1702. dialParamsKey := makeDialParametersKey(
  1703. []byte(serverEntryFields.GetIPAddress()),
  1704. []byte(networkID))
  1705. dialParamsRecord := dialParameters.get(dialParamsKey)
  1706. if dialParamsRecord != nil {
  1707. err := json.Unmarshal(dialParamsRecord, &dialParams)
  1708. if err != nil {
  1709. return errors.Trace(err)
  1710. }
  1711. }
  1712. return nil
  1713. })
  1714. if err != nil {
  1715. return nil, nil, errors.Trace(err)
  1716. }
  1717. return serverEntryFields, dialParams, nil
  1718. }
  1719. func setBucketValue(bucket, key, value []byte) error {
  1720. err := datastoreUpdate(func(tx *datastoreTx) error {
  1721. bucket := tx.bucket(bucket)
  1722. err := bucket.put(key, value)
  1723. if err != nil {
  1724. return errors.Trace(err)
  1725. }
  1726. return nil
  1727. })
  1728. if err != nil {
  1729. return errors.Trace(err)
  1730. }
  1731. return nil
  1732. }
  1733. func getBucketValue(bucket, key []byte, valueCallback func([]byte) error) error {
  1734. err := datastoreView(func(tx *datastoreTx) error {
  1735. bucket := tx.bucket(bucket)
  1736. value := bucket.get(key)
  1737. return valueCallback(value)
  1738. })
  1739. if err != nil {
  1740. return errors.Trace(err)
  1741. }
  1742. return nil
  1743. }
  1744. func deleteBucketValue(bucket, key []byte) error {
  1745. err := datastoreUpdate(func(tx *datastoreTx) error {
  1746. bucket := tx.bucket(bucket)
  1747. return bucket.delete(key)
  1748. })
  1749. if err != nil {
  1750. return errors.Trace(err)
  1751. }
  1752. return nil
  1753. }
  1754. func copyBucketValue(bucket, key []byte) ([]byte, error) {
  1755. var valueCopy []byte
  1756. err := getBucketValue(bucket, key, func(value []byte) error {
  1757. if value != nil {
  1758. // Must make a copy as slice is only valid within transaction.
  1759. valueCopy = make([]byte, len(value))
  1760. copy(valueCopy, value)
  1761. }
  1762. return nil
  1763. })
  1764. return valueCopy, err
  1765. }