db.go 29 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078
  1. package bolt
  2. import (
  3. "errors"
  4. "fmt"
  5. "hash/fnv"
  6. "log"
  7. "os"
  8. "runtime"
  9. "runtime/debug"
  10. "strings"
  11. "sync"
  12. "time"
  13. "unsafe"
  14. )
  15. // The largest step that can be taken when remapping the mmap.
  16. const maxMmapStep = 1 << 30 // 1GB
  17. // The data file format version.
  18. const version = 2
  19. // Represents a marker value to indicate that a file is a Bolt DB.
  20. const magic uint32 = 0xED0CDAED
  21. // IgnoreNoSync specifies whether the NoSync field of a DB is ignored when
  22. // syncing changes to a file. This is required as some operating systems,
  23. // such as OpenBSD, do not have a unified buffer cache (UBC) and writes
  24. // must be synchronized using the msync(2) syscall.
  25. const IgnoreNoSync = runtime.GOOS == "openbsd"
  26. // Default values if not set in a DB instance.
  27. const (
  28. DefaultMaxBatchSize int = 1000
  29. DefaultMaxBatchDelay = 10 * time.Millisecond
  30. DefaultAllocSize = 16 * 1024 * 1024
  31. )
  32. // default page size for db is set to the OS page size.
  33. var defaultPageSize = os.Getpagesize()
  34. // DB represents a collection of buckets persisted to a file on disk.
  35. // All data access is performed through transactions which can be obtained through the DB.
  36. // All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called.
  37. type DB struct {
  38. // When enabled, the database will perform a Check() after every commit.
  39. // A panic is issued if the database is in an inconsistent state. This
  40. // flag has a large performance impact so it should only be used for
  41. // debugging purposes.
  42. StrictMode bool
  43. // Setting the NoSync flag will cause the database to skip fsync()
  44. // calls after each commit. This can be useful when bulk loading data
  45. // into a database and you can restart the bulk load in the event of
  46. // a system failure or database corruption. Do not set this flag for
  47. // normal use.
  48. //
  49. // If the package global IgnoreNoSync constant is true, this value is
  50. // ignored. See the comment on that constant for more details.
  51. //
  52. // THIS IS UNSAFE. PLEASE USE WITH CAUTION.
  53. NoSync bool
  54. // When true, skips the truncate call when growing the database.
  55. // Setting this to true is only safe on non-ext3/ext4 systems.
  56. // Skipping truncation avoids preallocation of hard drive space and
  57. // bypasses a truncate() and fsync() syscall on remapping.
  58. //
  59. // https://github.com/boltdb/bolt/issues/284
  60. NoGrowSync bool
  61. // If you want to read the entire database fast, you can set MmapFlag to
  62. // syscall.MAP_POPULATE on Linux 2.6.23+ for sequential read-ahead.
  63. MmapFlags int
  64. // MaxBatchSize is the maximum size of a batch. Default value is
  65. // copied from DefaultMaxBatchSize in Open.
  66. //
  67. // If <=0, disables batching.
  68. //
  69. // Do not change concurrently with calls to Batch.
  70. MaxBatchSize int
  71. // MaxBatchDelay is the maximum delay before a batch starts.
  72. // Default value is copied from DefaultMaxBatchDelay in Open.
  73. //
  74. // If <=0, effectively disables batching.
  75. //
  76. // Do not change concurrently with calls to Batch.
  77. MaxBatchDelay time.Duration
  78. // AllocSize is the amount of space allocated when the database
  79. // needs to create new pages. This is done to amortize the cost
  80. // of truncate() and fsync() when growing the data file.
  81. AllocSize int
  82. path string
  83. file *os.File
  84. lockfile *os.File // windows only
  85. dataref []byte // mmap'ed readonly, write throws SEGV
  86. data *[maxMapSize]byte
  87. datasz int
  88. filesz int // current on disk file size
  89. meta0 *meta
  90. meta1 *meta
  91. pageSize int
  92. opened bool
  93. rwtx *Tx
  94. txs []*Tx
  95. freelist *freelist
  96. stats Stats
  97. // [Psiphon]
  98. // https://github.com/etcd-io/bbolt/commit/b3e98dcb3752e0a8d5db6503b80fe19e462fdb73
  99. mmapErr error // set on mmap failure; subsequently returned by all methods
  100. pagePool sync.Pool
  101. batchMu sync.Mutex
  102. batch *batch
  103. rwlock sync.Mutex // Allows only one writer at a time.
  104. metalock sync.Mutex // Protects meta page access.
  105. mmaplock sync.RWMutex // Protects mmap access during remapping.
  106. statlock sync.RWMutex // Protects stats access.
  107. ops struct {
  108. writeAt func(b []byte, off int64) (n int, err error)
  109. }
  110. // Read only mode.
  111. // When true, Update() and Begin(true) return ErrDatabaseReadOnly immediately.
  112. readOnly bool
  113. }
  114. // Path returns the path to currently open database file.
  115. func (db *DB) Path() string {
  116. return db.path
  117. }
  118. // GoString returns the Go string representation of the database.
  119. func (db *DB) GoString() string {
  120. return fmt.Sprintf("bolt.DB{path:%q}", db.path)
  121. }
  122. // String returns the string representation of the database.
  123. func (db *DB) String() string {
  124. return fmt.Sprintf("DB<%q>", db.path)
  125. }
  126. // Open creates and opens a database at the given path.
  127. // If the file does not exist then it will be created automatically.
  128. // Passing in nil options will cause Bolt to open the database with the default options.
  129. func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
  130. var db = &DB{opened: true}
  131. // [Psiphon]
  132. // Ensure cleanup on panic so recovery can reset a locked file.
  133. defer func() {
  134. if r := recover(); r != nil {
  135. _ = db.close()
  136. panic(r)
  137. }
  138. }()
  139. // Set default options if no options are provided.
  140. if options == nil {
  141. options = DefaultOptions
  142. }
  143. db.NoGrowSync = options.NoGrowSync
  144. db.MmapFlags = options.MmapFlags
  145. // Set default values for later DB operations.
  146. db.MaxBatchSize = DefaultMaxBatchSize
  147. db.MaxBatchDelay = DefaultMaxBatchDelay
  148. db.AllocSize = DefaultAllocSize
  149. flag := os.O_RDWR
  150. if options.ReadOnly {
  151. flag = os.O_RDONLY
  152. db.readOnly = true
  153. }
  154. // Open data file and separate sync handler for metadata writes.
  155. db.path = path
  156. var err error
  157. if db.file, err = os.OpenFile(db.path, flag|os.O_CREATE, mode); err != nil {
  158. _ = db.close()
  159. return nil, err
  160. }
  161. // Lock file so that other processes using Bolt in read-write mode cannot
  162. // use the database at the same time. This would cause corruption since
  163. // the two processes would write meta pages and free pages separately.
  164. // The database file is locked exclusively (only one process can grab the lock)
  165. // if !options.ReadOnly.
  166. // The database file is locked using the shared lock (more than one process may
  167. // hold a lock at the same time) otherwise (options.ReadOnly is set).
  168. if err := flock(db, mode, !db.readOnly, options.Timeout); err != nil {
  169. _ = db.close()
  170. return nil, err
  171. }
  172. // Default values for test hooks
  173. db.ops.writeAt = db.file.WriteAt
  174. // Initialize the database if it doesn't exist.
  175. if info, err := db.file.Stat(); err != nil {
  176. return nil, err
  177. } else if info.Size() == 0 {
  178. // Initialize new files with meta pages.
  179. if err := db.init(); err != nil {
  180. return nil, err
  181. }
  182. } else {
  183. // Read the first meta page to determine the page size.
  184. var buf [0x1000]byte
  185. if _, err := db.file.ReadAt(buf[:], 0); err == nil {
  186. m := db.pageInBuffer(buf[:], 0).meta()
  187. if err := m.validate(); err != nil {
  188. // If we can't read the page size, we can assume it's the same
  189. // as the OS -- since that's how the page size was chosen in the
  190. // first place.
  191. //
  192. // If the first page is invalid and this OS uses a different
  193. // page size than what the database was created with then we
  194. // are out of luck and cannot access the database.
  195. db.pageSize = os.Getpagesize()
  196. } else {
  197. db.pageSize = int(m.pageSize)
  198. }
  199. }
  200. }
  201. // Initialize page pool.
  202. db.pagePool = sync.Pool{
  203. New: func() interface{} {
  204. return make([]byte, db.pageSize)
  205. },
  206. }
  207. // Memory map the data file.
  208. if err := db.mmap(options.InitialMmapSize); err != nil {
  209. _ = db.close()
  210. return nil, err
  211. }
  212. // Read in the freelist.
  213. db.freelist = newFreelist()
  214. db.freelist.read(db.page(db.meta().freelist))
  215. // Mark the database as opened and return.
  216. return db, nil
  217. }
  218. // mmap opens the underlying memory-mapped file and initializes the meta references.
  219. // minsz is the minimum size that the new mmap can be.
  220. func (db *DB) mmap(minsz int) error {
  221. db.mmaplock.Lock()
  222. defer db.mmaplock.Unlock()
  223. info, err := db.file.Stat()
  224. if err != nil {
  225. return fmt.Errorf("mmap stat error: %s", err)
  226. } else if int(info.Size()) < db.pageSize*2 {
  227. return fmt.Errorf("file size too small")
  228. }
  229. // Ensure the size is at least the minimum size.
  230. var size = int(info.Size())
  231. if size < minsz {
  232. size = minsz
  233. }
  234. size, err = db.mmapSize(size)
  235. if err != nil {
  236. return err
  237. }
  238. // Dereference all mmap references before unmapping.
  239. if db.rwtx != nil {
  240. db.rwtx.root.dereference()
  241. }
  242. // Unmap existing data before continuing.
  243. if err := db.munmap(); err != nil {
  244. return err
  245. }
  246. // Memory-map the data file as a byte slice.
  247. if err := mmap(db, size); err != nil {
  248. // [Psiphon]
  249. // https://github.com/etcd-io/bbolt/commit/b3e98dcb3752e0a8d5db6503b80fe19e462fdb73
  250. // If mmap fails, we cannot safely continue. Mark the db as unusable,
  251. // causing all future calls to return the mmap error.
  252. db.mmapErr = MmapError(err.Error())
  253. return db.mmapErr
  254. }
  255. // Save references to the meta pages.
  256. db.meta0 = db.page(0).meta()
  257. db.meta1 = db.page(1).meta()
  258. // Validate the meta pages. We only return an error if both meta pages fail
  259. // validation, since meta0 failing validation means that it wasn't saved
  260. // properly -- but we can recover using meta1. And vice-versa.
  261. err0 := db.meta0.validate()
  262. err1 := db.meta1.validate()
  263. if err0 != nil && err1 != nil {
  264. return err0
  265. }
  266. return nil
  267. }
  268. // munmap unmaps the data file from memory.
  269. func (db *DB) munmap() error {
  270. if err := munmap(db); err != nil {
  271. return fmt.Errorf("unmap error: " + err.Error())
  272. }
  273. return nil
  274. }
  275. // mmapSize determines the appropriate size for the mmap given the current size
  276. // of the database. The minimum size is 32KB and doubles until it reaches 1GB.
  277. // Returns an error if the new mmap size is greater than the max allowed.
  278. func (db *DB) mmapSize(size int) (int, error) {
  279. // Double the size from 32KB until 1GB.
  280. for i := uint(15); i <= 30; i++ {
  281. if size <= 1<<i {
  282. return 1 << i, nil
  283. }
  284. }
  285. // Verify the requested size is not above the maximum allowed.
  286. if size > maxMapSize {
  287. return 0, fmt.Errorf("mmap too large")
  288. }
  289. // If larger than 1GB then grow by 1GB at a time.
  290. sz := int64(size)
  291. if remainder := sz % int64(maxMmapStep); remainder > 0 {
  292. sz += int64(maxMmapStep) - remainder
  293. }
  294. // Ensure that the mmap size is a multiple of the page size.
  295. // This should always be true since we're incrementing in MBs.
  296. pageSize := int64(db.pageSize)
  297. if (sz % pageSize) != 0 {
  298. sz = ((sz / pageSize) + 1) * pageSize
  299. }
  300. // If we've exceeded the max size then only grow up to the max size.
  301. if sz > maxMapSize {
  302. sz = maxMapSize
  303. }
  304. return int(sz), nil
  305. }
  306. // init creates a new database file and initializes its meta pages.
  307. func (db *DB) init() error {
  308. // Set the page size to the OS page size.
  309. db.pageSize = os.Getpagesize()
  310. // Create two meta pages on a buffer.
  311. buf := make([]byte, db.pageSize*4)
  312. for i := 0; i < 2; i++ {
  313. p := db.pageInBuffer(buf[:], pgid(i))
  314. p.id = pgid(i)
  315. p.flags = metaPageFlag
  316. // Initialize the meta page.
  317. m := p.meta()
  318. m.magic = magic
  319. m.version = version
  320. m.pageSize = uint32(db.pageSize)
  321. m.freelist = 2
  322. m.root = bucket{root: 3}
  323. m.pgid = 4
  324. m.txid = txid(i)
  325. m.checksum = m.sum64()
  326. }
  327. // Write an empty freelist at page 3.
  328. p := db.pageInBuffer(buf[:], pgid(2))
  329. p.id = pgid(2)
  330. p.flags = freelistPageFlag
  331. p.count = 0
  332. // Write an empty leaf page at page 4.
  333. p = db.pageInBuffer(buf[:], pgid(3))
  334. p.id = pgid(3)
  335. p.flags = leafPageFlag
  336. p.count = 0
  337. // Write the buffer to our data file.
  338. if _, err := db.ops.writeAt(buf, 0); err != nil {
  339. return err
  340. }
  341. if err := fdatasync(db); err != nil {
  342. return err
  343. }
  344. return nil
  345. }
  346. // Close releases all database resources.
  347. // All transactions must be closed before closing the database.
  348. func (db *DB) Close() error {
  349. db.rwlock.Lock()
  350. defer db.rwlock.Unlock()
  351. db.metalock.Lock()
  352. defer db.metalock.Unlock()
  353. // [Psiphon]
  354. // https://github.com/etcd-io/bbolt/commit/e06ec0a754bc30c2e17ad871962e71635bf94d45
  355. // "Fix Close() to wait for view transactions by getting a full lock on mmaplock"
  356. db.mmaplock.Lock()
  357. defer db.mmaplock.Unlock()
  358. return db.close()
  359. }
  360. func (db *DB) close() error {
  361. if !db.opened {
  362. return nil
  363. }
  364. db.opened = false
  365. db.freelist = nil
  366. // Clear ops.
  367. db.ops.writeAt = nil
  368. // Close the mmap.
  369. if err := db.munmap(); err != nil {
  370. return err
  371. }
  372. // Close file handles.
  373. if db.file != nil {
  374. // No need to unlock read-only file.
  375. if !db.readOnly {
  376. // Unlock the file.
  377. if err := funlock(db); err != nil {
  378. log.Printf("bolt.Close(): funlock error: %s", err)
  379. }
  380. }
  381. // Close the file descriptor.
  382. if err := db.file.Close(); err != nil {
  383. return fmt.Errorf("db file close: %s", err)
  384. }
  385. db.file = nil
  386. }
  387. db.path = ""
  388. return nil
  389. }
  390. // Begin starts a new transaction.
  391. // Multiple read-only transactions can be used concurrently but only one
  392. // write transaction can be used at a time. Starting multiple write transactions
  393. // will cause the calls to block and be serialized until the current write
  394. // transaction finishes.
  395. //
  396. // Transactions should not be dependent on one another. Opening a read
  397. // transaction and a write transaction in the same goroutine can cause the
  398. // writer to deadlock because the database periodically needs to re-mmap itself
  399. // as it grows and it cannot do that while a read transaction is open.
  400. //
  401. // If a long running read transaction (for example, a snapshot transaction) is
  402. // needed, you might want to set DB.InitialMmapSize to a large enough value
  403. // to avoid potential blocking of write transaction.
  404. //
  405. // IMPORTANT: You must close read-only transactions after you are finished or
  406. // else the database will not reclaim old pages.
  407. func (db *DB) Begin(writable bool) (*Tx, error) {
  408. if writable {
  409. return db.beginRWTx()
  410. }
  411. return db.beginTx()
  412. }
  413. func (db *DB) beginTx() (*Tx, error) {
  414. // Lock the meta pages while we initialize the transaction. We obtain
  415. // the meta lock before the mmap lock because that's the order that the
  416. // write transaction will obtain them.
  417. db.metalock.Lock()
  418. // Obtain a read-only lock on the mmap. When the mmap is remapped it will
  419. // obtain a write lock so all transactions must finish before it can be
  420. // remapped.
  421. db.mmaplock.RLock()
  422. // Exit if the database is not open yet.
  423. if !db.opened {
  424. db.mmaplock.RUnlock()
  425. db.metalock.Unlock()
  426. return nil, ErrDatabaseNotOpen
  427. }
  428. // [Psiphon]
  429. // https://github.com/etcd-io/bbolt/commit/b3e98dcb3752e0a8d5db6503b80fe19e462fdb73
  430. // Return mmap error if a previous mmap failed.
  431. if db.mmapErr != nil {
  432. db.mmaplock.RUnlock()
  433. db.metalock.Unlock()
  434. return nil, db.mmapErr
  435. }
  436. // Create a transaction associated with the database.
  437. t := &Tx{}
  438. t.init(db)
  439. // Keep track of transaction until it closes.
  440. db.txs = append(db.txs, t)
  441. n := len(db.txs)
  442. // Unlock the meta pages.
  443. db.metalock.Unlock()
  444. // Update the transaction stats.
  445. db.statlock.Lock()
  446. db.stats.TxN++
  447. db.stats.OpenTxN = n
  448. db.statlock.Unlock()
  449. return t, nil
  450. }
  451. func (db *DB) beginRWTx() (*Tx, error) {
  452. // If the database was opened with Options.ReadOnly, return an error.
  453. if db.readOnly {
  454. return nil, ErrDatabaseReadOnly
  455. }
  456. // Obtain writer lock. This is released by the transaction when it closes.
  457. // This enforces only one writer transaction at a time.
  458. db.rwlock.Lock()
  459. // Once we have the writer lock then we can lock the meta pages so that
  460. // we can set up the transaction.
  461. db.metalock.Lock()
  462. defer db.metalock.Unlock()
  463. // Exit if the database is not open yet.
  464. if !db.opened {
  465. db.rwlock.Unlock()
  466. return nil, ErrDatabaseNotOpen
  467. }
  468. // [Psiphon]
  469. // https://github.com/etcd-io/bbolt/commit/b3e98dcb3752e0a8d5db6503b80fe19e462fdb73
  470. // Return mmap error if a previous mmap failed.
  471. if db.mmapErr != nil {
  472. db.rwlock.Unlock()
  473. return nil, db.mmapErr
  474. }
  475. // Create a transaction associated with the database.
  476. t := &Tx{writable: true}
  477. t.init(db)
  478. db.rwtx = t
  479. // Free any pages associated with closed read-only transactions.
  480. var minid txid = 0xFFFFFFFFFFFFFFFF
  481. for _, t := range db.txs {
  482. if t.meta.txid < minid {
  483. minid = t.meta.txid
  484. }
  485. }
  486. if minid > 0 {
  487. db.freelist.release(minid - 1)
  488. }
  489. return t, nil
  490. }
  491. // removeTx removes a transaction from the database.
  492. func (db *DB) removeTx(tx *Tx) {
  493. // Release the read lock on the mmap.
  494. db.mmaplock.RUnlock()
  495. // Use the meta lock to restrict access to the DB object.
  496. db.metalock.Lock()
  497. // Remove the transaction.
  498. for i, t := range db.txs {
  499. if t == tx {
  500. last := len(db.txs) - 1
  501. db.txs[i] = db.txs[last]
  502. db.txs[last] = nil
  503. db.txs = db.txs[:last]
  504. break
  505. }
  506. }
  507. n := len(db.txs)
  508. // Unlock the meta pages.
  509. db.metalock.Unlock()
  510. // Merge statistics.
  511. db.statlock.Lock()
  512. db.stats.OpenTxN = n
  513. db.stats.TxStats.add(&tx.stats)
  514. db.statlock.Unlock()
  515. }
  516. // Update executes a function within the context of a read-write managed transaction.
  517. // If no error is returned from the function then the transaction is committed.
  518. // If an error is returned then the entire transaction is rolled back.
  519. // Any error that is returned from the function or returned from the commit is
  520. // returned from the Update() method.
  521. //
  522. // Attempting to manually commit or rollback within the function will cause a panic.
  523. func (db *DB) Update(fn func(*Tx) error) error {
  524. t, err := db.Begin(true)
  525. if err != nil {
  526. return err
  527. }
  528. // Make sure the transaction rolls back in the event of a panic.
  529. defer func() {
  530. if t.db != nil {
  531. t.rollback()
  532. }
  533. }()
  534. // Mark as a managed tx so that the inner function cannot manually commit.
  535. t.managed = true
  536. // If an error is returned from the function then rollback and return error.
  537. err = fn(t)
  538. t.managed = false
  539. if err != nil {
  540. _ = t.Rollback()
  541. return err
  542. }
  543. return t.Commit()
  544. }
  545. // View executes a function within the context of a managed read-only transaction.
  546. // Any error that is returned from the function is returned from the View() method.
  547. //
  548. // Attempting to manually rollback within the function will cause a panic.
  549. func (db *DB) View(fn func(*Tx) error) error {
  550. t, err := db.Begin(false)
  551. if err != nil {
  552. return err
  553. }
  554. // Make sure the transaction rolls back in the event of a panic.
  555. defer func() {
  556. if t.db != nil {
  557. t.rollback()
  558. }
  559. }()
  560. // Mark as a managed tx so that the inner function cannot manually rollback.
  561. t.managed = true
  562. // If an error is returned from the function then pass it through.
  563. err = fn(t)
  564. t.managed = false
  565. if err != nil {
  566. _ = t.Rollback()
  567. return err
  568. }
  569. if err := t.Rollback(); err != nil {
  570. return err
  571. }
  572. return nil
  573. }
  574. // Batch calls fn as part of a batch. It behaves similar to Update,
  575. // except:
  576. //
  577. // 1. concurrent Batch calls can be combined into a single Bolt
  578. // transaction.
  579. //
  580. // 2. the function passed to Batch may be called multiple times,
  581. // regardless of whether it returns error or not.
  582. //
  583. // This means that Batch function side effects must be idempotent and
  584. // take permanent effect only after a successful return is seen in
  585. // caller.
  586. //
  587. // The maximum batch size and delay can be adjusted with DB.MaxBatchSize
  588. // and DB.MaxBatchDelay, respectively.
  589. //
  590. // Batch is only useful when there are multiple goroutines calling it.
  591. func (db *DB) Batch(fn func(*Tx) error) error {
  592. errCh := make(chan error, 1)
  593. db.batchMu.Lock()
  594. if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) {
  595. // There is no existing batch, or the existing batch is full; start a new one.
  596. db.batch = &batch{
  597. db: db,
  598. }
  599. db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger)
  600. }
  601. db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
  602. if len(db.batch.calls) >= db.MaxBatchSize {
  603. // wake up batch, it's ready to run
  604. go db.batch.trigger()
  605. }
  606. db.batchMu.Unlock()
  607. err := <-errCh
  608. if err == trySolo {
  609. err = db.Update(fn)
  610. }
  611. return err
  612. }
  613. type call struct {
  614. fn func(*Tx) error
  615. err chan<- error
  616. }
  617. type batch struct {
  618. db *DB
  619. timer *time.Timer
  620. start sync.Once
  621. calls []call
  622. }
  623. // trigger runs the batch if it hasn't already been run.
  624. func (b *batch) trigger() {
  625. b.start.Do(b.run)
  626. }
  627. // run performs the transactions in the batch and communicates results
  628. // back to DB.Batch.
  629. func (b *batch) run() {
  630. b.db.batchMu.Lock()
  631. b.timer.Stop()
  632. // Make sure no new work is added to this batch, but don't break
  633. // other batches.
  634. if b.db.batch == b {
  635. b.db.batch = nil
  636. }
  637. b.db.batchMu.Unlock()
  638. retry:
  639. for len(b.calls) > 0 {
  640. var failIdx = -1
  641. err := b.db.Update(func(tx *Tx) error {
  642. for i, c := range b.calls {
  643. if err := safelyCall(c.fn, tx); err != nil {
  644. failIdx = i
  645. return err
  646. }
  647. }
  648. return nil
  649. })
  650. if failIdx >= 0 {
  651. // take the failing transaction out of the batch. it's
  652. // safe to shorten b.calls here because db.batch no longer
  653. // points to us, and we hold the mutex anyway.
  654. c := b.calls[failIdx]
  655. b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1]
  656. // tell the submitter re-run it solo, continue with the rest of the batch
  657. c.err <- trySolo
  658. continue retry
  659. }
  660. // pass success, or bolt internal errors, to all callers
  661. for _, c := range b.calls {
  662. if c.err != nil {
  663. c.err <- err
  664. }
  665. }
  666. break retry
  667. }
  668. }
  669. // trySolo is a special sentinel error value used for signaling that a
  670. // transaction function should be re-run. It should never be seen by
  671. // callers.
  672. var trySolo = errors.New("batch function returned an error and should be re-run solo")
  673. type panicked struct {
  674. reason interface{}
  675. }
  676. func (p panicked) Error() string {
  677. if err, ok := p.reason.(error); ok {
  678. return err.Error()
  679. }
  680. return fmt.Sprintf("panic: %v", p.reason)
  681. }
  682. func safelyCall(fn func(*Tx) error, tx *Tx) (err error) {
  683. defer func() {
  684. if p := recover(); p != nil {
  685. err = panicked{p}
  686. }
  687. }()
  688. return fn(tx)
  689. }
  690. // Sync executes fdatasync() against the database file handle.
  691. //
  692. // This is not necessary under normal operation, however, if you use NoSync
  693. // then it allows you to force the database file to sync against the disk.
  694. func (db *DB) Sync() error { return fdatasync(db) }
  695. // Stats retrieves ongoing performance stats for the database.
  696. // This is only updated when a transaction closes.
  697. func (db *DB) Stats() Stats {
  698. db.statlock.RLock()
  699. defer db.statlock.RUnlock()
  700. return db.stats
  701. }
  702. // This is for internal access to the raw data bytes from the C cursor, use
  703. // carefully, or not at all.
  704. func (db *DB) Info() *Info {
  705. return &Info{uintptr(unsafe.Pointer(&db.data[0])), db.pageSize}
  706. }
  707. // page retrieves a page reference from the mmap based on the current page size.
  708. func (db *DB) page(id pgid) *page {
  709. pos := id * pgid(db.pageSize)
  710. return (*page)(unsafe.Pointer(&db.data[pos]))
  711. }
  712. // pageInBuffer retrieves a page reference from a given byte array based on the current page size.
  713. func (db *DB) pageInBuffer(b []byte, id pgid) *page {
  714. return (*page)(unsafe.Pointer(&b[id*pgid(db.pageSize)]))
  715. }
  716. // meta retrieves the current meta page reference.
  717. func (db *DB) meta() *meta {
  718. // We have to return the meta with the highest txid which doesn't fail
  719. // validation. Otherwise, we can cause errors when in fact the database is
  720. // in a consistent state. metaA is the one with the higher txid.
  721. metaA := db.meta0
  722. metaB := db.meta1
  723. if db.meta1.txid > db.meta0.txid {
  724. metaA = db.meta1
  725. metaB = db.meta0
  726. }
  727. // Use higher meta page if valid. Otherwise fallback to previous, if valid.
  728. if err := metaA.validate(); err == nil {
  729. return metaA
  730. } else if err := metaB.validate(); err == nil {
  731. return metaB
  732. }
  733. // This should never be reached, because both meta1 and meta0 were validated
  734. // on mmap() and we do fsync() on every write.
  735. panic("bolt.DB.meta(): invalid meta pages")
  736. }
  737. // allocate returns a contiguous block of memory starting at a given page.
  738. func (db *DB) allocate(count int) (*page, error) {
  739. // Allocate a temporary buffer for the page.
  740. var buf []byte
  741. if count == 1 {
  742. buf = db.pagePool.Get().([]byte)
  743. } else {
  744. buf = make([]byte, count*db.pageSize)
  745. }
  746. p := (*page)(unsafe.Pointer(&buf[0]))
  747. p.overflow = uint32(count - 1)
  748. // Use pages from the freelist if they are available.
  749. if p.id = db.freelist.allocate(count); p.id != 0 {
  750. return p, nil
  751. }
  752. // Resize mmap() if we're at the end.
  753. p.id = db.rwtx.meta.pgid
  754. var minsz = int((p.id+pgid(count))+1) * db.pageSize
  755. if minsz >= db.datasz {
  756. if err := db.mmap(minsz); err != nil {
  757. return nil, fmt.Errorf("mmap allocate error: %s", err)
  758. }
  759. }
  760. // Move the page id high water mark.
  761. db.rwtx.meta.pgid += pgid(count)
  762. return p, nil
  763. }
  764. // grow grows the size of the database to the given sz.
  765. func (db *DB) grow(sz int) error {
  766. // Ignore if the new size is less than available file size.
  767. if sz <= db.filesz {
  768. return nil
  769. }
  770. // If the data is smaller than the alloc size then only allocate what's needed.
  771. // Once it goes over the allocation size then allocate in chunks.
  772. if db.datasz < db.AllocSize {
  773. sz = db.datasz
  774. } else {
  775. sz += db.AllocSize
  776. }
  777. // Truncate and fsync to ensure file size metadata is flushed.
  778. // https://github.com/boltdb/bolt/issues/284
  779. if !db.NoGrowSync && !db.readOnly {
  780. if runtime.GOOS != "windows" {
  781. if err := db.file.Truncate(int64(sz)); err != nil {
  782. return fmt.Errorf("file resize error: %s", err)
  783. }
  784. }
  785. if err := db.file.Sync(); err != nil {
  786. return fmt.Errorf("file sync error: %s", err)
  787. }
  788. }
  789. db.filesz = sz
  790. return nil
  791. }
  792. func (db *DB) IsReadOnly() bool {
  793. return db.readOnly
  794. }
  795. // Options represents the options that can be set when opening a database.
  796. type Options struct {
  797. // Timeout is the amount of time to wait to obtain a file lock.
  798. // When set to zero it will wait indefinitely. This option is only
  799. // available on Darwin and Linux.
  800. Timeout time.Duration
  801. // Sets the DB.NoGrowSync flag before memory mapping the file.
  802. NoGrowSync bool
  803. // Open database in read-only mode. Uses flock(..., LOCK_SH |LOCK_NB) to
  804. // grab a shared lock (UNIX).
  805. ReadOnly bool
  806. // Sets the DB.MmapFlags flag before memory mapping the file.
  807. MmapFlags int
  808. // InitialMmapSize is the initial mmap size of the database
  809. // in bytes. Read transactions won't block write transaction
  810. // if the InitialMmapSize is large enough to hold database mmap
  811. // size. (See DB.Begin for more information)
  812. //
  813. // If <=0, the initial map size is 0.
  814. // If initialMmapSize is smaller than the previous database size,
  815. // it takes no effect.
  816. InitialMmapSize int
  817. }
  818. // DefaultOptions represent the options used if nil options are passed into Open().
  819. // No timeout is used which will cause Bolt to wait indefinitely for a lock.
  820. var DefaultOptions = &Options{
  821. Timeout: 0,
  822. NoGrowSync: false,
  823. }
  824. // Stats represents statistics about the database.
  825. type Stats struct {
  826. // Freelist stats
  827. FreePageN int // total number of free pages on the freelist
  828. PendingPageN int // total number of pending pages on the freelist
  829. FreeAlloc int // total bytes allocated in free pages
  830. FreelistInuse int // total bytes used by the freelist
  831. // Transaction stats
  832. TxN int // total number of started read transactions
  833. OpenTxN int // number of currently open read transactions
  834. TxStats TxStats // global, ongoing stats.
  835. }
  836. // Sub calculates and returns the difference between two sets of database stats.
  837. // This is useful when obtaining stats at two different points and time and
  838. // you need the performance counters that occurred within that time span.
  839. func (s *Stats) Sub(other *Stats) Stats {
  840. if other == nil {
  841. return *s
  842. }
  843. var diff Stats
  844. diff.FreePageN = s.FreePageN
  845. diff.PendingPageN = s.PendingPageN
  846. diff.FreeAlloc = s.FreeAlloc
  847. diff.FreelistInuse = s.FreelistInuse
  848. diff.TxN = s.TxN - other.TxN
  849. diff.TxStats = s.TxStats.Sub(&other.TxStats)
  850. return diff
  851. }
  852. func (s *Stats) add(other *Stats) {
  853. s.TxStats.add(&other.TxStats)
  854. }
  855. type Info struct {
  856. Data uintptr
  857. PageSize int
  858. }
  859. type meta struct {
  860. magic uint32
  861. version uint32
  862. pageSize uint32
  863. flags uint32
  864. root bucket
  865. freelist pgid
  866. pgid pgid
  867. txid txid
  868. checksum uint64
  869. }
  870. // validate checks the marker bytes and version of the meta page to ensure it matches this binary.
  871. func (m *meta) validate() error {
  872. if m.magic != magic {
  873. return ErrInvalid
  874. } else if m.version != version {
  875. return ErrVersionMismatch
  876. } else if m.checksum != 0 && m.checksum != m.sum64() {
  877. return ErrChecksum
  878. }
  879. return nil
  880. }
  881. // copy copies one meta object to another.
  882. func (m *meta) copy(dest *meta) {
  883. *dest = *m
  884. }
  885. // write writes the meta onto a page.
  886. func (m *meta) write(p *page) {
  887. if m.root.root >= m.pgid {
  888. panic(fmt.Sprintf("root bucket pgid (%d) above high water mark (%d)", m.root.root, m.pgid))
  889. } else if m.freelist >= m.pgid {
  890. panic(fmt.Sprintf("freelist pgid (%d) above high water mark (%d)", m.freelist, m.pgid))
  891. }
  892. // Page id is either going to be 0 or 1 which we can determine by the transaction ID.
  893. p.id = pgid(m.txid % 2)
  894. p.flags |= metaPageFlag
  895. // Calculate the checksum.
  896. m.checksum = m.sum64()
  897. m.copy(p.meta())
  898. }
  899. // generates the checksum for the meta.
  900. func (m *meta) sum64() uint64 {
  901. var h = fnv.New64a()
  902. _, _ = h.Write((*[unsafe.Offsetof(meta{}.checksum)]byte)(unsafe.Pointer(m))[:])
  903. return h.Sum64()
  904. }
  905. // _assert will panic with a given formatted message if the given condition is false.
  906. func _assert(condition bool, msg string, v ...interface{}) {
  907. if !condition {
  908. panic(fmt.Sprintf("assertion failed: "+msg, v...))
  909. }
  910. }
  911. func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }
  912. func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }
  913. func printstack() {
  914. stack := strings.Join(strings.Split(string(debug.Stack()), "\n")[2:], "\n")
  915. fmt.Fprintln(os.Stderr, stack)
  916. }