batch delete during the cold purge
This commit is contained in:
parent
4c05ec28ba
commit
06d8ea10b1
@ -57,7 +57,12 @@ var (
|
||||
log = logging.Logger("splitstore")
|
||||
)
|
||||
|
||||
const batchSize = 16384
|
||||
const (
|
||||
batchSize = 16384
|
||||
|
||||
defaultColdPurgeSize = 7_000_000
|
||||
defaultDeadPurgeSize = 1_000_000
|
||||
)
|
||||
|
||||
func init() {
|
||||
// TODO temporary for debugging purposes; to be removed for merge.
|
||||
@ -107,6 +112,9 @@ type SplitStore struct {
|
||||
baseEpoch abi.ChainEpoch
|
||||
warmupEpoch abi.ChainEpoch
|
||||
|
||||
coldPurgeSize int
|
||||
deadPurgeSize int
|
||||
|
||||
mx sync.Mutex
|
||||
curTs *types.TipSet
|
||||
|
||||
@ -152,6 +160,12 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
|
||||
enableGC: cfg.EnableGC,
|
||||
skipOldMsgs: !(cfg.EnableFullCompaction && cfg.Archival),
|
||||
skipMsgReceipts: !(cfg.EnableFullCompaction && cfg.Archival),
|
||||
|
||||
coldPurgeSize: defaultColdPurgeSize,
|
||||
}
|
||||
|
||||
if cfg.EnableGC {
|
||||
ss.deadPurgeSize = defaultDeadPurgeSize
|
||||
}
|
||||
|
||||
return ss, nil
|
||||
@ -559,7 +573,7 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) {
|
||||
log.Info("collecting cold objects")
|
||||
startCollect := time.Now()
|
||||
|
||||
cold := make(map[cid.Cid]struct{})
|
||||
cold := make([]cid.Cid, 0, s.coldPurgeSize)
|
||||
|
||||
// some stats for logging
|
||||
var hotCnt, coldCnt int
|
||||
@ -585,7 +599,7 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) {
|
||||
}
|
||||
|
||||
// it's cold, mark it for move
|
||||
cold[cid] = struct{}{}
|
||||
cold = append(cold, cid)
|
||||
coldCnt++
|
||||
return nil
|
||||
})
|
||||
@ -595,6 +609,8 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
s.coldPurgeSize = coldCnt
|
||||
|
||||
log.Infow("collection done", "took", time.Since(startCollect))
|
||||
log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt)
|
||||
|
||||
@ -642,10 +658,10 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SplitStore) moveColdBlocks(cold map[cid.Cid]struct{}) error {
|
||||
func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
|
||||
batch := make([]blocks.Block, 0, batchSize)
|
||||
|
||||
for cid := range cold {
|
||||
for _, cid := range cold {
|
||||
blk, err := s.hot.Get(cid)
|
||||
if err != nil {
|
||||
if err == dstore.ErrNotFound {
|
||||
@ -682,20 +698,16 @@ func (s *SplitStore) moveColdBlocks(cold map[cid.Cid]struct{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SplitStore) purgeBlocks(cids map[cid.Cid]struct{}) error {
|
||||
// TODO batch deletion -- this is very slow with many objects, but we need
|
||||
// a DeleteBatch method in the blockstore interface
|
||||
for cid := range cids {
|
||||
err := s.hot.DeleteBlock(cid)
|
||||
func (s *SplitStore) purgeBlocks(cids []cid.Cid) error {
|
||||
err := s.hot.DeleteMany(cids)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error deleting block %s from hotstore: %e", cid, err)
|
||||
}
|
||||
return xerrors.Errorf("error deleting batch from hotstore: %e", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SplitStore) purgeTracking(cids map[cid.Cid]struct{}) error {
|
||||
func (s *SplitStore) purgeTracking(cids []cid.Cid) error {
|
||||
err := s.tracker.DeleteBatch(cids)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error deleting batch from tracker: %w", err)
|
||||
@ -780,17 +792,17 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) {
|
||||
startCollect := time.Now()
|
||||
|
||||
// some stats for logging
|
||||
var stHot, stCold, stDead int
|
||||
var hotCnt, coldCnt, deadCnt int
|
||||
|
||||
cold := make(map[cid.Cid]struct{})
|
||||
dead := make(map[cid.Cid]struct{})
|
||||
cold := make([]cid.Cid, 0, s.coldPurgeSize)
|
||||
dead := make([]cid.Cid, 0, s.deadPurgeSize)
|
||||
|
||||
// 2.1 iterate through the tracker and collect cold and dead objects
|
||||
err = s.tracker.ForEach(func(cid cid.Cid, wrEpoch abi.ChainEpoch) error {
|
||||
// is the object stil hot?
|
||||
if wrEpoch > coldEpoch {
|
||||
// yes, stay in the hotstore
|
||||
stHot++
|
||||
hotCnt++
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -802,7 +814,7 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) {
|
||||
|
||||
if mark {
|
||||
// the object is reachable in the hot range, stay in the hotstore
|
||||
stHot++
|
||||
hotCnt++
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -815,20 +827,20 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) {
|
||||
if s.enableGC {
|
||||
if mark {
|
||||
// the object is reachable in the cold range, move it to the cold store
|
||||
cold[cid] = struct{}{}
|
||||
stCold++
|
||||
cold = append(cold, cid)
|
||||
coldCnt++
|
||||
} else {
|
||||
// the object is dead and will be deleted
|
||||
dead[cid] = struct{}{}
|
||||
stDead++
|
||||
dead = append(dead, cid)
|
||||
deadCnt++
|
||||
}
|
||||
} else {
|
||||
// if GC is disabled, we move both cold and dead objects to the coldstore
|
||||
cold[cid] = struct{}{}
|
||||
cold = append(cold, cid)
|
||||
if mark {
|
||||
stCold++
|
||||
coldCnt++
|
||||
} else {
|
||||
stDead++
|
||||
deadCnt++
|
||||
}
|
||||
}
|
||||
|
||||
@ -840,8 +852,11 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
s.coldPurgeSize = coldCnt + coldCnt>>2 // overestimate a bit
|
||||
s.deadPurgeSize = deadCnt + deadCnt>>2 // overestimate a bit
|
||||
|
||||
log.Infow("collection done", "took", time.Since(startCollect))
|
||||
log.Infow("compaction stats", "hot", stHot, "cold", stCold, "dead", stDead)
|
||||
log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt, "dead", deadCnt)
|
||||
|
||||
// 2.2 copy the cold objects to the coldstore
|
||||
log.Info("moving cold objects to the coldstore")
|
||||
|
@ -16,7 +16,7 @@ type TrackingStore interface {
|
||||
PutBatch([]cid.Cid, abi.ChainEpoch) error
|
||||
Get(cid.Cid) (abi.ChainEpoch, error)
|
||||
Delete(cid.Cid) error
|
||||
DeleteBatch(map[cid.Cid]struct{}) error
|
||||
DeleteBatch([]cid.Cid) error
|
||||
ForEach(func(cid.Cid, abi.ChainEpoch) error) error
|
||||
Sync() error
|
||||
Close() error
|
||||
|
@ -87,10 +87,10 @@ func (s *BoltTrackingStore) Delete(cid cid.Cid) error {
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltTrackingStore) DeleteBatch(cids map[cid.Cid]struct{}) error {
|
||||
func (s *BoltTrackingStore) DeleteBatch(cids []cid.Cid) error {
|
||||
return s.db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(s.bucketId)
|
||||
for cid := range cids {
|
||||
for _, cid := range cids {
|
||||
err := b.Delete(cid.Hash())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error deleting %s", cid)
|
||||
|
Loading…
Reference in New Issue
Block a user