361 lines
7.7 KiB
Go
361 lines
7.7 KiB
Go
package badgerbs
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"sync/atomic"
|
|
|
|
"github.com/dgraph-io/badger/v2"
|
|
"github.com/dgraph-io/badger/v2/options"
|
|
|
|
blocks "github.com/ipfs/go-block-format"
|
|
"github.com/ipfs/go-cid"
|
|
logger "github.com/ipfs/go-log/v2"
|
|
pool "github.com/libp2p/go-buffer-pool"
|
|
|
|
"github.com/filecoin-project/lotus/lib/blockstore"
|
|
)
|
|
|
|
var (
|
|
// ErrBlockstoreClosed is returned from blockstore operations after
|
|
// the blockstore has been closed.
|
|
ErrBlockstoreClosed = fmt.Errorf("badger blockstore closed")
|
|
|
|
log = logger.Logger("badgerbs")
|
|
)
|
|
|
|
// aliases to mask badger dependencies.
|
|
const (
|
|
// FileIO is equivalent to badger.options.FileIO.
|
|
FileIO = options.FileIO
|
|
// MemoryMap is equivalent to badger.options.MemoryMap.
|
|
MemoryMap = options.MemoryMap
|
|
// LoadToRAM is equivalent to badger.options.LoadToRAM.
|
|
LoadToRAM = options.LoadToRAM
|
|
)
|
|
|
|
type Options struct {
|
|
badger.Options
|
|
|
|
// Prefix is an optional prefix to prepend to keys. Default: "".
|
|
Prefix string
|
|
}
|
|
|
|
func DefaultOptions(path string) Options {
|
|
return Options{
|
|
Options: badger.DefaultOptions(path),
|
|
Prefix: "",
|
|
}
|
|
}
|
|
|
|
// badgerLog is a local wrapper for go-log to make the interface
|
|
// compatible with badger.Logger (namely, aliasing Warnf to Warningf)
|
|
type badgerLog struct {
|
|
logger.ZapEventLogger
|
|
}
|
|
|
|
func (b *badgerLog) Warningf(format string, args ...interface{}) {
|
|
b.Warnf(format, args...)
|
|
}
|
|
|
|
const (
|
|
stateOpen int64 = iota
|
|
stateClosing
|
|
stateClosed
|
|
)
|
|
|
|
// Blockstore is a badger-backed IPLD blockstore.
|
|
//
|
|
// NOTE: once Close() is called, methods will try their best to return
|
|
// ErrBlockstoreClosed. This will guaranteed to happen for all subsequent
|
|
// operation calls after Close() has returned, but it may not happen for
|
|
// operations in progress. Those are likely to fail with a different error.
|
|
type Blockstore struct {
|
|
DB *badger.DB
|
|
|
|
// state is guarded by atomic.
|
|
state int64
|
|
|
|
prefixing bool
|
|
prefix []byte
|
|
prefixLen int
|
|
}
|
|
|
|
var _ blockstore.Blockstore = (*Blockstore)(nil)
|
|
var _ blockstore.Viewer = (*Blockstore)(nil)
|
|
var _ io.Closer = (*Blockstore)(nil)
|
|
|
|
func Open(opts Options) (*Blockstore, error) {
|
|
opts.Logger = &badgerLog{*log}
|
|
|
|
db, err := badger.Open(opts.Options)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open badger blockstore: %w", err)
|
|
}
|
|
|
|
bs := &Blockstore{
|
|
DB: db,
|
|
}
|
|
|
|
if p := opts.Prefix; p != "" {
|
|
bs.prefixing = true
|
|
bs.prefix = []byte(p)
|
|
bs.prefixLen = len(bs.prefix)
|
|
}
|
|
|
|
return bs, nil
|
|
}
|
|
|
|
func (b *Blockstore) Close() error {
|
|
if !atomic.CompareAndSwapInt64(&b.state, stateOpen, stateClosing) {
|
|
return nil
|
|
}
|
|
|
|
defer atomic.StoreInt64(&b.state, stateClosed)
|
|
return b.DB.Close()
|
|
}
|
|
|
|
func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error {
|
|
if atomic.LoadInt64(&b.state) != stateOpen {
|
|
return ErrBlockstoreClosed
|
|
}
|
|
|
|
k, pooled := b.PooledPrefixedKey(cid)
|
|
if pooled {
|
|
defer pool.Put(k)
|
|
}
|
|
|
|
return b.DB.View(func(txn *badger.Txn) error {
|
|
switch item, err := txn.Get(k); err {
|
|
case nil:
|
|
return item.Value(fn)
|
|
case badger.ErrKeyNotFound:
|
|
return blockstore.ErrNotFound
|
|
default:
|
|
return fmt.Errorf("failed to view block from badger blockstore: %w", err)
|
|
}
|
|
})
|
|
}
|
|
|
|
func (b *Blockstore) Has(cid cid.Cid) (bool, error) {
|
|
if atomic.LoadInt64(&b.state) != stateOpen {
|
|
return false, ErrBlockstoreClosed
|
|
}
|
|
|
|
k, pooled := b.PooledPrefixedKey(cid)
|
|
if pooled {
|
|
defer pool.Put(k)
|
|
}
|
|
|
|
err := b.DB.View(func(txn *badger.Txn) error {
|
|
_, err := txn.Get(k)
|
|
return err
|
|
})
|
|
|
|
switch err {
|
|
case badger.ErrKeyNotFound:
|
|
return false, nil
|
|
case nil:
|
|
return true, nil
|
|
default:
|
|
return false, fmt.Errorf("failed to check if block exists in badger blockstore: %w", err)
|
|
}
|
|
}
|
|
|
|
func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) {
|
|
if !cid.Defined() {
|
|
return nil, blockstore.ErrNotFound
|
|
}
|
|
|
|
if atomic.LoadInt64(&b.state) != stateOpen {
|
|
return nil, ErrBlockstoreClosed
|
|
}
|
|
|
|
k, pooled := b.PooledPrefixedKey(cid)
|
|
if pooled {
|
|
defer pool.Put(k)
|
|
}
|
|
|
|
var val []byte
|
|
err := b.DB.View(func(txn *badger.Txn) error {
|
|
switch item, err := txn.Get(k); err {
|
|
case nil:
|
|
val, err = item.ValueCopy(nil)
|
|
return err
|
|
case badger.ErrKeyNotFound:
|
|
return blockstore.ErrNotFound
|
|
default:
|
|
return fmt.Errorf("failed to get block from badger blockstore: %w", err)
|
|
}
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return blocks.NewBlockWithCid(val, cid)
|
|
}
|
|
|
|
func (b *Blockstore) GetSize(cid cid.Cid) (int, error) {
|
|
if atomic.LoadInt64(&b.state) != stateOpen {
|
|
return -1, ErrBlockstoreClosed
|
|
}
|
|
|
|
k, pooled := b.PooledPrefixedKey(cid)
|
|
if pooled {
|
|
defer pool.Put(k)
|
|
}
|
|
|
|
var size int
|
|
err := b.DB.View(func(txn *badger.Txn) error {
|
|
switch item, err := txn.Get(k); err {
|
|
case nil:
|
|
size = int(item.ValueSize())
|
|
case badger.ErrKeyNotFound:
|
|
return blockstore.ErrNotFound
|
|
default:
|
|
return fmt.Errorf("failed to get block size from badger blockstore: %w", err)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
size = -1
|
|
}
|
|
return size, err
|
|
}
|
|
|
|
func (b *Blockstore) Put(block blocks.Block) error {
|
|
if atomic.LoadInt64(&b.state) != stateOpen {
|
|
return ErrBlockstoreClosed
|
|
}
|
|
|
|
k, pooled := b.PooledPrefixedKey(block.Cid())
|
|
if pooled {
|
|
defer pool.Put(k)
|
|
}
|
|
|
|
err := b.DB.Update(func(txn *badger.Txn) error {
|
|
return txn.Set(k, block.RawData())
|
|
})
|
|
if err != nil {
|
|
err = fmt.Errorf("failed to put block in badger blockstore: %w", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (b *Blockstore) PutMany(blocks []blocks.Block) error {
|
|
if atomic.LoadInt64(&b.state) != stateOpen {
|
|
return ErrBlockstoreClosed
|
|
}
|
|
|
|
batch := b.DB.NewWriteBatch()
|
|
defer batch.Cancel()
|
|
|
|
// toReturn tracks the byte slices to return to the pool, if we're using key
|
|
// prefixing. we can't return each slice to the pool after each Set, because
|
|
// badger holds on to the slice.
|
|
var toReturn [][]byte
|
|
if b.prefixing {
|
|
toReturn = make([][]byte, 0, len(blocks))
|
|
defer func() {
|
|
for _, b := range toReturn {
|
|
pool.Put(b)
|
|
}
|
|
}()
|
|
}
|
|
|
|
for _, block := range blocks {
|
|
k, pooled := b.PooledPrefixedKey(block.Cid())
|
|
if pooled {
|
|
toReturn = append(toReturn, k)
|
|
}
|
|
if err := batch.Set(k, block.RawData()); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
err := batch.Flush()
|
|
if err != nil {
|
|
err = fmt.Errorf("failed to put blocks in badger blockstore: %w", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (b *Blockstore) DeleteBlock(cid cid.Cid) error {
|
|
if atomic.LoadInt64(&b.state) != stateOpen {
|
|
return ErrBlockstoreClosed
|
|
}
|
|
|
|
k, pooled := b.PooledPrefixedKey(cid)
|
|
if pooled {
|
|
defer pool.Put(k)
|
|
}
|
|
|
|
return b.DB.Update(func(txn *badger.Txn) error {
|
|
return txn.Delete(k)
|
|
})
|
|
}
|
|
|
|
func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
|
|
if atomic.LoadInt64(&b.state) != stateOpen {
|
|
return nil, ErrBlockstoreClosed
|
|
}
|
|
|
|
txn := b.DB.NewTransaction(false)
|
|
opts := badger.IteratorOptions{PrefetchSize: 100}
|
|
if b.prefixing {
|
|
opts.Prefix = b.prefix
|
|
}
|
|
iter := txn.NewIterator(opts)
|
|
|
|
ch := make(chan cid.Cid)
|
|
go func() {
|
|
defer close(ch)
|
|
defer iter.Close()
|
|
|
|
for iter.Rewind(); iter.Valid(); iter.Next() {
|
|
if ctx.Err() != nil {
|
|
return // context has fired.
|
|
}
|
|
if atomic.LoadInt64(&b.state) != stateOpen {
|
|
// open iterators will run even after the database is closed...
|
|
return // closing, yield.
|
|
}
|
|
k := iter.Item().Key()
|
|
if b.prefixing {
|
|
k = k[b.prefixLen:]
|
|
}
|
|
ch <- cid.NewCidV1(cid.Raw, k)
|
|
}
|
|
}()
|
|
|
|
return ch, nil
|
|
}
|
|
|
|
func (b *Blockstore) HashOnRead(enabled bool) {
|
|
log.Warnf("called HashOnRead on badger blockstore; function not supported; ignoring")
|
|
}
|
|
|
|
func (b *Blockstore) PrefixedKey(cid cid.Cid) []byte {
|
|
h := cid.Hash()
|
|
if !b.prefixing {
|
|
return h
|
|
}
|
|
k := make([]byte, b.prefixLen+len(h))
|
|
copy(k, b.prefix)
|
|
copy(k[b.prefixLen:], h)
|
|
return k
|
|
}
|
|
|
|
func (b *Blockstore) PooledPrefixedKey(cid cid.Cid) (key []byte, pooled bool) {
|
|
h := cid.Hash()
|
|
if !b.prefixing {
|
|
return h, false
|
|
}
|
|
|
|
size := b.prefixLen + len(h)
|
|
k := pool.Get(size)
|
|
copy(k, b.prefix)
|
|
copy(k[b.prefixLen:], h)
|
|
return k, true
|
|
}
|