diff --git a/lib/blockstore/badger/badger.go b/lib/blockstore/badger/badger.go new file mode 100644 index 000000000..6a67cb2c3 --- /dev/null +++ b/lib/blockstore/badger/badger.go @@ -0,0 +1,346 @@ +package badgerbs + +import ( + "context" + "fmt" + "io" + "sync/atomic" + + "github.com/dgraph-io/badger/v2" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + logger "github.com/ipfs/go-log/v2" + pool "github.com/libp2p/go-buffer-pool" + + "github.com/filecoin-project/lotus/lib/blockstore" +) + +var ( + ErrBlockstoreClosed = fmt.Errorf("badger blockstore closed") + + log = logger.Logger("badgerbs") +) + +type Options struct { + badger.Options + + // Prefix is an optional prefix to prepend to keys. Default: "". + Prefix string +} + +func DefaultOptions(path string) Options { + return Options{ + Options: badger.DefaultOptions(path), + Prefix: "", + } +} + +// badgerLog is a local wrapper for go-log to make the interface +// compatible with badger.Logger (namely, aliasing Warnf to Warningf) +type badgerLog struct { + logger.ZapEventLogger +} + +func (b *badgerLog) Warningf(format string, args ...interface{}) { + b.Warnf(format, args...) +} + +const ( + stateOpen int64 = iota + stateClosing + stateClosed +) + +// Blockstore is a badger-backed IPLD blockstore. +// +// NOTE: once Close() is called, methods will try their best to return +// ErrBlockstoreClosed. This will guaranteed to happen for all subsequent +// operation calls after Close() has returned, but it may not happen for +// operations in progress. Those are likely to fail with a different error. +type Blockstore struct { + DB *badger.DB + + // state is guarded by atomic. + state int64 + + prefixing bool + prefix []byte + prefixLen int +} + +var _ blockstore.Blockstore = (*Blockstore)(nil) +var _ blockstore.Viewer = (*Blockstore)(nil) +var _ io.Closer = (*Blockstore)(nil) + +func Open(opts Options) (*Blockstore, error) { + opts.Logger = &badgerLog{*log} + + db, err := badger.Open(opts.Options) + if err != nil { + return nil, fmt.Errorf("failed to open badger blockstore: %w", err) + } + + bs := &Blockstore{ + DB: db, + } + + if p := opts.Prefix; p != "" { + bs.prefixing = true + bs.prefix = []byte(p) + bs.prefixLen = len(bs.prefix) + } + + return bs, nil +} + +func (b *Blockstore) Close() error { + if !atomic.CompareAndSwapInt64(&b.state, stateOpen, stateClosing) { + return nil + } + + defer atomic.StoreInt64(&b.state, stateClosed) + return b.DB.Close() +} + +func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error { + if atomic.LoadInt64(&b.state) != stateOpen { + return ErrBlockstoreClosed + } + + k, pooled := b.PooledPrefixedKey(cid) + if pooled { + defer pool.Put(k) + } + + return b.DB.View(func(txn *badger.Txn) error { + switch item, err := txn.Get(k); err { + case nil: + return item.Value(fn) + case badger.ErrKeyNotFound: + return blockstore.ErrNotFound + default: + return fmt.Errorf("failed to view block from badger blockstore: %w", err) + } + }) +} + +func (b *Blockstore) Has(cid cid.Cid) (bool, error) { + if atomic.LoadInt64(&b.state) != stateOpen { + return false, ErrBlockstoreClosed + } + + k, pooled := b.PooledPrefixedKey(cid) + if pooled { + defer pool.Put(k) + } + + err := b.DB.View(func(txn *badger.Txn) error { + _, err := txn.Get(k) + return err + }) + + switch err { + case badger.ErrKeyNotFound: + return false, nil + case nil: + return true, nil + default: + return false, fmt.Errorf("failed to check if block exists in badger blockstore: %w", err) + } +} + +func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) { + if !cid.Defined() { + return nil, blockstore.ErrNotFound + } + + if atomic.LoadInt64(&b.state) != stateOpen { + return nil, ErrBlockstoreClosed + } + + k, pooled := b.PooledPrefixedKey(cid) + if pooled { + defer pool.Put(k) + } + + var val []byte + err := b.DB.View(func(txn *badger.Txn) error { + switch item, err := txn.Get(k); err { + case nil: + val, err = item.ValueCopy(nil) + return err + case badger.ErrKeyNotFound: + return blockstore.ErrNotFound + default: + return fmt.Errorf("failed to get block from badger blockstore: %w", err) + } + }) + if err != nil { + return nil, err + } + return blocks.NewBlockWithCid(val, cid) +} + +func (b *Blockstore) GetSize(cid cid.Cid) (int, error) { + if atomic.LoadInt64(&b.state) != stateOpen { + return -1, ErrBlockstoreClosed + } + + k, pooled := b.PooledPrefixedKey(cid) + if pooled { + defer pool.Put(k) + } + + var size int + err := b.DB.View(func(txn *badger.Txn) error { + switch item, err := txn.Get(k); err { + case nil: + size = int(item.ValueSize()) + case badger.ErrKeyNotFound: + return blockstore.ErrNotFound + default: + return fmt.Errorf("failed to get block size from badger blockstore: %w", err) + } + return nil + }) + if err != nil { + size = -1 + } + return size, err +} + +func (b *Blockstore) Put(block blocks.Block) error { + if atomic.LoadInt64(&b.state) != stateOpen { + return ErrBlockstoreClosed + } + + k, pooled := b.PooledPrefixedKey(block.Cid()) + if pooled { + defer pool.Put(k) + } + + err := b.DB.Update(func(txn *badger.Txn) error { + return txn.Set(k, block.RawData()) + }) + if err != nil { + err = fmt.Errorf("failed to put block in badger blockstore: %w", err) + } + return err +} + +func (b *Blockstore) PutMany(blocks []blocks.Block) error { + if atomic.LoadInt64(&b.state) != stateOpen { + return ErrBlockstoreClosed + } + + batch := b.DB.NewWriteBatch() + defer batch.Cancel() + + // toReturn tracks the byte slices to return to the pool, if we're using key + // prefixing. we can't return each slice to the pool after each Set, because + // badger holds on to the slice. + var toReturn [][]byte + if b.prefixing { + toReturn = make([][]byte, 0, len(blocks)) + defer func() { + for _, b := range toReturn { + pool.Put(b) + } + }() + } + + for _, block := range blocks { + k, pooled := b.PooledPrefixedKey(block.Cid()) + if pooled { + toReturn = append(toReturn, k) + } + if err := batch.Set(k, block.RawData()); err != nil { + return err + } + } + + err := batch.Flush() + if err != nil { + err = fmt.Errorf("failed to put blocks in badger blockstore: %w", err) + } + return err +} + +func (b *Blockstore) DeleteBlock(cid cid.Cid) error { + if atomic.LoadInt64(&b.state) != stateOpen { + return ErrBlockstoreClosed + } + + k, pooled := b.PooledPrefixedKey(cid) + if pooled { + defer pool.Put(k) + } + + return b.DB.Update(func(txn *badger.Txn) error { + return txn.Delete(k) + }) +} + +func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + if atomic.LoadInt64(&b.state) != stateOpen { + return nil, ErrBlockstoreClosed + } + + txn := b.DB.NewTransaction(false) + opts := badger.IteratorOptions{PrefetchSize: 100} + if b.prefixing { + opts.Prefix = b.prefix + } + iter := txn.NewIterator(opts) + + ch := make(chan cid.Cid) + go func() { + defer close(ch) + defer iter.Close() + + for iter.Rewind(); iter.Valid(); iter.Next() { + if ctx.Err() != nil { + return // context has fired. + } + if atomic.LoadInt64(&b.state) != stateOpen { + // open iterators will run even after the database is closed... + return // closing, yield. + } + k := iter.Item().Key() + if b.prefixing { + k = k[b.prefixLen:] + } + ch <- cid.NewCidV1(cid.Raw, k) + } + }() + + return ch, nil +} + +func (b *Blockstore) HashOnRead(enabled bool) { + log.Warnf("called HashOnRead on badger blockstore; function not supported; ignoring") +} + +func (b *Blockstore) PrefixedKey(cid cid.Cid) []byte { + h := cid.Hash() + if !b.prefixing { + return h + } + k := make([]byte, b.prefixLen+len(h)) + copy(k, b.prefix) + copy(k[b.prefixLen:], h) + return k +} + +func (b *Blockstore) PooledPrefixedKey(cid cid.Cid) (key []byte, pooled bool) { + h := cid.Hash() + if !b.prefixing { + return h, false + } + + size := b.prefixLen + len(h) + k := pool.Get(size) + copy(k, b.prefix) + copy(k[b.prefixLen:], h) + return k, true +} diff --git a/lib/blockstore/badger/badger_bench_test.go b/lib/blockstore/badger/badger_bench_test.go new file mode 100644 index 000000000..0039a1b45 --- /dev/null +++ b/lib/blockstore/badger/badger_bench_test.go @@ -0,0 +1,9 @@ +package badgerbs + +import "testing" + +func BenchmarkName(b *testing.B) { + for i := 0; i < b.N; i++ { + + } +} diff --git a/lib/blockstore/badger/badger_test.go b/lib/blockstore/badger/badger_test.go new file mode 100644 index 000000000..bfa979349 --- /dev/null +++ b/lib/blockstore/badger/badger_test.go @@ -0,0 +1,56 @@ +package badgerbs + +import ( + "io/ioutil" + "os" + "testing" + + blockstore "github.com/ipfs/go-ipfs-blockstore" +) + +func TestBadgerBlockstore(t *testing.T) { + (&Suite{ + NewBlockstore: newBlockstore(DefaultOptions), + OpenBlockstore: openBlockstore(DefaultOptions), + }).RunTests(t, "non_prefixed") + + prefixed := func(path string) Options { + opts := DefaultOptions(path) + opts.Prefix = "/prefixed/" + return opts + } + + (&Suite{ + NewBlockstore: newBlockstore(prefixed), + OpenBlockstore: openBlockstore(prefixed), + }).RunTests(t, "prefixed") +} + +func newBlockstore(optsSupplier func(path string) Options) func(tb testing.TB) (bs blockstore.Blockstore, path string) { + return func(tb testing.TB) (bs blockstore.Blockstore, path string) { + tb.Helper() + + path, err := ioutil.TempDir("", "") + if err != nil { + tb.Fatal(err) + } + + db, err := Open(optsSupplier(path)) + if err != nil { + tb.Fatal(err) + } + + tb.Cleanup(func() { + _ = os.RemoveAll(path) + }) + + return db, path + } +} + +func openBlockstore(optsSupplier func(path string) Options) func(tb testing.TB, path string) (bs blockstore.Blockstore, err error) { + return func(tb testing.TB, path string) (bs blockstore.Blockstore, err error) { + tb.Helper() + return Open(optsSupplier(path)) + } +} diff --git a/lib/blockstore/badger/badger_test_suite.go b/lib/blockstore/badger/badger_test_suite.go new file mode 100644 index 000000000..6d73be421 --- /dev/null +++ b/lib/blockstore/badger/badger_test_suite.go @@ -0,0 +1,307 @@ +package badgerbs + +import ( + "context" + "fmt" + "io" + "reflect" + "strings" + "testing" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-ipfs-blockstore" + u "github.com/ipfs/go-ipfs-util" + + "github.com/stretchr/testify/require" +) + +// TODO: move this to go-ipfs-blockstore. +type Suite struct { + NewBlockstore func(tb testing.TB) (bs blockstore.Blockstore, path string) + OpenBlockstore func(tb testing.TB, path string) (bs blockstore.Blockstore, err error) +} + +func (s *Suite) RunTests(t *testing.T, prefix string) { + v := reflect.TypeOf(s) + f := func(t *testing.T) { + for i := 0; i < v.NumMethod(); i++ { + if m := v.Method(i); strings.HasPrefix(m.Name, "Test") { + f := m.Func.Interface().(func(*Suite, *testing.T)) + t.Run(m.Name, func(t *testing.T) { + f(s, t) + }) + } + } + } + + if prefix == "" { + f(t) + } else { + t.Run(prefix, f) + } +} + +func (s *Suite) TestGetWhenKeyNotPresent(t *testing.T) { + bs, _ := s.NewBlockstore(t) + if c, ok := bs.(io.Closer); ok { + defer func() { require.NoError(t, c.Close()) }() + } + + c := cid.NewCidV0(u.Hash([]byte("stuff"))) + bl, err := bs.Get(c) + require.Nil(t, bl) + require.Equal(t, blockstore.ErrNotFound, err) +} + +func (s *Suite) TestGetWhenKeyIsNil(t *testing.T) { + bs, _ := s.NewBlockstore(t) + if c, ok := bs.(io.Closer); ok { + defer func() { require.NoError(t, c.Close()) }() + } + + _, err := bs.Get(cid.Undef) + require.Equal(t, blockstore.ErrNotFound, err) +} + +func (s *Suite) TestPutThenGetBlock(t *testing.T) { + bs, _ := s.NewBlockstore(t) + if c, ok := bs.(io.Closer); ok { + defer func() { require.NoError(t, c.Close()) }() + } + + orig := blocks.NewBlock([]byte("some data")) + + err := bs.Put(orig) + require.NoError(t, err) + + fetched, err := bs.Get(orig.Cid()) + require.NoError(t, err) + require.Equal(t, orig.RawData(), fetched.RawData()) +} + +func (s *Suite) TestHas(t *testing.T) { + bs, _ := s.NewBlockstore(t) + if c, ok := bs.(io.Closer); ok { + defer func() { require.NoError(t, c.Close()) }() + } + + orig := blocks.NewBlock([]byte("some data")) + + err := bs.Put(orig) + require.NoError(t, err) + + ok, err := bs.Has(orig.Cid()) + require.NoError(t, err) + require.True(t, ok) + + ok, err = bs.Has(blocks.NewBlock([]byte("another thing")).Cid()) + require.NoError(t, err) + require.False(t, ok) +} + +func (s *Suite) TestCidv0v1(t *testing.T) { + bs, _ := s.NewBlockstore(t) + if c, ok := bs.(io.Closer); ok { + defer func() { require.NoError(t, c.Close()) }() + } + + orig := blocks.NewBlock([]byte("some data")) + + err := bs.Put(orig) + require.NoError(t, err) + + fetched, err := bs.Get(cid.NewCidV1(cid.DagProtobuf, orig.Cid().Hash())) + require.NoError(t, err) + require.Equal(t, orig.RawData(), fetched.RawData()) +} + +func (s *Suite) TestPutThenGetSizeBlock(t *testing.T) { + bs, _ := s.NewBlockstore(t) + if c, ok := bs.(io.Closer); ok { + defer func() { require.NoError(t, c.Close()) }() + } + + block := blocks.NewBlock([]byte("some data")) + missingBlock := blocks.NewBlock([]byte("missingBlock")) + emptyBlock := blocks.NewBlock([]byte{}) + + err := bs.Put(block) + require.NoError(t, err) + + blockSize, err := bs.GetSize(block.Cid()) + require.NoError(t, err) + require.Len(t, block.RawData(), blockSize) + + err = bs.Put(emptyBlock) + require.NoError(t, err) + + emptySize, err := bs.GetSize(emptyBlock.Cid()) + require.NoError(t, err) + require.Zero(t, emptySize) + + missingSize, err := bs.GetSize(missingBlock.Cid()) + require.Equal(t, blockstore.ErrNotFound, err) + require.Equal(t, -1, missingSize) +} + +func (s *Suite) TestAllKeysSimple(t *testing.T) { + bs, _ := s.NewBlockstore(t) + if c, ok := bs.(io.Closer); ok { + defer func() { require.NoError(t, c.Close()) }() + } + + keys := insertBlocks(t, bs, 100) + + ctx := context.Background() + ch, err := bs.AllKeysChan(ctx) + require.NoError(t, err) + actual := collect(ch) + + require.ElementsMatch(t, keys, actual) +} + +func (s *Suite) TestAllKeysRespectsContext(t *testing.T) { + bs, _ := s.NewBlockstore(t) + if c, ok := bs.(io.Closer); ok { + defer func() { require.NoError(t, c.Close()) }() + } + + _ = insertBlocks(t, bs, 100) + + ctx, cancel := context.WithCancel(context.Background()) + ch, err := bs.AllKeysChan(ctx) + require.NoError(t, err) + + // consume 2, then cancel context. + v, ok := <-ch + require.NotEqual(t, cid.Undef, v) + require.True(t, ok) + + v, ok = <-ch + require.NotEqual(t, cid.Undef, v) + require.True(t, ok) + + cancel() + + v, ok = <-ch + require.Equal(t, cid.Undef, v) + require.False(t, ok) +} + +func (s *Suite) TestDoubleClose(t *testing.T) { + bs, _ := s.NewBlockstore(t) + c, ok := bs.(io.Closer) + if !ok { + t.SkipNow() + } + require.NoError(t, c.Close()) + require.NoError(t, c.Close()) +} + +func (s *Suite) TestReopenPutGet(t *testing.T) { + bs, path := s.NewBlockstore(t) + c, ok := bs.(io.Closer) + if !ok { + t.SkipNow() + } + + orig := blocks.NewBlock([]byte("some data")) + err := bs.Put(orig) + require.NoError(t, err) + + err = c.Close() + require.NoError(t, err) + + bs, err = s.OpenBlockstore(t, path) + require.NoError(t, err) + + fetched, err := bs.Get(orig.Cid()) + require.NoError(t, err) + require.Equal(t, orig.RawData(), fetched.RawData()) +} + +func (s *Suite) TestPutMany(t *testing.T) { + bs, _ := s.NewBlockstore(t) + if c, ok := bs.(io.Closer); ok { + defer func() { require.NoError(t, c.Close()) }() + } + + blks := []blocks.Block{ + blocks.NewBlock([]byte("foo1")), + blocks.NewBlock([]byte("foo2")), + blocks.NewBlock([]byte("foo3")), + } + err := bs.PutMany(blks) + require.NoError(t, err) + + for _, blk := range blks { + fetched, err := bs.Get(blk.Cid()) + require.NoError(t, err) + require.Equal(t, blk.RawData(), fetched.RawData()) + + ok, err := bs.Has(blk.Cid()) + require.NoError(t, err) + require.True(t, ok) + } + + ch, err := bs.AllKeysChan(context.Background()) + require.NoError(t, err) + + cids := collect(ch) + require.Len(t, cids, 3) +} + +func (s *Suite) TestDelete(t *testing.T) { + bs, _ := s.NewBlockstore(t) + if c, ok := bs.(io.Closer); ok { + defer func() { require.NoError(t, c.Close()) }() + } + + blks := []blocks.Block{ + blocks.NewBlock([]byte("foo1")), + blocks.NewBlock([]byte("foo2")), + blocks.NewBlock([]byte("foo3")), + } + err := bs.PutMany(blks) + require.NoError(t, err) + + err = bs.DeleteBlock(blks[1].Cid()) + require.NoError(t, err) + + ch, err := bs.AllKeysChan(context.Background()) + require.NoError(t, err) + + cids := collect(ch) + require.Len(t, cids, 2) + require.ElementsMatch(t, cids, []cid.Cid{ + cid.NewCidV1(cid.Raw, blks[0].Cid().Hash()), + cid.NewCidV1(cid.Raw, blks[2].Cid().Hash()), + }) + + has, err := bs.Has(blks[1].Cid()) + require.NoError(t, err) + require.False(t, has) + +} + +func insertBlocks(t *testing.T, bs blockstore.Blockstore, count int) []cid.Cid { + keys := make([]cid.Cid, count) + for i := 0; i < count; i++ { + block := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i))) + err := bs.Put(block) + require.NoError(t, err) + // NewBlock assigns a CIDv0; we convert it to CIDv1 because that's what + // the store returns. + keys[i] = cid.NewCidV1(cid.Raw, block.Multihash()) + } + return keys +} + +func collect(ch <-chan cid.Cid) []cid.Cid { + var keys []cid.Cid + for k := range ch { + keys = append(keys, k) + } + return keys +} diff --git a/lib/blockstore/blockstore.go b/lib/blockstore/blockstore.go index 99d849188..b6369e491 100644 --- a/lib/blockstore/blockstore.go +++ b/lib/blockstore/blockstore.go @@ -17,11 +17,18 @@ package blockstore import ( "context" + "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" blockstore "github.com/ipfs/go-ipfs-blockstore" ) +// Viewer is a blockstore trait that can be implemented by blockstores +// that offer zero-copy access to blocks. +type Viewer interface { + View(cid cid.Cid, callback func([]byte) error) error +} + // NewTemporary returns a temporary blockstore. func NewTemporary() MemStore { return make(MemStore)