test: for dag store wrapper

This commit is contained in:
Dirk McCormick 2021-07-20 15:02:50 +02:00
parent 16be5b5478
commit 8becd922cf
4 changed files with 224 additions and 11 deletions

Binary file not shown.

View File

@ -21,7 +21,6 @@ import (
)
var log = logging.Logger("dagstore-wrapper")
var gcInterval = 5 * time.Minute
// MarketDAGStoreConfig is the config the market needs to then construct a DAG Store.
type MarketDAGStoreConfig struct {
@ -30,6 +29,17 @@ type MarketDAGStoreConfig struct {
Datastore ds.Datastore
MaxConcurrentFetch int
MaxConcurrentIndex int
GCInterval time.Duration
}
// DAGStore provides an interface for the DAG store that can be mocked out
// by tests
type DAGStore interface {
RegisterShard(ctx context.Context, key shard.Key, mnt mount.Mount, out chan dagstore.ShardResult, opts dagstore.RegisterOpts) error
AcquireShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.AcquireOpts) error
RecoverShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.RecoverOpts) error
GC(ctx context.Context) (map[shard.Key]error, error)
Close() error
}
type closableBlockstore struct {
@ -42,10 +52,11 @@ type Wrapper struct {
cancel context.CancelFunc
backgroundWg sync.WaitGroup
dagStore *dagstore.DAGStore
dagStore DAGStore
mountApi LotusAccessor
failureCh chan dagstore.ShardResult
traceCh chan dagstore.Trace
gcInterval time.Duration
}
var _ shared.DagStoreWrapper = (*Wrapper)(nil)
@ -81,6 +92,7 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap
mountApi: mountApi,
failureCh: failureCh,
traceCh: traceCh,
gcInterval: cfg.GCInterval,
}, nil
}
@ -96,11 +108,11 @@ func (ds *Wrapper) Start(ctx context.Context) {
func (ds *Wrapper) background() {
defer ds.backgroundWg.Done()
gcTicker := time.NewTicker(gcInterval)
gcTicker := time.NewTicker(ds.gcInterval)
defer gcTicker.Stop()
recoverShardResults := make(chan dagstore.ShardResult, 32)
for ds.ctx.Err() != nil {
for ds.ctx.Err() == nil {
select {
// GC the DAG store on every tick

View File

@ -0,0 +1,200 @@
package dagstore
import (
"bytes"
"context"
"io"
"os"
"testing"
"time"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"
)
// TestWrapperAcquireRecovery verifies that if acquire shard returns a "not found"
// error, the wrapper will attempt to register the shard then reacquire
func TestWrapperAcquireRecovery(t *testing.T) {
ctx := context.Background()
pieceCid, err := cid.Parse("bafkqaaa")
require.NoError(t, err)
// Create a DAG store wrapper
w, err := NewDagStoreWrapper(MarketDAGStoreConfig{
TransientsDir: t.TempDir(),
IndexDir: t.TempDir(),
GCInterval: time.Millisecond,
}, mockLotusMount{})
require.NoError(t, err)
// Return an error from acquire shard the first time
acquireShardErr := make(chan error, 1)
acquireShardErr <- xerrors.Errorf("unknown shard: %w", dagstore.ErrShardUnknown)
// Create a mock DAG store in place of the real DAG store
mock := &mockDagStore{
acquireShardErr: acquireShardErr,
acquireShardRes: dagstore.ShardResult{
Accessor: getShardAccessor(t),
},
register: make(chan shard.Key, 1),
}
w.dagStore = mock
mybs, err := w.LoadShard(ctx, pieceCid)
require.NoError(t, err)
// Expect the wrapper to try to recover from the error returned from
// acquire shard by calling register shard with the same key
tctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
select {
case <-tctx.Done():
require.Fail(t, "failed to call register")
case k := <-mock.register:
require.Equal(t, k.String(), pieceCid.String())
}
// Verify that we can get things from the acquired blockstore
var count int
ch, err := mybs.AllKeysChan(ctx)
require.NoError(t, err)
for range ch {
count++
}
require.Greater(t, count, 0)
}
// TestWrapperBackground verifies the behaviour of the background go routine
func TestWrapperBackground(t *testing.T) {
ctx := context.Background()
// Create a DAG store wrapper
w, err := NewDagStoreWrapper(MarketDAGStoreConfig{
TransientsDir: t.TempDir(),
IndexDir: t.TempDir(),
GCInterval: time.Millisecond,
}, mockLotusMount{})
require.NoError(t, err)
// Create a mock DAG store in place of the real DAG store
mock := &mockDagStore{
gc: make(chan struct{}, 1),
recover: make(chan shard.Key, 1),
close: make(chan struct{}, 1),
}
w.dagStore = mock
// Start up the wrapper
w.Start(ctx)
// Expect GC to be called automatically
tctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
select {
case <-tctx.Done():
require.Fail(t, "failed to call GC")
case <-mock.gc:
}
// Expect that when a result is sent on the failure channel, the wrapper
// will attempt to recover the shard
shardKey := shard.KeyFromString("key")
w.failureCh <- dagstore.ShardResult{
Key: shardKey,
}
tctx, cancel2 := context.WithTimeout(ctx, time.Second)
defer cancel2()
select {
case <-tctx.Done():
require.Fail(t, "failed to call recover")
case k := <-mock.recover:
require.Equal(t, shardKey, k)
}
// Expect that when the wrapper is closed it will call close on the
// DAG store
err = w.Close()
require.NoError(t, err)
tctx, cancel3 := context.WithTimeout(ctx, time.Second)
defer cancel3()
select {
case <-tctx.Done():
require.Fail(t, "failed to call close")
case <-mock.close:
}
}
type mockDagStore struct {
acquireShardErr chan error
acquireShardRes dagstore.ShardResult
register chan shard.Key
gc chan struct{}
recover chan shard.Key
close chan struct{}
}
func (m *mockDagStore) RegisterShard(ctx context.Context, key shard.Key, mnt mount.Mount, out chan dagstore.ShardResult, opts dagstore.RegisterOpts) error {
m.register <- key
out <- dagstore.ShardResult{Key: key}
return nil
}
func (m *mockDagStore) AcquireShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.AcquireOpts) error {
select {
case err := <-m.acquireShardErr:
return err
default:
}
out <- m.acquireShardRes
return nil
}
func (m *mockDagStore) RecoverShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.RecoverOpts) error {
m.recover <- key
return nil
}
func (m *mockDagStore) GC(ctx context.Context) (map[shard.Key]error, error) {
select {
case m.gc <- struct{}{}:
default:
}
return nil, nil
}
func (m *mockDagStore) Close() error {
m.close <- struct{}{}
return nil
}
type mockLotusMount struct {
}
func (m mockLotusMount) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
panic("implement me")
}
func (m mockLotusMount) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) {
panic("implement me")
}
func getShardAccessor(t *testing.T) *dagstore.ShardAccessor {
data, err := os.ReadFile("./fixtures/sample-rw-bs-v2.car")
require.NoError(t, err)
buff := bytes.NewReader(data)
reader := &mount.NopCloser{Reader: buff, ReaderAt: buff, Seeker: buff}
shardAccessor, err := dagstore.NewShardAccessor(reader, nil, nil)
require.NoError(t, err)
return shardAccessor
}

View File

@ -588,6 +588,7 @@ func DagStoreWrapper(
TransientsDir: filepath.Join(dagStoreDir, "transients"),
IndexDir: filepath.Join(dagStoreDir, "index"),
Datastore: dagStoreDS,
GCInterval: 5 * time.Minute,
}
mountApi := dagstore.NewLotusMountAPI(pieceStore, rpn)
dsw, err := dagstore.NewDagStoreWrapper(cfg, mountApi)