diff --git a/blockstore/splitstore/splitstore_compact.go b/blockstore/splitstore/splitstore_compact.go index 1c70ce973..1fc46b9fb 100644 --- a/blockstore/splitstore/splitstore_compact.go +++ b/blockstore/splitstore/splitstore_compact.go @@ -376,7 +376,13 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { currentEpoch := curTs.Height() boundaryEpoch := currentEpoch - CompactionBoundary - log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "compactionIndex", s.compactionIndex) + var inclMsgsEpoch abi.ChainEpoch + inclMsgsRange := abi.ChainEpoch(s.cfg.HotStoreMessageRetention) * build.Finality + if inclMsgsRange < boundaryEpoch { + inclMsgsEpoch = boundaryEpoch - inclMsgsRange + } + + log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "inclMsgsEpoch", inclMsgsEpoch, "compactionIndex", s.compactionIndex) markSet, err := s.markSetEnv.Create("live", s.markSetSize) if err != nil { @@ -398,7 +404,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { startMark := time.Now() var count int64 - err = s.walkChain(curTs, boundaryEpoch, true, + err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, func(c cid.Cid) error { if isUnitaryObject(c) { return errStopWalk @@ -593,7 +599,7 @@ func (s *SplitStore) endTxnProtect() { s.txnMissing = nil } -func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs bool, +func (s *SplitStore) walkChain(ts *types.TipSet, inclState abi.ChainEpoch, inclMsgs abi.ChainEpoch, f func(cid.Cid) error) error { visited := cid.NewSet() walked := cid.NewSet() @@ -621,14 +627,25 @@ func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMs return xerrors.Errorf("error unmarshaling block header (cid: %s): %w", c, err) } - // we only scan the block if it is at or above the boundary - if hdr.Height >= boundary || hdr.Height == 0 { - scanCnt++ - if inclMsgs && hdr.Height > 0 { + // message are retained if within the inclMsgs boundary + if hdr.Height >= inclMsgs && hdr.Height > 0 { + if inclMsgs < inclState { + // we need to use walkObjectIncomplete here, as messages may be missing early on if we + // synced from snapshot and have a long HotStoreMessageRetentionPolicy. + stopWalk := func(_ cid.Cid) error { return errStopWalk } + if err := s.walkObjectIncomplete(hdr.Messages, walked, f, stopWalk); err != nil { + return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err) + } + } else { if err := s.walkObject(hdr.Messages, walked, f); err != nil { return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err) } + } + } + // state and message receipts is only retained if within the inclState boundary + if hdr.Height >= inclState || hdr.Height == 0 { + if hdr.Height > 0 { if err := s.walkObject(hdr.ParentMessageReceipts, walked, f); err != nil { return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err) } @@ -637,6 +654,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMs if err := s.walkObject(hdr.ParentStateRoot, walked, f); err != nil { return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err) } + scanCnt++ } if hdr.Height > 0 { diff --git a/blockstore/splitstore/splitstore_warmup.go b/blockstore/splitstore/splitstore_warmup.go index fa5192587..55fa94c6f 100644 --- a/blockstore/splitstore/splitstore_warmup.go +++ b/blockstore/splitstore/splitstore_warmup.go @@ -48,7 +48,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error { count := int64(0) xcount := int64(0) missing := int64(0) - err := s.walkChain(curTs, epoch, false, + err := s.walkChain(curTs, epoch, epoch+1, // we don't load messages in warmup func(c cid.Cid) error { if isUnitaryObject(c) { return errStopWalk