add a native badger blockstore with View() method.

This commit is contained in:
Raúl Kripalani 2020-11-01 12:55:43 +00:00
parent c3d00b0ac6
commit ce27b13076
5 changed files with 725 additions and 0 deletions

View File

@ -0,0 +1,346 @@
package badgerbs
import (
"context"
"fmt"
"io"
"sync/atomic"
"github.com/dgraph-io/badger/v2"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
logger "github.com/ipfs/go-log/v2"
pool "github.com/libp2p/go-buffer-pool"
"github.com/filecoin-project/lotus/lib/blockstore"
)
var (
ErrBlockstoreClosed = fmt.Errorf("badger blockstore closed")
log = logger.Logger("badgerbs")
)
type Options struct {
badger.Options
// Prefix is an optional prefix to prepend to keys. Default: "".
Prefix string
}
func DefaultOptions(path string) Options {
return Options{
Options: badger.DefaultOptions(path),
Prefix: "",
}
}
// badgerLog is a local wrapper for go-log to make the interface
// compatible with badger.Logger (namely, aliasing Warnf to Warningf)
type badgerLog struct {
logger.ZapEventLogger
}
func (b *badgerLog) Warningf(format string, args ...interface{}) {
b.Warnf(format, args...)
}
const (
stateOpen int64 = iota
stateClosing
stateClosed
)
// Blockstore is a badger-backed IPLD blockstore.
//
// NOTE: once Close() is called, methods will try their best to return
// ErrBlockstoreClosed. This will guaranteed to happen for all subsequent
// operation calls after Close() has returned, but it may not happen for
// operations in progress. Those are likely to fail with a different error.
type Blockstore struct {
DB *badger.DB
// state is guarded by atomic.
state int64
prefixing bool
prefix []byte
prefixLen int
}
var _ blockstore.Blockstore = (*Blockstore)(nil)
var _ blockstore.Viewer = (*Blockstore)(nil)
var _ io.Closer = (*Blockstore)(nil)
func Open(opts Options) (*Blockstore, error) {
opts.Logger = &badgerLog{*log}
db, err := badger.Open(opts.Options)
if err != nil {
return nil, fmt.Errorf("failed to open badger blockstore: %w", err)
}
bs := &Blockstore{
DB: db,
}
if p := opts.Prefix; p != "" {
bs.prefixing = true
bs.prefix = []byte(p)
bs.prefixLen = len(bs.prefix)
}
return bs, nil
}
func (b *Blockstore) Close() error {
if !atomic.CompareAndSwapInt64(&b.state, stateOpen, stateClosing) {
return nil
}
defer atomic.StoreInt64(&b.state, stateClosed)
return b.DB.Close()
}
func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error {
if atomic.LoadInt64(&b.state) != stateOpen {
return ErrBlockstoreClosed
}
k, pooled := b.PooledPrefixedKey(cid)
if pooled {
defer pool.Put(k)
}
return b.DB.View(func(txn *badger.Txn) error {
switch item, err := txn.Get(k); err {
case nil:
return item.Value(fn)
case badger.ErrKeyNotFound:
return blockstore.ErrNotFound
default:
return fmt.Errorf("failed to view block from badger blockstore: %w", err)
}
})
}
func (b *Blockstore) Has(cid cid.Cid) (bool, error) {
if atomic.LoadInt64(&b.state) != stateOpen {
return false, ErrBlockstoreClosed
}
k, pooled := b.PooledPrefixedKey(cid)
if pooled {
defer pool.Put(k)
}
err := b.DB.View(func(txn *badger.Txn) error {
_, err := txn.Get(k)
return err
})
switch err {
case badger.ErrKeyNotFound:
return false, nil
case nil:
return true, nil
default:
return false, fmt.Errorf("failed to check if block exists in badger blockstore: %w", err)
}
}
func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) {
if !cid.Defined() {
return nil, blockstore.ErrNotFound
}
if atomic.LoadInt64(&b.state) != stateOpen {
return nil, ErrBlockstoreClosed
}
k, pooled := b.PooledPrefixedKey(cid)
if pooled {
defer pool.Put(k)
}
var val []byte
err := b.DB.View(func(txn *badger.Txn) error {
switch item, err := txn.Get(k); err {
case nil:
val, err = item.ValueCopy(nil)
return err
case badger.ErrKeyNotFound:
return blockstore.ErrNotFound
default:
return fmt.Errorf("failed to get block from badger blockstore: %w", err)
}
})
if err != nil {
return nil, err
}
return blocks.NewBlockWithCid(val, cid)
}
func (b *Blockstore) GetSize(cid cid.Cid) (int, error) {
if atomic.LoadInt64(&b.state) != stateOpen {
return -1, ErrBlockstoreClosed
}
k, pooled := b.PooledPrefixedKey(cid)
if pooled {
defer pool.Put(k)
}
var size int
err := b.DB.View(func(txn *badger.Txn) error {
switch item, err := txn.Get(k); err {
case nil:
size = int(item.ValueSize())
case badger.ErrKeyNotFound:
return blockstore.ErrNotFound
default:
return fmt.Errorf("failed to get block size from badger blockstore: %w", err)
}
return nil
})
if err != nil {
size = -1
}
return size, err
}
func (b *Blockstore) Put(block blocks.Block) error {
if atomic.LoadInt64(&b.state) != stateOpen {
return ErrBlockstoreClosed
}
k, pooled := b.PooledPrefixedKey(block.Cid())
if pooled {
defer pool.Put(k)
}
err := b.DB.Update(func(txn *badger.Txn) error {
return txn.Set(k, block.RawData())
})
if err != nil {
err = fmt.Errorf("failed to put block in badger blockstore: %w", err)
}
return err
}
func (b *Blockstore) PutMany(blocks []blocks.Block) error {
if atomic.LoadInt64(&b.state) != stateOpen {
return ErrBlockstoreClosed
}
batch := b.DB.NewWriteBatch()
defer batch.Cancel()
// toReturn tracks the byte slices to return to the pool, if we're using key
// prefixing. we can't return each slice to the pool after each Set, because
// badger holds on to the slice.
var toReturn [][]byte
if b.prefixing {
toReturn = make([][]byte, 0, len(blocks))
defer func() {
for _, b := range toReturn {
pool.Put(b)
}
}()
}
for _, block := range blocks {
k, pooled := b.PooledPrefixedKey(block.Cid())
if pooled {
toReturn = append(toReturn, k)
}
if err := batch.Set(k, block.RawData()); err != nil {
return err
}
}
err := batch.Flush()
if err != nil {
err = fmt.Errorf("failed to put blocks in badger blockstore: %w", err)
}
return err
}
func (b *Blockstore) DeleteBlock(cid cid.Cid) error {
if atomic.LoadInt64(&b.state) != stateOpen {
return ErrBlockstoreClosed
}
k, pooled := b.PooledPrefixedKey(cid)
if pooled {
defer pool.Put(k)
}
return b.DB.Update(func(txn *badger.Txn) error {
return txn.Delete(k)
})
}
func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
if atomic.LoadInt64(&b.state) != stateOpen {
return nil, ErrBlockstoreClosed
}
txn := b.DB.NewTransaction(false)
opts := badger.IteratorOptions{PrefetchSize: 100}
if b.prefixing {
opts.Prefix = b.prefix
}
iter := txn.NewIterator(opts)
ch := make(chan cid.Cid)
go func() {
defer close(ch)
defer iter.Close()
for iter.Rewind(); iter.Valid(); iter.Next() {
if ctx.Err() != nil {
return // context has fired.
}
if atomic.LoadInt64(&b.state) != stateOpen {
// open iterators will run even after the database is closed...
return // closing, yield.
}
k := iter.Item().Key()
if b.prefixing {
k = k[b.prefixLen:]
}
ch <- cid.NewCidV1(cid.Raw, k)
}
}()
return ch, nil
}
func (b *Blockstore) HashOnRead(enabled bool) {
log.Warnf("called HashOnRead on badger blockstore; function not supported; ignoring")
}
func (b *Blockstore) PrefixedKey(cid cid.Cid) []byte {
h := cid.Hash()
if !b.prefixing {
return h
}
k := make([]byte, b.prefixLen+len(h))
copy(k, b.prefix)
copy(k[b.prefixLen:], h)
return k
}
func (b *Blockstore) PooledPrefixedKey(cid cid.Cid) (key []byte, pooled bool) {
h := cid.Hash()
if !b.prefixing {
return h, false
}
size := b.prefixLen + len(h)
k := pool.Get(size)
copy(k, b.prefix)
copy(k[b.prefixLen:], h)
return k, true
}

View File

@ -0,0 +1,9 @@
package badgerbs
import "testing"
func BenchmarkName(b *testing.B) {
for i := 0; i < b.N; i++ {
}
}

View File

@ -0,0 +1,56 @@
package badgerbs
import (
"io/ioutil"
"os"
"testing"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)
func TestBadgerBlockstore(t *testing.T) {
(&Suite{
NewBlockstore: newBlockstore(DefaultOptions),
OpenBlockstore: openBlockstore(DefaultOptions),
}).RunTests(t, "non_prefixed")
prefixed := func(path string) Options {
opts := DefaultOptions(path)
opts.Prefix = "/prefixed/"
return opts
}
(&Suite{
NewBlockstore: newBlockstore(prefixed),
OpenBlockstore: openBlockstore(prefixed),
}).RunTests(t, "prefixed")
}
func newBlockstore(optsSupplier func(path string) Options) func(tb testing.TB) (bs blockstore.Blockstore, path string) {
return func(tb testing.TB) (bs blockstore.Blockstore, path string) {
tb.Helper()
path, err := ioutil.TempDir("", "")
if err != nil {
tb.Fatal(err)
}
db, err := Open(optsSupplier(path))
if err != nil {
tb.Fatal(err)
}
tb.Cleanup(func() {
_ = os.RemoveAll(path)
})
return db, path
}
}
func openBlockstore(optsSupplier func(path string) Options) func(tb testing.TB, path string) (bs blockstore.Blockstore, err error) {
return func(tb testing.TB, path string) (bs blockstore.Blockstore, err error) {
tb.Helper()
return Open(optsSupplier(path))
}
}

View File

@ -0,0 +1,307 @@
package badgerbs
import (
"context"
"fmt"
"io"
"reflect"
"strings"
"testing"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-blockstore"
u "github.com/ipfs/go-ipfs-util"
"github.com/stretchr/testify/require"
)
// TODO: move this to go-ipfs-blockstore.
type Suite struct {
NewBlockstore func(tb testing.TB) (bs blockstore.Blockstore, path string)
OpenBlockstore func(tb testing.TB, path string) (bs blockstore.Blockstore, err error)
}
func (s *Suite) RunTests(t *testing.T, prefix string) {
v := reflect.TypeOf(s)
f := func(t *testing.T) {
for i := 0; i < v.NumMethod(); i++ {
if m := v.Method(i); strings.HasPrefix(m.Name, "Test") {
f := m.Func.Interface().(func(*Suite, *testing.T))
t.Run(m.Name, func(t *testing.T) {
f(s, t)
})
}
}
}
if prefix == "" {
f(t)
} else {
t.Run(prefix, f)
}
}
func (s *Suite) TestGetWhenKeyNotPresent(t *testing.T) {
bs, _ := s.NewBlockstore(t)
if c, ok := bs.(io.Closer); ok {
defer func() { require.NoError(t, c.Close()) }()
}
c := cid.NewCidV0(u.Hash([]byte("stuff")))
bl, err := bs.Get(c)
require.Nil(t, bl)
require.Equal(t, blockstore.ErrNotFound, err)
}
func (s *Suite) TestGetWhenKeyIsNil(t *testing.T) {
bs, _ := s.NewBlockstore(t)
if c, ok := bs.(io.Closer); ok {
defer func() { require.NoError(t, c.Close()) }()
}
_, err := bs.Get(cid.Undef)
require.Equal(t, blockstore.ErrNotFound, err)
}
func (s *Suite) TestPutThenGetBlock(t *testing.T) {
bs, _ := s.NewBlockstore(t)
if c, ok := bs.(io.Closer); ok {
defer func() { require.NoError(t, c.Close()) }()
}
orig := blocks.NewBlock([]byte("some data"))
err := bs.Put(orig)
require.NoError(t, err)
fetched, err := bs.Get(orig.Cid())
require.NoError(t, err)
require.Equal(t, orig.RawData(), fetched.RawData())
}
func (s *Suite) TestHas(t *testing.T) {
bs, _ := s.NewBlockstore(t)
if c, ok := bs.(io.Closer); ok {
defer func() { require.NoError(t, c.Close()) }()
}
orig := blocks.NewBlock([]byte("some data"))
err := bs.Put(orig)
require.NoError(t, err)
ok, err := bs.Has(orig.Cid())
require.NoError(t, err)
require.True(t, ok)
ok, err = bs.Has(blocks.NewBlock([]byte("another thing")).Cid())
require.NoError(t, err)
require.False(t, ok)
}
func (s *Suite) TestCidv0v1(t *testing.T) {
bs, _ := s.NewBlockstore(t)
if c, ok := bs.(io.Closer); ok {
defer func() { require.NoError(t, c.Close()) }()
}
orig := blocks.NewBlock([]byte("some data"))
err := bs.Put(orig)
require.NoError(t, err)
fetched, err := bs.Get(cid.NewCidV1(cid.DagProtobuf, orig.Cid().Hash()))
require.NoError(t, err)
require.Equal(t, orig.RawData(), fetched.RawData())
}
func (s *Suite) TestPutThenGetSizeBlock(t *testing.T) {
bs, _ := s.NewBlockstore(t)
if c, ok := bs.(io.Closer); ok {
defer func() { require.NoError(t, c.Close()) }()
}
block := blocks.NewBlock([]byte("some data"))
missingBlock := blocks.NewBlock([]byte("missingBlock"))
emptyBlock := blocks.NewBlock([]byte{})
err := bs.Put(block)
require.NoError(t, err)
blockSize, err := bs.GetSize(block.Cid())
require.NoError(t, err)
require.Len(t, block.RawData(), blockSize)
err = bs.Put(emptyBlock)
require.NoError(t, err)
emptySize, err := bs.GetSize(emptyBlock.Cid())
require.NoError(t, err)
require.Zero(t, emptySize)
missingSize, err := bs.GetSize(missingBlock.Cid())
require.Equal(t, blockstore.ErrNotFound, err)
require.Equal(t, -1, missingSize)
}
func (s *Suite) TestAllKeysSimple(t *testing.T) {
bs, _ := s.NewBlockstore(t)
if c, ok := bs.(io.Closer); ok {
defer func() { require.NoError(t, c.Close()) }()
}
keys := insertBlocks(t, bs, 100)
ctx := context.Background()
ch, err := bs.AllKeysChan(ctx)
require.NoError(t, err)
actual := collect(ch)
require.ElementsMatch(t, keys, actual)
}
func (s *Suite) TestAllKeysRespectsContext(t *testing.T) {
bs, _ := s.NewBlockstore(t)
if c, ok := bs.(io.Closer); ok {
defer func() { require.NoError(t, c.Close()) }()
}
_ = insertBlocks(t, bs, 100)
ctx, cancel := context.WithCancel(context.Background())
ch, err := bs.AllKeysChan(ctx)
require.NoError(t, err)
// consume 2, then cancel context.
v, ok := <-ch
require.NotEqual(t, cid.Undef, v)
require.True(t, ok)
v, ok = <-ch
require.NotEqual(t, cid.Undef, v)
require.True(t, ok)
cancel()
v, ok = <-ch
require.Equal(t, cid.Undef, v)
require.False(t, ok)
}
func (s *Suite) TestDoubleClose(t *testing.T) {
bs, _ := s.NewBlockstore(t)
c, ok := bs.(io.Closer)
if !ok {
t.SkipNow()
}
require.NoError(t, c.Close())
require.NoError(t, c.Close())
}
func (s *Suite) TestReopenPutGet(t *testing.T) {
bs, path := s.NewBlockstore(t)
c, ok := bs.(io.Closer)
if !ok {
t.SkipNow()
}
orig := blocks.NewBlock([]byte("some data"))
err := bs.Put(orig)
require.NoError(t, err)
err = c.Close()
require.NoError(t, err)
bs, err = s.OpenBlockstore(t, path)
require.NoError(t, err)
fetched, err := bs.Get(orig.Cid())
require.NoError(t, err)
require.Equal(t, orig.RawData(), fetched.RawData())
}
func (s *Suite) TestPutMany(t *testing.T) {
bs, _ := s.NewBlockstore(t)
if c, ok := bs.(io.Closer); ok {
defer func() { require.NoError(t, c.Close()) }()
}
blks := []blocks.Block{
blocks.NewBlock([]byte("foo1")),
blocks.NewBlock([]byte("foo2")),
blocks.NewBlock([]byte("foo3")),
}
err := bs.PutMany(blks)
require.NoError(t, err)
for _, blk := range blks {
fetched, err := bs.Get(blk.Cid())
require.NoError(t, err)
require.Equal(t, blk.RawData(), fetched.RawData())
ok, err := bs.Has(blk.Cid())
require.NoError(t, err)
require.True(t, ok)
}
ch, err := bs.AllKeysChan(context.Background())
require.NoError(t, err)
cids := collect(ch)
require.Len(t, cids, 3)
}
func (s *Suite) TestDelete(t *testing.T) {
bs, _ := s.NewBlockstore(t)
if c, ok := bs.(io.Closer); ok {
defer func() { require.NoError(t, c.Close()) }()
}
blks := []blocks.Block{
blocks.NewBlock([]byte("foo1")),
blocks.NewBlock([]byte("foo2")),
blocks.NewBlock([]byte("foo3")),
}
err := bs.PutMany(blks)
require.NoError(t, err)
err = bs.DeleteBlock(blks[1].Cid())
require.NoError(t, err)
ch, err := bs.AllKeysChan(context.Background())
require.NoError(t, err)
cids := collect(ch)
require.Len(t, cids, 2)
require.ElementsMatch(t, cids, []cid.Cid{
cid.NewCidV1(cid.Raw, blks[0].Cid().Hash()),
cid.NewCidV1(cid.Raw, blks[2].Cid().Hash()),
})
has, err := bs.Has(blks[1].Cid())
require.NoError(t, err)
require.False(t, has)
}
func insertBlocks(t *testing.T, bs blockstore.Blockstore, count int) []cid.Cid {
keys := make([]cid.Cid, count)
for i := 0; i < count; i++ {
block := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i)))
err := bs.Put(block)
require.NoError(t, err)
// NewBlock assigns a CIDv0; we convert it to CIDv1 because that's what
// the store returns.
keys[i] = cid.NewCidV1(cid.Raw, block.Multihash())
}
return keys
}
func collect(ch <-chan cid.Cid) []cid.Cid {
var keys []cid.Cid
for k := range ch {
keys = append(keys, k)
}
return keys
}

View File

@ -17,11 +17,18 @@ package blockstore
import (
"context"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)
// Viewer is a blockstore trait that can be implemented by blockstores
// that offer zero-copy access to blocks.
type Viewer interface {
View(cid cid.Cid, callback func([]byte) error) error
}
// NewTemporary returns a temporary blockstore.
func NewTemporary() MemStore {
return make(MemStore)