diff --git a/markets/dagstore/fixtures/sample-rw-bs-v2.car b/markets/dagstore/fixtures/sample-rw-bs-v2.car new file mode 100644 index 000000000..9f7b56df3 Binary files /dev/null and b/markets/dagstore/fixtures/sample-rw-bs-v2.car differ diff --git a/markets/dagstore/wrapper.go b/markets/dagstore/wrapper.go index c10c740e6..437d0cbd8 100644 --- a/markets/dagstore/wrapper.go +++ b/markets/dagstore/wrapper.go @@ -21,7 +21,6 @@ import ( ) var log = logging.Logger("dagstore-wrapper") -var gcInterval = 5 * time.Minute // MarketDAGStoreConfig is the config the market needs to then construct a DAG Store. type MarketDAGStoreConfig struct { @@ -30,6 +29,17 @@ type MarketDAGStoreConfig struct { Datastore ds.Datastore MaxConcurrentFetch int MaxConcurrentIndex int + GCInterval time.Duration +} + +// DAGStore provides an interface for the DAG store that can be mocked out +// by tests +type DAGStore interface { + RegisterShard(ctx context.Context, key shard.Key, mnt mount.Mount, out chan dagstore.ShardResult, opts dagstore.RegisterOpts) error + AcquireShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.AcquireOpts) error + RecoverShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.RecoverOpts) error + GC(ctx context.Context) (map[shard.Key]error, error) + Close() error } type closableBlockstore struct { @@ -42,10 +52,11 @@ type Wrapper struct { cancel context.CancelFunc backgroundWg sync.WaitGroup - dagStore *dagstore.DAGStore - mountApi LotusAccessor - failureCh chan dagstore.ShardResult - traceCh chan dagstore.Trace + dagStore DAGStore + mountApi LotusAccessor + failureCh chan dagstore.ShardResult + traceCh chan dagstore.Trace + gcInterval time.Duration } var _ shared.DagStoreWrapper = (*Wrapper)(nil) @@ -77,10 +88,11 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap } return &Wrapper{ - dagStore: dagStore, - mountApi: mountApi, - failureCh: failureCh, - traceCh: traceCh, + dagStore: dagStore, + mountApi: mountApi, + failureCh: failureCh, + traceCh: traceCh, + gcInterval: cfg.GCInterval, }, nil } @@ -96,11 +108,11 @@ func (ds *Wrapper) Start(ctx context.Context) { func (ds *Wrapper) background() { defer ds.backgroundWg.Done() - gcTicker := time.NewTicker(gcInterval) + gcTicker := time.NewTicker(ds.gcInterval) defer gcTicker.Stop() recoverShardResults := make(chan dagstore.ShardResult, 32) - for ds.ctx.Err() != nil { + for ds.ctx.Err() == nil { select { // GC the DAG store on every tick diff --git a/markets/dagstore/wrapper_test.go b/markets/dagstore/wrapper_test.go new file mode 100644 index 000000000..8ccd87203 --- /dev/null +++ b/markets/dagstore/wrapper_test.go @@ -0,0 +1,200 @@ +package dagstore + +import ( + "bytes" + "context" + "io" + "os" + "testing" + "time" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/dagstore" + "github.com/filecoin-project/dagstore/mount" + "github.com/filecoin-project/dagstore/shard" + + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" +) + +// TestWrapperAcquireRecovery verifies that if acquire shard returns a "not found" +// error, the wrapper will attempt to register the shard then reacquire +func TestWrapperAcquireRecovery(t *testing.T) { + ctx := context.Background() + pieceCid, err := cid.Parse("bafkqaaa") + require.NoError(t, err) + + // Create a DAG store wrapper + w, err := NewDagStoreWrapper(MarketDAGStoreConfig{ + TransientsDir: t.TempDir(), + IndexDir: t.TempDir(), + GCInterval: time.Millisecond, + }, mockLotusMount{}) + require.NoError(t, err) + + // Return an error from acquire shard the first time + acquireShardErr := make(chan error, 1) + acquireShardErr <- xerrors.Errorf("unknown shard: %w", dagstore.ErrShardUnknown) + + // Create a mock DAG store in place of the real DAG store + mock := &mockDagStore{ + acquireShardErr: acquireShardErr, + acquireShardRes: dagstore.ShardResult{ + Accessor: getShardAccessor(t), + }, + register: make(chan shard.Key, 1), + } + w.dagStore = mock + + mybs, err := w.LoadShard(ctx, pieceCid) + require.NoError(t, err) + + // Expect the wrapper to try to recover from the error returned from + // acquire shard by calling register shard with the same key + tctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + select { + case <-tctx.Done(): + require.Fail(t, "failed to call register") + case k := <-mock.register: + require.Equal(t, k.String(), pieceCid.String()) + } + + // Verify that we can get things from the acquired blockstore + var count int + ch, err := mybs.AllKeysChan(ctx) + require.NoError(t, err) + for range ch { + count++ + } + require.Greater(t, count, 0) +} + +// TestWrapperBackground verifies the behaviour of the background go routine +func TestWrapperBackground(t *testing.T) { + ctx := context.Background() + + // Create a DAG store wrapper + w, err := NewDagStoreWrapper(MarketDAGStoreConfig{ + TransientsDir: t.TempDir(), + IndexDir: t.TempDir(), + GCInterval: time.Millisecond, + }, mockLotusMount{}) + require.NoError(t, err) + + // Create a mock DAG store in place of the real DAG store + mock := &mockDagStore{ + gc: make(chan struct{}, 1), + recover: make(chan shard.Key, 1), + close: make(chan struct{}, 1), + } + w.dagStore = mock + + // Start up the wrapper + w.Start(ctx) + + // Expect GC to be called automatically + tctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + select { + case <-tctx.Done(): + require.Fail(t, "failed to call GC") + case <-mock.gc: + } + + // Expect that when a result is sent on the failure channel, the wrapper + // will attempt to recover the shard + shardKey := shard.KeyFromString("key") + w.failureCh <- dagstore.ShardResult{ + Key: shardKey, + } + tctx, cancel2 := context.WithTimeout(ctx, time.Second) + defer cancel2() + select { + case <-tctx.Done(): + require.Fail(t, "failed to call recover") + case k := <-mock.recover: + require.Equal(t, shardKey, k) + } + + // Expect that when the wrapper is closed it will call close on the + // DAG store + err = w.Close() + require.NoError(t, err) + + tctx, cancel3 := context.WithTimeout(ctx, time.Second) + defer cancel3() + select { + case <-tctx.Done(): + require.Fail(t, "failed to call close") + case <-mock.close: + } +} + +type mockDagStore struct { + acquireShardErr chan error + acquireShardRes dagstore.ShardResult + register chan shard.Key + + gc chan struct{} + recover chan shard.Key + close chan struct{} +} + +func (m *mockDagStore) RegisterShard(ctx context.Context, key shard.Key, mnt mount.Mount, out chan dagstore.ShardResult, opts dagstore.RegisterOpts) error { + m.register <- key + out <- dagstore.ShardResult{Key: key} + return nil +} + +func (m *mockDagStore) AcquireShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.AcquireOpts) error { + select { + case err := <-m.acquireShardErr: + return err + default: + } + + out <- m.acquireShardRes + return nil +} + +func (m *mockDagStore) RecoverShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.RecoverOpts) error { + m.recover <- key + return nil +} + +func (m *mockDagStore) GC(ctx context.Context) (map[shard.Key]error, error) { + select { + case m.gc <- struct{}{}: + default: + } + + return nil, nil +} + +func (m *mockDagStore) Close() error { + m.close <- struct{}{} + return nil +} + +type mockLotusMount struct { +} + +func (m mockLotusMount) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) { + panic("implement me") +} + +func (m mockLotusMount) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) { + panic("implement me") +} + +func getShardAccessor(t *testing.T) *dagstore.ShardAccessor { + data, err := os.ReadFile("./fixtures/sample-rw-bs-v2.car") + require.NoError(t, err) + buff := bytes.NewReader(data) + reader := &mount.NopCloser{Reader: buff, ReaderAt: buff, Seeker: buff} + shardAccessor, err := dagstore.NewShardAccessor(reader, nil, nil) + require.NoError(t, err) + return shardAccessor +} diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 659325fc6..995e98981 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -588,6 +588,7 @@ func DagStoreWrapper( TransientsDir: filepath.Join(dagStoreDir, "transients"), IndexDir: filepath.Join(dagStoreDir, "index"), Datastore: dagStoreDS, + GCInterval: 5 * time.Minute, } mountApi := dagstore.NewLotusMountAPI(pieceStore, rpn) dsw, err := dagstore.NewDagStoreWrapper(cfg, mountApi)