From 86b73d651e1c28442b9861ad8ba6578e9c350bf2 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 2 Mar 2021 16:45:45 +0200 Subject: [PATCH] add DeleteMany to Blockstore interface --- blockstore/badger/blockstore.go | 38 +++++++++++++++++++++++++++++ blockstore/blockstore.go | 16 ++++++++++++ blockstore/buffered.go | 8 ++++++ blockstore/mem.go | 7 ++++++ blockstore/splitstore/splitstore.go | 5 ++++ blockstore/sync.go | 6 +++++ blockstore/timed.go | 6 +++++ 7 files changed, 86 insertions(+) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 22f9036e3..cd740e650 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -318,6 +318,44 @@ func (b *Blockstore) DeleteBlock(cid cid.Cid) error { }) } +func (b *Blockstore) DeleteMany(cids []cid.Cid) error { + if atomic.LoadInt64(&b.state) != stateOpen { + return ErrBlockstoreClosed + } + + batch := b.DB.NewWriteBatch() + defer batch.Cancel() + + // toReturn tracks the byte slices to return to the pool, if we're using key + // prefixing. we can't return each slice to the pool after each Set, because + // badger holds on to the slice. + var toReturn [][]byte + if b.prefixing { + toReturn = make([][]byte, 0, len(cids)) + defer func() { + for _, b := range toReturn { + KeyPool.Put(b) + } + }() + } + + for _, cid := range cids { + k, pooled := b.PooledStorageKey(cid) + if pooled { + toReturn = append(toReturn, k) + } + if err := batch.Delete(k); err != nil { + return err + } + } + + err := batch.Flush() + if err != nil { + err = fmt.Errorf("failed to delete blocks from badger blockstore: %w", err) + } + return err +} + // AllKeysChan implements Blockstore.AllKeysChan. func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { if atomic.LoadInt64(&b.state) != stateOpen { diff --git a/blockstore/blockstore.go b/blockstore/blockstore.go index 5d4578777..2414dbad0 100644 --- a/blockstore/blockstore.go +++ b/blockstore/blockstore.go @@ -18,6 +18,7 @@ var ErrNotFound = blockstore.ErrNotFound type Blockstore interface { blockstore.Blockstore blockstore.Viewer + BatchDeleter } // BasicBlockstore is an alias to the original IPFS Blockstore. @@ -25,6 +26,10 @@ type BasicBlockstore = blockstore.Blockstore type Viewer = blockstore.Viewer +type BatchDeleter interface { + DeleteMany(cids []cid.Cid) error +} + // WrapIDStore wraps the underlying blockstore in an "identity" blockstore. // The ID store filters out all puts for blocks with CIDs using the "identity" // hash function. It also extracts inlined blocks from CIDs using the identity @@ -53,6 +58,17 @@ func (a *adaptedBlockstore) View(cid cid.Cid, callback func([]byte) error) error return callback(blk.RawData()) } +func (a *adaptedBlockstore) DeleteMany(cids []cid.Cid) error { + for _, cid := range cids { + err := a.DeleteBlock(cid) + if err != nil { + return err + } + } + + return nil +} + // Adapt adapts a standard blockstore to a Lotus blockstore by // enriching it with the extra methods that Lotus requires (e.g. View, Sync). // diff --git a/blockstore/buffered.go b/blockstore/buffered.go index 200e9b995..5d3d38f78 100644 --- a/blockstore/buffered.go +++ b/blockstore/buffered.go @@ -96,6 +96,14 @@ func (bs *BufferedBlockstore) DeleteBlock(c cid.Cid) error { return bs.write.DeleteBlock(c) } +func (bs *BufferedBlockstore) DeleteMany(cids []cid.Cid) error { + if err := bs.read.DeleteMany(cids); err != nil { + return err + } + + return bs.write.DeleteMany(cids) +} + func (bs *BufferedBlockstore) View(c cid.Cid, callback func([]byte) error) error { // both stores are viewable. if err := bs.write.View(c, callback); err == ErrNotFound { diff --git a/blockstore/mem.go b/blockstore/mem.go index c8de3e3e8..8ea69d46a 100644 --- a/blockstore/mem.go +++ b/blockstore/mem.go @@ -20,6 +20,13 @@ func (m MemBlockstore) DeleteBlock(k cid.Cid) error { return nil } +func (m MemBlockstore) DeleteMany(ks []cid.Cid) error { + for _, k := range ks { + delete(m, k) + } + return nil +} + func (m MemBlockstore) Has(k cid.Cid) (bool, error) { _, ok := m[k] return ok, nil diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 9764f508d..1431b0496 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -163,6 +163,11 @@ func (s *SplitStore) DeleteBlock(_ cid.Cid) error { return errors.New("DeleteBlock not implemented on SplitStore; don't do this Luke!") //nolint } +func (s *SplitStore) DeleteMany(_ []cid.Cid) error { + // afaict we don't seem to be using this method, so it's not implemented + return errors.New("DeleteMany not implemented on SplitStore; don't do this Luke!") //nolint +} + func (s *SplitStore) Has(cid cid.Cid) (bool, error) { has, err := s.hot.Has(cid) diff --git a/blockstore/sync.go b/blockstore/sync.go index 2da71a898..848ccd19d 100644 --- a/blockstore/sync.go +++ b/blockstore/sync.go @@ -26,6 +26,12 @@ func (m *SyncBlockstore) DeleteBlock(k cid.Cid) error { return m.bs.DeleteBlock(k) } +func (m *SyncBlockstore) DeleteMany(ks []cid.Cid) error { + m.mu.Lock() + defer m.mu.Unlock() + return m.bs.DeleteMany(ks) +} + func (m *SyncBlockstore) Has(k cid.Cid) (bool, error) { m.mu.RLock() defer m.mu.RUnlock() diff --git a/blockstore/timed.go b/blockstore/timed.go index 138375028..ce25bb5bc 100644 --- a/blockstore/timed.go +++ b/blockstore/timed.go @@ -153,6 +153,12 @@ func (t *TimedCacheBlockstore) DeleteBlock(k cid.Cid) error { return multierr.Combine(t.active.DeleteBlock(k), t.inactive.DeleteBlock(k)) } +func (t *TimedCacheBlockstore) DeleteMany(ks []cid.Cid) error { + t.mu.Lock() + defer t.mu.Unlock() + return multierr.Combine(t.active.DeleteMany(ks), t.inactive.DeleteMany(ks)) +} + func (t *TimedCacheBlockstore) AllKeysChan(_ context.Context) (<-chan cid.Cid, error) { t.mu.RLock() defer t.mu.RUnlock()