implement BlockstoreMover in badger

This commit is contained in:
vyzo 2021-07-11 14:33:15 +03:00
parent 5048c3f717
commit b741d61b20

View File

@ -8,6 +8,7 @@ import (
"path/filepath"
"runtime"
"sync"
"time"
"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
@ -35,6 +36,8 @@ var (
log = logger.Logger("badgerbs")
)
const moveBatchSize = 16384
// aliases to mask badger dependencies.
const (
// FileIO is equivalent to badger/options.FileIO.
@ -80,13 +83,26 @@ const (
stateClosed
)
const (
moveStateNone = iota
moveStateMoving
moveStateCleanup
moveStateLock
)
// Blockstore is a badger-backed IPLD blockstore.
type Blockstore struct {
stateLk sync.RWMutex
state int
viewers sync.WaitGroup
DB *badger.DB
moveMx sync.RWMutex
moveCond *sync.Cond
moveState int
rlock int
db *badger.DB
db2 *badger.DB // when moving
opts Options
prefixing bool
@ -113,13 +129,15 @@ func Open(opts Options) (*Blockstore, error) {
return nil, fmt.Errorf("failed to open badger blockstore: %w", err)
}
bs := &Blockstore{DB: db, opts: opts}
bs := &Blockstore{db: db, opts: opts}
if p := opts.Prefix; p != "" {
bs.prefixing = true
bs.prefix = []byte(p)
bs.prefixLen = len(bs.prefix)
}
bs.moveCond = sync.NewCond(&bs.moveMx)
return bs, nil
}
@ -143,7 +161,7 @@ func (b *Blockstore) Close() error {
// wait for all accesses to complete
b.viewers.Wait()
return b.DB.Close()
return b.db.Close()
}
func (b *Blockstore) access() error {
@ -165,26 +183,252 @@ func (b *Blockstore) isOpen() bool {
return b.state == stateOpen
}
// CollectGarbage runs garbage collection on the value log
// lockDB/unlockDB implement a recursive lock contigent on move state
func (b *Blockstore) lockDB() {
b.moveMx.Lock()
defer b.moveMx.Unlock()
if b.rlock == 0 {
for b.moveState == moveStateLock {
b.moveCond.Wait()
}
}
b.rlock++
}
func (b *Blockstore) unlockDB() {
b.moveMx.Lock()
defer b.moveMx.Unlock()
b.rlock--
if b.rlock == 0 && b.moveState == moveStateLock {
b.moveCond.Broadcast()
}
}
// lockMove/unlockMove implement an exclusive lock of move state
func (b *Blockstore) lockMove() {
b.moveMx.Lock()
b.moveState = moveStateLock
for b.rlock > 0 {
b.moveCond.Wait()
}
}
func (b *Blockstore) unlockMove(state int) {
b.moveState = state
b.moveCond.Broadcast()
b.moveMx.Unlock()
}
// MoveTo implements the BlockstoreMover trait
func (b *Blockstore) MoveTo(path string, filter func(cid.Cid) bool) error {
if err := b.access(); err != nil {
return err
}
defer b.viewers.Done()
// this inlines moveLock/moveUnlock for the initial state check to prevent a second move
// while one is in progress without clobbering state
b.moveMx.Lock()
if b.moveState != moveStateNone {
b.moveMx.Unlock()
return fmt.Errorf("move in progress")
}
b.moveState = moveStateLock
for b.rlock > 0 {
b.moveCond.Wait()
}
b.moveState = moveStateMoving
b.moveCond.Broadcast()
b.moveMx.Unlock()
if path == "" {
path = fmt.Sprintf("%s.%d", b.opts.Dir, time.Now().Unix())
}
defer func() {
b.lockMove()
db2 := b.db2
b.db2 = nil
var state int
if db2 != nil {
state = moveStateCleanup
} else {
state = moveStateNone
}
b.unlockMove(state)
if db2 != nil {
err := db2.Close()
if err != nil {
log.Warnf("error closing badger db: %s", err)
}
b.deleteDB(path)
b.lockMove()
b.unlockMove(moveStateNone)
}
}()
opts := b.opts
opts.Dir = path
opts.ValueDir = path
db2, err := badger.Open(opts.Options)
if err != nil {
return fmt.Errorf("failed to open badger blockstore in %s: %w", path, err)
}
b.lockMove()
b.db2 = db2
b.unlockMove(moveStateMoving)
err = b.doCopy(b.db, b.db2, filter)
if err != nil {
return fmt.Errorf("error moving badger blockstore to %s: %w", path, err)
}
b.lockMove()
db1 := b.db
b.db = b.db2
b.db2 = nil
b.unlockMove(moveStateCleanup)
err = db1.Close()
if err != nil {
log.Warnf("error closing badger db: %s", err)
}
dbpath := b.opts.Dir
oldpath := fmt.Sprintf("%s.old.%d", dbpath, time.Now().Unix())
ok := true
err = os.Rename(dbpath, oldpath)
if err != nil {
// this is bad, but not catastrophic; new data will be written in db2 and user can fix
log.Errorf("error renaming badger db dir from %s to %s; USER ACTION REQUIRED", dbpath, oldpath)
ok = false
}
if ok {
err = os.Symlink(path, dbpath)
if err != nil {
// ditto, this is bad, but not catastrophic; user can fix
log.Errorf("error symlinking badger db dir from %s to %s; USER ACTION REQUIRED", path, dbpath)
}
b.deleteDB(oldpath)
}
return nil
}
func (b *Blockstore) doCopy(from, to *badger.DB, filter func(cid.Cid) bool) error {
count := 0
batch := to.NewWriteBatch()
defer batch.Cancel()
txn := from.NewTransaction(false)
defer txn.Discard()
opts := badger.IteratorOptions{PrefetchSize: moveBatchSize}
iter := txn.NewIterator(opts)
defer iter.Close()
var buf []byte
for iter.Rewind(); iter.Valid(); iter.Next() {
if !b.isOpen() {
return ErrBlockstoreClosed
}
item := iter.Item()
if filter != nil {
k := item.Key()
if b.prefixing {
k = k[b.prefixLen:]
}
klen := base32.RawStdEncoding.DecodedLen(len(k))
if klen > len(buf) {
buf = make([]byte, klen)
}
n, err := base32.RawStdEncoding.Decode(buf, k)
if err != nil {
return err
}
c := cid.NewCidV1(cid.Raw, buf[:n])
if !filter(c) {
continue
}
}
k := item.KeyCopy(nil)
v, err := item.ValueCopy(nil)
if err != nil {
return err
}
if err := batch.Set(k, v); err != nil {
return err
}
count++
if count == moveBatchSize {
if err := batch.Flush(); err != nil {
return err
}
count = 0
}
}
if count > 0 {
return batch.Flush()
}
return nil
}
func (b *Blockstore) deleteDB(path string) {
err := os.RemoveAll(path)
if err != nil {
log.Warnf("error deleting db at %s: %s", path, err)
}
}
// CollectGarbage compacts and runs garbage collection on the value log;
// implements the BlockstoreGC trait
func (b *Blockstore) CollectGarbage() error {
if err := b.access(); err != nil {
return err
}
defer b.viewers.Done()
b.lockDB()
defer b.unlockDB()
// compact first to gather the necessary statistics for GC
nworkers := runtime.NumCPU() / 2
if nworkers < 2 {
nworkers = 2
}
err := b.DB.Flatten(nworkers)
err := b.db.Flatten(nworkers)
if err != nil {
return err
}
for err == nil {
err = b.DB.RunValueLogGC(0.125)
err = b.db.RunValueLogGC(0.125)
}
if err == badger.ErrNoRewrite {
@ -202,7 +446,10 @@ func (b *Blockstore) Size() (int64, error) {
}
defer b.viewers.Done()
lsm, vlog := b.DB.Size()
b.lockDB()
defer b.unlockDB()
lsm, vlog := b.db.Size()
size := lsm + vlog
if size == 0 {
@ -234,12 +481,15 @@ func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error {
}
defer b.viewers.Done()
b.lockDB()
defer b.unlockDB()
k, pooled := b.PooledStorageKey(cid)
if pooled {
defer KeyPool.Put(k)
}
return b.DB.View(func(txn *badger.Txn) error {
return b.db.View(func(txn *badger.Txn) error {
switch item, err := txn.Get(k); err {
case nil:
return item.Value(fn)
@ -258,12 +508,15 @@ func (b *Blockstore) Has(cid cid.Cid) (bool, error) {
}
defer b.viewers.Done()
b.lockDB()
defer b.unlockDB()
k, pooled := b.PooledStorageKey(cid)
if pooled {
defer KeyPool.Put(k)
}
err := b.DB.View(func(txn *badger.Txn) error {
err := b.db.View(func(txn *badger.Txn) error {
_, err := txn.Get(k)
return err
})
@ -289,13 +542,16 @@ func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) {
}
defer b.viewers.Done()
b.lockDB()
defer b.unlockDB()
k, pooled := b.PooledStorageKey(cid)
if pooled {
defer KeyPool.Put(k)
}
var val []byte
err := b.DB.View(func(txn *badger.Txn) error {
err := b.db.View(func(txn *badger.Txn) error {
switch item, err := txn.Get(k); err {
case nil:
val, err = item.ValueCopy(nil)
@ -319,13 +575,16 @@ func (b *Blockstore) GetSize(cid cid.Cid) (int, error) {
}
defer b.viewers.Done()
b.lockDB()
defer b.unlockDB()
k, pooled := b.PooledStorageKey(cid)
if pooled {
defer KeyPool.Put(k)
}
var size int
err := b.DB.View(func(txn *badger.Txn) error {
err := b.db.View(func(txn *badger.Txn) error {
switch item, err := txn.Get(k); err {
case nil:
size = int(item.ValueSize())
@ -349,18 +608,36 @@ func (b *Blockstore) Put(block blocks.Block) error {
}
defer b.viewers.Done()
b.lockDB()
defer b.unlockDB()
k, pooled := b.PooledStorageKey(block.Cid())
if pooled {
defer KeyPool.Put(k)
}
err := b.DB.Update(func(txn *badger.Txn) error {
return txn.Set(k, block.RawData())
})
if err != nil {
err = fmt.Errorf("failed to put block in badger blockstore: %w", err)
put := func(db *badger.DB) error {
err := db.Update(func(txn *badger.Txn) error {
return txn.Set(k, block.RawData())
})
if err != nil {
return fmt.Errorf("failed to put block in badger blockstore: %w", err)
}
return nil
}
return err
if err := put(b.db); err != nil {
return err
}
if b.db2 != nil {
if err := put(b.db2); err != nil {
return err
}
}
return nil
}
// PutMany implements Blockstore.PutMany.
@ -370,6 +647,9 @@ func (b *Blockstore) PutMany(blocks []blocks.Block) error {
}
defer b.viewers.Done()
b.lockDB()
defer b.unlockDB()
// toReturn tracks the byte slices to return to the pool, if we're using key
// prefixing. we can't return each slice to the pool after each Set, because
// badger holds on to the slice.
@ -383,24 +663,45 @@ func (b *Blockstore) PutMany(blocks []blocks.Block) error {
}()
}
batch := b.DB.NewWriteBatch()
defer batch.Cancel()
keys := make([][]byte, 0, len(blocks))
for _, block := range blocks {
k, pooled := b.PooledStorageKey(block.Cid())
if pooled {
toReturn = append(toReturn, k)
}
if err := batch.Set(k, block.RawData()); err != nil {
keys = append(keys, k)
}
put := func(db *badger.DB) error {
batch := db.NewWriteBatch()
defer batch.Cancel()
for i, block := range blocks {
k := keys[i]
if err := batch.Set(k, block.RawData()); err != nil {
return err
}
}
err := batch.Flush()
if err != nil {
return fmt.Errorf("failed to put blocks in badger blockstore: %w", err)
}
return nil
}
if err := put(b.db); err != nil {
return err
}
if b.db2 != nil {
if err := put(b.db2); err != nil {
return err
}
}
err := batch.Flush()
if err != nil {
err = fmt.Errorf("failed to put blocks in badger blockstore: %w", err)
}
return err
return nil
}
// DeleteBlock implements Blockstore.DeleteBlock.
@ -410,12 +711,15 @@ func (b *Blockstore) DeleteBlock(cid cid.Cid) error {
}
defer b.viewers.Done()
b.lockDB()
defer b.unlockDB()
k, pooled := b.PooledStorageKey(cid)
if pooled {
defer KeyPool.Put(k)
}
return b.DB.Update(func(txn *badger.Txn) error {
return b.db.Update(func(txn *badger.Txn) error {
return txn.Delete(k)
})
}
@ -426,6 +730,9 @@ func (b *Blockstore) DeleteMany(cids []cid.Cid) error {
}
defer b.viewers.Done()
b.lockDB()
defer b.unlockDB()
// toReturn tracks the byte slices to return to the pool, if we're using key
// prefixing. we can't return each slice to the pool after each Set, because
// badger holds on to the slice.
@ -439,7 +746,7 @@ func (b *Blockstore) DeleteMany(cids []cid.Cid) error {
}()
}
batch := b.DB.NewWriteBatch()
batch := b.db.NewWriteBatch()
defer batch.Cancel()
for _, cid := range cids {
@ -465,7 +772,10 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, err
}
txn := b.DB.NewTransaction(false)
b.lockDB()
defer b.unlockDB()
txn := b.db.NewTransaction(false)
opts := badger.IteratorOptions{PrefetchSize: 100}
if b.prefixing {
opts.Prefix = b.prefix
@ -519,7 +829,10 @@ func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error {
}
defer b.viewers.Done()
txn := b.DB.NewTransaction(false)
b.lockDB()
defer b.unlockDB()
txn := b.db.NewTransaction(false)
defer txn.Discard()
opts := badger.IteratorOptions{PrefetchSize: 100}