batch deletion for purging the tracking store

This commit is contained in:
vyzo 2021-02-27 23:08:23 +02:00
parent 09cd1175a1
commit aba6530411
4 changed files with 79 additions and 39 deletions

View File

@ -12,6 +12,7 @@ type TrackingStore interface {
PutBatch([]cid.Cid, abi.ChainEpoch) error
Get(cid.Cid) (abi.ChainEpoch, error)
Delete(cid.Cid) error
DeleteBatch(map[cid.Cid]struct{}) error
ForEach(func(cid.Cid, abi.ChainEpoch) error) error
Sync() error
Close() error

View File

@ -87,6 +87,19 @@ func (s *BoltTrackingStore) Delete(cid cid.Cid) error {
})
}
func (s *BoltTrackingStore) DeleteBatch(cids map[cid.Cid]struct{}) error {
return s.db.Batch(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucketId)
for cid := range cids {
err := b.Delete(cid.Hash())
if err != nil {
return xerrors.Errorf("error deleting %s", cid)
}
}
return nil
})
}
func (s *BoltTrackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error {
return s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucketId)

View File

@ -122,6 +122,21 @@ func (s *LMDBTrackingStore) Delete(cid cid.Cid) error {
})
}
func (s *LMDBTrackingStore) DeleteBatch(cids map[cid.Cid]struct{}) error {
return withMaxReadersRetry(
func() error {
return s.env.Update(func(txn *lmdb.Txn) error {
for cid := range cids {
err := txn.Del(s.db, cid.Hash(), nil)
if err != nil {
return err
}
}
return nil
})
})
}
func (s *LMDBTrackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error {
return withMaxReadersRetry(
func() error {

View File

@ -423,10 +423,10 @@ func (s *SplitStore) compactSimple() {
}
log.Infow("collection done", "took", time.Since(startCollect))
log.Infow("compaction stats", "hot", stHot, "cold", stCold)
log.Info("moving cold objects to the coldstore")
startMove := time.Now()
for cid := range cold {
blk, err := s.hot.Get(cid)
if err != nil {
@ -435,13 +435,12 @@ func (s *SplitStore) compactSimple() {
// but before we have deleted it from the snoop; just delete the snoop.
err = s.snoop.Delete(cid)
if err != nil {
log.Errorf("error deleting cid %s from tracking store: %w", cid, err)
log.Errorf("error deleting cid %s from tracking store: %s", cid, err)
// TODO do something better here -- just continue?
panic(err)
}
} else {
log.Errorf("error retrieving tracked block %s from hotstore: %w ", cid, err)
log.Errorf("error retrieving tracked block %s from hotstore: %s", cid, err)
// TODO do something better here -- just continue?
panic(err)
}
@ -452,7 +451,7 @@ func (s *SplitStore) compactSimple() {
// put the object in the coldstore
err = s.cold.Put(blk)
if err != nil {
log.Errorf("error puting block %s to coldstore: %w", cid, err)
log.Errorf("error puting block %s to coldstore: %s", cid, err)
// TODO do something better here -- just continue?
panic(err)
}
@ -460,22 +459,24 @@ func (s *SplitStore) compactSimple() {
// delete the object from the hotstore
err = s.hot.DeleteBlock(cid)
if err != nil {
log.Errorf("error deleting block %s from hotstore: %w", cid, err)
log.Errorf("error deleting block %s from hotstore: %s", cid, err)
// TODO do something better here -- just continue?
panic(err)
}
}
log.Infow("moving done", "took", time.Since(startMove))
// remove the snoop tracking
err = s.snoop.Delete(cid)
purgeStart := time.Now()
log.Info("purging cold objects from tracking store")
err = s.snoop.DeleteBatch(cold)
if err != nil {
log.Errorf("error deleting cid %s from tracking store: %w", cid, err)
log.Errorf("error purging cold objects from tracking store: %s", err)
// TODO do something better here -- just continue?
panic(err)
}
}
log.Infow("moving done", "took", time.Since(startMove))
log.Infow("compaction stats", "hot", stHot, "cold", stCold)
log.Infow("purging done", "took", time.Since(purgeStart))
err = s.snoop.Sync()
if err != nil {
@ -617,6 +618,8 @@ func (s *SplitStore) compactFull() {
panic(err)
}
log.Infow("compaction stats", "hot", stHot, "cold", stCold, "dead", stDead)
log.Info("moving cold objects to the coldstore")
for cid := range cold {
blk, err := s.hot.Get(cid)
@ -626,13 +629,12 @@ func (s *SplitStore) compactFull() {
// but before we have deleted it from the snoop; just delete the snoop.
err = s.snoop.Delete(cid)
if err != nil {
log.Errorf("error deleting cid %s from tracking store: %w", cid, err)
log.Errorf("error deleting cid %s from tracking store: %s", cid, err)
// TODO do something better here -- just continue?
panic(err)
}
} else {
log.Errorf("error retrieving tracked block %s from hotstore: %w ", cid, err)
log.Errorf("error retrieving tracked block %s from hotstore: %s", cid, err)
// TODO do something better here -- just continue?
panic(err)
}
@ -643,7 +645,7 @@ func (s *SplitStore) compactFull() {
// put the object in the coldstore
err = s.cold.Put(blk)
if err != nil {
log.Errorf("error puting block %s to coldstore: %w", cid, err)
log.Errorf("error puting block %s to coldstore: %s", cid, err)
// TODO do something better here -- just continue?
panic(err)
}
@ -651,19 +653,24 @@ func (s *SplitStore) compactFull() {
// delete the object from the hotstore
err = s.hot.DeleteBlock(cid)
if err != nil {
log.Errorf("error deleting block %s from hotstore: %w", cid, err)
log.Errorf("error deleting block %s from hotstore: %s", cid, err)
// TODO do something better here -- just continue?
panic(err)
}
}
// remove the snoop tracking
purgeStart := time.Now()
log.Info("purging cold objects from tracking store")
err = s.snoop.DeleteBatch(cold)
if err != nil {
log.Errorf("error purging cold objects from tracking store: %s", err)
// TODO do something better here -- just continue?
panic(err)
}
// remove the snoop tracking
err = s.snoop.Delete(cid)
if err != nil {
log.Errorf("error deleting cid %s from tracking store: %w", cid, err)
// TODO do something better here -- just continue?
panic(err)
}
}
log.Infow("purging done", "took", time.Since(purgeStart))
if len(dead) > 0 {
log.Info("deleting dead objects")
@ -672,23 +679,27 @@ func (s *SplitStore) compactFull() {
// delete the object from the hotstore
err = s.hot.DeleteBlock(cid)
if err != nil {
log.Errorf("error deleting block %s from hotstore: %w", cid, err)
log.Errorf("error deleting block %s from hotstore: %s", cid, err)
// TODO do something better here -- just continue?
panic(err)
}
}
// remove the snoop tracking
err = s.snoop.Delete(cid)
purgeStart := time.Now()
log.Info("purging dead objects from tracking store")
err = s.snoop.DeleteBatch(dead)
if err != nil {
log.Errorf("error deleting cid %s from tracking store: %w", cid, err)
log.Errorf("error purging dead objects from tracking store: %s", err)
// TODO do something better here -- just continue?
panic(err)
}
}
log.Infow("purging done", "took", time.Since(purgeStart))
}
log.Infow("sweeping done", "took", time.Since(startSweep))
log.Infow("compaction stats", "hot", stHot, "cold", stCold, "dead", stDead)
err = s.snoop.Sync()
if err != nil {