walk at boundary epoch, 2 finalities from current epoch, to find live objects
objects written after that are retained anyway.
This commit is contained in:
parent
508fcb9d26
commit
fdd877534f
@ -29,10 +29,10 @@ var (
|
|||||||
//
|
//
|
||||||
// |················· CompactionThreshold ··················|
|
// |················· CompactionThreshold ··················|
|
||||||
// | |
|
// | |
|
||||||
// =======‖≡≡≡≡≡≡≡‖-------------------------------------------------»
|
// =======‖≡≡≡≡≡≡≡‖-----------------------|------------------------»
|
||||||
// | | chain --> ↑__ current epoch
|
// | | | chain --> ↑__ current epoch
|
||||||
// |·······|
|
// |·······| |
|
||||||
// ↑________ CompactionCold.
|
// ↑________ CompactionCold ↑________ CompactionBoundary
|
||||||
//
|
//
|
||||||
// === :: cold (already archived)
|
// === :: cold (already archived)
|
||||||
// ≡≡≡ :: to be archived in this compaction
|
// ≡≡≡ :: to be archived in this compaction
|
||||||
@ -43,6 +43,10 @@ var (
|
|||||||
// cold store on compaction. See diagram on CompactionThreshold for a
|
// cold store on compaction. See diagram on CompactionThreshold for a
|
||||||
// better sense.
|
// better sense.
|
||||||
CompactionCold = build.Finality
|
CompactionCold = build.Finality
|
||||||
|
|
||||||
|
// CompactionBoundary is the number of epochs from the current epoch at which
|
||||||
|
// we will walk the chain for live objects
|
||||||
|
CompactionBoundary = 2 * build.Finality
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -536,8 +540,10 @@ func (s *SplitStore) estimateMarkSetSize(curTs *types.TipSet) {
|
|||||||
|
|
||||||
func (s *SplitStore) compactSimple(curTs *types.TipSet) {
|
func (s *SplitStore) compactSimple(curTs *types.TipSet) {
|
||||||
coldEpoch := s.baseEpoch + CompactionCold
|
coldEpoch := s.baseEpoch + CompactionCold
|
||||||
|
currentEpoch := curTs.Height()
|
||||||
|
boundaryEpoch := currentEpoch - CompactionBoundary
|
||||||
|
|
||||||
log.Infow("running simple compaction", "currentEpoch", curTs.Height(), "baseEpoch", s.baseEpoch, "coldEpoch", coldEpoch)
|
log.Infow("running simple compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "coldEpoch", coldEpoch, "boundaryEpoch", boundaryEpoch)
|
||||||
|
|
||||||
coldSet, err := s.env.Create("cold", s.markSetSize)
|
coldSet, err := s.env.Create("cold", s.markSetSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -547,17 +553,17 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) {
|
|||||||
defer coldSet.Close() //nolint:errcheck
|
defer coldSet.Close() //nolint:errcheck
|
||||||
|
|
||||||
// 1. mark reachable cold objects by looking at the objects reachable only from the cold epoch
|
// 1. mark reachable cold objects by looking at the objects reachable only from the cold epoch
|
||||||
log.Infow("marking reachable cold objects", "coldEpoch", coldEpoch)
|
log.Infow("marking reachable cold objects", "boundaryEpoch", boundaryEpoch)
|
||||||
startMark := time.Now()
|
startMark := time.Now()
|
||||||
|
|
||||||
coldTs, err := s.chain.GetTipsetByHeight(context.Background(), coldEpoch, curTs, true)
|
boundaryTs, err := s.chain.GetTipsetByHeight(context.Background(), boundaryEpoch, curTs, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO do something better here
|
// TODO do something better here
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var count int64
|
var count int64
|
||||||
err = s.chain.WalkSnapshot(context.Background(), coldTs, 1, s.skipOldMsgs, s.skipMsgReceipts,
|
err = s.chain.WalkSnapshot(context.Background(), boundaryTs, 1, s.skipOldMsgs, s.skipMsgReceipts,
|
||||||
func(cid cid.Cid) error {
|
func(cid cid.Cid) error {
|
||||||
count++
|
count++
|
||||||
return coldSet.Mark(cid)
|
return coldSet.Mark(cid)
|
||||||
@ -754,10 +760,11 @@ func (s *SplitStore) purgeTracking(cids []cid.Cid) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) compactFull(curTs *types.TipSet) {
|
func (s *SplitStore) compactFull(curTs *types.TipSet) {
|
||||||
epoch := curTs.Height()
|
currentEpoch := curTs.Height()
|
||||||
coldEpoch := s.baseEpoch + CompactionCold
|
coldEpoch := s.baseEpoch + CompactionCold
|
||||||
|
boundaryEpoch := currentEpoch - CompactionBoundary
|
||||||
|
|
||||||
log.Infow("running full compaction", "currentEpoch", curTs.Height(), "baseEpoch", s.baseEpoch, "coldEpoch", coldEpoch)
|
log.Infow("running full compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "coldEpoch", coldEpoch, "boundaryEpoch", boundaryEpoch)
|
||||||
|
|
||||||
// create two mark sets, one for marking the cold finality region
|
// create two mark sets, one for marking the cold finality region
|
||||||
// and one for marking the hot region
|
// and one for marking the hot region
|
||||||
@ -780,8 +787,14 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) {
|
|||||||
startMark := time.Now()
|
startMark := time.Now()
|
||||||
|
|
||||||
// Phase 1a: mark all reachable CIDs in the hot range
|
// Phase 1a: mark all reachable CIDs in the hot range
|
||||||
|
boundaryTs, err := s.chain.GetTipsetByHeight(context.Background(), boundaryEpoch, curTs, true)
|
||||||
|
if err != nil {
|
||||||
|
// TODO do something better here
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
count := int64(0)
|
count := int64(0)
|
||||||
err = s.chain.WalkSnapshot(context.Background(), curTs, epoch-coldEpoch, s.skipOldMsgs, s.skipMsgReceipts,
|
err = s.chain.WalkSnapshot(context.Background(), boundaryTs, boundaryEpoch-coldEpoch, s.skipOldMsgs, s.skipMsgReceipts,
|
||||||
func(cid cid.Cid) error {
|
func(cid cid.Cid) error {
|
||||||
count++
|
count++
|
||||||
return hotSet.Mark(cid)
|
return hotSet.Mark(cid)
|
||||||
|
Loading…
Reference in New Issue
Block a user