lotus/markets/dagstore/wrapper_test.go

219 lines
5.5 KiB
Go
Raw Normal View History

2021-07-20 13:02:50 +00:00
package dagstore
import (
"bytes"
"context"
"io"
"os"
"testing"
"time"
"golang.org/x/xerrors"
2021-08-03 22:22:46 +00:00
"github.com/filecoin-project/lotus/node/config"
2021-07-20 13:02:50 +00:00
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"
)
// TestWrapperAcquireRecovery verifies that if acquire shard returns a "not found"
// error, the wrapper will attempt to register the shard then reacquire
func TestWrapperAcquireRecovery(t *testing.T) {
ctx := context.Background()
pieceCid, err := cid.Parse("bafkqaaa")
require.NoError(t, err)
// Create a DAG store wrapper
dagst, w, err := NewDAGStore(config.DAGStoreConfig{
TransientsDir: t.TempDir(),
IndexDir: t.TempDir(),
DatastoreDir: t.TempDir(),
GCInterval: config.Duration(1 * time.Millisecond),
2021-07-20 13:02:50 +00:00
}, mockLotusMount{})
require.NoError(t, err)
2021-08-04 11:48:53 +00:00
defer dagst.Close() //nolint:errcheck
2021-07-20 13:02:50 +00:00
// Return an error from acquire shard the first time
acquireShardErr := make(chan error, 1)
acquireShardErr <- xerrors.Errorf("unknown shard: %w", dagstore.ErrShardUnknown)
// Create a mock DAG store in place of the real DAG store
mock := &mockDagStore{
acquireShardErr: acquireShardErr,
acquireShardRes: dagstore.ShardResult{
Accessor: getShardAccessor(t),
},
register: make(chan shard.Key, 1),
}
2021-08-03 11:22:40 +00:00
w.dagst = mock
2021-07-20 13:02:50 +00:00
mybs, err := w.LoadShard(ctx, pieceCid)
require.NoError(t, err)
// Expect the wrapper to try to recover from the error returned from
// acquire shard by calling register shard with the same key
tctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
select {
case <-tctx.Done():
require.Fail(t, "failed to call register")
case k := <-mock.register:
require.Equal(t, k.String(), pieceCid.String())
}
// Verify that we can get things from the acquired blockstore
var count int
ch, err := mybs.AllKeysChan(ctx)
require.NoError(t, err)
for range ch {
count++
}
require.Greater(t, count, 0)
}
// TestWrapperBackground verifies the behaviour of the background go routine
func TestWrapperBackground(t *testing.T) {
ctx := context.Background()
// Create a DAG store wrapper
dagst, w, err := NewDAGStore(config.DAGStoreConfig{
TransientsDir: t.TempDir(),
IndexDir: t.TempDir(),
DatastoreDir: t.TempDir(),
GCInterval: config.Duration(1 * time.Millisecond),
2021-07-20 13:02:50 +00:00
}, mockLotusMount{})
require.NoError(t, err)
2021-08-04 11:48:53 +00:00
defer dagst.Close() //nolint:errcheck
2021-07-20 13:02:50 +00:00
// Create a mock DAG store in place of the real DAG store
mock := &mockDagStore{
gc: make(chan struct{}, 1),
recover: make(chan shard.Key, 1),
close: make(chan struct{}, 1),
}
2021-08-03 11:22:40 +00:00
w.dagst = mock
2021-07-20 13:02:50 +00:00
// Start up the wrapper
2021-07-30 14:25:18 +00:00
err = w.Start(ctx)
require.NoError(t, err)
2021-07-20 13:02:50 +00:00
// Expect GC to be called automatically
tctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
select {
case <-tctx.Done():
require.Fail(t, "failed to call GC")
case <-mock.gc:
}
// Expect that when the wrapper is closed it will call close on the
// DAG store
err = w.Close()
require.NoError(t, err)
tctx, cancel3 := context.WithTimeout(ctx, time.Second)
defer cancel3()
select {
case <-tctx.Done():
require.Fail(t, "failed to call close")
case <-mock.close:
}
}
type mockDagStore struct {
acquireShardErr chan error
acquireShardRes dagstore.ShardResult
register chan shard.Key
gc chan struct{}
recover chan shard.Key
close chan struct{}
}
func (m *mockDagStore) DestroyShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.DestroyOpts) error {
panic("implement me")
}
func (m *mockDagStore) GetShardInfo(k shard.Key) (dagstore.ShardInfo, error) {
panic("implement me")
}
func (m *mockDagStore) AllShardsInfo() dagstore.AllShardsInfo {
panic("implement me")
}
2021-07-30 07:32:42 +00:00
func (m *mockDagStore) Start(_ context.Context) error {
return nil
}
2021-07-20 13:02:50 +00:00
func (m *mockDagStore) RegisterShard(ctx context.Context, key shard.Key, mnt mount.Mount, out chan dagstore.ShardResult, opts dagstore.RegisterOpts) error {
m.register <- key
out <- dagstore.ShardResult{Key: key}
return nil
}
func (m *mockDagStore) AcquireShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.AcquireOpts) error {
select {
case err := <-m.acquireShardErr:
return err
default:
}
out <- m.acquireShardRes
return nil
}
func (m *mockDagStore) RecoverShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.RecoverOpts) error {
m.recover <- key
return nil
}
func (m *mockDagStore) GC(ctx context.Context) (*dagstore.GCResult, error) {
2021-07-20 13:02:50 +00:00
select {
case m.gc <- struct{}{}:
default:
}
return nil, nil
}
func (m *mockDagStore) Close() error {
m.close <- struct{}{}
return nil
}
type mockLotusMount struct {
}
2021-07-21 10:23:55 +00:00
func (m mockLotusMount) Start(ctx context.Context) error {
return nil
}
2021-07-20 13:02:50 +00:00
func (m mockLotusMount) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
panic("implement me")
}
func (m mockLotusMount) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {
2021-07-20 13:02:50 +00:00
panic("implement me")
}
2021-07-30 07:32:42 +00:00
func (m mockLotusMount) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
panic("implement me")
}
2021-07-20 13:02:50 +00:00
func getShardAccessor(t *testing.T) *dagstore.ShardAccessor {
data, err := os.ReadFile("./fixtures/sample-rw-bs-v2.car")
require.NoError(t, err)
buff := bytes.NewReader(data)
reader := &mount.NopCloser{Reader: buff, ReaderAt: buff, Seeker: buff}
shardAccessor, err := dagstore.NewShardAccessor(reader, nil, nil)
require.NoError(t, err)
return shardAccessor
}