diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 1431b0496..375ec22d4 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -57,7 +57,12 @@ var ( log = logging.Logger("splitstore") ) -const batchSize = 16384 +const ( + batchSize = 16384 + + defaultColdPurgeSize = 7_000_000 + defaultDeadPurgeSize = 1_000_000 +) func init() { // TODO temporary for debugging purposes; to be removed for merge. @@ -107,6 +112,9 @@ type SplitStore struct { baseEpoch abi.ChainEpoch warmupEpoch abi.ChainEpoch + coldPurgeSize int + deadPurgeSize int + mx sync.Mutex curTs *types.TipSet @@ -152,6 +160,12 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co enableGC: cfg.EnableGC, skipOldMsgs: !(cfg.EnableFullCompaction && cfg.Archival), skipMsgReceipts: !(cfg.EnableFullCompaction && cfg.Archival), + + coldPurgeSize: defaultColdPurgeSize, + } + + if cfg.EnableGC { + ss.deadPurgeSize = defaultDeadPurgeSize } return ss, nil @@ -559,7 +573,7 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) { log.Info("collecting cold objects") startCollect := time.Now() - cold := make(map[cid.Cid]struct{}) + cold := make([]cid.Cid, 0, s.coldPurgeSize) // some stats for logging var hotCnt, coldCnt int @@ -585,7 +599,7 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) { } // it's cold, mark it for move - cold[cid] = struct{}{} + cold = append(cold, cid) coldCnt++ return nil }) @@ -595,6 +609,8 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) { panic(err) } + s.coldPurgeSize = coldCnt + log.Infow("collection done", "took", time.Since(startCollect)) log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt) @@ -642,10 +658,10 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) { } } -func (s *SplitStore) moveColdBlocks(cold map[cid.Cid]struct{}) error { +func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error { batch := make([]blocks.Block, 0, batchSize) - for cid := range cold { + for _, cid := range cold { blk, err := s.hot.Get(cid) if err != nil { if err == dstore.ErrNotFound { @@ -682,20 +698,16 @@ func (s *SplitStore) moveColdBlocks(cold map[cid.Cid]struct{}) error { return nil } -func (s *SplitStore) purgeBlocks(cids map[cid.Cid]struct{}) error { - // TODO batch deletion -- this is very slow with many objects, but we need - // a DeleteBatch method in the blockstore interface - for cid := range cids { - err := s.hot.DeleteBlock(cid) - if err != nil { - return xerrors.Errorf("error deleting block %s from hotstore: %e", cid, err) - } +func (s *SplitStore) purgeBlocks(cids []cid.Cid) error { + err := s.hot.DeleteMany(cids) + if err != nil { + return xerrors.Errorf("error deleting batch from hotstore: %e", err) } return nil } -func (s *SplitStore) purgeTracking(cids map[cid.Cid]struct{}) error { +func (s *SplitStore) purgeTracking(cids []cid.Cid) error { err := s.tracker.DeleteBatch(cids) if err != nil { return xerrors.Errorf("error deleting batch from tracker: %w", err) @@ -780,17 +792,17 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) { startCollect := time.Now() // some stats for logging - var stHot, stCold, stDead int + var hotCnt, coldCnt, deadCnt int - cold := make(map[cid.Cid]struct{}) - dead := make(map[cid.Cid]struct{}) + cold := make([]cid.Cid, 0, s.coldPurgeSize) + dead := make([]cid.Cid, 0, s.deadPurgeSize) // 2.1 iterate through the tracker and collect cold and dead objects err = s.tracker.ForEach(func(cid cid.Cid, wrEpoch abi.ChainEpoch) error { // is the object stil hot? if wrEpoch > coldEpoch { // yes, stay in the hotstore - stHot++ + hotCnt++ return nil } @@ -802,7 +814,7 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) { if mark { // the object is reachable in the hot range, stay in the hotstore - stHot++ + hotCnt++ return nil } @@ -815,20 +827,20 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) { if s.enableGC { if mark { // the object is reachable in the cold range, move it to the cold store - cold[cid] = struct{}{} - stCold++ + cold = append(cold, cid) + coldCnt++ } else { // the object is dead and will be deleted - dead[cid] = struct{}{} - stDead++ + dead = append(dead, cid) + deadCnt++ } } else { // if GC is disabled, we move both cold and dead objects to the coldstore - cold[cid] = struct{}{} + cold = append(cold, cid) if mark { - stCold++ + coldCnt++ } else { - stDead++ + deadCnt++ } } @@ -840,8 +852,11 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) { panic(err) } + s.coldPurgeSize = coldCnt + coldCnt>>2 // overestimate a bit + s.deadPurgeSize = deadCnt + deadCnt>>2 // overestimate a bit + log.Infow("collection done", "took", time.Since(startCollect)) - log.Infow("compaction stats", "hot", stHot, "cold", stCold, "dead", stDead) + log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt, "dead", deadCnt) // 2.2 copy the cold objects to the coldstore log.Info("moving cold objects to the coldstore") diff --git a/blockstore/splitstore/tracking.go b/blockstore/splitstore/tracking.go index fc1895e49..1772a4305 100644 --- a/blockstore/splitstore/tracking.go +++ b/blockstore/splitstore/tracking.go @@ -16,7 +16,7 @@ type TrackingStore interface { PutBatch([]cid.Cid, abi.ChainEpoch) error Get(cid.Cid) (abi.ChainEpoch, error) Delete(cid.Cid) error - DeleteBatch(map[cid.Cid]struct{}) error + DeleteBatch([]cid.Cid) error ForEach(func(cid.Cid, abi.ChainEpoch) error) error Sync() error Close() error diff --git a/blockstore/splitstore/tracking_bolt.go b/blockstore/splitstore/tracking_bolt.go index 8c491043e..c5c451e15 100644 --- a/blockstore/splitstore/tracking_bolt.go +++ b/blockstore/splitstore/tracking_bolt.go @@ -87,10 +87,10 @@ func (s *BoltTrackingStore) Delete(cid cid.Cid) error { }) } -func (s *BoltTrackingStore) DeleteBatch(cids map[cid.Cid]struct{}) error { +func (s *BoltTrackingStore) DeleteBatch(cids []cid.Cid) error { return s.db.Batch(func(tx *bolt.Tx) error { b := tx.Bucket(s.bucketId) - for cid := range cids { + for _, cid := range cids { err := b.Delete(cid.Hash()) if err != nil { return xerrors.Errorf("error deleting %s", cid)