From 2047a74958b39d292d6d3c5bcb8093344f9c33a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 2 Mar 2021 17:03:11 +0000 Subject: [PATCH] implement blockstore.Union, a union blockstore. The union blockstore takes a list of blockstores. It returns the first satisfying read, and broadcasts writes to all stores. It can be used for operations that require reading from any two blockstores, for example WalkSnapshot. --- blockstore/union.go | 108 +++++++++++++++++++++++++++++++++++++++ blockstore/union_test.go | 102 ++++++++++++++++++++++++++++++++++++ chain/store/store.go | 3 +- 3 files changed, 212 insertions(+), 1 deletion(-) create mode 100644 blockstore/union.go create mode 100644 blockstore/union_test.go diff --git a/blockstore/union.go b/blockstore/union.go new file mode 100644 index 000000000..9573842a4 --- /dev/null +++ b/blockstore/union.go @@ -0,0 +1,108 @@ +package blockstore + +import ( + "context" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" +) + +type unionBlockstore []Blockstore + +// Union returns an unioned blockstore. +// +// * Reads return from the first blockstore that has the value, querying in the +// supplied order. +// * Writes (puts and deltes) are broadcast to all stores. +// +func Union(stores ...Blockstore) Blockstore { + return unionBlockstore(stores) +} + +func (m unionBlockstore) Has(cid cid.Cid) (has bool, err error) { + for _, bs := range m { + if has, err = bs.Has(cid); has || err != nil { + break + } + } + return has, err +} + +func (m unionBlockstore) Get(cid cid.Cid) (blk blocks.Block, err error) { + for _, bs := range m { + if blk, err = bs.Get(cid); err == nil || err != ErrNotFound { + break + } + } + return blk, err +} + +func (m unionBlockstore) View(cid cid.Cid, callback func([]byte) error) (err error) { + for _, bs := range m { + if err = bs.View(cid, callback); err == nil || err != ErrNotFound { + break + } + } + return err +} + +func (m unionBlockstore) GetSize(cid cid.Cid) (size int, err error) { + for _, bs := range m { + if size, err = bs.GetSize(cid); err == nil || err != ErrNotFound { + break + } + } + return size, err +} + +func (m unionBlockstore) Put(block blocks.Block) (err error) { + for _, bs := range m { + if err = bs.Put(block); err != nil { + break + } + } + return err +} + +func (m unionBlockstore) PutMany(blks []blocks.Block) (err error) { + for _, bs := range m { + if err = bs.PutMany(blks); err != nil { + break + } + } + return err +} + +func (m unionBlockstore) DeleteBlock(cid cid.Cid) (err error) { + for _, bs := range m { + if err = bs.DeleteBlock(cid); err != nil { + break + } + } + return err +} + +func (m unionBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + // this does not deduplicate; this interface needs to be revisited. + outCh := make(chan cid.Cid) + + go func() { + defer close(outCh) + + for _, bs := range m { + ch, err := bs.AllKeysChan(ctx) + if err != nil { + return + } + for cid := range ch { + outCh <- cid + } + } + }() + + return outCh, nil +} + +func (m unionBlockstore) HashOnRead(enabled bool) { + panic("implement me") +} diff --git a/blockstore/union_test.go b/blockstore/union_test.go new file mode 100644 index 000000000..b62026892 --- /dev/null +++ b/blockstore/union_test.go @@ -0,0 +1,102 @@ +package blockstore + +import ( + "context" + "testing" + + blocks "github.com/ipfs/go-block-format" + "github.com/stretchr/testify/require" +) + +var ( + b0 = blocks.NewBlock([]byte("abc")) + b1 = blocks.NewBlock([]byte("foo")) + b2 = blocks.NewBlock([]byte("bar")) +) + +func TestUnionBlockstore_Get(t *testing.T) { + m1 := NewMemory() + m2 := NewMemory() + + _ = m1.Put(b1) + _ = m2.Put(b2) + + u := Union(m1, m2) + + v1, err := u.Get(b1.Cid()) + require.NoError(t, err) + require.Equal(t, b1.RawData(), v1.RawData()) + + v2, err := u.Get(b2.Cid()) + require.NoError(t, err) + require.Equal(t, b2.RawData(), v2.RawData()) +} + +func TestUnionBlockstore_Put_PutMany_Delete_AllKeysChan(t *testing.T) { + m1 := NewMemory() + m2 := NewMemory() + + u := Union(m1, m2) + + err := u.Put(b0) + require.NoError(t, err) + + var has bool + + // write was broadcasted to all stores. + has, _ = m1.Has(b0.Cid()) + require.True(t, has) + + has, _ = m2.Has(b0.Cid()) + require.True(t, has) + + has, _ = u.Has(b0.Cid()) + require.True(t, has) + + // put many. + err = u.PutMany([]blocks.Block{b1, b2}) + require.NoError(t, err) + + // write was broadcasted to all stores. + has, _ = m1.Has(b1.Cid()) + require.True(t, has) + + has, _ = m1.Has(b2.Cid()) + require.True(t, has) + + has, _ = m2.Has(b1.Cid()) + require.True(t, has) + + has, _ = m2.Has(b2.Cid()) + require.True(t, has) + + // also in the union store. + has, _ = u.Has(b1.Cid()) + require.True(t, has) + + has, _ = u.Has(b2.Cid()) + require.True(t, has) + + // deleted from all stores. + err = u.DeleteBlock(b1.Cid()) + require.NoError(t, err) + + has, _ = u.Has(b1.Cid()) + require.False(t, has) + + has, _ = m1.Has(b1.Cid()) + require.False(t, has) + + has, _ = m2.Has(b1.Cid()) + require.False(t, has) + + // check that AllKeysChan returns b0 and b2, twice (once per backing store) + ch, err := u.AllKeysChan(context.Background()) + require.NoError(t, err) + + var i int + for range ch { + i++ + } + require.Equal(t, 4, i) +} diff --git a/chain/store/store.go b/chain/store/store.go index 823063978..6a3febcc8 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -1427,8 +1427,9 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo return xerrors.Errorf("failed to write car header: %s", err) } + unionBs := bstore.Union(cs.stateBlockstore, cs.chainBlockstore) return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, true, func(c cid.Cid) error { - blk, err := cs.chainBlockstore.Get(c) + blk, err := unionBs.Get(c) if err != nil { return xerrors.Errorf("writing object to car, bs.Get: %w", err) }