add DeleteMany to Blockstore interface
This commit is contained in:
parent
dd0c308427
commit
86b73d651e
@ -318,6 +318,44 @@ func (b *Blockstore) DeleteBlock(cid cid.Cid) error {
|
||||
})
|
||||
}
|
||||
|
||||
func (b *Blockstore) DeleteMany(cids []cid.Cid) error {
|
||||
if atomic.LoadInt64(&b.state) != stateOpen {
|
||||
return ErrBlockstoreClosed
|
||||
}
|
||||
|
||||
batch := b.DB.NewWriteBatch()
|
||||
defer batch.Cancel()
|
||||
|
||||
// toReturn tracks the byte slices to return to the pool, if we're using key
|
||||
// prefixing. we can't return each slice to the pool after each Set, because
|
||||
// badger holds on to the slice.
|
||||
var toReturn [][]byte
|
||||
if b.prefixing {
|
||||
toReturn = make([][]byte, 0, len(cids))
|
||||
defer func() {
|
||||
for _, b := range toReturn {
|
||||
KeyPool.Put(b)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
for _, cid := range cids {
|
||||
k, pooled := b.PooledStorageKey(cid)
|
||||
if pooled {
|
||||
toReturn = append(toReturn, k)
|
||||
}
|
||||
if err := batch.Delete(k); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err := batch.Flush()
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to delete blocks from badger blockstore: %w", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// AllKeysChan implements Blockstore.AllKeysChan.
|
||||
func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
|
||||
if atomic.LoadInt64(&b.state) != stateOpen {
|
||||
|
@ -18,6 +18,7 @@ var ErrNotFound = blockstore.ErrNotFound
|
||||
type Blockstore interface {
|
||||
blockstore.Blockstore
|
||||
blockstore.Viewer
|
||||
BatchDeleter
|
||||
}
|
||||
|
||||
// BasicBlockstore is an alias to the original IPFS Blockstore.
|
||||
@ -25,6 +26,10 @@ type BasicBlockstore = blockstore.Blockstore
|
||||
|
||||
type Viewer = blockstore.Viewer
|
||||
|
||||
type BatchDeleter interface {
|
||||
DeleteMany(cids []cid.Cid) error
|
||||
}
|
||||
|
||||
// WrapIDStore wraps the underlying blockstore in an "identity" blockstore.
|
||||
// The ID store filters out all puts for blocks with CIDs using the "identity"
|
||||
// hash function. It also extracts inlined blocks from CIDs using the identity
|
||||
@ -53,6 +58,17 @@ func (a *adaptedBlockstore) View(cid cid.Cid, callback func([]byte) error) error
|
||||
return callback(blk.RawData())
|
||||
}
|
||||
|
||||
func (a *adaptedBlockstore) DeleteMany(cids []cid.Cid) error {
|
||||
for _, cid := range cids {
|
||||
err := a.DeleteBlock(cid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Adapt adapts a standard blockstore to a Lotus blockstore by
|
||||
// enriching it with the extra methods that Lotus requires (e.g. View, Sync).
|
||||
//
|
||||
|
@ -96,6 +96,14 @@ func (bs *BufferedBlockstore) DeleteBlock(c cid.Cid) error {
|
||||
return bs.write.DeleteBlock(c)
|
||||
}
|
||||
|
||||
func (bs *BufferedBlockstore) DeleteMany(cids []cid.Cid) error {
|
||||
if err := bs.read.DeleteMany(cids); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return bs.write.DeleteMany(cids)
|
||||
}
|
||||
|
||||
func (bs *BufferedBlockstore) View(c cid.Cid, callback func([]byte) error) error {
|
||||
// both stores are viewable.
|
||||
if err := bs.write.View(c, callback); err == ErrNotFound {
|
||||
|
@ -20,6 +20,13 @@ func (m MemBlockstore) DeleteBlock(k cid.Cid) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m MemBlockstore) DeleteMany(ks []cid.Cid) error {
|
||||
for _, k := range ks {
|
||||
delete(m, k)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m MemBlockstore) Has(k cid.Cid) (bool, error) {
|
||||
_, ok := m[k]
|
||||
return ok, nil
|
||||
|
@ -163,6 +163,11 @@ func (s *SplitStore) DeleteBlock(_ cid.Cid) error {
|
||||
return errors.New("DeleteBlock not implemented on SplitStore; don't do this Luke!") //nolint
|
||||
}
|
||||
|
||||
func (s *SplitStore) DeleteMany(_ []cid.Cid) error {
|
||||
// afaict we don't seem to be using this method, so it's not implemented
|
||||
return errors.New("DeleteMany not implemented on SplitStore; don't do this Luke!") //nolint
|
||||
}
|
||||
|
||||
func (s *SplitStore) Has(cid cid.Cid) (bool, error) {
|
||||
has, err := s.hot.Has(cid)
|
||||
|
||||
|
@ -26,6 +26,12 @@ func (m *SyncBlockstore) DeleteBlock(k cid.Cid) error {
|
||||
return m.bs.DeleteBlock(k)
|
||||
}
|
||||
|
||||
func (m *SyncBlockstore) DeleteMany(ks []cid.Cid) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
return m.bs.DeleteMany(ks)
|
||||
}
|
||||
|
||||
func (m *SyncBlockstore) Has(k cid.Cid) (bool, error) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
@ -153,6 +153,12 @@ func (t *TimedCacheBlockstore) DeleteBlock(k cid.Cid) error {
|
||||
return multierr.Combine(t.active.DeleteBlock(k), t.inactive.DeleteBlock(k))
|
||||
}
|
||||
|
||||
func (t *TimedCacheBlockstore) DeleteMany(ks []cid.Cid) error {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
return multierr.Combine(t.active.DeleteMany(ks), t.inactive.DeleteMany(ks))
|
||||
}
|
||||
|
||||
func (t *TimedCacheBlockstore) AllKeysChan(_ context.Context) (<-chan cid.Cid, error) {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
Loading…
Reference in New Issue
Block a user