Merge pull request #9010 from LexLuthr/feat/dagstore-destroyshard

feat: dagstore: Add DestroyShard() in dagstore wrapper
This commit is contained in:
Łukasz Magiera 2022-07-12 14:41:52 +02:00 committed by GitHub
commit c90826e670
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 2 deletions

View File

@ -272,6 +272,22 @@ func (w *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath s
return nil return nil
} }
func (w *Wrapper) DestroyShard(ctx context.Context, pieceCid cid.Cid, resch chan dagstore.ShardResult) error {
key := shard.KeyFromCID(pieceCid)
opts := dagstore.DestroyOpts{}
err := w.dagst.DestroyShard(ctx, key, resch, opts)
if err != nil {
return xerrors.Errorf("failed to schedule destroy shard for piece CID %s: %w", pieceCid, err)
}
log.Debugf("successfully submitted destroy Shard request for piece CID %s", pieceCid)
return nil
}
func (w *Wrapper) MigrateDeals(ctx context.Context, deals []storagemarket.MinerDeal) (bool, error) { func (w *Wrapper) MigrateDeals(ctx context.Context, deals []storagemarket.MinerDeal) (bool, error) {
log := log.Named("migrator") log := log.Named("migrator")

View File

@ -24,7 +24,7 @@ import (
// TestWrapperAcquireRecovery verifies that if acquire shard returns a "not found" // TestWrapperAcquireRecovery verifies that if acquire shard returns a "not found"
// error, the wrapper will attempt to register the shard then reacquire // error, the wrapper will attempt to register the shard then reacquire
func TestWrapperAcquireRecovery(t *testing.T) { func TestWrapperAcquireRecoveryDestroy(t *testing.T) {
ctx := context.Background() ctx := context.Background()
pieceCid, err := cid.Parse("bafkqaaa") pieceCid, err := cid.Parse("bafkqaaa")
require.NoError(t, err) require.NoError(t, err)
@ -51,6 +51,7 @@ func TestWrapperAcquireRecovery(t *testing.T) {
Accessor: getShardAccessor(t), Accessor: getShardAccessor(t),
}, },
register: make(chan shard.Key, 1), register: make(chan shard.Key, 1),
destroy: make(chan shard.Key, 1),
} }
w.dagst = mock w.dagst = mock
@ -77,6 +78,28 @@ func TestWrapperAcquireRecovery(t *testing.T) {
count++ count++
} }
require.Greater(t, count, 0) require.Greater(t, count, 0)
// Destroy the shard
dr := make(chan dagstore.ShardResult, 1)
err = w.DestroyShard(ctx, pieceCid, dr)
require.NoError(t, err)
dctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
select {
case <-dctx.Done():
require.Fail(t, "failed to call destroy")
case k := <-mock.destroy:
require.Equal(t, k.String(), pieceCid.String())
}
var dcount int
dch, err := mybs.AllKeysChan(ctx)
require.NoError(t, err)
for range dch {
count++
}
require.Equal(t, dcount, 0)
} }
// TestWrapperBackground verifies the behaviour of the background go routine // TestWrapperBackground verifies the behaviour of the background go routine
@ -139,6 +162,7 @@ type mockDagStore struct {
gc chan struct{} gc chan struct{}
recover chan shard.Key recover chan shard.Key
destroy chan shard.Key
close chan struct{} close chan struct{}
} }
@ -155,7 +179,9 @@ func (m *mockDagStore) GetShardKeysForCid(c cid.Cid) ([]shard.Key, error) {
} }
func (m *mockDagStore) DestroyShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.DestroyOpts) error { func (m *mockDagStore) DestroyShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.DestroyOpts) error {
panic("implement me") m.destroy <- key
out <- dagstore.ShardResult{Key: key}
return nil
} }
func (m *mockDagStore) GetShardInfo(k shard.Key) (dagstore.ShardInfo, error) { func (m *mockDagStore) GetShardInfo(k shard.Key) (dagstore.ShardInfo, error) {