value.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package badger
  17. import (
  18. "bufio"
  19. "bytes"
  20. "encoding/binary"
  21. "fmt"
  22. "hash/crc32"
  23. "io"
  24. "io/ioutil"
  25. "log"
  26. "math"
  27. "math/rand"
  28. "os"
  29. "sort"
  30. "strconv"
  31. "strings"
  32. "sync"
  33. "sync/atomic"
  34. "time"
  35. "github.com/dgraph-io/badger/options"
  36. "github.com/dgraph-io/badger/y"
  37. "github.com/pkg/errors"
  38. "golang.org/x/net/trace"
  39. )
  40. // Values have their first byte being byteData or byteDelete. This helps us distinguish between
  41. // a key that has never been seen and a key that has been explicitly deleted.
  42. const (
  43. bitDelete byte = 1 << 0 // Set if the key has been deleted.
  44. bitValuePointer byte = 1 << 1 // Set if the value is NOT stored directly next to key.
  45. bitDiscardEarlierVersions byte = 1 << 2 // Set if earlier versions can be discarded.
  46. // The MSB 2 bits are for transactions.
  47. bitTxn byte = 1 << 6 // Set if the entry is part of a txn.
  48. bitFinTxn byte = 1 << 7 // Set if the entry is to indicate end of txn in value log.
  49. mi int64 = 1 << 20
  50. )
  51. type logFile struct {
  52. path string
  53. // This is a lock on the log file. It guards the fd’s value, the file’s
  54. // existence and the file’s memory map.
  55. //
  56. // Use shared ownership when reading/writing the file or memory map, use
  57. // exclusive ownership to open/close the descriptor, unmap or remove the file.
  58. lock sync.RWMutex
  59. fd *os.File
  60. fid uint32
  61. fmap []byte
  62. size uint32
  63. loadingMode options.FileLoadingMode
  64. }
  65. // openReadOnly assumes that we have a write lock on logFile.
  66. func (lf *logFile) openReadOnly() error {
  67. var err error
  68. lf.fd, err = os.OpenFile(lf.path, os.O_RDONLY, 0666)
  69. if err != nil {
  70. return errors.Wrapf(err, "Unable to open %q as RDONLY.", lf.path)
  71. }
  72. fi, err := lf.fd.Stat()
  73. if err != nil {
  74. return errors.Wrapf(err, "Unable to check stat for %q", lf.path)
  75. }
  76. lf.size = uint32(fi.Size())
  77. if err = lf.mmap(fi.Size()); err != nil {
  78. _ = lf.fd.Close()
  79. return y.Wrapf(err, "Unable to map file")
  80. }
  81. return nil
  82. }
  83. func (lf *logFile) mmap(size int64) (err error) {
  84. if lf.loadingMode != options.MemoryMap {
  85. // Nothing to do
  86. return nil
  87. }
  88. lf.fmap, err = y.Mmap(lf.fd, false, size)
  89. if err == nil {
  90. err = y.Madvise(lf.fmap, false) // Disable readahead
  91. }
  92. return err
  93. }
  94. func (lf *logFile) munmap() (err error) {
  95. if lf.loadingMode != options.MemoryMap {
  96. // Nothing to do
  97. return nil
  98. }
  99. if err := y.Munmap(lf.fmap); err != nil {
  100. return errors.Wrapf(err, "Unable to munmap value log: %q", lf.path)
  101. }
  102. return nil
  103. }
  104. // Acquire lock on mmap/file if you are calling this
  105. func (lf *logFile) read(p valuePointer, s *y.Slice) (buf []byte, err error) {
  106. var nbr int64
  107. offset := p.Offset
  108. if lf.loadingMode == options.FileIO {
  109. buf = s.Resize(int(p.Len))
  110. var n int
  111. n, err = lf.fd.ReadAt(buf, int64(offset))
  112. nbr = int64(n)
  113. } else {
  114. size := uint32(len(lf.fmap))
  115. valsz := p.Len
  116. if offset >= size || offset+valsz > size {
  117. err = y.ErrEOF
  118. } else {
  119. buf = lf.fmap[offset : offset+valsz]
  120. nbr = int64(valsz)
  121. }
  122. }
  123. y.NumReads.Add(1)
  124. y.NumBytesRead.Add(nbr)
  125. return buf, err
  126. }
  127. func (lf *logFile) doneWriting(offset uint32) error {
  128. // Sync before acquiring lock. (We call this from write() and thus know we have shared access
  129. // to the fd.)
  130. if err := lf.fd.Sync(); err != nil {
  131. return errors.Wrapf(err, "Unable to sync value log: %q", lf.path)
  132. }
  133. // Close and reopen the file read-only. Acquire lock because fd will become invalid for a bit.
  134. // Acquiring the lock is bad because, while we don't hold the lock for a long time, it forces
  135. // one batch of readers wait for the preceding batch of readers to finish.
  136. //
  137. // If there's a benefit to reopening the file read-only, it might be on Windows. I don't know
  138. // what the benefit is. Consider keeping the file read-write, or use fcntl to change
  139. // permissions.
  140. lf.lock.Lock()
  141. defer lf.lock.Unlock()
  142. if err := lf.munmap(); err != nil {
  143. return err
  144. }
  145. // TODO: Confirm if we need to run a file sync after truncation.
  146. // Truncation must run after unmapping, otherwise Windows would crap itself.
  147. if err := lf.fd.Truncate(int64(offset)); err != nil {
  148. return errors.Wrapf(err, "Unable to truncate file: %q", lf.path)
  149. }
  150. if err := lf.fd.Close(); err != nil {
  151. return errors.Wrapf(err, "Unable to close value log: %q", lf.path)
  152. }
  153. return lf.openReadOnly()
  154. }
  155. // You must hold lf.lock to sync()
  156. func (lf *logFile) sync() error {
  157. return lf.fd.Sync()
  158. }
  159. var errStop = errors.New("Stop iteration")
  160. var errTruncate = errors.New("Do truncate")
  161. type logEntry func(e Entry, vp valuePointer) error
  162. type safeRead struct {
  163. k []byte
  164. v []byte
  165. recordOffset uint32
  166. }
  167. func (r *safeRead) Entry(reader *bufio.Reader) (*Entry, error) {
  168. var hbuf [headerBufSize]byte
  169. var err error
  170. hash := crc32.New(y.CastagnoliCrcTable)
  171. tee := io.TeeReader(reader, hash)
  172. if _, err = io.ReadFull(tee, hbuf[:]); err != nil {
  173. return nil, err
  174. }
  175. var h header
  176. h.Decode(hbuf[:])
  177. if h.klen > maxKeySize {
  178. return nil, errTruncate
  179. }
  180. kl := int(h.klen)
  181. if cap(r.k) < kl {
  182. r.k = make([]byte, 2*kl)
  183. }
  184. vl := int(h.vlen)
  185. if cap(r.v) < vl {
  186. r.v = make([]byte, 2*vl)
  187. }
  188. e := &Entry{}
  189. e.offset = r.recordOffset
  190. e.Key = r.k[:kl]
  191. e.Value = r.v[:vl]
  192. if _, err = io.ReadFull(tee, e.Key); err != nil {
  193. if err == io.EOF {
  194. err = errTruncate
  195. }
  196. return nil, err
  197. }
  198. if _, err = io.ReadFull(tee, e.Value); err != nil {
  199. if err == io.EOF {
  200. err = errTruncate
  201. }
  202. return nil, err
  203. }
  204. var crcBuf [4]byte
  205. if _, err = io.ReadFull(reader, crcBuf[:]); err != nil {
  206. if err == io.EOF {
  207. err = errTruncate
  208. }
  209. return nil, err
  210. }
  211. crc := binary.BigEndian.Uint32(crcBuf[:])
  212. if crc != hash.Sum32() {
  213. return nil, errTruncate
  214. }
  215. e.meta = h.meta
  216. e.UserMeta = h.userMeta
  217. e.ExpiresAt = h.expiresAt
  218. return e, nil
  219. }
  220. // iterate iterates over log file. It doesn't not allocate new memory for every kv pair.
  221. // Therefore, the kv pair is only valid for the duration of fn call.
  222. func (vlog *valueLog) iterate(lf *logFile, offset uint32, fn logEntry) error {
  223. _, err := lf.fd.Seek(int64(offset), io.SeekStart)
  224. if err != nil {
  225. return y.Wrap(err)
  226. }
  227. reader := bufio.NewReader(lf.fd)
  228. read := &safeRead{
  229. k: make([]byte, 10),
  230. v: make([]byte, 10),
  231. recordOffset: offset,
  232. }
  233. truncate := false
  234. var lastCommit uint64
  235. var validEndOffset uint32
  236. var count int
  237. for {
  238. count++
  239. if count%2000 == 0 {
  240. log.Printf("Replaying log file: %d. Running count: %d\n", lf.fid, count)
  241. }
  242. e, err := read.Entry(reader)
  243. if err == io.EOF {
  244. break
  245. } else if err == io.ErrUnexpectedEOF || err == errTruncate {
  246. truncate = true
  247. break
  248. } else if err != nil {
  249. return err
  250. } else if e == nil {
  251. continue
  252. }
  253. var vp valuePointer
  254. vp.Len = uint32(headerBufSize + len(e.Key) + len(e.Value) + 4) // len(crcBuf)
  255. read.recordOffset += vp.Len
  256. vp.Offset = e.offset
  257. vp.Fid = lf.fid
  258. if e.meta&bitTxn > 0 {
  259. txnTs := y.ParseTs(e.Key)
  260. if lastCommit == 0 {
  261. lastCommit = txnTs
  262. }
  263. if lastCommit != txnTs {
  264. truncate = true
  265. break
  266. }
  267. } else if e.meta&bitFinTxn > 0 {
  268. txnTs, err := strconv.ParseUint(string(e.Value), 10, 64)
  269. if err != nil || lastCommit != txnTs {
  270. truncate = true
  271. break
  272. }
  273. // Got the end of txn. Now we can store them.
  274. lastCommit = 0
  275. validEndOffset = read.recordOffset
  276. } else {
  277. if lastCommit != 0 {
  278. // This is most likely an entry which was moved as part of GC.
  279. // We shouldn't get this entry in the middle of a transaction.
  280. truncate = true
  281. break
  282. }
  283. validEndOffset = read.recordOffset
  284. }
  285. if vlog.opt.ReadOnly {
  286. return ErrReplayNeeded
  287. }
  288. if err := fn(*e, vp); err != nil {
  289. if err == errStop {
  290. break
  291. }
  292. return y.Wrap(err)
  293. }
  294. }
  295. if vlog.opt.Truncate && truncate && len(lf.fmap) == 0 {
  296. // Only truncate if the file isn't mmaped. Otherwise, Windows would puke.
  297. if err := lf.fd.Truncate(int64(validEndOffset)); err != nil {
  298. return err
  299. }
  300. } else if truncate {
  301. return ErrTruncateNeeded
  302. }
  303. return nil
  304. }
  305. func (vlog *valueLog) rewrite(f *logFile, tr trace.Trace) error {
  306. maxFid := atomic.LoadUint32(&vlog.maxFid)
  307. y.AssertTruef(uint32(f.fid) < maxFid, "fid to move: %d. Current max fid: %d", f.fid, maxFid)
  308. tr.LazyPrintf("Rewriting fid: %d", f.fid)
  309. wb := make([]*Entry, 0, 1000)
  310. var size int64
  311. y.AssertTrue(vlog.kv != nil)
  312. var count, moved int
  313. fe := func(e Entry) error {
  314. count++
  315. if count%100000 == 0 {
  316. tr.LazyPrintf("Processing entry %d", count)
  317. }
  318. vs, err := vlog.kv.get(e.Key)
  319. if err != nil {
  320. return err
  321. }
  322. if discardEntry(e, vs) {
  323. return nil
  324. }
  325. // Value is still present in value log.
  326. if len(vs.Value) == 0 {
  327. return errors.Errorf("Empty value: %+v", vs)
  328. }
  329. var vp valuePointer
  330. vp.Decode(vs.Value)
  331. if vp.Fid > f.fid {
  332. return nil
  333. }
  334. if vp.Offset > e.offset {
  335. return nil
  336. }
  337. if vp.Fid == f.fid && vp.Offset == e.offset {
  338. moved++
  339. // This new entry only contains the key, and a pointer to the value.
  340. ne := new(Entry)
  341. ne.meta = 0 // Remove all bits. Different keyspace doesn't need these bits.
  342. ne.UserMeta = e.UserMeta
  343. // Create a new key in a separate keyspace, prefixed by moveKey. We are not
  344. // allowed to rewrite an older version of key in the LSM tree, because then this older
  345. // version would be at the top of the LSM tree. To work correctly, reads expect the
  346. // latest versions to be at the top, and the older versions at the bottom.
  347. if bytes.HasPrefix(e.Key, badgerMove) {
  348. ne.Key = append([]byte{}, e.Key...)
  349. } else {
  350. ne.Key = append([]byte{}, badgerMove...)
  351. ne.Key = append(ne.Key, e.Key...)
  352. }
  353. ne.Value = append([]byte{}, e.Value...)
  354. wb = append(wb, ne)
  355. size += int64(e.estimateSize(vlog.opt.ValueThreshold))
  356. if size >= 64*mi {
  357. tr.LazyPrintf("request has %d entries, size %d", len(wb), size)
  358. if err := vlog.kv.batchSet(wb); err != nil {
  359. return err
  360. }
  361. size = 0
  362. wb = wb[:0]
  363. }
  364. } else {
  365. log.Printf("WARNING: This entry should have been caught. %+v\n", e)
  366. }
  367. return nil
  368. }
  369. err := vlog.iterate(f, 0, func(e Entry, vp valuePointer) error {
  370. return fe(e)
  371. })
  372. if err != nil {
  373. return err
  374. }
  375. tr.LazyPrintf("request has %d entries, size %d", len(wb), size)
  376. batchSize := 1024
  377. var loops int
  378. for i := 0; i < len(wb); {
  379. loops++
  380. if batchSize == 0 {
  381. log.Printf("WARNING: We shouldn't reach batch size of zero.")
  382. return ErrNoRewrite
  383. }
  384. end := i + batchSize
  385. if end > len(wb) {
  386. end = len(wb)
  387. }
  388. if err := vlog.kv.batchSet(wb[i:end]); err != nil {
  389. if err == ErrTxnTooBig {
  390. // Decrease the batch size to half.
  391. batchSize = batchSize / 2
  392. tr.LazyPrintf("Dropped batch size to %d", batchSize)
  393. continue
  394. }
  395. return err
  396. }
  397. i += batchSize
  398. }
  399. tr.LazyPrintf("Processed %d entries in %d loops", len(wb), loops)
  400. tr.LazyPrintf("Total entries: %d. Moved: %d", count, moved)
  401. tr.LazyPrintf("Removing fid: %d", f.fid)
  402. var deleteFileNow bool
  403. // Entries written to LSM. Remove the older file now.
  404. {
  405. vlog.filesLock.Lock()
  406. // Just a sanity-check.
  407. if _, ok := vlog.filesMap[f.fid]; !ok {
  408. vlog.filesLock.Unlock()
  409. return errors.Errorf("Unable to find fid: %d", f.fid)
  410. }
  411. if vlog.numActiveIterators == 0 {
  412. delete(vlog.filesMap, f.fid)
  413. deleteFileNow = true
  414. } else {
  415. vlog.filesToBeDeleted = append(vlog.filesToBeDeleted, f.fid)
  416. }
  417. vlog.filesLock.Unlock()
  418. }
  419. if deleteFileNow {
  420. vlog.deleteLogFile(f)
  421. }
  422. return nil
  423. }
  424. func (vlog *valueLog) deleteMoveKeysFor(fid uint32, tr trace.Trace) {
  425. db := vlog.kv
  426. var result []*Entry
  427. var count, pointers uint64
  428. tr.LazyPrintf("Iterating over move keys to find invalids for fid: %d", fid)
  429. err := db.View(func(txn *Txn) error {
  430. opt := DefaultIteratorOptions
  431. opt.internalAccess = true
  432. opt.PrefetchValues = false
  433. itr := txn.NewIterator(opt)
  434. defer itr.Close()
  435. for itr.Seek(badgerMove); itr.ValidForPrefix(badgerMove); itr.Next() {
  436. count++
  437. item := itr.Item()
  438. if item.meta&bitValuePointer == 0 {
  439. continue
  440. }
  441. pointers++
  442. var vp valuePointer
  443. vp.Decode(item.vptr)
  444. if vp.Fid == fid {
  445. e := &Entry{Key: item.KeyCopy(nil), meta: bitDelete}
  446. result = append(result, e)
  447. }
  448. }
  449. return nil
  450. })
  451. if err != nil {
  452. tr.LazyPrintf("Got error while iterating move keys: %v", err)
  453. tr.SetError()
  454. return
  455. }
  456. tr.LazyPrintf("Num total move keys: %d. Num pointers: %d", count, pointers)
  457. tr.LazyPrintf("Number of invalid move keys found: %d", len(result))
  458. batchSize := 10240
  459. for i := 0; i < len(result); {
  460. end := i + batchSize
  461. if end > len(result) {
  462. end = len(result)
  463. }
  464. if err := db.batchSet(result[i:end]); err != nil {
  465. if err == ErrTxnTooBig {
  466. batchSize /= 2
  467. tr.LazyPrintf("Dropped batch size to %d", batchSize)
  468. continue
  469. }
  470. tr.LazyPrintf("Error while doing batchSet: %v", err)
  471. tr.SetError()
  472. return
  473. }
  474. i += batchSize
  475. }
  476. tr.LazyPrintf("Move keys deletion done.")
  477. return
  478. }
  479. func (vlog *valueLog) incrIteratorCount() {
  480. atomic.AddInt32(&vlog.numActiveIterators, 1)
  481. }
  482. func (vlog *valueLog) decrIteratorCount() error {
  483. num := atomic.AddInt32(&vlog.numActiveIterators, -1)
  484. if num != 0 {
  485. return nil
  486. }
  487. vlog.filesLock.Lock()
  488. lfs := make([]*logFile, 0, len(vlog.filesToBeDeleted))
  489. for _, id := range vlog.filesToBeDeleted {
  490. lfs = append(lfs, vlog.filesMap[id])
  491. delete(vlog.filesMap, id)
  492. }
  493. vlog.filesToBeDeleted = nil
  494. vlog.filesLock.Unlock()
  495. for _, lf := range lfs {
  496. if err := vlog.deleteLogFile(lf); err != nil {
  497. return err
  498. }
  499. }
  500. return nil
  501. }
  502. func (vlog *valueLog) deleteLogFile(lf *logFile) error {
  503. path := vlog.fpath(lf.fid)
  504. if err := lf.munmap(); err != nil {
  505. _ = lf.fd.Close()
  506. return err
  507. }
  508. if err := lf.fd.Close(); err != nil {
  509. return err
  510. }
  511. return os.Remove(path)
  512. }
  513. // lfDiscardStats keeps track of the amount of data that could be discarded for
  514. // a given logfile.
  515. type lfDiscardStats struct {
  516. sync.Mutex
  517. m map[uint32]int64
  518. }
  519. type valueLog struct {
  520. buf bytes.Buffer
  521. dirPath string
  522. elog trace.EventLog
  523. // guards our view of which files exist, which to be deleted, how many active iterators
  524. filesLock sync.RWMutex
  525. filesMap map[uint32]*logFile
  526. filesToBeDeleted []uint32
  527. // A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted.
  528. numActiveIterators int32
  529. kv *DB
  530. maxFid uint32
  531. writableLogOffset uint32
  532. numEntriesWritten uint32
  533. opt Options
  534. garbageCh chan struct{}
  535. lfDiscardStats *lfDiscardStats
  536. }
  537. func vlogFilePath(dirPath string, fid uint32) string {
  538. return fmt.Sprintf("%s%s%06d.vlog", dirPath, string(os.PathSeparator), fid)
  539. }
  540. func (vlog *valueLog) fpath(fid uint32) string {
  541. return vlogFilePath(vlog.dirPath, fid)
  542. }
  543. func (vlog *valueLog) openOrCreateFiles(readOnly bool) error {
  544. files, err := ioutil.ReadDir(vlog.dirPath)
  545. if err != nil {
  546. return errors.Wrapf(err, "Error while opening value log")
  547. }
  548. found := make(map[uint64]struct{})
  549. var maxFid uint32 // Beware len(files) == 0 case, this starts at 0.
  550. for _, file := range files {
  551. if !strings.HasSuffix(file.Name(), ".vlog") {
  552. continue
  553. }
  554. fsz := len(file.Name())
  555. fid, err := strconv.ParseUint(file.Name()[:fsz-5], 10, 32)
  556. if err != nil {
  557. return errors.Wrapf(err, "Error while parsing value log id for file: %q", file.Name())
  558. }
  559. if _, ok := found[fid]; ok {
  560. return errors.Errorf("Found the same value log file twice: %d", fid)
  561. }
  562. found[fid] = struct{}{}
  563. lf := &logFile{
  564. fid: uint32(fid),
  565. path: vlog.fpath(uint32(fid)),
  566. loadingMode: vlog.opt.ValueLogLoadingMode,
  567. }
  568. vlog.filesMap[uint32(fid)] = lf
  569. if uint32(fid) > maxFid {
  570. maxFid = uint32(fid)
  571. }
  572. }
  573. vlog.maxFid = uint32(maxFid)
  574. // Open all previous log files as read only. Open the last log file
  575. // as read write (unless the DB is read only).
  576. for fid, lf := range vlog.filesMap {
  577. if fid == maxFid {
  578. var flags uint32
  579. if vlog.opt.SyncWrites {
  580. flags |= y.Sync
  581. }
  582. if readOnly {
  583. flags |= y.ReadOnly
  584. }
  585. if lf.fd, err = y.OpenExistingFile(vlog.fpath(fid), flags); err != nil {
  586. return errors.Wrapf(err, "Unable to open value log file")
  587. }
  588. } else {
  589. if err := lf.openReadOnly(); err != nil {
  590. return err
  591. }
  592. }
  593. }
  594. // If no files are found, then create a new file.
  595. if len(vlog.filesMap) == 0 {
  596. // We already set vlog.maxFid above
  597. _, err := vlog.createVlogFile(0)
  598. if err != nil {
  599. return err
  600. }
  601. }
  602. return nil
  603. }
  604. func (vlog *valueLog) createVlogFile(fid uint32) (*logFile, error) {
  605. path := vlog.fpath(fid)
  606. lf := &logFile{fid: fid, path: path, loadingMode: vlog.opt.ValueLogLoadingMode}
  607. vlog.writableLogOffset = 0
  608. vlog.numEntriesWritten = 0
  609. var err error
  610. if lf.fd, err = y.CreateSyncedFile(path, vlog.opt.SyncWrites); err != nil {
  611. return nil, errors.Wrapf(err, "Unable to create value log file")
  612. }
  613. if err = syncDir(vlog.dirPath); err != nil {
  614. return nil, errors.Wrapf(err, "Unable to sync value log file dir")
  615. }
  616. vlog.filesLock.Lock()
  617. vlog.filesMap[fid] = lf
  618. vlog.filesLock.Unlock()
  619. return lf, nil
  620. }
  621. func (vlog *valueLog) Open(kv *DB, opt Options) error {
  622. vlog.dirPath = opt.ValueDir
  623. vlog.opt = opt
  624. vlog.kv = kv
  625. vlog.filesMap = make(map[uint32]*logFile)
  626. if err := vlog.openOrCreateFiles(kv.opt.ReadOnly); err != nil {
  627. return errors.Wrapf(err, "Unable to open value log")
  628. }
  629. vlog.elog = trace.NewEventLog("Badger", "Valuelog")
  630. vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.
  631. vlog.lfDiscardStats = &lfDiscardStats{m: make(map[uint32]int64)}
  632. return nil
  633. }
  634. func (vlog *valueLog) Close() error {
  635. vlog.elog.Printf("Stopping garbage collection of values.")
  636. defer vlog.elog.Finish()
  637. var err error
  638. for id, f := range vlog.filesMap {
  639. f.lock.Lock() // We won’t release the lock.
  640. if munmapErr := f.munmap(); munmapErr != nil && err == nil {
  641. err = munmapErr
  642. }
  643. if !vlog.opt.ReadOnly && id == vlog.maxFid {
  644. // truncate writable log file to correct offset.
  645. if truncErr := f.fd.Truncate(
  646. int64(vlog.writableLogOffset)); truncErr != nil && err == nil {
  647. err = truncErr
  648. }
  649. }
  650. if closeErr := f.fd.Close(); closeErr != nil && err == nil {
  651. err = closeErr
  652. }
  653. }
  654. return err
  655. }
  656. // sortedFids returns the file id's not pending deletion, sorted. Assumes we have shared access to
  657. // filesMap.
  658. func (vlog *valueLog) sortedFids() []uint32 {
  659. toBeDeleted := make(map[uint32]struct{})
  660. for _, fid := range vlog.filesToBeDeleted {
  661. toBeDeleted[fid] = struct{}{}
  662. }
  663. ret := make([]uint32, 0, len(vlog.filesMap))
  664. for fid := range vlog.filesMap {
  665. if _, ok := toBeDeleted[fid]; !ok {
  666. ret = append(ret, fid)
  667. }
  668. }
  669. sort.Slice(ret, func(i, j int) bool {
  670. return ret[i] < ret[j]
  671. })
  672. return ret
  673. }
  674. // Replay replays the value log. The kv provided is only valid for the lifetime of function call.
  675. func (vlog *valueLog) Replay(ptr valuePointer, fn logEntry) error {
  676. fid := ptr.Fid
  677. offset := ptr.Offset + ptr.Len
  678. vlog.elog.Printf("Seeking at value pointer: %+v\n", ptr)
  679. log.Printf("Replaying from value pointer: %+v\n", ptr)
  680. fids := vlog.sortedFids()
  681. for _, id := range fids {
  682. if id < fid {
  683. continue
  684. }
  685. of := offset
  686. if id > fid {
  687. of = 0
  688. }
  689. f := vlog.filesMap[id]
  690. log.Printf("Iterating file id: %d", id)
  691. now := time.Now()
  692. err := vlog.iterate(f, of, fn)
  693. log.Printf("Iteration took: %s\n", time.Since(now))
  694. if err != nil {
  695. return errors.Wrapf(err, "Unable to replay value log: %q", f.path)
  696. }
  697. }
  698. // Seek to the end to start writing.
  699. var err error
  700. last := vlog.filesMap[vlog.maxFid]
  701. lastOffset, err := last.fd.Seek(0, io.SeekEnd)
  702. atomic.AddUint32(&vlog.writableLogOffset, uint32(lastOffset))
  703. return errors.Wrapf(err, "Unable to seek to end of value log: %q", last.path)
  704. }
  705. type request struct {
  706. // Input values
  707. Entries []*Entry
  708. // Output values and wait group stuff below
  709. Ptrs []valuePointer
  710. Wg sync.WaitGroup
  711. Err error
  712. }
  713. func (req *request) Wait() error {
  714. req.Wg.Wait()
  715. req.Entries = nil
  716. err := req.Err
  717. requestPool.Put(req)
  718. return err
  719. }
  720. // sync is thread-unsafe and should not be called concurrently with write.
  721. func (vlog *valueLog) sync() error {
  722. if vlog.opt.SyncWrites {
  723. return nil
  724. }
  725. vlog.filesLock.RLock()
  726. if len(vlog.filesMap) == 0 {
  727. vlog.filesLock.RUnlock()
  728. return nil
  729. }
  730. curlf := vlog.filesMap[vlog.maxFid]
  731. curlf.lock.RLock()
  732. vlog.filesLock.RUnlock()
  733. dirSyncCh := make(chan error)
  734. go func() { dirSyncCh <- syncDir(vlog.opt.ValueDir) }()
  735. err := curlf.sync()
  736. curlf.lock.RUnlock()
  737. dirSyncErr := <-dirSyncCh
  738. if err != nil {
  739. err = dirSyncErr
  740. }
  741. return err
  742. }
  743. func (vlog *valueLog) writableOffset() uint32 {
  744. return atomic.LoadUint32(&vlog.writableLogOffset)
  745. }
  746. // write is thread-unsafe by design and should not be called concurrently.
  747. func (vlog *valueLog) write(reqs []*request) error {
  748. vlog.filesLock.RLock()
  749. curlf := vlog.filesMap[vlog.maxFid]
  750. vlog.filesLock.RUnlock()
  751. toDisk := func() error {
  752. if vlog.buf.Len() == 0 {
  753. return nil
  754. }
  755. vlog.elog.Printf("Flushing %d blocks of total size: %d", len(reqs), vlog.buf.Len())
  756. n, err := curlf.fd.Write(vlog.buf.Bytes())
  757. if err != nil {
  758. return errors.Wrapf(err, "Unable to write to value log file: %q", curlf.path)
  759. }
  760. y.NumWrites.Add(1)
  761. y.NumBytesWritten.Add(int64(n))
  762. vlog.elog.Printf("Done")
  763. atomic.AddUint32(&vlog.writableLogOffset, uint32(n))
  764. vlog.buf.Reset()
  765. if vlog.writableOffset() > uint32(vlog.opt.ValueLogFileSize) ||
  766. vlog.numEntriesWritten > vlog.opt.ValueLogMaxEntries {
  767. var err error
  768. if err = curlf.doneWriting(vlog.writableLogOffset); err != nil {
  769. return err
  770. }
  771. newid := atomic.AddUint32(&vlog.maxFid, 1)
  772. y.AssertTruef(newid > 0, "newid has overflown uint32: %v", newid)
  773. newlf, err := vlog.createVlogFile(newid)
  774. if err != nil {
  775. return err
  776. }
  777. if err = newlf.mmap(2 * vlog.opt.ValueLogFileSize); err != nil {
  778. return err
  779. }
  780. curlf = newlf
  781. }
  782. return nil
  783. }
  784. for i := range reqs {
  785. b := reqs[i]
  786. b.Ptrs = b.Ptrs[:0]
  787. for j := range b.Entries {
  788. e := b.Entries[j]
  789. var p valuePointer
  790. p.Fid = curlf.fid
  791. // Use the offset including buffer length so far.
  792. p.Offset = vlog.writableOffset() + uint32(vlog.buf.Len())
  793. plen, err := encodeEntry(e, &vlog.buf) // Now encode the entry into buffer.
  794. if err != nil {
  795. return err
  796. }
  797. p.Len = uint32(plen)
  798. b.Ptrs = append(b.Ptrs, p)
  799. }
  800. vlog.numEntriesWritten += uint32(len(b.Entries))
  801. // We write to disk here so that all entries that are part of the same transaction are
  802. // written to the same vlog file.
  803. writeNow :=
  804. vlog.writableOffset()+uint32(vlog.buf.Len()) > uint32(vlog.opt.ValueLogFileSize) ||
  805. vlog.numEntriesWritten > uint32(vlog.opt.ValueLogMaxEntries)
  806. if writeNow {
  807. if err := toDisk(); err != nil {
  808. return err
  809. }
  810. }
  811. }
  812. return toDisk()
  813. // Acquire mutex locks around this manipulation, so that the reads don't try to use
  814. // an invalid file descriptor.
  815. }
  816. // Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file
  817. // (if non-nil)
  818. func (vlog *valueLog) getFileRLocked(fid uint32) (*logFile, error) {
  819. vlog.filesLock.RLock()
  820. defer vlog.filesLock.RUnlock()
  821. ret, ok := vlog.filesMap[fid]
  822. if !ok {
  823. // log file has gone away, will need to retry the operation.
  824. return nil, ErrRetry
  825. }
  826. ret.lock.RLock()
  827. return ret, nil
  828. }
  829. // Read reads the value log at a given location.
  830. // TODO: Make this read private.
  831. func (vlog *valueLog) Read(vp valuePointer, s *y.Slice) ([]byte, func(), error) {
  832. // Check for valid offset if we are reading to writable log.
  833. if vp.Fid == vlog.maxFid && vp.Offset >= vlog.writableOffset() {
  834. return nil, nil, errors.Errorf(
  835. "Invalid value pointer offset: %d greater than current offset: %d",
  836. vp.Offset, vlog.writableOffset())
  837. }
  838. buf, cb, err := vlog.readValueBytes(vp, s)
  839. if err != nil {
  840. return nil, cb, err
  841. }
  842. var h header
  843. h.Decode(buf)
  844. n := uint32(headerBufSize) + h.klen
  845. return buf[n : n+h.vlen], cb, nil
  846. }
  847. func (vlog *valueLog) readValueBytes(vp valuePointer, s *y.Slice) ([]byte, func(), error) {
  848. lf, err := vlog.getFileRLocked(vp.Fid)
  849. if err != nil {
  850. return nil, nil, err
  851. }
  852. buf, err := lf.read(vp, s)
  853. if vlog.opt.ValueLogLoadingMode == options.MemoryMap {
  854. return buf, lf.lock.RUnlock, err
  855. }
  856. // If we are using File I/O we unlock the file immediately
  857. // and return an empty function as callback.
  858. lf.lock.RUnlock()
  859. return buf, nil, err
  860. }
  861. // Test helper
  862. func valueBytesToEntry(buf []byte) (e Entry) {
  863. var h header
  864. h.Decode(buf)
  865. n := uint32(headerBufSize)
  866. e.Key = buf[n : n+h.klen]
  867. n += h.klen
  868. e.meta = h.meta
  869. e.UserMeta = h.userMeta
  870. e.Value = buf[n : n+h.vlen]
  871. return
  872. }
  873. func (vlog *valueLog) pickLog(head valuePointer, tr trace.Trace) (files []*logFile) {
  874. vlog.filesLock.RLock()
  875. defer vlog.filesLock.RUnlock()
  876. fids := vlog.sortedFids()
  877. if len(fids) <= 1 {
  878. tr.LazyPrintf("Only one or less value log file.")
  879. return nil
  880. } else if head.Fid == 0 {
  881. tr.LazyPrintf("Head pointer is at zero.")
  882. return nil
  883. }
  884. // Pick a candidate that contains the largest amount of discardable data
  885. candidate := struct {
  886. fid uint32
  887. discard int64
  888. }{math.MaxUint32, 0}
  889. vlog.lfDiscardStats.Lock()
  890. for _, fid := range fids {
  891. if fid >= head.Fid {
  892. break
  893. }
  894. if vlog.lfDiscardStats.m[fid] > candidate.discard {
  895. candidate.fid = fid
  896. candidate.discard = vlog.lfDiscardStats.m[fid]
  897. }
  898. }
  899. vlog.lfDiscardStats.Unlock()
  900. if candidate.fid != math.MaxUint32 { // Found a candidate
  901. tr.LazyPrintf("Found candidate via discard stats: %v", candidate)
  902. files = append(files, vlog.filesMap[candidate.fid])
  903. } else {
  904. tr.LazyPrintf("Could not find candidate via discard stats. Randomly picking one.")
  905. }
  906. // Fallback to randomly picking a log file
  907. var idxHead int
  908. for i, fid := range fids {
  909. if fid == head.Fid {
  910. idxHead = i
  911. break
  912. }
  913. }
  914. if idxHead == 0 { // Not found or first file
  915. tr.LazyPrintf("Could not find any file.")
  916. return nil
  917. }
  918. idx := rand.Intn(idxHead) // Don’t include head.Fid. We pick a random file before it.
  919. if idx > 0 {
  920. idx = rand.Intn(idx + 1) // Another level of rand to favor smaller fids.
  921. }
  922. tr.LazyPrintf("Randomly chose fid: %d", fids[idx])
  923. files = append(files, vlog.filesMap[fids[idx]])
  924. return files
  925. }
  926. func discardEntry(e Entry, vs y.ValueStruct) bool {
  927. if vs.Version != y.ParseTs(e.Key) {
  928. // Version not found. Discard.
  929. return true
  930. }
  931. if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
  932. return true
  933. }
  934. if (vs.Meta & bitValuePointer) == 0 {
  935. // Key also stores the value in LSM. Discard.
  936. return true
  937. }
  938. if (vs.Meta & bitFinTxn) > 0 {
  939. // Just a txn finish entry. Discard.
  940. return true
  941. }
  942. return false
  943. }
  944. func (vlog *valueLog) doRunGC(lf *logFile, discardRatio float64, tr trace.Trace) (err error) {
  945. // Update stats before exiting
  946. defer func() {
  947. if err == nil {
  948. vlog.lfDiscardStats.Lock()
  949. delete(vlog.lfDiscardStats.m, lf.fid)
  950. vlog.lfDiscardStats.Unlock()
  951. }
  952. }()
  953. type reason struct {
  954. total float64
  955. discard float64
  956. count int
  957. }
  958. fi, err := lf.fd.Stat()
  959. if err != nil {
  960. tr.LazyPrintf("Error while finding file size: %v", err)
  961. tr.SetError()
  962. return err
  963. }
  964. // Set up the sampling window sizes.
  965. sizeWindow := float64(fi.Size()) * 0.1 // 10% of the file as window.
  966. countWindow := int(float64(vlog.opt.ValueLogMaxEntries) * 0.01) // 1% of num entries.
  967. tr.LazyPrintf("Size window: %5.2f. Count window: %d.", sizeWindow, countWindow)
  968. // Pick a random start point for the log.
  969. skipFirstM := float64(rand.Int63n(fi.Size())) // Pick a random starting location.
  970. skipFirstM -= sizeWindow // Avoid hitting EOF by moving back by window.
  971. skipFirstM /= float64(mi) // Convert to MBs.
  972. tr.LazyPrintf("Skip first %5.2f MB of file of size: %d MB", skipFirstM, fi.Size()/mi)
  973. var skipped float64
  974. var r reason
  975. start := time.Now()
  976. y.AssertTrue(vlog.kv != nil)
  977. s := new(y.Slice)
  978. var numIterations int
  979. err = vlog.iterate(lf, 0, func(e Entry, vp valuePointer) error {
  980. numIterations++
  981. esz := float64(vp.Len) / (1 << 20) // in MBs.
  982. if skipped < skipFirstM {
  983. skipped += esz
  984. return nil
  985. }
  986. // Sample until we reach the window sizes or exceed 10 seconds.
  987. if r.count > countWindow {
  988. tr.LazyPrintf("Stopping sampling after %d entries.", countWindow)
  989. return errStop
  990. }
  991. if r.total > sizeWindow {
  992. tr.LazyPrintf("Stopping sampling after reaching window size.")
  993. return errStop
  994. }
  995. if time.Since(start) > 10*time.Second {
  996. tr.LazyPrintf("Stopping sampling after 10 seconds.")
  997. return errStop
  998. }
  999. r.total += esz
  1000. r.count++
  1001. vs, err := vlog.kv.get(e.Key)
  1002. if err != nil {
  1003. return err
  1004. }
  1005. if discardEntry(e, vs) {
  1006. r.discard += esz
  1007. return nil
  1008. }
  1009. // Value is still present in value log.
  1010. y.AssertTrue(len(vs.Value) > 0)
  1011. vp.Decode(vs.Value)
  1012. if vp.Fid > lf.fid {
  1013. // Value is present in a later log. Discard.
  1014. r.discard += esz
  1015. return nil
  1016. }
  1017. if vp.Offset > e.offset {
  1018. // Value is present in a later offset, but in the same log.
  1019. r.discard += esz
  1020. return nil
  1021. }
  1022. if vp.Fid == lf.fid && vp.Offset == e.offset {
  1023. // This is still the active entry. This would need to be rewritten.
  1024. } else {
  1025. vlog.elog.Printf("Reason=%+v\n", r)
  1026. buf, cb, err := vlog.readValueBytes(vp, s)
  1027. if err != nil {
  1028. return errStop
  1029. }
  1030. ne := valueBytesToEntry(buf)
  1031. ne.offset = vp.Offset
  1032. ne.print("Latest Entry Header in LSM")
  1033. e.print("Latest Entry in Log")
  1034. runCallback(cb)
  1035. return errors.Errorf("This shouldn't happen. Latest Pointer:%+v. Meta:%v.",
  1036. vp, vs.Meta)
  1037. }
  1038. return nil
  1039. })
  1040. if err != nil {
  1041. tr.LazyPrintf("Error while iterating for RunGC: %v", err)
  1042. tr.SetError()
  1043. return err
  1044. }
  1045. tr.LazyPrintf("Fid: %d. Skipped: %5.2fMB Num iterations: %d. Data status=%+v\n",
  1046. lf.fid, skipped, numIterations, r)
  1047. // If we couldn't sample at least a 1000 KV pairs or at least 75% of the window size,
  1048. // and what we can discard is below the threshold, we should skip the rewrite.
  1049. if (r.count < countWindow && r.total < sizeWindow*0.75) || r.discard < discardRatio*r.total {
  1050. tr.LazyPrintf("Skipping GC on fid: %d", lf.fid)
  1051. return ErrNoRewrite
  1052. }
  1053. if err = vlog.rewrite(lf, tr); err != nil {
  1054. return err
  1055. }
  1056. tr.LazyPrintf("Done rewriting.")
  1057. return nil
  1058. }
  1059. func (vlog *valueLog) waitOnGC(lc *y.Closer) {
  1060. defer lc.Done()
  1061. <-lc.HasBeenClosed() // Wait for lc to be closed.
  1062. // Block any GC in progress to finish, and don't allow any more writes to runGC by filling up
  1063. // the channel of size 1.
  1064. vlog.garbageCh <- struct{}{}
  1065. }
  1066. func (vlog *valueLog) runGC(discardRatio float64, head valuePointer) error {
  1067. select {
  1068. case vlog.garbageCh <- struct{}{}:
  1069. // Pick a log file for GC.
  1070. tr := trace.New("Badger.ValueLog", "GC")
  1071. tr.SetMaxEvents(100)
  1072. defer func() {
  1073. tr.Finish()
  1074. <-vlog.garbageCh
  1075. }()
  1076. var err error
  1077. files := vlog.pickLog(head, tr)
  1078. if len(files) == 0 {
  1079. tr.LazyPrintf("PickLog returned zero results.")
  1080. return ErrNoRewrite
  1081. }
  1082. tried := make(map[uint32]bool)
  1083. for _, lf := range files {
  1084. if _, done := tried[lf.fid]; done {
  1085. continue
  1086. }
  1087. tried[lf.fid] = true
  1088. err = vlog.doRunGC(lf, discardRatio, tr)
  1089. if err == nil {
  1090. vlog.deleteMoveKeysFor(lf.fid, tr)
  1091. return nil
  1092. }
  1093. }
  1094. return err
  1095. default:
  1096. return ErrRejected
  1097. }
  1098. }
  1099. func (vlog *valueLog) updateGCStats(stats map[uint32]int64) {
  1100. vlog.lfDiscardStats.Lock()
  1101. for fid, sz := range stats {
  1102. vlog.lfDiscardStats.m[fid] += sz
  1103. }
  1104. vlog.lfDiscardStats.Unlock()
  1105. }