lotus/blockstore/badger/blockstore.go

1051 lines
24 KiB
Go
Raw Normal View History

package badgerbs
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
2021-03-11 09:45:05 +00:00
"runtime"
2021-07-09 06:54:12 +00:00
"sync"
2021-07-11 11:33:15 +00:00
"time"
"github.com/dgraph-io/badger/v2"
2020-11-01 13:01:38 +00:00
"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/pb"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
blocks "github.com/ipfs/go-libipfs/blocks"
logger "github.com/ipfs/go-log/v2"
pool "github.com/libp2p/go-buffer-pool"
2022-06-14 15:00:51 +00:00
"github.com/multiformats/go-base32"
"go.uber.org/zap"
2023-03-03 16:14:52 +00:00
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/blockstore"
)
var (
// KeyPool is the buffer pool we use to compute storage keys.
KeyPool *pool.BufferPool = pool.GlobalPool
)
var (
2020-11-01 13:01:38 +00:00
// ErrBlockstoreClosed is returned from blockstore operations after
// the blockstore has been closed.
ErrBlockstoreClosed = fmt.Errorf("badger blockstore closed")
log = logger.Logger("badgerbs")
)
2020-11-01 13:01:38 +00:00
// aliases to mask badger dependencies.
const (
2020-11-06 18:55:13 +00:00
// FileIO is equivalent to badger/options.FileIO.
2020-11-01 13:01:38 +00:00
FileIO = options.FileIO
2020-11-06 18:55:13 +00:00
// MemoryMap is equivalent to badger/options.MemoryMap.
2020-11-01 13:01:38 +00:00
MemoryMap = options.MemoryMap
2020-11-06 18:55:13 +00:00
// LoadToRAM is equivalent to badger/options.LoadToRAM.
2023-03-03 16:14:52 +00:00
LoadToRAM = options.LoadToRAM
defaultGCThreshold = 0.125
2020-11-01 13:01:38 +00:00
)
2020-11-06 18:55:13 +00:00
// Options embeds the badger options themselves, and augments them with
// blockstore-specific options.
type Options struct {
badger.Options
// Prefix is an optional prefix to prepend to keys. Default: "".
Prefix string
}
func DefaultOptions(path string) Options {
return Options{
Options: badger.DefaultOptions(path),
Prefix: "",
}
}
// badgerLogger is a local wrapper for go-log to make the interface
// compatible with badger.Logger (namely, aliasing Warnf to Warningf)
type badgerLogger struct {
*zap.SugaredLogger // skips 1 caller to get useful line info, skipping over badger.Options.
skip2 *zap.SugaredLogger // skips 2 callers, just like above + this logger.
}
// Warningf is required by the badger logger APIs.
func (b *badgerLogger) Warningf(format string, args ...interface{}) {
b.skip2.Warnf(format, args...)
}
2021-07-27 09:03:26 +00:00
// bsState is the current blockstore state
type bsState int
const (
2021-07-27 09:03:26 +00:00
// stateOpen signifies an open blockstore
stateOpen bsState = iota
// stateClosing signifies a blockstore that is currently closing
stateClosing
2021-07-27 09:03:26 +00:00
// stateClosed signifies a blockstore that has been colosed
stateClosed
)
2021-07-27 09:03:26 +00:00
type bsMoveState int
2021-07-11 11:33:15 +00:00
const (
2021-07-27 09:03:26 +00:00
// moveStateNone signifies that there is no move in progress
moveStateNone bsMoveState = iota
// moveStateMoving signifies that there is a move in a progress
2021-07-11 11:33:15 +00:00
moveStateMoving
2021-07-27 09:03:26 +00:00
// moveStateCleanup signifies that a move has completed or aborted and we are cleaning up
2021-07-11 11:33:15 +00:00
moveStateCleanup
2021-07-27 09:03:26 +00:00
// moveStateLock signifies that an exclusive lock has been acquired
2021-07-11 11:33:15 +00:00
moveStateLock
)
// Blockstore is a badger-backed IPLD blockstore.
type Blockstore struct {
2021-07-09 06:54:12 +00:00
stateLk sync.RWMutex
2021-07-27 09:03:26 +00:00
state bsState
2021-07-09 06:54:12 +00:00
viewers sync.WaitGroup
moveMx sync.Mutex
moveCond sync.Cond
2021-07-27 09:03:26 +00:00
moveState bsMoveState
2021-07-11 11:33:15 +00:00
rlock int
2021-07-27 09:06:40 +00:00
db *badger.DB
dbNext *badger.DB // when moving
opts Options
prefixing bool
prefix []byte
prefixLen int
}
var _ blockstore.Blockstore = (*Blockstore)(nil)
var _ blockstore.Viewer = (*Blockstore)(nil)
var _ blockstore.BlockstoreIterator = (*Blockstore)(nil)
var _ blockstore.BlockstoreGC = (*Blockstore)(nil)
2021-07-26 05:43:09 +00:00
var _ blockstore.BlockstoreSize = (*Blockstore)(nil)
var _ io.Closer = (*Blockstore)(nil)
2020-11-06 18:55:13 +00:00
// Open creates a new badger-backed blockstore, with the supplied options.
func Open(opts Options) (*Blockstore, error) {
opts.Logger = &badgerLogger{
SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(),
skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(),
}
db, err := badger.Open(opts.Options)
if err != nil {
return nil, fmt.Errorf("failed to open badger blockstore: %w", err)
}
2021-07-11 11:33:15 +00:00
bs := &Blockstore{db: db, opts: opts}
if p := opts.Prefix; p != "" {
bs.prefixing = true
bs.prefix = []byte(p)
bs.prefixLen = len(bs.prefix)
}
bs.moveCond.L = &bs.moveMx
2021-07-11 11:33:15 +00:00
return bs, nil
}
2020-11-06 18:55:13 +00:00
// Close closes the store. If the store has already been closed, this noops and
// returns an error, even if the first closure resulted in error.
func (b *Blockstore) Close() error {
2021-07-09 06:54:12 +00:00
b.stateLk.Lock()
if b.state != stateOpen {
b.stateLk.Unlock()
return nil
}
2021-07-09 06:54:12 +00:00
b.state = stateClosing
b.stateLk.Unlock()
defer func() {
b.stateLk.Lock()
b.state = stateClosed
b.stateLk.Unlock()
}()
// wait for all accesses to complete
b.viewers.Wait()
2021-07-11 11:33:15 +00:00
return b.db.Close()
}
2021-07-09 06:54:12 +00:00
func (b *Blockstore) access() error {
b.stateLk.RLock()
defer b.stateLk.RUnlock()
if b.state != stateOpen {
return ErrBlockstoreClosed
}
b.viewers.Add(1)
return nil
}
func (b *Blockstore) isOpen() bool {
b.stateLk.RLock()
defer b.stateLk.RUnlock()
return b.state == stateOpen
}
// lockDB/unlockDB implement a recursive lock contingent on move state
2021-07-11 11:33:15 +00:00
func (b *Blockstore) lockDB() {
b.moveMx.Lock()
defer b.moveMx.Unlock()
if b.rlock == 0 {
for b.moveState == moveStateLock {
b.moveCond.Wait()
}
}
b.rlock++
}
func (b *Blockstore) unlockDB() {
b.moveMx.Lock()
defer b.moveMx.Unlock()
b.rlock--
if b.rlock == 0 && b.moveState == moveStateLock {
b.moveCond.Broadcast()
}
}
// lockMove/unlockMove implement an exclusive lock of move state
func (b *Blockstore) lockMove() {
b.moveMx.Lock()
b.moveState = moveStateLock
for b.rlock > 0 {
b.moveCond.Wait()
}
}
2021-07-27 09:03:26 +00:00
func (b *Blockstore) unlockMove(state bsMoveState) {
2021-07-11 11:33:15 +00:00
b.moveState = state
b.moveCond.Broadcast()
b.moveMx.Unlock()
}
// movingGC moves the blockstore to a new path, adjacent to the current path, and creates
// a symlink from the current path to the new path; the old blockstore is deleted.
//
// The blockstore MUST accept new writes during the move and ensure that these
2021-07-23 19:25:32 +00:00
// are persisted to the new blockstore; if a failure occurs aboring the move,
// then they must be peristed to the old blockstore.
// In short, the blockstore must not lose data from new writes during the move.
func (b *Blockstore) movingGC() error {
2021-07-11 11:33:15 +00:00
// this inlines moveLock/moveUnlock for the initial state check to prevent a second move
// while one is in progress without clobbering state
b.moveMx.Lock()
if b.moveState != moveStateNone {
b.moveMx.Unlock()
return fmt.Errorf("move in progress")
}
b.moveState = moveStateLock
for b.rlock > 0 {
b.moveCond.Wait()
}
b.moveState = moveStateMoving
b.moveCond.Broadcast()
b.moveMx.Unlock()
var newPath string
2021-07-11 11:33:15 +00:00
defer func() {
b.lockMove()
dbNext := b.dbNext
2021-07-27 09:06:40 +00:00
b.dbNext = nil
2021-07-11 11:33:15 +00:00
2021-07-27 09:03:26 +00:00
var state bsMoveState
if dbNext != nil {
2021-07-11 11:33:15 +00:00
state = moveStateCleanup
} else {
state = moveStateNone
}
b.unlockMove(state)
if dbNext != nil {
// the move failed and we have a left-over db; delete it.
err := dbNext.Close()
2021-07-11 11:33:15 +00:00
if err != nil {
log.Warnf("error closing badger db: %s", err)
}
b.deleteDB(newPath)
2021-07-11 11:33:15 +00:00
b.lockMove()
b.unlockMove(moveStateNone)
}
}()
// we resolve symlinks to create the new path in the adjacent to the old path.
// this allows the user to symlink the db directory into a separate filesystem.
basePath := b.opts.Dir
linkPath, err := filepath.EvalSymlinks(basePath)
if err != nil {
return fmt.Errorf("error resolving symlink %s: %w", basePath, err)
}
if basePath == linkPath {
newPath = basePath
} else {
// we do this dance to create a name adjacent to the current one, while avoiding clown
// shoes with multiple moves (i.e. we can't just take the basename of the linkPath, as it
// could have been created in a previous move and have the timestamp suffix, which would then
// perpetuate itself.
name := filepath.Base(basePath)
dir := filepath.Dir(linkPath)
newPath = filepath.Join(dir, name)
}
newPath = fmt.Sprintf("%s.%d", newPath, time.Now().UnixNano())
log.Infof("moving blockstore from %s to %s", b.opts.Dir, newPath)
2021-07-12 09:06:44 +00:00
2021-07-11 11:33:15 +00:00
opts := b.opts
opts.Dir = newPath
opts.ValueDir = newPath
2021-07-11 11:33:15 +00:00
dbNew, err := badger.Open(opts.Options)
2021-07-11 11:33:15 +00:00
if err != nil {
return fmt.Errorf("failed to open badger blockstore in %s: %w", newPath, err)
2021-07-11 11:33:15 +00:00
}
b.lockMove()
b.dbNext = dbNew
2021-07-11 11:33:15 +00:00
b.unlockMove(moveStateMoving)
2021-07-12 09:06:44 +00:00
log.Info("copying blockstore")
2021-07-27 09:06:40 +00:00
err = b.doCopy(b.db, b.dbNext)
2021-07-11 11:33:15 +00:00
if err != nil {
return fmt.Errorf("error moving badger blockstore to %s: %w", newPath, err)
2021-07-11 11:33:15 +00:00
}
b.lockMove()
dbOld := b.db
2021-07-27 09:06:40 +00:00
b.db = b.dbNext
b.dbNext = nil
2021-07-11 11:33:15 +00:00
b.unlockMove(moveStateCleanup)
err = dbOld.Close()
2021-07-11 11:33:15 +00:00
if err != nil {
log.Warnf("error closing old badger db: %s", err)
2021-07-11 11:33:15 +00:00
}
// this is the canonical db path; this is where our db lives.
dbPath := b.opts.Dir
2021-07-11 11:33:15 +00:00
// we first move the existing db out of the way, and only delete it after we have symlinked the
// new db to the canonical path
backupPath := fmt.Sprintf("%s.old.%d", dbPath, time.Now().Unix())
if err = os.Rename(dbPath, backupPath); err != nil {
// this is not catastrophic in the sense that we have not lost any data.
// but it is pretty bad, as the db path points to the old db, while we are now using to the new
// db; we can't continue and leave a ticking bomb for the next restart.
// so a panic is appropriate and user can fix.
panic(fmt.Errorf("error renaming old badger db dir from %s to %s: %w; USER ACTION REQUIRED", dbPath, backupPath, err)) //nolint
2021-07-11 11:33:15 +00:00
}
2021-07-29 05:35:53 +00:00
if err = symlink(newPath, dbPath); err != nil {
// same here; the db path is pointing to the void. panic and let the user fix.
panic(fmt.Errorf("error symlinking new badger db dir from %s to %s: %w; USER ACTION REQUIRED", newPath, dbPath, err)) //nolint
2021-07-11 11:33:15 +00:00
}
// the delete follows symlinks
b.deleteDB(backupPath)
2021-07-12 09:06:44 +00:00
log.Info("moving blockstore done")
2021-07-11 11:33:15 +00:00
return nil
}
// symlink creates a symlink from path to linkTo; the link is relative if the two are
// in the same directory
2021-07-29 05:35:53 +00:00
func symlink(path, linkTo string) error {
2021-07-28 13:15:39 +00:00
resolvedPathDir, err := filepath.EvalSymlinks(filepath.Dir(path))
if err != nil {
return fmt.Errorf("error resolving links in %s: %w", path, err)
}
resolvedLinkDir, err := filepath.EvalSymlinks(filepath.Dir(linkTo))
if err != nil {
2021-07-28 13:20:25 +00:00
return fmt.Errorf("error resolving links in %s: %w", linkTo, err)
2021-07-28 13:15:39 +00:00
}
if resolvedPathDir == resolvedLinkDir {
path = filepath.Base(path)
}
return os.Symlink(path, linkTo)
}
2021-07-23 19:25:32 +00:00
// doCopy copies a badger blockstore to another, with an optional filter; if the filter
// is not nil, then only cids that satisfy the filter will be copied.
func (b *Blockstore) doCopy(from, to *badger.DB) error {
workers := runtime.NumCPU() / 2
if workers < 2 {
workers = 2
2021-07-11 11:43:52 +00:00
}
if workers > 8 {
workers = 8
}
2021-07-11 11:33:15 +00:00
stream := from.NewStream()
stream.NumGo = workers
stream.LogPrefix = "doCopy"
stream.Send = func(list *pb.KVList) error {
batch := to.NewWriteBatch()
defer batch.Cancel()
2021-07-11 11:43:52 +00:00
for _, kv := range list.Kv {
if kv.Key == nil || kv.Value == nil {
continue
}
if err := batch.Set(kv.Key, kv.Value); err != nil {
2021-07-11 11:33:15 +00:00
return err
}
}
return batch.Flush()
}
return stream.Orchestrate(context.Background())
2021-07-11 11:33:15 +00:00
}
func (b *Blockstore) deleteDB(path string) {
// follow symbolic links, otherwise the data wil be left behind
linkPath, err := filepath.EvalSymlinks(path)
2021-07-27 07:08:07 +00:00
if err != nil {
log.Warnf("error resolving symlinks in %s", path)
return
}
log.Infof("removing data directory %s", linkPath)
if err := os.RemoveAll(linkPath); err != nil {
log.Warnf("error deleting db at %s: %s", linkPath, err)
return
}
if path != linkPath {
2021-07-12 09:06:44 +00:00
log.Infof("removing link %s", path)
if err := os.Remove(path); err != nil {
log.Warnf("error removing symbolic link %s", err)
}
2021-07-11 11:33:15 +00:00
}
}
2023-03-03 16:14:52 +00:00
func (b *Blockstore) onlineGC(ctx context.Context, threshold float64) error {
2021-07-11 11:33:15 +00:00
b.lockDB()
defer b.unlockDB()
// compact first to gather the necessary statistics for GC
nworkers := runtime.NumCPU() / 2
if nworkers < 2 {
nworkers = 2
}
2023-03-03 16:14:52 +00:00
if nworkers > 7 { // max out at 1 goroutine per badger level
nworkers = 7
}
2021-07-11 11:33:15 +00:00
err := b.db.Flatten(nworkers)
if err != nil {
return err
}
for err == nil {
2023-03-03 16:14:52 +00:00
select {
case <-ctx.Done():
err = ctx.Err()
default:
err = b.db.RunValueLogGC(threshold)
}
2021-03-11 09:45:05 +00:00
}
if err == badger.ErrNoRewrite {
// not really an error in this case, it signals the end of GC
return nil
2021-03-11 09:45:05 +00:00
}
return err
2021-03-11 09:45:05 +00:00
}
// CollectGarbage compacts and runs garbage collection on the value log;
// implements the BlockstoreGC trait
2023-03-03 16:14:52 +00:00
func (b *Blockstore) CollectGarbage(ctx context.Context, opts ...blockstore.BlockstoreGCOption) error {
if err := b.access(); err != nil {
return err
}
defer b.viewers.Done()
var options blockstore.BlockstoreGCOptions
for _, opt := range opts {
err := opt(&options)
if err != nil {
return err
}
}
if options.FullGC {
return b.movingGC()
}
2023-03-03 16:14:52 +00:00
threshold := options.Threshold
if threshold == 0 {
threshold = defaultGCThreshold
}
return b.onlineGC(ctx, threshold)
}
// GCOnce runs garbage collection on the value log;
// implements BlockstoreGCOnce trait
func (b *Blockstore) GCOnce(ctx context.Context, opts ...blockstore.BlockstoreGCOption) error {
if err := b.access(); err != nil {
return err
}
defer b.viewers.Done()
var options blockstore.BlockstoreGCOptions
for _, opt := range opts {
err := opt(&options)
if err != nil {
return err
}
}
if options.FullGC {
return xerrors.Errorf("FullGC option specified for GCOnce but full GC is non incremental")
}
2023-03-03 16:14:52 +00:00
threshold := options.Threshold
if threshold == 0 {
threshold = defaultGCThreshold
}
b.lockDB()
defer b.unlockDB()
// Note no compaction needed before single GC as we will hit at most one vlog anyway
err := b.db.RunValueLogGC(threshold)
if err == badger.ErrNoRewrite {
// not really an error in this case, it signals the end of GC
return nil
}
return err
}
2021-07-26 05:43:09 +00:00
// Size returns the aggregate size of the blockstore
func (b *Blockstore) Size() (int64, error) {
if err := b.access(); err != nil {
return 0, err
}
defer b.viewers.Done()
2021-07-11 11:33:15 +00:00
b.lockDB()
defer b.unlockDB()
lsm, vlog := b.db.Size()
size := lsm + vlog
if size == 0 {
// badger reports a 0 size on symlinked directories... sigh
dir := b.opts.Dir
entries, err := os.ReadDir(dir)
if err != nil {
return 0, err
}
for _, e := range entries {
path := filepath.Join(dir, e.Name())
finfo, err := os.Stat(path)
if err != nil {
return 0, err
}
size += finfo.Size()
}
}
return size, nil
2021-07-26 05:43:09 +00:00
}
2020-11-06 18:55:13 +00:00
// View implements blockstore.Viewer, which leverages zero-copy read-only
// access to values.
func (b *Blockstore) View(ctx context.Context, cid cid.Cid, fn func([]byte) error) error {
2021-07-09 06:54:12 +00:00
if err := b.access(); err != nil {
return err
}
2021-07-09 06:54:12 +00:00
defer b.viewers.Done()
2021-07-11 11:33:15 +00:00
b.lockDB()
defer b.unlockDB()
k, pooled := b.PooledStorageKey(cid)
if pooled {
defer KeyPool.Put(k)
}
2021-07-11 11:33:15 +00:00
return b.db.View(func(txn *badger.Txn) error {
switch item, err := txn.Get(k); err {
case nil:
return item.Value(fn)
case badger.ErrKeyNotFound:
return ipld.ErrNotFound{Cid: cid}
default:
return fmt.Errorf("failed to view block from badger blockstore: %w", err)
}
})
}
func (b *Blockstore) Flush(context.Context) error {
if err := b.access(); err != nil {
return err
}
defer b.viewers.Done()
b.lockDB()
defer b.unlockDB()
return b.db.Sync()
}
2020-11-06 18:55:13 +00:00
// Has implements Blockstore.Has.
func (b *Blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
2021-07-09 06:54:12 +00:00
if err := b.access(); err != nil {
return false, err
}
2021-07-09 06:54:12 +00:00
defer b.viewers.Done()
2021-07-11 11:33:15 +00:00
b.lockDB()
defer b.unlockDB()
k, pooled := b.PooledStorageKey(cid)
if pooled {
defer KeyPool.Put(k)
}
2021-07-11 11:33:15 +00:00
err := b.db.View(func(txn *badger.Txn) error {
_, err := txn.Get(k)
return err
})
switch err {
case badger.ErrKeyNotFound:
return false, nil
case nil:
return true, nil
default:
return false, fmt.Errorf("failed to check if block exists in badger blockstore: %w", err)
}
}
2020-11-06 18:55:13 +00:00
// Get implements Blockstore.Get.
func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
if !cid.Defined() {
return nil, ipld.ErrNotFound{Cid: cid}
}
2021-07-09 06:54:12 +00:00
if err := b.access(); err != nil {
return nil, err
}
2021-07-09 06:54:12 +00:00
defer b.viewers.Done()
2021-07-11 11:33:15 +00:00
b.lockDB()
defer b.unlockDB()
k, pooled := b.PooledStorageKey(cid)
if pooled {
defer KeyPool.Put(k)
}
var val []byte
2021-07-11 11:33:15 +00:00
err := b.db.View(func(txn *badger.Txn) error {
switch item, err := txn.Get(k); err {
case nil:
val, err = item.ValueCopy(nil)
return err
case badger.ErrKeyNotFound:
return ipld.ErrNotFound{Cid: cid}
default:
return fmt.Errorf("failed to get block from badger blockstore: %w", err)
}
})
if err != nil {
return nil, err
}
return blocks.NewBlockWithCid(val, cid)
}
2020-11-06 18:55:13 +00:00
// GetSize implements Blockstore.GetSize.
func (b *Blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
2021-07-09 06:54:12 +00:00
if err := b.access(); err != nil {
return 0, err
}
2021-07-09 06:54:12 +00:00
defer b.viewers.Done()
2021-07-11 11:33:15 +00:00
b.lockDB()
defer b.unlockDB()
k, pooled := b.PooledStorageKey(cid)
if pooled {
defer KeyPool.Put(k)
}
var size int
2021-07-11 11:33:15 +00:00
err := b.db.View(func(txn *badger.Txn) error {
switch item, err := txn.Get(k); err {
case nil:
size = int(item.ValueSize())
case badger.ErrKeyNotFound:
return ipld.ErrNotFound{Cid: cid}
default:
return fmt.Errorf("failed to get block size from badger blockstore: %w", err)
}
return nil
})
if err != nil {
size = -1
}
return size, err
}
2020-11-06 18:55:13 +00:00
// Put implements Blockstore.Put.
func (b *Blockstore) Put(ctx context.Context, block blocks.Block) error {
2021-07-09 06:54:12 +00:00
if err := b.access(); err != nil {
return err
}
2021-07-09 06:54:12 +00:00
defer b.viewers.Done()
2021-07-11 11:33:15 +00:00
b.lockDB()
defer b.unlockDB()
k, pooled := b.PooledStorageKey(block.Cid())
if pooled {
defer KeyPool.Put(k)
}
2021-07-11 11:33:15 +00:00
put := func(db *badger.DB) error {
err := db.Update(func(txn *badger.Txn) error {
return txn.Set(k, block.RawData())
})
if err != nil {
return fmt.Errorf("failed to put block in badger blockstore: %w", err)
}
return nil
}
2021-07-11 11:33:15 +00:00
if err := put(b.db); err != nil {
return err
}
2021-07-27 09:06:40 +00:00
if b.dbNext != nil {
if err := put(b.dbNext); err != nil {
2021-07-11 11:33:15 +00:00
return err
}
}
return nil
}
2020-11-06 18:55:13 +00:00
// PutMany implements Blockstore.PutMany.
func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
2021-07-09 06:54:12 +00:00
if err := b.access(); err != nil {
return err
}
2021-07-09 06:54:12 +00:00
defer b.viewers.Done()
2021-07-11 11:33:15 +00:00
b.lockDB()
defer b.unlockDB()
// toReturn tracks the byte slices to return to the pool, if we're using key
// prefixing. we can't return each slice to the pool after each Set, because
// badger holds on to the slice.
var toReturn [][]byte
if b.prefixing {
toReturn = make([][]byte, 0, len(blocks))
defer func() {
for _, b := range toReturn {
KeyPool.Put(b)
}
}()
}
2021-07-11 11:33:15 +00:00
keys := make([][]byte, 0, len(blocks))
for _, block := range blocks {
k, pooled := b.PooledStorageKey(block.Cid())
if pooled {
toReturn = append(toReturn, k)
}
2021-07-11 11:33:15 +00:00
keys = append(keys, k)
}
put := func(db *badger.DB) error {
batch := db.NewWriteBatch()
defer batch.Cancel()
for i, block := range blocks {
k := keys[i]
if err := batch.Set(k, block.RawData()); err != nil {
return err
}
}
2021-07-11 11:33:15 +00:00
err := batch.Flush()
if err != nil {
return fmt.Errorf("failed to put blocks in badger blockstore: %w", err)
}
return nil
}
2021-07-11 11:33:15 +00:00
if err := put(b.db); err != nil {
return err
}
2021-07-11 11:33:15 +00:00
2021-07-27 09:06:40 +00:00
if b.dbNext != nil {
if err := put(b.dbNext); err != nil {
2021-07-11 11:33:15 +00:00
return err
}
}
return nil
}
2020-11-06 18:55:13 +00:00
// DeleteBlock implements Blockstore.DeleteBlock.
func (b *Blockstore) DeleteBlock(ctx context.Context, cid cid.Cid) error {
2021-07-09 06:54:12 +00:00
if err := b.access(); err != nil {
return err
}
2021-07-09 06:54:12 +00:00
defer b.viewers.Done()
2021-07-11 11:33:15 +00:00
b.lockDB()
defer b.unlockDB()
k, pooled := b.PooledStorageKey(cid)
if pooled {
defer KeyPool.Put(k)
}
2021-07-11 11:33:15 +00:00
return b.db.Update(func(txn *badger.Txn) error {
return txn.Delete(k)
})
}
func (b *Blockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error {
2021-07-09 06:54:12 +00:00
if err := b.access(); err != nil {
return err
2021-03-02 14:45:45 +00:00
}
2021-07-09 06:54:12 +00:00
defer b.viewers.Done()
2021-03-02 14:45:45 +00:00
2021-07-11 11:33:15 +00:00
b.lockDB()
defer b.unlockDB()
2021-03-02 14:45:45 +00:00
// toReturn tracks the byte slices to return to the pool, if we're using key
// prefixing. we can't return each slice to the pool after each Set, because
// badger holds on to the slice.
var toReturn [][]byte
if b.prefixing {
toReturn = make([][]byte, 0, len(cids))
defer func() {
for _, b := range toReturn {
KeyPool.Put(b)
}
}()
}
2021-07-11 11:33:15 +00:00
batch := b.db.NewWriteBatch()
defer batch.Cancel()
2021-03-02 14:45:45 +00:00
for _, cid := range cids {
k, pooled := b.PooledStorageKey(cid)
if pooled {
toReturn = append(toReturn, k)
}
if err := batch.Delete(k); err != nil {
return err
}
}
err := batch.Flush()
if err != nil {
err = fmt.Errorf("failed to delete blocks from badger blockstore: %w", err)
}
return err
}
2020-11-06 18:55:13 +00:00
// AllKeysChan implements Blockstore.AllKeysChan.
func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
2021-07-09 06:54:12 +00:00
if err := b.access(); err != nil {
return nil, err
}
2021-07-11 11:33:15 +00:00
b.lockDB()
defer b.unlockDB()
txn := b.db.NewTransaction(false)
opts := badger.IteratorOptions{PrefetchSize: 100}
if b.prefixing {
opts.Prefix = b.prefix
}
iter := txn.NewIterator(opts)
ch := make(chan cid.Cid)
go func() {
2021-07-09 06:54:12 +00:00
defer b.viewers.Done()
defer close(ch)
defer iter.Close()
// NewCidV1 makes a copy of the multihash buffer, so we can reuse it to
// contain allocs.
var buf []byte
for iter.Rewind(); iter.Valid(); iter.Next() {
if ctx.Err() != nil {
return // context has fired.
}
2021-07-09 06:54:12 +00:00
if !b.isOpen() {
// open iterators will run even after the database is closed...
return // closing, yield.
}
k := iter.Item().Key()
if b.prefixing {
k = k[b.prefixLen:]
}
if reqlen := base32.RawStdEncoding.DecodedLen(len(k)); len(buf) < reqlen {
buf = make([]byte, reqlen)
}
if n, err := base32.RawStdEncoding.Decode(buf, k); err == nil {
2020-11-11 23:12:16 +00:00
select {
case ch <- cid.NewCidV1(cid.Raw, buf[:n]):
case <-ctx.Done():
return
}
} else {
log.Warnf("failed to decode key %s in badger AllKeysChan; err: %s", k, err)
}
}
}()
return ch, nil
}
// Implementation of BlockstoreIterator interface
func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error {
2021-07-09 06:54:12 +00:00
if err := b.access(); err != nil {
return err
}
2021-07-09 06:54:12 +00:00
defer b.viewers.Done()
2021-07-11 11:33:15 +00:00
b.lockDB()
defer b.unlockDB()
txn := b.db.NewTransaction(false)
defer txn.Discard()
opts := badger.IteratorOptions{PrefetchSize: 100}
if b.prefixing {
opts.Prefix = b.prefix
}
iter := txn.NewIterator(opts)
defer iter.Close()
var buf []byte
for iter.Rewind(); iter.Valid(); iter.Next() {
2021-07-09 06:54:12 +00:00
if !b.isOpen() {
return ErrBlockstoreClosed
}
k := iter.Item().Key()
if b.prefixing {
k = k[b.prefixLen:]
}
klen := base32.RawStdEncoding.DecodedLen(len(k))
if klen > len(buf) {
buf = make([]byte, klen)
}
n, err := base32.RawStdEncoding.Decode(buf, k)
if err != nil {
return err
}
c := cid.NewCidV1(cid.Raw, buf[:n])
err = f(c)
if err != nil {
return err
}
}
return nil
}
2020-11-06 18:55:13 +00:00
// HashOnRead implements Blockstore.HashOnRead. It is not supported by this
// blockstore.
2021-12-11 21:03:00 +00:00
func (b *Blockstore) HashOnRead(_ bool) {
log.Warnf("called HashOnRead on badger blockstore; function not supported; ignoring")
}
// PooledStorageKey returns the storage key under which this CID is stored.
//
// The key is: prefix + base32_no_padding(cid.Hash)
//
// This method may return pooled byte slice, which MUST be returned to the
// KeyPool if pooled=true, or a leak will occur.
func (b *Blockstore) PooledStorageKey(cid cid.Cid) (key []byte, pooled bool) {
h := cid.Hash()
size := base32.RawStdEncoding.EncodedLen(len(h))
if !b.prefixing { // optimize for branch prediction.
k := pool.Get(size)
base32.RawStdEncoding.Encode(k, h)
return k, true // slicing upto length unnecessary; the pool has already done this.
}
size += b.prefixLen
k := pool.Get(size)
copy(k, b.prefix)
base32.RawStdEncoding.Encode(k[b.prefixLen:], h)
return k, true // slicing upto length unnecessary; the pool has already done this.
}
// Storage acts like PooledStorageKey, but attempts to write the storage key
// into the provided slice. If the slice capacity is insufficient, it allocates
// a new byte slice with enough capacity to accommodate the result. This method
// returns the resulting slice.
func (b *Blockstore) StorageKey(dst []byte, cid cid.Cid) []byte {
h := cid.Hash()
reqsize := base32.RawStdEncoding.EncodedLen(len(h)) + b.prefixLen
if reqsize > cap(dst) {
// passed slice is smaller than required size; create new.
dst = make([]byte, reqsize)
} else if reqsize > len(dst) {
// passed slice has enough capacity, but its length is
// restricted, expand.
dst = dst[:cap(dst)]
}
if b.prefixing { // optimize for branch prediction.
copy(dst, b.prefix)
base32.RawStdEncoding.Encode(dst[b.prefixLen:], h)
} else {
base32.RawStdEncoding.Encode(dst, h)
}
return dst[:reqsize]
}
2021-07-11 15:00:54 +00:00
// this method is added for lotus-shed needs
// WARNING: THIS IS COMPLETELY UNSAFE; DONT USE THIS IN PRODUCTION CODE
func (b *Blockstore) DB() *badger.DB {
return b.db
}