diff --git a/chain/store/splitstore/snoop.go b/chain/store/splitstore/snoop.go index b312cae93..02c270fc7 100644 --- a/chain/store/splitstore/snoop.go +++ b/chain/store/splitstore/snoop.go @@ -12,6 +12,7 @@ type TrackingStore interface { PutBatch([]cid.Cid, abi.ChainEpoch) error Get(cid.Cid) (abi.ChainEpoch, error) Delete(cid.Cid) error + DeleteBatch(map[cid.Cid]struct{}) error ForEach(func(cid.Cid, abi.ChainEpoch) error) error Sync() error Close() error diff --git a/chain/store/splitstore/snoop_bolt.go b/chain/store/splitstore/snoop_bolt.go index 5c305e91e..2fc5d4f6d 100644 --- a/chain/store/splitstore/snoop_bolt.go +++ b/chain/store/splitstore/snoop_bolt.go @@ -87,6 +87,19 @@ func (s *BoltTrackingStore) Delete(cid cid.Cid) error { }) } +func (s *BoltTrackingStore) DeleteBatch(cids map[cid.Cid]struct{}) error { + return s.db.Batch(func(tx *bolt.Tx) error { + b := tx.Bucket(s.bucketId) + for cid := range cids { + err := b.Delete(cid.Hash()) + if err != nil { + return xerrors.Errorf("error deleting %s", cid) + } + } + return nil + }) +} + func (s *BoltTrackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error { return s.db.View(func(tx *bolt.Tx) error { b := tx.Bucket(s.bucketId) diff --git a/chain/store/splitstore/snoop_lmdb.go b/chain/store/splitstore/snoop_lmdb.go index 225f5f199..4222c94be 100644 --- a/chain/store/splitstore/snoop_lmdb.go +++ b/chain/store/splitstore/snoop_lmdb.go @@ -122,6 +122,21 @@ func (s *LMDBTrackingStore) Delete(cid cid.Cid) error { }) } +func (s *LMDBTrackingStore) DeleteBatch(cids map[cid.Cid]struct{}) error { + return withMaxReadersRetry( + func() error { + return s.env.Update(func(txn *lmdb.Txn) error { + for cid := range cids { + err := txn.Del(s.db, cid.Hash(), nil) + if err != nil { + return err + } + } + return nil + }) + }) +} + func (s *LMDBTrackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error { return withMaxReadersRetry( func() error { diff --git a/chain/store/splitstore/splitstore.go b/chain/store/splitstore/splitstore.go index 92d50a925..edfe09ff9 100644 --- a/chain/store/splitstore/splitstore.go +++ b/chain/store/splitstore/splitstore.go @@ -423,10 +423,10 @@ func (s *SplitStore) compactSimple() { } log.Infow("collection done", "took", time.Since(startCollect)) + log.Infow("compaction stats", "hot", stHot, "cold", stCold) log.Info("moving cold objects to the coldstore") startMove := time.Now() - for cid := range cold { blk, err := s.hot.Get(cid) if err != nil { @@ -435,13 +435,12 @@ func (s *SplitStore) compactSimple() { // but before we have deleted it from the snoop; just delete the snoop. err = s.snoop.Delete(cid) if err != nil { - log.Errorf("error deleting cid %s from tracking store: %w", cid, err) + log.Errorf("error deleting cid %s from tracking store: %s", cid, err) // TODO do something better here -- just continue? panic(err) - } } else { - log.Errorf("error retrieving tracked block %s from hotstore: %w ", cid, err) + log.Errorf("error retrieving tracked block %s from hotstore: %s", cid, err) // TODO do something better here -- just continue? panic(err) } @@ -452,7 +451,7 @@ func (s *SplitStore) compactSimple() { // put the object in the coldstore err = s.cold.Put(blk) if err != nil { - log.Errorf("error puting block %s to coldstore: %w", cid, err) + log.Errorf("error puting block %s to coldstore: %s", cid, err) // TODO do something better here -- just continue? panic(err) } @@ -460,22 +459,24 @@ func (s *SplitStore) compactSimple() { // delete the object from the hotstore err = s.hot.DeleteBlock(cid) if err != nil { - log.Errorf("error deleting block %s from hotstore: %w", cid, err) - // TODO do something better here -- just continue? - panic(err) - } - - // remove the snoop tracking - err = s.snoop.Delete(cid) - if err != nil { - log.Errorf("error deleting cid %s from tracking store: %w", cid, err) + log.Errorf("error deleting block %s from hotstore: %s", cid, err) // TODO do something better here -- just continue? panic(err) } } - log.Infow("moving done", "took", time.Since(startMove)) - log.Infow("compaction stats", "hot", stHot, "cold", stCold) + + // remove the snoop tracking + purgeStart := time.Now() + log.Info("purging cold objects from tracking store") + + err = s.snoop.DeleteBatch(cold) + if err != nil { + log.Errorf("error purging cold objects from tracking store: %s", err) + // TODO do something better here -- just continue? + panic(err) + } + log.Infow("purging done", "took", time.Since(purgeStart)) err = s.snoop.Sync() if err != nil { @@ -617,6 +618,8 @@ func (s *SplitStore) compactFull() { panic(err) } + log.Infow("compaction stats", "hot", stHot, "cold", stCold, "dead", stDead) + log.Info("moving cold objects to the coldstore") for cid := range cold { blk, err := s.hot.Get(cid) @@ -626,13 +629,12 @@ func (s *SplitStore) compactFull() { // but before we have deleted it from the snoop; just delete the snoop. err = s.snoop.Delete(cid) if err != nil { - log.Errorf("error deleting cid %s from tracking store: %w", cid, err) + log.Errorf("error deleting cid %s from tracking store: %s", cid, err) // TODO do something better here -- just continue? panic(err) - } } else { - log.Errorf("error retrieving tracked block %s from hotstore: %w ", cid, err) + log.Errorf("error retrieving tracked block %s from hotstore: %s", cid, err) // TODO do something better here -- just continue? panic(err) } @@ -643,7 +645,7 @@ func (s *SplitStore) compactFull() { // put the object in the coldstore err = s.cold.Put(blk) if err != nil { - log.Errorf("error puting block %s to coldstore: %w", cid, err) + log.Errorf("error puting block %s to coldstore: %s", cid, err) // TODO do something better here -- just continue? panic(err) } @@ -651,20 +653,25 @@ func (s *SplitStore) compactFull() { // delete the object from the hotstore err = s.hot.DeleteBlock(cid) if err != nil { - log.Errorf("error deleting block %s from hotstore: %w", cid, err) - // TODO do something better here -- just continue? - panic(err) - } - - // remove the snoop tracking - err = s.snoop.Delete(cid) - if err != nil { - log.Errorf("error deleting cid %s from tracking store: %w", cid, err) + log.Errorf("error deleting block %s from hotstore: %s", cid, err) // TODO do something better here -- just continue? panic(err) } } + // remove the snoop tracking + purgeStart := time.Now() + log.Info("purging cold objects from tracking store") + + err = s.snoop.DeleteBatch(cold) + if err != nil { + log.Errorf("error purging cold objects from tracking store: %s", err) + // TODO do something better here -- just continue? + panic(err) + } + + log.Infow("purging done", "took", time.Since(purgeStart)) + if len(dead) > 0 { log.Info("deleting dead objects") @@ -672,23 +679,27 @@ func (s *SplitStore) compactFull() { // delete the object from the hotstore err = s.hot.DeleteBlock(cid) if err != nil { - log.Errorf("error deleting block %s from hotstore: %w", cid, err) - // TODO do something better here -- just continue? - panic(err) - } - - // remove the snoop tracking - err = s.snoop.Delete(cid) - if err != nil { - log.Errorf("error deleting cid %s from tracking store: %w", cid, err) + log.Errorf("error deleting block %s from hotstore: %s", cid, err) // TODO do something better here -- just continue? panic(err) } } + + // remove the snoop tracking + purgeStart := time.Now() + log.Info("purging dead objects from tracking store") + + err = s.snoop.DeleteBatch(dead) + if err != nil { + log.Errorf("error purging dead objects from tracking store: %s", err) + // TODO do something better here -- just continue? + panic(err) + } + + log.Infow("purging done", "took", time.Since(purgeStart)) } log.Infow("sweeping done", "took", time.Since(startSweep)) - log.Infow("compaction stats", "hot", stHot, "cold", stCold, "dead", stDead) err = s.snoop.Sync() if err != nil {