WIP: updating to new datastore/blockstore code with contexts

This commit is contained in:
whyrusleeping 2021-11-18 17:50:25 -08:00 committed by vyzo
parent 49c619d65d
commit 072297e661
23 changed files with 452 additions and 555 deletions

View File

@ -25,35 +25,35 @@ func NewAPIBlockstore(cio ChainIO) Blockstore {
return Adapt(bs) // return an adapted blockstore.
}
func (a *apiBlockstore) DeleteBlock(cid.Cid) error {
func (a *apiBlockstore) DeleteBlock(context.Context, cid.Cid) error {
return xerrors.New("not supported")
}
func (a *apiBlockstore) Has(c cid.Cid) (bool, error) {
return a.api.ChainHasObj(context.TODO(), c)
func (a *apiBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
return a.api.ChainHasObj(ctx, c)
}
func (a *apiBlockstore) Get(c cid.Cid) (blocks.Block, error) {
bb, err := a.api.ChainReadObj(context.TODO(), c)
func (a *apiBlockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
bb, err := a.api.ChainReadObj(ctx, c)
if err != nil {
return nil, err
}
return blocks.NewBlockWithCid(bb, c)
}
func (a *apiBlockstore) GetSize(c cid.Cid) (int, error) {
bb, err := a.api.ChainReadObj(context.TODO(), c)
func (a *apiBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
bb, err := a.api.ChainReadObj(ctx, c)
if err != nil {
return 0, err
}
return len(bb), nil
}
func (a *apiBlockstore) Put(blocks.Block) error {
func (a *apiBlockstore) Put(context.Context, blocks.Block) error {
return xerrors.New("not supported")
}
func (a *apiBlockstore) PutMany([]blocks.Block) error {
func (a *apiBlockstore) PutMany(context.Context, []blocks.Block) error {
return xerrors.New("not supported")
}
@ -61,6 +61,6 @@ func (a *apiBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error)
return nil, xerrors.New("not supported")
}
func (a *apiBlockstore) HashOnRead(enabled bool) {
func (a *apiBlockstore) HashOnRead(ctx context.Context, enabled bool) {
return
}

View File

@ -525,7 +525,7 @@ func (b *Blockstore) Size() (int64, error) {
// View implements blockstore.Viewer, which leverages zero-copy read-only
// access to values.
func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error {
func (b *Blockstore) View(ctx context.Context, cid cid.Cid, fn func([]byte) error) error {
if err := b.access(); err != nil {
return err
}
@ -552,7 +552,7 @@ func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error {
}
// Has implements Blockstore.Has.
func (b *Blockstore) Has(cid cid.Cid) (bool, error) {
func (b *Blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
if err := b.access(); err != nil {
return false, err
}
@ -582,7 +582,7 @@ func (b *Blockstore) Has(cid cid.Cid) (bool, error) {
}
// Get implements Blockstore.Get.
func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) {
func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
if !cid.Defined() {
return nil, blockstore.ErrNotFound
}
@ -619,7 +619,7 @@ func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) {
}
// GetSize implements Blockstore.GetSize.
func (b *Blockstore) GetSize(cid cid.Cid) (int, error) {
func (b *Blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
if err := b.access(); err != nil {
return 0, err
}
@ -652,7 +652,7 @@ func (b *Blockstore) GetSize(cid cid.Cid) (int, error) {
}
// Put implements Blockstore.Put.
func (b *Blockstore) Put(block blocks.Block) error {
func (b *Blockstore) Put(ctx context.Context, block blocks.Block) error {
if err := b.access(); err != nil {
return err
}
@ -691,7 +691,7 @@ func (b *Blockstore) Put(block blocks.Block) error {
}
// PutMany implements Blockstore.PutMany.
func (b *Blockstore) PutMany(blocks []blocks.Block) error {
func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
if err := b.access(); err != nil {
return err
}
@ -755,7 +755,7 @@ func (b *Blockstore) PutMany(blocks []blocks.Block) error {
}
// DeleteBlock implements Blockstore.DeleteBlock.
func (b *Blockstore) DeleteBlock(cid cid.Cid) error {
func (b *Blockstore) DeleteBlock(ctx context.Context, cid cid.Cid) error {
if err := b.access(); err != nil {
return err
}
@ -774,7 +774,7 @@ func (b *Blockstore) DeleteBlock(cid cid.Cid) error {
})
}
func (b *Blockstore) DeleteMany(cids []cid.Cid) error {
func (b *Blockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error {
if err := b.access(); err != nil {
return err
}
@ -927,7 +927,7 @@ func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error {
// HashOnRead implements Blockstore.HashOnRead. It is not supported by this
// blockstore.
func (b *Blockstore) HashOnRead(_ bool) {
func (b *Blockstore) HashOnRead(ctx context.Context, _ bool) {
log.Warnf("called HashOnRead on badger blockstore; function not supported; ignoring")
}

View File

@ -2,6 +2,7 @@ package badgerbs
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"os"
@ -98,6 +99,7 @@ func openBlockstore(optsSupplier func(path string) Options) func(tb testing.TB,
}
func testMove(t *testing.T, optsF func(string) Options) {
ctx := context.TODO()
basePath, err := ioutil.TempDir("", "")
if err != nil {
t.Fatal(err)
@ -122,7 +124,7 @@ func testMove(t *testing.T, optsF func(string) Options) {
// add some blocks
for i := 0; i < 10; i++ {
blk := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i)))
err := db.Put(blk)
err := db.Put(ctx, blk)
if err != nil {
t.Fatal(err)
}
@ -132,7 +134,7 @@ func testMove(t *testing.T, optsF func(string) Options) {
// delete some of them
for i := 5; i < 10; i++ {
c := have[i].Cid()
err := db.DeleteBlock(c)
err := db.DeleteBlock(ctx, c)
if err != nil {
t.Fatal(err)
}
@ -145,7 +147,7 @@ func testMove(t *testing.T, optsF func(string) Options) {
g.Go(func() error {
for i := 10; i < 1000; i++ {
blk := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i)))
err := db.Put(blk)
err := db.Put(ctx, blk)
if err != nil {
return err
}
@ -165,7 +167,7 @@ func testMove(t *testing.T, optsF func(string) Options) {
// now check that we have all the blocks in have and none in the deleted lists
checkBlocks := func() {
for _, blk := range have {
has, err := db.Has(blk.Cid())
has, err := db.Has(ctx, blk.Cid())
if err != nil {
t.Fatal(err)
}
@ -174,7 +176,7 @@ func testMove(t *testing.T, optsF func(string) Options) {
t.Fatal("missing block")
}
blk2, err := db.Get(blk.Cid())
blk2, err := db.Get(ctx, blk.Cid())
if err != nil {
t.Fatal(err)
}
@ -185,7 +187,7 @@ func testMove(t *testing.T, optsF func(string) Options) {
}
for _, c := range deleted {
has, err := db.Has(c)
has, err := db.Has(ctx, c)
if err != nil {
t.Fatal(err)
}

View File

@ -44,28 +44,31 @@ func (s *Suite) RunTests(t *testing.T, prefix string) {
}
func (s *Suite) TestGetWhenKeyNotPresent(t *testing.T) {
ctx := context.TODO()
bs, _ := s.NewBlockstore(t)
if c, ok := bs.(io.Closer); ok {
defer func() { require.NoError(t, c.Close()) }()
}
c := cid.NewCidV0(u.Hash([]byte("stuff")))
bl, err := bs.Get(c)
bl, err := bs.Get(ctx, c)
require.Nil(t, bl)
require.Equal(t, blockstore.ErrNotFound, err)
}
func (s *Suite) TestGetWhenKeyIsNil(t *testing.T) {
ctx := context.TODO()
bs, _ := s.NewBlockstore(t)
if c, ok := bs.(io.Closer); ok {
defer func() { require.NoError(t, c.Close()) }()
}
_, err := bs.Get(cid.Undef)
_, err := bs.Get(ctx, cid.Undef)
require.Equal(t, blockstore.ErrNotFound, err)
}
func (s *Suite) TestPutThenGetBlock(t *testing.T) {
ctx := context.TODO()
bs, _ := s.NewBlockstore(t)
if c, ok := bs.(io.Closer); ok {
defer func() { require.NoError(t, c.Close()) }()
@ -73,15 +76,16 @@ func (s *Suite) TestPutThenGetBlock(t *testing.T) {
orig := blocks.NewBlock([]byte("some data"))
err := bs.Put(orig)
err := bs.Put(ctx, orig)
require.NoError(t, err)
fetched, err := bs.Get(orig.Cid())
fetched, err := bs.Get(ctx, orig.Cid())
require.NoError(t, err)
require.Equal(t, orig.RawData(), fetched.RawData())
}
func (s *Suite) TestHas(t *testing.T) {
ctx := context.TODO()
bs, _ := s.NewBlockstore(t)
if c, ok := bs.(io.Closer); ok {
defer func() { require.NoError(t, c.Close()) }()
@ -89,19 +93,20 @@ func (s *Suite) TestHas(t *testing.T) {
orig := blocks.NewBlock([]byte("some data"))
err := bs.Put(orig)
err := bs.Put(ctx, orig)
require.NoError(t, err)
ok, err := bs.Has(orig.Cid())
ok, err := bs.Has(ctx, orig.Cid())
require.NoError(t, err)
require.True(t, ok)
ok, err = bs.Has(blocks.NewBlock([]byte("another thing")).Cid())
ok, err = bs.Has(ctx, blocks.NewBlock([]byte("another thing")).Cid())
require.NoError(t, err)
require.False(t, ok)
}
func (s *Suite) TestCidv0v1(t *testing.T) {
ctx := context.TODO()
bs, _ := s.NewBlockstore(t)
if c, ok := bs.(io.Closer); ok {
defer func() { require.NoError(t, c.Close()) }()
@ -109,15 +114,17 @@ func (s *Suite) TestCidv0v1(t *testing.T) {
orig := blocks.NewBlock([]byte("some data"))
err := bs.Put(orig)
err := bs.Put(ctx, orig)
require.NoError(t, err)
fetched, err := bs.Get(cid.NewCidV1(cid.DagProtobuf, orig.Cid().Hash()))
fetched, err := bs.Get(ctx, cid.NewCidV1(cid.DagProtobuf, orig.Cid().Hash()))
require.NoError(t, err)
require.Equal(t, orig.RawData(), fetched.RawData())
}
func (s *Suite) TestPutThenGetSizeBlock(t *testing.T) {
ctx := context.TODO()
bs, _ := s.NewBlockstore(t)
if c, ok := bs.(io.Closer); ok {
defer func() { require.NoError(t, c.Close()) }()
@ -127,21 +134,21 @@ func (s *Suite) TestPutThenGetSizeBlock(t *testing.T) {
missingBlock := blocks.NewBlock([]byte("missingBlock"))
emptyBlock := blocks.NewBlock([]byte{})
err := bs.Put(block)
err := bs.Put(ctx, block)
require.NoError(t, err)
blockSize, err := bs.GetSize(block.Cid())
blockSize, err := bs.GetSize(ctx, block.Cid())
require.NoError(t, err)
require.Len(t, block.RawData(), blockSize)
err = bs.Put(emptyBlock)
err = bs.Put(ctx, emptyBlock)
require.NoError(t, err)
emptySize, err := bs.GetSize(emptyBlock.Cid())
emptySize, err := bs.GetSize(ctx, emptyBlock.Cid())
require.NoError(t, err)
require.Zero(t, emptySize)
missingSize, err := bs.GetSize(missingBlock.Cid())
missingSize, err := bs.GetSize(ctx, missingBlock.Cid())
require.Equal(t, blockstore.ErrNotFound, err)
require.Equal(t, -1, missingSize)
}
@ -203,6 +210,7 @@ func (s *Suite) TestDoubleClose(t *testing.T) {
}
func (s *Suite) TestReopenPutGet(t *testing.T) {
ctx := context.TODO()
bs, path := s.NewBlockstore(t)
c, ok := bs.(io.Closer)
if !ok {
@ -210,7 +218,7 @@ func (s *Suite) TestReopenPutGet(t *testing.T) {
}
orig := blocks.NewBlock([]byte("some data"))
err := bs.Put(orig)
err := bs.Put(ctx, orig)
require.NoError(t, err)
err = c.Close()
@ -219,7 +227,7 @@ func (s *Suite) TestReopenPutGet(t *testing.T) {
bs, err = s.OpenBlockstore(t, path)
require.NoError(t, err)
fetched, err := bs.Get(orig.Cid())
fetched, err := bs.Get(ctx, orig.Cid())
require.NoError(t, err)
require.Equal(t, orig.RawData(), fetched.RawData())
@ -228,6 +236,7 @@ func (s *Suite) TestReopenPutGet(t *testing.T) {
}
func (s *Suite) TestPutMany(t *testing.T) {
ctx := context.TODO()
bs, _ := s.NewBlockstore(t)
if c, ok := bs.(io.Closer); ok {
defer func() { require.NoError(t, c.Close()) }()
@ -238,15 +247,15 @@ func (s *Suite) TestPutMany(t *testing.T) {
blocks.NewBlock([]byte("foo2")),
blocks.NewBlock([]byte("foo3")),
}
err := bs.PutMany(blks)
err := bs.PutMany(ctx, blks)
require.NoError(t, err)
for _, blk := range blks {
fetched, err := bs.Get(blk.Cid())
fetched, err := bs.Get(ctx, blk.Cid())
require.NoError(t, err)
require.Equal(t, blk.RawData(), fetched.RawData())
ok, err := bs.Has(blk.Cid())
ok, err := bs.Has(ctx, blk.Cid())
require.NoError(t, err)
require.True(t, ok)
}
@ -259,6 +268,7 @@ func (s *Suite) TestPutMany(t *testing.T) {
}
func (s *Suite) TestDelete(t *testing.T) {
ctx := context.TODO()
bs, _ := s.NewBlockstore(t)
if c, ok := bs.(io.Closer); ok {
defer func() { require.NoError(t, c.Close()) }()
@ -269,10 +279,10 @@ func (s *Suite) TestDelete(t *testing.T) {
blocks.NewBlock([]byte("foo2")),
blocks.NewBlock([]byte("foo3")),
}
err := bs.PutMany(blks)
err := bs.PutMany(ctx, blks)
require.NoError(t, err)
err = bs.DeleteBlock(blks[1].Cid())
err = bs.DeleteBlock(ctx, blks[1].Cid())
require.NoError(t, err)
ch, err := bs.AllKeysChan(context.Background())
@ -285,17 +295,17 @@ func (s *Suite) TestDelete(t *testing.T) {
cid.NewCidV1(cid.Raw, blks[2].Cid().Hash()),
})
has, err := bs.Has(blks[1].Cid())
has, err := bs.Has(ctx, blks[1].Cid())
require.NoError(t, err)
require.False(t, has)
}
func insertBlocks(t *testing.T, bs blockstore.BasicBlockstore, count int) []cid.Cid {
ctx := context.TODO()
keys := make([]cid.Cid, count)
for i := 0; i < count; i++ {
block := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i)))
err := bs.Put(block)
err := bs.Put(ctx, block)
require.NoError(t, err)
// NewBlock assigns a CIDv0; we convert it to CIDv1 because that's what
// the store returns.

View File

@ -1,6 +1,8 @@
package blockstore
import (
"context"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
@ -27,7 +29,7 @@ type BasicBlockstore = blockstore.Blockstore
type Viewer = blockstore.Viewer
type BatchDeleter interface {
DeleteMany(cids []cid.Cid) error
DeleteMany(ctx context.Context, cids []cid.Cid) error
}
// BlockstoreIterator is a trait for efficient iteration
@ -93,17 +95,17 @@ type adaptedBlockstore struct {
var _ Blockstore = (*adaptedBlockstore)(nil)
func (a *adaptedBlockstore) View(cid cid.Cid, callback func([]byte) error) error {
blk, err := a.Get(cid)
func (a *adaptedBlockstore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error {
blk, err := a.Get(ctx, cid)
if err != nil {
return err
}
return callback(blk.RawData())
}
func (a *adaptedBlockstore) DeleteMany(cids []cid.Cid) error {
func (a *adaptedBlockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error {
for _, cid := range cids {
err := a.DeleteBlock(cid)
err := a.DeleteBlock(ctx, cid)
if err != nil {
return err
}

View File

@ -88,34 +88,34 @@ func (bs *BufferedBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid,
return out, nil
}
func (bs *BufferedBlockstore) DeleteBlock(c cid.Cid) error {
if err := bs.read.DeleteBlock(c); err != nil {
func (bs *BufferedBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error {
if err := bs.read.DeleteBlock(ctx, c); err != nil {
return err
}
return bs.write.DeleteBlock(c)
return bs.write.DeleteBlock(ctx, c)
}
func (bs *BufferedBlockstore) DeleteMany(cids []cid.Cid) error {
if err := bs.read.DeleteMany(cids); err != nil {
func (bs *BufferedBlockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error {
if err := bs.read.DeleteMany(ctx, cids); err != nil {
return err
}
return bs.write.DeleteMany(cids)
return bs.write.DeleteMany(ctx, cids)
}
func (bs *BufferedBlockstore) View(c cid.Cid, callback func([]byte) error) error {
func (bs *BufferedBlockstore) View(ctx context.Context, c cid.Cid, callback func([]byte) error) error {
// both stores are viewable.
if err := bs.write.View(c, callback); err == ErrNotFound {
if err := bs.write.View(ctx, c, callback); err == ErrNotFound {
// not found in write blockstore; fall through.
} else {
return err // propagate errors, or nil, i.e. found.
}
return bs.read.View(c, callback)
return bs.read.View(ctx, c, callback)
}
func (bs *BufferedBlockstore) Get(c cid.Cid) (block.Block, error) {
if out, err := bs.write.Get(c); err != nil {
func (bs *BufferedBlockstore) Get(ctx context.Context, c cid.Cid) (block.Block, error) {
if out, err := bs.write.Get(ctx, c); err != nil {
if err != ErrNotFound {
return nil, err
}
@ -123,20 +123,20 @@ func (bs *BufferedBlockstore) Get(c cid.Cid) (block.Block, error) {
return out, nil
}
return bs.read.Get(c)
return bs.read.Get(ctx, c)
}
func (bs *BufferedBlockstore) GetSize(c cid.Cid) (int, error) {
s, err := bs.read.GetSize(c)
func (bs *BufferedBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
s, err := bs.read.GetSize(ctx, c)
if err == ErrNotFound || s == 0 {
return bs.write.GetSize(c)
return bs.write.GetSize(ctx, c)
}
return s, err
}
func (bs *BufferedBlockstore) Put(blk block.Block) error {
has, err := bs.read.Has(blk.Cid()) // TODO: consider dropping this check
func (bs *BufferedBlockstore) Put(ctx context.Context, blk block.Block) error {
has, err := bs.read.Has(ctx, blk.Cid()) // TODO: consider dropping this check
if err != nil {
return err
}
@ -145,11 +145,11 @@ func (bs *BufferedBlockstore) Put(blk block.Block) error {
return nil
}
return bs.write.Put(blk)
return bs.write.Put(ctx, blk)
}
func (bs *BufferedBlockstore) Has(c cid.Cid) (bool, error) {
has, err := bs.write.Has(c)
func (bs *BufferedBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
has, err := bs.write.Has(ctx, c)
if err != nil {
return false, err
}
@ -157,16 +157,16 @@ func (bs *BufferedBlockstore) Has(c cid.Cid) (bool, error) {
return true, nil
}
return bs.read.Has(c)
return bs.read.Has(ctx, c)
}
func (bs *BufferedBlockstore) HashOnRead(hor bool) {
bs.read.HashOnRead(hor)
bs.write.HashOnRead(hor)
func (bs *BufferedBlockstore) HashOnRead(ctx context.Context, hor bool) {
bs.read.HashOnRead(ctx, hor)
bs.write.HashOnRead(ctx, hor)
}
func (bs *BufferedBlockstore) PutMany(blks []block.Block) error {
return bs.write.PutMany(blks)
func (bs *BufferedBlockstore) PutMany(ctx context.Context, blks []block.Block) error {
return bs.write.PutMany(ctx, blks)
}
func (bs *BufferedBlockstore) Read() Blockstore {

View File

@ -18,39 +18,39 @@ func NewDiscardStore(bs Blockstore) Blockstore {
return &discardstore{bs: bs}
}
func (b *discardstore) Has(cid cid.Cid) (bool, error) {
return b.bs.Has(cid)
func (b *discardstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
return b.bs.Has(ctx, cid)
}
func (b *discardstore) HashOnRead(hor bool) {
b.bs.HashOnRead(hor)
func (b *discardstore) HashOnRead(ctx context.Context, hor bool) {
b.bs.HashOnRead(ctx, hor)
}
func (b *discardstore) Get(cid cid.Cid) (blocks.Block, error) {
return b.bs.Get(cid)
func (b *discardstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
return b.bs.Get(ctx, cid)
}
func (b *discardstore) GetSize(cid cid.Cid) (int, error) {
return b.bs.GetSize(cid)
func (b *discardstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
return b.bs.GetSize(ctx, cid)
}
func (b *discardstore) View(cid cid.Cid, f func([]byte) error) error {
return b.bs.View(cid, f)
func (b *discardstore) View(ctx context.Context, cid cid.Cid, f func([]byte) error) error {
return b.bs.View(ctx, cid, f)
}
func (b *discardstore) Put(blk blocks.Block) error {
func (b *discardstore) Put(ctx context.Context, blk blocks.Block) error {
return nil
}
func (b *discardstore) PutMany(blks []blocks.Block) error {
func (b *discardstore) PutMany(ctx context.Context, blks []blocks.Block) error {
return nil
}
func (b *discardstore) DeleteBlock(cid cid.Cid) error {
func (b *discardstore) DeleteBlock(ctx context.Context, cid cid.Cid) error {
return nil
}
func (b *discardstore) DeleteMany(cids []cid.Cid) error {
func (b *discardstore) DeleteMany(ctx context.Context, cids []cid.Cid) error {
return nil
}

View File

@ -71,14 +71,14 @@ func (fbs *FallbackStore) getFallback(c cid.Cid) (blocks.Block, error) {
// chain bitswap puts blocks in temp blockstore which is cleaned up
// every few min (to drop any messages we fetched but don't want)
// in this case we want to keep this block around
if err := fbs.Put(b); err != nil {
if err := fbs.Put(ctx, b); err != nil {
return nil, xerrors.Errorf("persisting fallback-fetched block: %w", err)
}
return b, nil
}
func (fbs *FallbackStore) Get(c cid.Cid) (blocks.Block, error) {
b, err := fbs.Blockstore.Get(c)
func (fbs *FallbackStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
b, err := fbs.Blockstore.Get(ctx, c)
switch err {
case nil:
return b, nil
@ -89,8 +89,8 @@ func (fbs *FallbackStore) Get(c cid.Cid) (blocks.Block, error) {
}
}
func (fbs *FallbackStore) GetSize(c cid.Cid) (int, error) {
sz, err := fbs.Blockstore.GetSize(c)
func (fbs *FallbackStore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
sz, err := fbs.Blockstore.GetSize(ctx, c)
switch err {
case nil:
return sz, nil

View File

@ -38,7 +38,7 @@ func decodeCid(cid cid.Cid) (inline bool, data []byte, err error) {
return false, nil, err
}
func (b *idstore) Has(cid cid.Cid) (bool, error) {
func (b *idstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
inline, _, err := decodeCid(cid)
if err != nil {
return false, xerrors.Errorf("error decoding Cid: %w", err)
@ -48,10 +48,10 @@ func (b *idstore) Has(cid cid.Cid) (bool, error) {
return true, nil
}
return b.bs.Has(cid)
return b.bs.Has(ctx, cid)
}
func (b *idstore) Get(cid cid.Cid) (blocks.Block, error) {
func (b *idstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
inline, data, err := decodeCid(cid)
if err != nil {
return nil, xerrors.Errorf("error decoding Cid: %w", err)
@ -61,10 +61,10 @@ func (b *idstore) Get(cid cid.Cid) (blocks.Block, error) {
return blocks.NewBlockWithCid(data, cid)
}
return b.bs.Get(cid)
return b.bs.Get(ctx, cid)
}
func (b *idstore) GetSize(cid cid.Cid) (int, error) {
func (b *idstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
inline, data, err := decodeCid(cid)
if err != nil {
return 0, xerrors.Errorf("error decoding Cid: %w", err)
@ -74,10 +74,10 @@ func (b *idstore) GetSize(cid cid.Cid) (int, error) {
return len(data), err
}
return b.bs.GetSize(cid)
return b.bs.GetSize(ctx, cid)
}
func (b *idstore) View(cid cid.Cid, cb func([]byte) error) error {
func (b *idstore) View(ctx context.Context, cid cid.Cid, cb func([]byte) error) error {
inline, data, err := decodeCid(cid)
if err != nil {
return xerrors.Errorf("error decoding Cid: %w", err)
@ -87,10 +87,10 @@ func (b *idstore) View(cid cid.Cid, cb func([]byte) error) error {
return cb(data)
}
return b.bs.View(cid, cb)
return b.bs.View(ctx, cid, cb)
}
func (b *idstore) Put(blk blocks.Block) error {
func (b *idstore) Put(ctx context.Context, blk blocks.Block) error {
inline, _, err := decodeCid(blk.Cid())
if err != nil {
return xerrors.Errorf("error decoding Cid: %w", err)
@ -100,10 +100,10 @@ func (b *idstore) Put(blk blocks.Block) error {
return nil
}
return b.bs.Put(blk)
return b.bs.Put(ctx, blk)
}
func (b *idstore) PutMany(blks []blocks.Block) error {
func (b *idstore) PutMany(ctx context.Context, blks []blocks.Block) error {
toPut := make([]blocks.Block, 0, len(blks))
for _, blk := range blks {
inline, _, err := decodeCid(blk.Cid())
@ -118,13 +118,13 @@ func (b *idstore) PutMany(blks []blocks.Block) error {
}
if len(toPut) > 0 {
return b.bs.PutMany(toPut)
return b.bs.PutMany(ctx, toPut)
}
return nil
}
func (b *idstore) DeleteBlock(cid cid.Cid) error {
func (b *idstore) DeleteBlock(ctx context.Context, cid cid.Cid) error {
inline, _, err := decodeCid(cid)
if err != nil {
return xerrors.Errorf("error decoding Cid: %w", err)
@ -134,10 +134,10 @@ func (b *idstore) DeleteBlock(cid cid.Cid) error {
return nil
}
return b.bs.DeleteBlock(cid)
return b.bs.DeleteBlock(ctx, cid)
}
func (b *idstore) DeleteMany(cids []cid.Cid) error {
func (b *idstore) DeleteMany(ctx context.Context, cids []cid.Cid) error {
toDelete := make([]cid.Cid, 0, len(cids))
for _, cid := range cids {
inline, _, err := decodeCid(cid)
@ -152,7 +152,7 @@ func (b *idstore) DeleteMany(cids []cid.Cid) error {
}
if len(toDelete) > 0 {
return b.bs.DeleteMany(toDelete)
return b.bs.DeleteMany(ctx, toDelete)
}
return nil
@ -162,8 +162,8 @@ func (b *idstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return b.bs.AllKeysChan(ctx)
}
func (b *idstore) HashOnRead(enabled bool) {
b.bs.HashOnRead(enabled)
func (b *idstore) HashOnRead(ctx context.Context, enabled bool) {
b.bs.HashOnRead(ctx, enabled)
}
func (b *idstore) Close() error {

View File

@ -79,12 +79,12 @@ func NewRemoteIPFSBlockstore(ctx context.Context, maddr multiaddr.Multiaddr, onl
return Adapt(bs), nil
}
func (i *IPFSBlockstore) DeleteBlock(cid cid.Cid) error {
func (i *IPFSBlockstore) DeleteBlock(ctx context.Context, cid cid.Cid) error {
return xerrors.Errorf("not supported")
}
func (i *IPFSBlockstore) Has(cid cid.Cid) (bool, error) {
_, err := i.offlineAPI.Block().Stat(i.ctx, path.IpldPath(cid))
func (i *IPFSBlockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
_, err := i.offlineAPI.Block().Stat(ctx, path.IpldPath(cid))
if err != nil {
// The underlying client is running in Offline mode.
// Stat() will fail with an err if the block isn't in the
@ -99,8 +99,8 @@ func (i *IPFSBlockstore) Has(cid cid.Cid) (bool, error) {
return true, nil
}
func (i *IPFSBlockstore) Get(cid cid.Cid) (blocks.Block, error) {
rd, err := i.api.Block().Get(i.ctx, path.IpldPath(cid))
func (i *IPFSBlockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
rd, err := i.api.Block().Get(ctx, path.IpldPath(cid))
if err != nil {
return nil, xerrors.Errorf("getting ipfs block: %w", err)
}
@ -113,8 +113,8 @@ func (i *IPFSBlockstore) Get(cid cid.Cid) (blocks.Block, error) {
return blocks.NewBlockWithCid(data, cid)
}
func (i *IPFSBlockstore) GetSize(cid cid.Cid) (int, error) {
st, err := i.api.Block().Stat(i.ctx, path.IpldPath(cid))
func (i *IPFSBlockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
st, err := i.api.Block().Stat(ctx, path.IpldPath(cid))
if err != nil {
return 0, xerrors.Errorf("getting ipfs block: %w", err)
}
@ -122,23 +122,23 @@ func (i *IPFSBlockstore) GetSize(cid cid.Cid) (int, error) {
return st.Size(), nil
}
func (i *IPFSBlockstore) Put(block blocks.Block) error {
func (i *IPFSBlockstore) Put(ctx context.Context, block blocks.Block) error {
mhd, err := multihash.Decode(block.Cid().Hash())
if err != nil {
return err
}
_, err = i.api.Block().Put(i.ctx, bytes.NewReader(block.RawData()),
_, err = i.api.Block().Put(ctx, bytes.NewReader(block.RawData()),
options.Block.Hash(mhd.Code, mhd.Length),
options.Block.Format(cid.CodecToStr[block.Cid().Type()]))
return err
}
func (i *IPFSBlockstore) PutMany(blocks []blocks.Block) error {
func (i *IPFSBlockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
// TODO: could be done in parallel
for _, block := range blocks {
if err := i.Put(block); err != nil {
if err := i.Put(ctx, block); err != nil {
return err
}
}
@ -150,6 +150,6 @@ func (i *IPFSBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error
return nil, xerrors.Errorf("not supported")
}
func (i *IPFSBlockstore) HashOnRead(enabled bool) {
func (i *IPFSBlockstore) HashOnRead(ctx context.Context, enabled bool) {
return // TODO: We could technically support this, but..
}

View File

@ -15,24 +15,24 @@ func NewMemory() MemBlockstore {
// MemBlockstore is a terminal blockstore that keeps blocks in memory.
type MemBlockstore map[cid.Cid]blocks.Block
func (m MemBlockstore) DeleteBlock(k cid.Cid) error {
func (m MemBlockstore) DeleteBlock(ctx context.Context, k cid.Cid) error {
delete(m, k)
return nil
}
func (m MemBlockstore) DeleteMany(ks []cid.Cid) error {
func (m MemBlockstore) DeleteMany(ctx context.Context, ks []cid.Cid) error {
for _, k := range ks {
delete(m, k)
}
return nil
}
func (m MemBlockstore) Has(k cid.Cid) (bool, error) {
func (m MemBlockstore) Has(ctx context.Context, k cid.Cid) (bool, error) {
_, ok := m[k]
return ok, nil
}
func (m MemBlockstore) View(k cid.Cid, callback func([]byte) error) error {
func (m MemBlockstore) View(ctx context.Context, k cid.Cid, callback func([]byte) error) error {
b, ok := m[k]
if !ok {
return ErrNotFound
@ -40,7 +40,7 @@ func (m MemBlockstore) View(k cid.Cid, callback func([]byte) error) error {
return callback(b.RawData())
}
func (m MemBlockstore) Get(k cid.Cid) (blocks.Block, error) {
func (m MemBlockstore) Get(ctx context.Context, k cid.Cid) (blocks.Block, error) {
b, ok := m[k]
if !ok {
return nil, ErrNotFound
@ -49,7 +49,7 @@ func (m MemBlockstore) Get(k cid.Cid) (blocks.Block, error) {
}
// GetSize returns the CIDs mapped BlockSize
func (m MemBlockstore) GetSize(k cid.Cid) (int, error) {
func (m MemBlockstore) GetSize(ctx context.Context, k cid.Cid) (int, error) {
b, ok := m[k]
if !ok {
return 0, ErrNotFound
@ -58,7 +58,7 @@ func (m MemBlockstore) GetSize(k cid.Cid) (int, error) {
}
// Put puts a given block to the underlying datastore
func (m MemBlockstore) Put(b blocks.Block) error {
func (m MemBlockstore) Put(ctx context.Context, b blocks.Block) error {
// Convert to a basic block for safety, but try to reuse the existing
// block if it's already a basic block.
k := b.Cid()
@ -76,9 +76,9 @@ func (m MemBlockstore) Put(b blocks.Block) error {
// PutMany puts a slice of blocks at the same time using batching
// capabilities of the underlying datastore whenever possible.
func (m MemBlockstore) PutMany(bs []blocks.Block) error {
func (m MemBlockstore) PutMany(ctx context.Context, bs []blocks.Block) error {
for _, b := range bs {
_ = m.Put(b) // can't fail
_ = m.Put(ctx, b) // can't fail
}
return nil
}
@ -97,6 +97,6 @@ func (m MemBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error)
// HashOnRead specifies if every read block should be
// rehashed to make sure it matches its CID.
func (m MemBlockstore) HashOnRead(enabled bool) {
func (m MemBlockstore) HashOnRead(ctx context.Context, enabled bool) {
// no-op
}

View File

@ -20,53 +20,53 @@ type SyncBlockstore struct {
bs MemBlockstore // specifically use a memStore to save indirection overhead.
}
func (m *SyncBlockstore) DeleteBlock(k cid.Cid) error {
func (m *SyncBlockstore) DeleteBlock(ctx context.Context, k cid.Cid) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.bs.DeleteBlock(k)
return m.bs.DeleteBlock(ctx, k)
}
func (m *SyncBlockstore) DeleteMany(ks []cid.Cid) error {
func (m *SyncBlockstore) DeleteMany(ctx context.Context, ks []cid.Cid) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.bs.DeleteMany(ks)
return m.bs.DeleteMany(ctx, ks)
}
func (m *SyncBlockstore) Has(k cid.Cid) (bool, error) {
func (m *SyncBlockstore) Has(ctx context.Context, k cid.Cid) (bool, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return m.bs.Has(k)
return m.bs.Has(ctx, k)
}
func (m *SyncBlockstore) View(k cid.Cid, callback func([]byte) error) error {
func (m *SyncBlockstore) View(ctx context.Context, k cid.Cid, callback func([]byte) error) error {
m.mu.RLock()
defer m.mu.RUnlock()
return m.bs.View(k, callback)
return m.bs.View(ctx, k, callback)
}
func (m *SyncBlockstore) Get(k cid.Cid) (blocks.Block, error) {
func (m *SyncBlockstore) Get(ctx context.Context, k cid.Cid) (blocks.Block, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return m.bs.Get(k)
return m.bs.Get(ctx, k)
}
func (m *SyncBlockstore) GetSize(k cid.Cid) (int, error) {
func (m *SyncBlockstore) GetSize(ctx context.Context, k cid.Cid) (int, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return m.bs.GetSize(k)
return m.bs.GetSize(ctx, k)
}
func (m *SyncBlockstore) Put(b blocks.Block) error {
func (m *SyncBlockstore) Put(ctx context.Context, b blocks.Block) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.bs.Put(b)
return m.bs.Put(ctx, b)
}
func (m *SyncBlockstore) PutMany(bs []blocks.Block) error {
func (m *SyncBlockstore) PutMany(ctx context.Context, bs []blocks.Block) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.bs.PutMany(bs)
return m.bs.PutMany(ctx, bs)
}
func (m *SyncBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
@ -76,6 +76,6 @@ func (m *SyncBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error
return m.bs.AllKeysChan(ctx)
}
func (m *SyncBlockstore) HashOnRead(enabled bool) {
func (m *SyncBlockstore) HashOnRead(ctx context.Context, enabled bool) {
// noop
}

View File

@ -92,28 +92,28 @@ func (t *TimedCacheBlockstore) rotate() {
t.mu.Unlock()
}
func (t *TimedCacheBlockstore) Put(b blocks.Block) error {
func (t *TimedCacheBlockstore) Put(ctx context.Context, b blocks.Block) error {
// Don't check the inactive set here. We want to keep this block for at
// least one interval.
t.mu.Lock()
defer t.mu.Unlock()
return t.active.Put(b)
return t.active.Put(ctx, b)
}
func (t *TimedCacheBlockstore) PutMany(bs []blocks.Block) error {
func (t *TimedCacheBlockstore) PutMany(ctx context.Context, bs []blocks.Block) error {
t.mu.Lock()
defer t.mu.Unlock()
return t.active.PutMany(bs)
return t.active.PutMany(ctx, bs)
}
func (t *TimedCacheBlockstore) View(k cid.Cid, callback func([]byte) error) error {
func (t *TimedCacheBlockstore) View(ctx context.Context, k cid.Cid, callback func([]byte) error) error {
// The underlying blockstore is always a "mem" blockstore so there's no difference,
// from a performance perspective, between view & get. So we call Get to avoid
// calling an arbitrary callback while holding a lock.
t.mu.RLock()
block, err := t.active.Get(k)
block, err := t.active.Get(ctx, k)
if err == ErrNotFound {
block, err = t.inactive.Get(k)
block, err = t.inactive.Get(ctx, k)
}
t.mu.RUnlock()
@ -123,51 +123,51 @@ func (t *TimedCacheBlockstore) View(k cid.Cid, callback func([]byte) error) erro
return callback(block.RawData())
}
func (t *TimedCacheBlockstore) Get(k cid.Cid) (blocks.Block, error) {
func (t *TimedCacheBlockstore) Get(ctx context.Context, k cid.Cid) (blocks.Block, error) {
t.mu.RLock()
defer t.mu.RUnlock()
b, err := t.active.Get(k)
b, err := t.active.Get(ctx, k)
if err == ErrNotFound {
b, err = t.inactive.Get(k)
b, err = t.inactive.Get(ctx, k)
}
return b, err
}
func (t *TimedCacheBlockstore) GetSize(k cid.Cid) (int, error) {
func (t *TimedCacheBlockstore) GetSize(ctx context.Context, k cid.Cid) (int, error) {
t.mu.RLock()
defer t.mu.RUnlock()
size, err := t.active.GetSize(k)
size, err := t.active.GetSize(ctx, k)
if err == ErrNotFound {
size, err = t.inactive.GetSize(k)
size, err = t.inactive.GetSize(ctx, k)
}
return size, err
}
func (t *TimedCacheBlockstore) Has(k cid.Cid) (bool, error) {
func (t *TimedCacheBlockstore) Has(ctx context.Context, k cid.Cid) (bool, error) {
t.mu.RLock()
defer t.mu.RUnlock()
if has, err := t.active.Has(k); err != nil {
if has, err := t.active.Has(ctx, k); err != nil {
return false, err
} else if has {
return true, nil
}
return t.inactive.Has(k)
return t.inactive.Has(ctx, k)
}
func (t *TimedCacheBlockstore) HashOnRead(_ bool) {
func (t *TimedCacheBlockstore) HashOnRead(ctx context.Context, _ bool) {
// no-op
}
func (t *TimedCacheBlockstore) DeleteBlock(k cid.Cid) error {
func (t *TimedCacheBlockstore) DeleteBlock(ctx context.Context, k cid.Cid) error {
t.mu.Lock()
defer t.mu.Unlock()
return multierr.Combine(t.active.DeleteBlock(k), t.inactive.DeleteBlock(k))
return multierr.Combine(t.active.DeleteBlock(ctx, k), t.inactive.DeleteBlock(ctx, k))
}
func (t *TimedCacheBlockstore) DeleteMany(ks []cid.Cid) error {
func (t *TimedCacheBlockstore) DeleteMany(ctx context.Context, ks []cid.Cid) error {
t.mu.Lock()
defer t.mu.Unlock()
return multierr.Combine(t.active.DeleteMany(ks), t.inactive.DeleteMany(ks))
return multierr.Combine(t.active.DeleteMany(ctx, ks), t.inactive.DeleteMany(ctx, ks))
}
func (t *TimedCacheBlockstore) AllKeysChan(_ context.Context) (<-chan cid.Cid, error) {

View File

@ -19,6 +19,8 @@ func TestTimedCacheBlockstoreSimple(t *testing.T) {
tc.clock = mClock
tc.doneRotatingCh = make(chan struct{})
ctx := context.Background()
_ = tc.Start(context.Background())
mClock.Add(1) // IDK why it is needed but it makes it work
@ -27,18 +29,18 @@ func TestTimedCacheBlockstoreSimple(t *testing.T) {
}()
b1 := blocks.NewBlock([]byte("foo"))
require.NoError(t, tc.Put(b1))
require.NoError(t, tc.Put(ctx, b1))
b2 := blocks.NewBlock([]byte("bar"))
require.NoError(t, tc.Put(b2))
require.NoError(t, tc.Put(ctx, b2))
b3 := blocks.NewBlock([]byte("baz"))
b1out, err := tc.Get(b1.Cid())
b1out, err := tc.Get(ctx, b1.Cid())
require.NoError(t, err)
require.Equal(t, b1.RawData(), b1out.RawData())
has, err := tc.Has(b1.Cid())
has, err := tc.Has(ctx, b1.Cid())
require.NoError(t, err)
require.True(t, has)
@ -46,17 +48,17 @@ func TestTimedCacheBlockstoreSimple(t *testing.T) {
<-tc.doneRotatingCh
// We should still have everything.
has, err = tc.Has(b1.Cid())
has, err = tc.Has(ctx, b1.Cid())
require.NoError(t, err)
require.True(t, has)
has, err = tc.Has(b2.Cid())
has, err = tc.Has(ctx, b2.Cid())
require.NoError(t, err)
require.True(t, has)
// extend b2, add b3.
require.NoError(t, tc.Put(b2))
require.NoError(t, tc.Put(b3))
require.NoError(t, tc.Put(ctx, b2))
require.NoError(t, tc.Put(ctx, b3))
// all keys once.
allKeys, err := tc.AllKeysChan(context.Background())
@ -71,15 +73,15 @@ func TestTimedCacheBlockstoreSimple(t *testing.T) {
<-tc.doneRotatingCh
// should still have b2, and b3, but not b1
has, err = tc.Has(b1.Cid())
has, err = tc.Has(ctx, b1.Cid())
require.NoError(t, err)
require.False(t, has)
has, err = tc.Has(b2.Cid())
has, err = tc.Has(ctx, b2.Cid())
require.NoError(t, err)
require.True(t, has)
has, err = tc.Has(b3.Cid())
has, err = tc.Has(ctx, b3.Cid())
require.NoError(t, err)
require.True(t, has)
}

View File

@ -19,72 +19,72 @@ func Union(stores ...Blockstore) Blockstore {
return unionBlockstore(stores)
}
func (m unionBlockstore) Has(cid cid.Cid) (has bool, err error) {
func (m unionBlockstore) Has(ctx context.Context, cid cid.Cid) (has bool, err error) {
for _, bs := range m {
if has, err = bs.Has(cid); has || err != nil {
if has, err = bs.Has(ctx, cid); has || err != nil {
break
}
}
return has, err
}
func (m unionBlockstore) Get(cid cid.Cid) (blk blocks.Block, err error) {
func (m unionBlockstore) Get(ctx context.Context, cid cid.Cid) (blk blocks.Block, err error) {
for _, bs := range m {
if blk, err = bs.Get(cid); err == nil || err != ErrNotFound {
if blk, err = bs.Get(ctx, cid); err == nil || err != ErrNotFound {
break
}
}
return blk, err
}
func (m unionBlockstore) View(cid cid.Cid, callback func([]byte) error) (err error) {
func (m unionBlockstore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) (err error) {
for _, bs := range m {
if err = bs.View(cid, callback); err == nil || err != ErrNotFound {
if err = bs.View(ctx, cid, callback); err == nil || err != ErrNotFound {
break
}
}
return err
}
func (m unionBlockstore) GetSize(cid cid.Cid) (size int, err error) {
func (m unionBlockstore) GetSize(ctx context.Context, cid cid.Cid) (size int, err error) {
for _, bs := range m {
if size, err = bs.GetSize(cid); err == nil || err != ErrNotFound {
if size, err = bs.GetSize(ctx, cid); err == nil || err != ErrNotFound {
break
}
}
return size, err
}
func (m unionBlockstore) Put(block blocks.Block) (err error) {
func (m unionBlockstore) Put(ctx context.Context, block blocks.Block) (err error) {
for _, bs := range m {
if err = bs.Put(block); err != nil {
if err = bs.Put(ctx, block); err != nil {
break
}
}
return err
}
func (m unionBlockstore) PutMany(blks []blocks.Block) (err error) {
func (m unionBlockstore) PutMany(ctx context.Context, blks []blocks.Block) (err error) {
for _, bs := range m {
if err = bs.PutMany(blks); err != nil {
if err = bs.PutMany(ctx, blks); err != nil {
break
}
}
return err
}
func (m unionBlockstore) DeleteBlock(cid cid.Cid) (err error) {
func (m unionBlockstore) DeleteBlock(ctx context.Context, cid cid.Cid) (err error) {
for _, bs := range m {
if err = bs.DeleteBlock(cid); err != nil {
if err = bs.DeleteBlock(ctx, cid); err != nil {
break
}
}
return err
}
func (m unionBlockstore) DeleteMany(cids []cid.Cid) (err error) {
func (m unionBlockstore) DeleteMany(ctx context.Context, cids []cid.Cid) (err error) {
for _, bs := range m {
if err = bs.DeleteMany(cids); err != nil {
if err = bs.DeleteMany(ctx, cids); err != nil {
break
}
}
@ -112,8 +112,8 @@ func (m unionBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error
return outCh, nil
}
func (m unionBlockstore) HashOnRead(enabled bool) {
func (m unionBlockstore) HashOnRead(ctx context.Context, enabled bool) {
for _, bs := range m {
bs.HashOnRead(enabled)
bs.HashOnRead(ctx, enabled)
}
}

View File

@ -15,79 +15,81 @@ var (
)
func TestUnionBlockstore_Get(t *testing.T) {
ctx := context.Background()
m1 := NewMemory()
m2 := NewMemory()
_ = m1.Put(b1)
_ = m2.Put(b2)
_ = m1.Put(ctx, b1)
_ = m2.Put(ctx, b2)
u := Union(m1, m2)
v1, err := u.Get(b1.Cid())
v1, err := u.Get(ctx, b1.Cid())
require.NoError(t, err)
require.Equal(t, b1.RawData(), v1.RawData())
v2, err := u.Get(b2.Cid())
v2, err := u.Get(ctx, b2.Cid())
require.NoError(t, err)
require.Equal(t, b2.RawData(), v2.RawData())
}
func TestUnionBlockstore_Put_PutMany_Delete_AllKeysChan(t *testing.T) {
ctx := context.Background()
m1 := NewMemory()
m2 := NewMemory()
u := Union(m1, m2)
err := u.Put(b0)
err := u.Put(ctx, b0)
require.NoError(t, err)
var has bool
// write was broadcasted to all stores.
has, _ = m1.Has(b0.Cid())
has, _ = m1.Has(ctx, b0.Cid())
require.True(t, has)
has, _ = m2.Has(b0.Cid())
has, _ = m2.Has(ctx, b0.Cid())
require.True(t, has)
has, _ = u.Has(b0.Cid())
has, _ = u.Has(ctx, b0.Cid())
require.True(t, has)
// put many.
err = u.PutMany([]blocks.Block{b1, b2})
err = u.PutMany(ctx, []blocks.Block{b1, b2})
require.NoError(t, err)
// write was broadcasted to all stores.
has, _ = m1.Has(b1.Cid())
has, _ = m1.Has(ctx, b1.Cid())
require.True(t, has)
has, _ = m1.Has(b2.Cid())
has, _ = m1.Has(ctx, b2.Cid())
require.True(t, has)
has, _ = m2.Has(b1.Cid())
has, _ = m2.Has(ctx, b1.Cid())
require.True(t, has)
has, _ = m2.Has(b2.Cid())
has, _ = m2.Has(ctx, b2.Cid())
require.True(t, has)
// also in the union store.
has, _ = u.Has(b1.Cid())
has, _ = u.Has(ctx, b1.Cid())
require.True(t, has)
has, _ = u.Has(b2.Cid())
has, _ = u.Has(ctx, b2.Cid())
require.True(t, has)
// deleted from all stores.
err = u.DeleteBlock(b1.Cid())
err = u.DeleteBlock(ctx, b1.Cid())
require.NoError(t, err)
has, _ = u.Has(b1.Cid())
has, _ = u.Has(ctx, b1.Cid())
require.False(t, has)
has, _ = m1.Has(b1.Cid())
has, _ = m1.Has(ctx, b1.Cid())
require.False(t, has)
has, _ = m2.Has(b1.Cid())
has, _ = m2.Has(ctx, b1.Cid())
require.False(t, has)
// check that AllKeysChan returns b0 and b2, twice (once per backing store)

2
extern/filecoin-ffi vendored

@ -1 +1 @@
Subproject commit 7912389334e347bbb2eac0520c836830875c39de
Subproject commit a7b3c2e695393fd716e9265ff8cba932a3e38dd4

38
go.mod
View File

@ -11,13 +11,14 @@ require (
github.com/StackExchange/wmi v1.2.1 // indirect
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
github.com/alecthomas/jsonschema v0.0.0-20200530073317-71f438968921
github.com/bep/debounce v1.2.0 // indirect
github.com/buger/goterm v1.0.3
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
github.com/cockroachdb/pebble v0.0.0-20201001221639-879f3bfeef07
github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327
github.com/coreos/go-systemd/v22 v22.3.2
github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e
github.com/dgraph-io/badger/v2 v2.2007.2
github.com/dgraph-io/badger/v2 v2.2007.3
github.com/docker/go-units v0.4.0
github.com/drand/drand v1.2.1
github.com/drand/kyber v1.1.4
@ -26,14 +27,13 @@ require (
github.com/elastic/gosigar v0.14.1
github.com/etclabscore/go-openrpc-reflect v0.0.36
github.com/fatih/color v1.13.0
github.com/filecoin-project/dagstore v0.4.3
github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f
github.com/filecoin-project/go-address v0.0.6
github.com/filecoin-project/go-bitfield v0.2.4
github.com/filecoin-project/go-cbor-util v0.0.1
github.com/filecoin-project/go-commp-utils v0.1.2
github.com/filecoin-project/go-crypto v0.0.1
github.com/filecoin-project/go-data-transfer v1.11.4
github.com/filecoin-project/go-data-transfer v1.11.7-0.20211119001436-c0dbfa5fae4d
github.com/filecoin-project/go-ds-versioning v0.1.0 // indirect
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-fil-markets v1.13.4
@ -42,7 +42,7 @@ require (
github.com/filecoin-project/go-paramfetch v0.0.2
github.com/filecoin-project/go-state-types v0.1.1
github.com/filecoin-project/go-statemachine v1.0.1
github.com/filecoin-project/go-statestore v0.1.1
github.com/filecoin-project/go-statestore v0.1.2-0.20211118230537-43557b6c5ce5
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
github.com/filecoin-project/specs-actors v0.9.14
github.com/filecoin-project/specs-actors/v2 v2.3.5
@ -66,32 +66,33 @@ require (
github.com/icza/backscanner v0.0.0-20210726202459-ac2ffc679f94
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab
github.com/ipfs/bbloom v0.0.4
github.com/ipfs/go-bitswap v0.3.4
github.com/ipfs/go-bitswap v0.5.1
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-blockservice v0.1.7
github.com/ipfs/go-blockservice v0.2.1
github.com/ipfs/go-cid v0.1.0
github.com/ipfs/go-cidutil v0.0.2
github.com/ipfs/go-datastore v0.4.6
github.com/ipfs/go-ds-badger2 v0.1.1-0.20200708190120-187fc06f714e
github.com/ipfs/go-ds-leveldb v0.4.2
github.com/ipfs/go-datastore v0.5.0
github.com/ipfs/go-ds-badger2 v0.1.2-0.20211119002906-7318f1b76158
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-ds-measure v0.1.0
github.com/ipfs/go-ds-pebble v0.0.2-0.20200921225637-ce220f8ac459
github.com/ipfs/go-filestore v0.0.3 // indirect
github.com/ipfs/go-fs-lock v0.0.6
github.com/ipfs/go-graphsync v0.10.6
github.com/ipfs/go-ipfs-blockstore v1.0.4
github.com/ipfs/go-ipfs-blockstore v1.1.0
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-ds-help v1.0.0
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-ds-help v1.1.0
github.com/ipfs/go-ipfs-exchange-interface v0.1.0
github.com/ipfs/go-ipfs-exchange-offline v0.1.1
github.com/ipfs/go-ipfs-files v0.0.9
github.com/ipfs/go-ipfs-http-client v0.0.6
github.com/ipfs/go-ipfs-routing v0.1.0
github.com/ipfs/go-ipfs-routing v0.2.1
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-ipld-cbor v0.0.5
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log/v2 v2.3.0
github.com/ipfs/go-merkledag v0.4.1
github.com/ipfs/go-merkledag v0.5.1
github.com/ipfs/go-metrics-interface v0.0.1
github.com/ipfs/go-metrics-prometheus v0.0.2
github.com/ipfs/go-path v0.0.7
@ -111,7 +112,7 @@ require (
github.com/libp2p/go-libp2p-discovery v0.5.1
github.com/libp2p/go-libp2p-kad-dht v0.13.0
github.com/libp2p/go-libp2p-noise v0.2.2
github.com/libp2p/go-libp2p-peerstore v0.3.0
github.com/libp2p/go-libp2p-peerstore v0.4.0
github.com/libp2p/go-libp2p-pubsub v0.5.6
github.com/libp2p/go-libp2p-quic-transport v0.11.2
github.com/libp2p/go-libp2p-record v0.1.3
@ -131,6 +132,7 @@ require (
github.com/multiformats/go-varint v0.0.6
github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333
github.com/opentracing/opentracing-go v1.2.0
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 // indirect
github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e
github.com/prometheus/client_golang v1.11.0
github.com/raulk/clock v1.1.0
@ -141,6 +143,7 @@ require (
github.com/uber/jaeger-client-go v2.25.0+incompatible // indirect
github.com/urfave/cli/v2 v2.2.0
github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba
github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect
github.com/whyrusleeping/cbor-gen v0.0.0-20210713220151-be142a5ae1a8
github.com/whyrusleeping/ledger-filecoin-go v0.9.1-0.20201010031517-c3dcc1bddce4
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7
@ -155,6 +158,7 @@ require (
go.uber.org/fx v1.9.0
go.uber.org/multierr v1.7.0
go.uber.org/zap v1.19.1
golang.org/x/exp v0.0.0-20210715201039-d37aa40e8013 // indirect
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210917161153-d61c044b1678

426
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -2,6 +2,7 @@ package backupds
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"os"
@ -17,14 +18,14 @@ const valSize = 512 << 10
func putVals(t *testing.T, ds datastore.Datastore, start, end int) {
for i := start; i < end; i++ {
err := ds.Put(datastore.NewKey(fmt.Sprintf("%d", i)), []byte(fmt.Sprintf("%d-%s", i, strings.Repeat("~", valSize))))
err := ds.Put(context.TODO(), datastore.NewKey(fmt.Sprintf("%d", i)), []byte(fmt.Sprintf("%d-%s", i, strings.Repeat("~", valSize))))
require.NoError(t, err)
}
}
func checkVals(t *testing.T, ds datastore.Datastore, start, end int, exist bool) {
for i := start; i < end; i++ {
v, err := ds.Get(datastore.NewKey(fmt.Sprintf("%d", i)))
v, err := ds.Get(context.TODO(), datastore.NewKey(fmt.Sprintf("%d", i)))
if exist {
require.NoError(t, err)
expect := []byte(fmt.Sprintf("%d-%s", i, strings.Repeat("~", valSize)))
@ -44,7 +45,7 @@ func TestNoLogRestore(t *testing.T) {
require.NoError(t, err)
var bup bytes.Buffer
require.NoError(t, bds.Backup(&bup))
require.NoError(t, bds.Backup(context.TODO(), &bup))
putVals(t, ds1, 10, 20)

View File

@ -1,6 +1,7 @@
package backupds
import (
"context"
"crypto/sha256"
"io"
"sync"
@ -52,7 +53,7 @@ func Wrap(child datastore.Batching, logdir string) (*Datastore, error) {
// Writes a datastore dump into the provided writer as
// [array(*) of [key, value] tuples, checksum]
func (d *Datastore) Backup(out io.Writer) error {
func (d *Datastore) Backup(ctx context.Context, out io.Writer) error {
scratch := make([]byte, 9)
if err := cbg.WriteMajorTypeHeaderBuf(scratch, out, cbg.MajArray, 2); err != nil {
@ -75,7 +76,7 @@ func (d *Datastore) Backup(out io.Writer) error {
log.Info("Starting datastore backup")
defer log.Info("Datastore backup done")
qr, err := d.child.Query(query.Query{})
qr, err := d.child.Query(ctx, query.Query{})
if err != nil {
return xerrors.Errorf("query: %w", err)
}
@ -132,23 +133,23 @@ func (d *Datastore) Backup(out io.Writer) error {
// proxy
func (d *Datastore) Get(key datastore.Key) (value []byte, err error) {
return d.child.Get(key)
func (d *Datastore) Get(ctx context.Context, key datastore.Key) (value []byte, err error) {
return d.child.Get(ctx, key)
}
func (d *Datastore) Has(key datastore.Key) (exists bool, err error) {
return d.child.Has(key)
func (d *Datastore) Has(ctx context.Context, key datastore.Key) (exists bool, err error) {
return d.child.Has(ctx, key)
}
func (d *Datastore) GetSize(key datastore.Key) (size int, err error) {
return d.child.GetSize(key)
func (d *Datastore) GetSize(ctx context.Context, key datastore.Key) (size int, err error) {
return d.child.GetSize(ctx, key)
}
func (d *Datastore) Query(q query.Query) (query.Results, error) {
return d.child.Query(q)
func (d *Datastore) Query(ctx context.Context, q query.Query) (query.Results, error) {
return d.child.Query(ctx, q)
}
func (d *Datastore) Put(key datastore.Key, value []byte) error {
func (d *Datastore) Put(ctx context.Context, key datastore.Key, value []byte) error {
d.backupLk.RLock()
defer d.backupLk.RUnlock()
@ -160,21 +161,21 @@ func (d *Datastore) Put(key datastore.Key, value []byte) error {
}
}
return d.child.Put(key, value)
return d.child.Put(ctx, key, value)
}
func (d *Datastore) Delete(key datastore.Key) error {
func (d *Datastore) Delete(ctx context.Context, key datastore.Key) error {
d.backupLk.RLock()
defer d.backupLk.RUnlock()
return d.child.Delete(key)
return d.child.Delete(ctx, key)
}
func (d *Datastore) Sync(prefix datastore.Key) error {
func (d *Datastore) Sync(ctx context.Context, prefix datastore.Key) error {
d.backupLk.RLock()
defer d.backupLk.RUnlock()
return d.child.Sync(prefix)
return d.child.Sync(ctx, prefix)
}
func (d *Datastore) CloseLog() error {
@ -196,8 +197,8 @@ func (d *Datastore) Close() error {
)
}
func (d *Datastore) Batch() (datastore.Batch, error) {
b, err := d.child.Batch()
func (d *Datastore) Batch(ctx context.Context) (datastore.Batch, error) {
b, err := d.child.Batch(ctx)
if err != nil {
return nil, err
}
@ -215,7 +216,7 @@ type bbatch struct {
rlk sync.Locker
}
func (b *bbatch) Put(key datastore.Key, value []byte) error {
func (b *bbatch) Put(ctx context.Context, key datastore.Key, value []byte) error {
if b.d.log != nil {
b.d.log <- Entry{
Key: []byte(key.String()),
@ -224,18 +225,18 @@ func (b *bbatch) Put(key datastore.Key, value []byte) error {
}
}
return b.b.Put(key, value)
return b.b.Put(ctx, key, value)
}
func (b *bbatch) Delete(key datastore.Key) error {
return b.b.Delete(key)
func (b *bbatch) Delete(ctx context.Context, key datastore.Key) error {
return b.b.Delete(ctx, key)
}
func (b *bbatch) Commit() error {
func (b *bbatch) Commit(ctx context.Context) error {
b.rlk.Lock()
defer b.rlk.Unlock()
return b.b.Commit()
return b.b.Commit(ctx)
}
var _ datastore.Batch = &bbatch{}

View File

@ -1,6 +1,7 @@
package backupds
import (
"context"
"fmt"
"io"
"io/ioutil"
@ -100,6 +101,7 @@ type logfile struct {
var compactThresh = 2
func (d *Datastore) createLog(logdir string) (*logfile, string, error) {
ctx := context.TODO()
p := filepath.Join(logdir, strconv.FormatInt(time.Now().Unix(), 10)+".log.cbor")
log.Infow("creating log", "file", p)
@ -108,7 +110,7 @@ func (d *Datastore) createLog(logdir string) (*logfile, string, error) {
return nil, "", err
}
if err := d.Backup(f); err != nil {
if err := d.Backup(ctx, f); err != nil {
return nil, "", xerrors.Errorf("writing log base: %w", err)
}
if err := f.Sync(); err != nil {
@ -122,8 +124,9 @@ func (d *Datastore) createLog(logdir string) (*logfile, string, error) {
}
func (d *Datastore) openLog(p string) (*logfile, string, error) {
ctx := context.TODO()
log.Infow("opening log", "file", p)
lh, err := d.child.Get(loghead)
lh, err := d.child.Get(ctx, loghead)
if err != nil {
return nil, "", xerrors.Errorf("checking log head (logfile '%s'): %w", p, err)
}
@ -212,6 +215,7 @@ func (d *Datastore) openLog(p string) (*logfile, string, error) {
}
func (l *logfile) writeLogHead(logname string, ds datastore.Batching) error {
ctx := context.TODO()
lval := []byte(fmt.Sprintf("%s;%s;%d", logname, uuid.New(), time.Now().Unix()))
err := l.writeEntry(&Entry{
@ -223,7 +227,7 @@ func (l *logfile) writeLogHead(logname string, ds datastore.Batching) error {
return xerrors.Errorf("writing loghead to the log: %w", err)
}
if err := ds.Put(loghead, lval); err != nil {
if err := ds.Put(ctx, loghead, lval); err != nil {
return xerrors.Errorf("writing loghead to the datastore: %w", err)
}

View File

@ -2,6 +2,7 @@ package backupds
import (
"bytes"
"context"
"crypto/sha256"
"io"
"os"
@ -117,13 +118,13 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte, log bool)
}
func RestoreInto(r io.Reader, dest datastore.Batching) error {
batch, err := dest.Batch()
batch, err := dest.Batch(context.TODO())
if err != nil {
return xerrors.Errorf("creating batch: %w", err)
}
_, err = ReadBackup(r, func(key datastore.Key, value []byte, _ bool) error {
if err := batch.Put(key, value); err != nil {
if err := batch.Put(context.TODO(), key, value); err != nil {
return xerrors.Errorf("put key: %w", err)
}
@ -133,7 +134,7 @@ func RestoreInto(r io.Reader, dest datastore.Batching) error {
return xerrors.Errorf("reading backup: %w", err)
}
if err := batch.Commit(); err != nil {
if err := batch.Commit(context.TODO()); err != nil {
return xerrors.Errorf("committing batch: %w", err)
}