implement hotstore message retention policy

This commit is contained in:
vyzo 2021-07-17 08:59:43 +03:00
parent 1b77361301
commit 006050ed27
2 changed files with 26 additions and 8 deletions

View File

@ -376,7 +376,13 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
currentEpoch := curTs.Height() currentEpoch := curTs.Height()
boundaryEpoch := currentEpoch - CompactionBoundary boundaryEpoch := currentEpoch - CompactionBoundary
log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "compactionIndex", s.compactionIndex) var inclMsgsEpoch abi.ChainEpoch
inclMsgsRange := abi.ChainEpoch(s.cfg.HotStoreMessageRetention) * build.Finality
if inclMsgsRange < boundaryEpoch {
inclMsgsEpoch = boundaryEpoch - inclMsgsRange
}
log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "inclMsgsEpoch", inclMsgsEpoch, "compactionIndex", s.compactionIndex)
markSet, err := s.markSetEnv.Create("live", s.markSetSize) markSet, err := s.markSetEnv.Create("live", s.markSetSize)
if err != nil { if err != nil {
@ -398,7 +404,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
startMark := time.Now() startMark := time.Now()
var count int64 var count int64
err = s.walkChain(curTs, boundaryEpoch, true, err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch,
func(c cid.Cid) error { func(c cid.Cid) error {
if isUnitaryObject(c) { if isUnitaryObject(c) {
return errStopWalk return errStopWalk
@ -593,7 +599,7 @@ func (s *SplitStore) endTxnProtect() {
s.txnMissing = nil s.txnMissing = nil
} }
func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs bool, func (s *SplitStore) walkChain(ts *types.TipSet, inclState abi.ChainEpoch, inclMsgs abi.ChainEpoch,
f func(cid.Cid) error) error { f func(cid.Cid) error) error {
visited := cid.NewSet() visited := cid.NewSet()
walked := cid.NewSet() walked := cid.NewSet()
@ -621,14 +627,25 @@ func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMs
return xerrors.Errorf("error unmarshaling block header (cid: %s): %w", c, err) return xerrors.Errorf("error unmarshaling block header (cid: %s): %w", c, err)
} }
// we only scan the block if it is at or above the boundary // message are retained if within the inclMsgs boundary
if hdr.Height >= boundary || hdr.Height == 0 { if hdr.Height >= inclMsgs && hdr.Height > 0 {
scanCnt++ if inclMsgs < inclState {
if inclMsgs && hdr.Height > 0 { // we need to use walkObjectIncomplete here, as messages may be missing early on if we
// synced from snapshot and have a long HotStoreMessageRetentionPolicy.
stopWalk := func(_ cid.Cid) error { return errStopWalk }
if err := s.walkObjectIncomplete(hdr.Messages, walked, f, stopWalk); err != nil {
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
}
} else {
if err := s.walkObject(hdr.Messages, walked, f); err != nil { if err := s.walkObject(hdr.Messages, walked, f); err != nil {
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err) return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
} }
}
}
// state and message receipts is only retained if within the inclState boundary
if hdr.Height >= inclState || hdr.Height == 0 {
if hdr.Height > 0 {
if err := s.walkObject(hdr.ParentMessageReceipts, walked, f); err != nil { if err := s.walkObject(hdr.ParentMessageReceipts, walked, f); err != nil {
return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err) return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
} }
@ -637,6 +654,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMs
if err := s.walkObject(hdr.ParentStateRoot, walked, f); err != nil { if err := s.walkObject(hdr.ParentStateRoot, walked, f); err != nil {
return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err) return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err)
} }
scanCnt++
} }
if hdr.Height > 0 { if hdr.Height > 0 {

View File

@ -48,7 +48,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
count := int64(0) count := int64(0)
xcount := int64(0) xcount := int64(0)
missing := int64(0) missing := int64(0)
err := s.walkChain(curTs, epoch, false, err := s.walkChain(curTs, epoch, epoch+1, // we don't load messages in warmup
func(c cid.Cid) error { func(c cid.Cid) error {
if isUnitaryObject(c) { if isUnitaryObject(c) {
return errStopWalk return errStopWalk