coalesce message and message receipt retention
This commit is contained in:
parent
2a68ae8dad
commit
5266b240b8
@ -436,7 +436,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
startMark := time.Now()
|
startMark := time.Now()
|
||||||
|
|
||||||
var count int64
|
var count int64
|
||||||
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, boundaryEpoch,
|
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch,
|
||||||
func(c cid.Cid) error {
|
func(c cid.Cid) error {
|
||||||
if isUnitaryObject(c) {
|
if isUnitaryObject(c) {
|
||||||
return errStopWalk
|
return errStopWalk
|
||||||
@ -631,7 +631,7 @@ func (s *SplitStore) endTxnProtect() {
|
|||||||
s.txnMissing = nil
|
s.txnMissing = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs, inclMsgReceipts abi.ChainEpoch,
|
func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEpoch,
|
||||||
f func(cid.Cid) error) error {
|
f func(cid.Cid) error) error {
|
||||||
visited := cid.NewSet()
|
visited := cid.NewSet()
|
||||||
walked := cid.NewSet()
|
walked := cid.NewSet()
|
||||||
@ -664,26 +664,20 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs, inclMsgRec
|
|||||||
// message are retained if within the inclMsgs boundary
|
// message are retained if within the inclMsgs boundary
|
||||||
if hdr.Height >= inclMsgs && hdr.Height > 0 {
|
if hdr.Height >= inclMsgs && hdr.Height > 0 {
|
||||||
if inclMsgs < inclState {
|
if inclMsgs < inclState {
|
||||||
// we need to use walkObjectIncomplete here, as messages may be missing early on if we
|
// we need to use walkObjectIncomplete here, as messages/receipts may be missing early on if we
|
||||||
// synced from snapshot and have a long HotStoreMessageRetentionPolicy.
|
// synced from snapshot and have a long HotStoreMessageRetentionPolicy.
|
||||||
if err := s.walkObjectIncomplete(hdr.Messages, walked, f, stopWalk); err != nil {
|
if err := s.walkObjectIncomplete(hdr.Messages, walked, f, stopWalk); err != nil {
|
||||||
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
|
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, walked, f, stopWalk); err != nil {
|
||||||
|
return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if err := s.walkObject(hdr.Messages, walked, f); err != nil {
|
if err := s.walkObject(hdr.Messages, walked, f); err != nil {
|
||||||
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
|
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if hdr.Height >= inclMsgReceipts && hdr.Height > 0 {
|
|
||||||
if inclMsgReceipts < inclState {
|
|
||||||
// we need to use walkObjectIncomplete here, as message receipts may be missing early on if we
|
|
||||||
// synced from snapshot
|
|
||||||
if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, walked, f, stopWalk); err != nil {
|
|
||||||
return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if err := s.walkObject(hdr.ParentMessageReceipts, walked, f); err != nil {
|
if err := s.walkObject(hdr.ParentMessageReceipts, walked, f); err != nil {
|
||||||
return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
|
return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
|
||||||
}
|
}
|
||||||
|
@ -48,7 +48,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
|
|||||||
count := int64(0)
|
count := int64(0)
|
||||||
xcount := int64(0)
|
xcount := int64(0)
|
||||||
missing := int64(0)
|
missing := int64(0)
|
||||||
err := s.walkChain(curTs, epoch, epoch+1, epoch+1, // we don't load messages/receipts in warmup
|
err := s.walkChain(curTs, epoch, epoch+1, // we don't load messages/receipts in warmup
|
||||||
func(c cid.Cid) error {
|
func(c cid.Cid) error {
|
||||||
if isUnitaryObject(c) {
|
if isUnitaryObject(c) {
|
||||||
return errStopWalk
|
return errStopWalk
|
||||||
|
Loading…
Reference in New Issue
Block a user