Merge pull request #6775 from filecoin-project/feat/splitstore-hot-messages
Splitstore: add retention policy option for keeping messages in the hotstore
This commit is contained in:
commit
0c02207fc9
@ -71,6 +71,13 @@ type Config struct {
|
|||||||
// which skips moving (as it is a noop, but still takes time to read all the cold objects)
|
// which skips moving (as it is a noop, but still takes time to read all the cold objects)
|
||||||
// and directly purges cold blocks.
|
// and directly purges cold blocks.
|
||||||
DiscardColdBlocks bool
|
DiscardColdBlocks bool
|
||||||
|
|
||||||
|
// HotstoreMessageRetention indicates the hotstore retention policy for messages.
|
||||||
|
// It has the following semantics:
|
||||||
|
// - a value of 0 will only retain messages within the compaction boundary (4 finalities)
|
||||||
|
// - a positive integer indicates the number of finalities, outside the compaction boundary,
|
||||||
|
// for which messages will be retained in the hotstore.
|
||||||
|
HotStoreMessageRetention uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// ChainAccessor allows the Splitstore to access the chain. It will most likely
|
// ChainAccessor allows the Splitstore to access the chain. It will most likely
|
||||||
|
@ -400,7 +400,13 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
currentEpoch := curTs.Height()
|
currentEpoch := curTs.Height()
|
||||||
boundaryEpoch := currentEpoch - CompactionBoundary
|
boundaryEpoch := currentEpoch - CompactionBoundary
|
||||||
|
|
||||||
log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "compactionIndex", s.compactionIndex)
|
var inclMsgsEpoch abi.ChainEpoch
|
||||||
|
inclMsgsRange := abi.ChainEpoch(s.cfg.HotStoreMessageRetention) * build.Finality
|
||||||
|
if inclMsgsRange < boundaryEpoch {
|
||||||
|
inclMsgsEpoch = boundaryEpoch - inclMsgsRange
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "inclMsgsEpoch", inclMsgsEpoch, "compactionIndex", s.compactionIndex)
|
||||||
|
|
||||||
markSet, err := s.markSetEnv.Create("live", s.markSetSize)
|
markSet, err := s.markSetEnv.Create("live", s.markSetSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -430,7 +436,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
startMark := time.Now()
|
startMark := time.Now()
|
||||||
|
|
||||||
var count int64
|
var count int64
|
||||||
err = s.walkChain(curTs, boundaryEpoch, true,
|
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch,
|
||||||
func(c cid.Cid) error {
|
func(c cid.Cid) error {
|
||||||
if isUnitaryObject(c) {
|
if isUnitaryObject(c) {
|
||||||
return errStopWalk
|
return errStopWalk
|
||||||
@ -625,7 +631,7 @@ func (s *SplitStore) endTxnProtect() {
|
|||||||
s.txnMissing = nil
|
s.txnMissing = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs bool,
|
func (s *SplitStore) walkChain(ts *types.TipSet, inclState abi.ChainEpoch, inclMsgs abi.ChainEpoch,
|
||||||
f func(cid.Cid) error) error {
|
f func(cid.Cid) error) error {
|
||||||
visited := cid.NewSet()
|
visited := cid.NewSet()
|
||||||
walked := cid.NewSet()
|
walked := cid.NewSet()
|
||||||
@ -653,14 +659,25 @@ func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMs
|
|||||||
return xerrors.Errorf("error unmarshaling block header (cid: %s): %w", c, err)
|
return xerrors.Errorf("error unmarshaling block header (cid: %s): %w", c, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// we only scan the block if it is at or above the boundary
|
// message are retained if within the inclMsgs boundary
|
||||||
if hdr.Height >= boundary || hdr.Height == 0 {
|
if hdr.Height >= inclMsgs && hdr.Height > 0 {
|
||||||
scanCnt++
|
if inclMsgs < inclState {
|
||||||
if inclMsgs && hdr.Height > 0 {
|
// we need to use walkObjectIncomplete here, as messages may be missing early on if we
|
||||||
|
// synced from snapshot and have a long HotStoreMessageRetentionPolicy.
|
||||||
|
stopWalk := func(_ cid.Cid) error { return errStopWalk }
|
||||||
|
if err := s.walkObjectIncomplete(hdr.Messages, walked, f, stopWalk); err != nil {
|
||||||
|
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
if err := s.walkObject(hdr.Messages, walked, f); err != nil {
|
if err := s.walkObject(hdr.Messages, walked, f); err != nil {
|
||||||
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
|
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// state and message receipts is only retained if within the inclState boundary
|
||||||
|
if hdr.Height >= inclState || hdr.Height == 0 {
|
||||||
|
if hdr.Height > 0 {
|
||||||
if err := s.walkObject(hdr.ParentMessageReceipts, walked, f); err != nil {
|
if err := s.walkObject(hdr.ParentMessageReceipts, walked, f); err != nil {
|
||||||
return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
|
return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
|
||||||
}
|
}
|
||||||
@ -669,6 +686,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMs
|
|||||||
if err := s.walkObject(hdr.ParentStateRoot, walked, f); err != nil {
|
if err := s.walkObject(hdr.ParentStateRoot, walked, f); err != nil {
|
||||||
return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err)
|
return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err)
|
||||||
}
|
}
|
||||||
|
scanCnt++
|
||||||
}
|
}
|
||||||
|
|
||||||
if hdr.Height > 0 {
|
if hdr.Height > 0 {
|
||||||
|
@ -48,7 +48,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
|
|||||||
count := int64(0)
|
count := int64(0)
|
||||||
xcount := int64(0)
|
xcount := int64(0)
|
||||||
missing := int64(0)
|
missing := int64(0)
|
||||||
err := s.walkChain(curTs, epoch, false,
|
err := s.walkChain(curTs, epoch, epoch+1, // we don't load messages in warmup
|
||||||
func(c cid.Cid) error {
|
func(c cid.Cid) error {
|
||||||
if isUnitaryObject(c) {
|
if isUnitaryObject(c) {
|
||||||
return errStopWalk
|
return errStopWalk
|
||||||
|
@ -251,6 +251,8 @@ type Splitstore struct {
|
|||||||
ColdStoreType string
|
ColdStoreType string
|
||||||
HotStoreType string
|
HotStoreType string
|
||||||
MarkSetType string
|
MarkSetType string
|
||||||
|
|
||||||
|
HotStoreMessageRetention uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// // Full Node
|
// // Full Node
|
||||||
|
@ -78,8 +78,9 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked
|
|||||||
}
|
}
|
||||||
|
|
||||||
cfg := &splitstore.Config{
|
cfg := &splitstore.Config{
|
||||||
MarkSetType: cfg.Splitstore.MarkSetType,
|
MarkSetType: cfg.Splitstore.MarkSetType,
|
||||||
DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard",
|
DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard",
|
||||||
|
HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention,
|
||||||
}
|
}
|
||||||
ss, err := splitstore.Open(path, ds, hot, cold, cfg)
|
ss, err := splitstore.Open(path, ds, hot, cold, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user