implement blockstore.Union, a union blockstore.

The union blockstore takes a list of blockstores. It returns the first
satisfying read, and broadcasts writes to all stores.

It can be used for operations that require reading from any two blockstores,
for example WalkSnapshot.
This commit is contained in:
Raúl Kripalani 2021-03-02 17:03:11 +00:00
parent b1c348b4a7
commit 2047a74958
3 changed files with 212 additions and 1 deletions

108
blockstore/union.go Normal file
View File

@ -0,0 +1,108 @@
package blockstore
import (
"context"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
)
type unionBlockstore []Blockstore
// Union returns an unioned blockstore.
//
// * Reads return from the first blockstore that has the value, querying in the
// supplied order.
// * Writes (puts and deltes) are broadcast to all stores.
//
func Union(stores ...Blockstore) Blockstore {
return unionBlockstore(stores)
}
func (m unionBlockstore) Has(cid cid.Cid) (has bool, err error) {
for _, bs := range m {
if has, err = bs.Has(cid); has || err != nil {
break
}
}
return has, err
}
func (m unionBlockstore) Get(cid cid.Cid) (blk blocks.Block, err error) {
for _, bs := range m {
if blk, err = bs.Get(cid); err == nil || err != ErrNotFound {
break
}
}
return blk, err
}
func (m unionBlockstore) View(cid cid.Cid, callback func([]byte) error) (err error) {
for _, bs := range m {
if err = bs.View(cid, callback); err == nil || err != ErrNotFound {
break
}
}
return err
}
func (m unionBlockstore) GetSize(cid cid.Cid) (size int, err error) {
for _, bs := range m {
if size, err = bs.GetSize(cid); err == nil || err != ErrNotFound {
break
}
}
return size, err
}
func (m unionBlockstore) Put(block blocks.Block) (err error) {
for _, bs := range m {
if err = bs.Put(block); err != nil {
break
}
}
return err
}
func (m unionBlockstore) PutMany(blks []blocks.Block) (err error) {
for _, bs := range m {
if err = bs.PutMany(blks); err != nil {
break
}
}
return err
}
func (m unionBlockstore) DeleteBlock(cid cid.Cid) (err error) {
for _, bs := range m {
if err = bs.DeleteBlock(cid); err != nil {
break
}
}
return err
}
func (m unionBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
// this does not deduplicate; this interface needs to be revisited.
outCh := make(chan cid.Cid)
go func() {
defer close(outCh)
for _, bs := range m {
ch, err := bs.AllKeysChan(ctx)
if err != nil {
return
}
for cid := range ch {
outCh <- cid
}
}
}()
return outCh, nil
}
func (m unionBlockstore) HashOnRead(enabled bool) {
panic("implement me")
}

102
blockstore/union_test.go Normal file
View File

@ -0,0 +1,102 @@
package blockstore
import (
"context"
"testing"
blocks "github.com/ipfs/go-block-format"
"github.com/stretchr/testify/require"
)
var (
b0 = blocks.NewBlock([]byte("abc"))
b1 = blocks.NewBlock([]byte("foo"))
b2 = blocks.NewBlock([]byte("bar"))
)
func TestUnionBlockstore_Get(t *testing.T) {
m1 := NewMemory()
m2 := NewMemory()
_ = m1.Put(b1)
_ = m2.Put(b2)
u := Union(m1, m2)
v1, err := u.Get(b1.Cid())
require.NoError(t, err)
require.Equal(t, b1.RawData(), v1.RawData())
v2, err := u.Get(b2.Cid())
require.NoError(t, err)
require.Equal(t, b2.RawData(), v2.RawData())
}
func TestUnionBlockstore_Put_PutMany_Delete_AllKeysChan(t *testing.T) {
m1 := NewMemory()
m2 := NewMemory()
u := Union(m1, m2)
err := u.Put(b0)
require.NoError(t, err)
var has bool
// write was broadcasted to all stores.
has, _ = m1.Has(b0.Cid())
require.True(t, has)
has, _ = m2.Has(b0.Cid())
require.True(t, has)
has, _ = u.Has(b0.Cid())
require.True(t, has)
// put many.
err = u.PutMany([]blocks.Block{b1, b2})
require.NoError(t, err)
// write was broadcasted to all stores.
has, _ = m1.Has(b1.Cid())
require.True(t, has)
has, _ = m1.Has(b2.Cid())
require.True(t, has)
has, _ = m2.Has(b1.Cid())
require.True(t, has)
has, _ = m2.Has(b2.Cid())
require.True(t, has)
// also in the union store.
has, _ = u.Has(b1.Cid())
require.True(t, has)
has, _ = u.Has(b2.Cid())
require.True(t, has)
// deleted from all stores.
err = u.DeleteBlock(b1.Cid())
require.NoError(t, err)
has, _ = u.Has(b1.Cid())
require.False(t, has)
has, _ = m1.Has(b1.Cid())
require.False(t, has)
has, _ = m2.Has(b1.Cid())
require.False(t, has)
// check that AllKeysChan returns b0 and b2, twice (once per backing store)
ch, err := u.AllKeysChan(context.Background())
require.NoError(t, err)
var i int
for range ch {
i++
}
require.Equal(t, 4, i)
}

View File

@ -1427,8 +1427,9 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
return xerrors.Errorf("failed to write car header: %s", err)
}
unionBs := bstore.Union(cs.stateBlockstore, cs.chainBlockstore)
return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, true, func(c cid.Cid) error {
blk, err := cs.chainBlockstore.Get(c)
blk, err := unionBs.Get(c)
if err != nil {
return xerrors.Errorf("writing object to car, bs.Get: %w", err)
}