diff --git a/markets/dagstore/wrapper.go b/markets/dagstore/wrapper.go index cee4ec886..cb20dccde 100644 --- a/markets/dagstore/wrapper.go +++ b/markets/dagstore/wrapper.go @@ -272,6 +272,22 @@ func (w *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath s return nil } +func (w *Wrapper) DestroyShard(ctx context.Context, pieceCid cid.Cid, resch chan dagstore.ShardResult) error { + key := shard.KeyFromCID(pieceCid) + + opts := dagstore.DestroyOpts{} + + err := w.dagst.DestroyShard(ctx, key, resch, opts) + + if err != nil { + return xerrors.Errorf("failed to schedule destroy shard for piece CID %s: %w", pieceCid, err) + } + log.Debugf("successfully submitted destroy Shard request for piece CID %s", pieceCid) + + return nil + +} + func (w *Wrapper) MigrateDeals(ctx context.Context, deals []storagemarket.MinerDeal) (bool, error) { log := log.Named("migrator") diff --git a/markets/dagstore/wrapper_test.go b/markets/dagstore/wrapper_test.go index ebc60372b..1240ce0a3 100644 --- a/markets/dagstore/wrapper_test.go +++ b/markets/dagstore/wrapper_test.go @@ -24,7 +24,7 @@ import ( // TestWrapperAcquireRecovery verifies that if acquire shard returns a "not found" // error, the wrapper will attempt to register the shard then reacquire -func TestWrapperAcquireRecovery(t *testing.T) { +func TestWrapperAcquireRecoveryDestroy(t *testing.T) { ctx := context.Background() pieceCid, err := cid.Parse("bafkqaaa") require.NoError(t, err) @@ -51,6 +51,7 @@ func TestWrapperAcquireRecovery(t *testing.T) { Accessor: getShardAccessor(t), }, register: make(chan shard.Key, 1), + destroy: make(chan shard.Key, 1), } w.dagst = mock @@ -77,6 +78,28 @@ func TestWrapperAcquireRecovery(t *testing.T) { count++ } require.Greater(t, count, 0) + + // Destroy the shard + dr := make(chan dagstore.ShardResult, 1) + err = w.DestroyShard(ctx, pieceCid, dr) + require.NoError(t, err) + + dctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + select { + case <-dctx.Done(): + require.Fail(t, "failed to call destroy") + case k := <-mock.destroy: + require.Equal(t, k.String(), pieceCid.String()) + } + + var dcount int + dch, err := mybs.AllKeysChan(ctx) + require.NoError(t, err) + for range dch { + count++ + } + require.Equal(t, dcount, 0) } // TestWrapperBackground verifies the behaviour of the background go routine @@ -139,6 +162,7 @@ type mockDagStore struct { gc chan struct{} recover chan shard.Key + destroy chan shard.Key close chan struct{} } @@ -155,7 +179,9 @@ func (m *mockDagStore) GetShardKeysForCid(c cid.Cid) ([]shard.Key, error) { } func (m *mockDagStore) DestroyShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.DestroyOpts) error { - panic("implement me") + m.destroy <- key + out <- dagstore.ShardResult{Key: key} + return nil } func (m *mockDagStore) GetShardInfo(k shard.Key) (dagstore.ShardInfo, error) {