feat:splitstore:retain tipset references in hot store (#9960)

* Retain tipset reference

Co-authored-by: zenground0 <ZenGround0@users.noreply.github.com>
This commit is contained in:
ZenGround0 2023-01-06 12:39:07 -05:00 committed by GitHub
parent ccd08c2d55
commit 06393da4ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 75 additions and 1 deletions

View File

@ -905,6 +905,10 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
walkCnt := new(int64) walkCnt := new(int64)
scanCnt := new(int64) scanCnt := new(int64)
tsRef := func(blkCids []cid.Cid) (cid.Cid, error) {
return types.NewTipSetKey(blkCids...).Cid()
}
stopWalk := func(_ cid.Cid) error { return errStopWalk } stopWalk := func(_ cid.Cid) error { return errStopWalk }
walkBlock := func(c cid.Cid) error { walkBlock := func(c cid.Cid) error {
@ -926,11 +930,19 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
err = s.view(c, func(data []byte) error { err = s.view(c, func(data []byte) error {
return hdr.UnmarshalCBOR(bytes.NewBuffer(data)) return hdr.UnmarshalCBOR(bytes.NewBuffer(data))
}) })
if err != nil { if err != nil {
return xerrors.Errorf("error unmarshaling block header (cid: %s): %w", c, err) return xerrors.Errorf("error unmarshaling block header (cid: %s): %w", c, err)
} }
// tipset CID references are retained
pRef, err := tsRef(hdr.Parents)
if err != nil {
return xerrors.Errorf("error computing cid reference to parent tipset")
}
if err := s.walkObjectIncomplete(pRef, visitor, fHot, stopWalk); err != nil {
return xerrors.Errorf("error walking parent tipset cid reference")
}
// message are retained if within the inclMsgs boundary // message are retained if within the inclMsgs boundary
if hdr.Height >= inclMsgs && hdr.Height > 0 { if hdr.Height >= inclMsgs && hdr.Height > 0 {
if inclMsgs < inclState { if inclMsgs < inclState {
@ -981,6 +993,15 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
return nil return nil
} }
// retain ref to chain head
hRef, err := tsRef(ts.Cids())
if err != nil {
return xerrors.Errorf("error computing cid reference to parent tipset")
}
if err := s.walkObjectIncomplete(hRef, visitor, fHot, stopWalk); err != nil {
return xerrors.Errorf("error walking parent tipset cid reference")
}
for len(toWalk) > 0 { for len(toWalk) > 0 {
// walking can take a while, so check this with every opportunity // walking can take a while, so check this with every opportunity
if err := s.checkClosing(); err != nil { if err := s.checkClosing(); err != nil {

View File

@ -249,6 +249,49 @@ func TestMessagesMode(t *testing.T) {
assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message not found in splitstore") assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message not found in splitstore")
} }
func TestCompactRetainsTipSetRef(t *testing.T) {
ctx := context.Background()
// disable sync checking because efficient itests require that the node is out of sync : /
splitstore.CheckSyncGap = false
opts := []interface{}{kit.MockProofs(), kit.SplitstoreDiscard()}
full, genesisMiner, ens := kit.EnsembleMinimal(t, opts...)
bm := ens.InterconnectAll().BeginMining(4 * time.Millisecond)[0]
_ = genesisMiner
_ = bm
check, err := full.ChainHead(ctx)
require.NoError(t, err)
e := check.Height()
checkRef, err := check.Key().Cid()
require.NoError(t, err)
assert.True(t, ipldExists(ctx, t, checkRef, full)) // reference to tipset key should be persisted before compaction
// Determine index of compaction that covers tipset "check" and wait for compaction
for {
bm.Pause()
if splitStoreCompacting(ctx, t, full) {
bm.Restart()
time.Sleep(1 * time.Second)
} else {
break
}
}
lastCompactionEpoch := splitStoreBaseEpoch(ctx, t, full)
garbageCompactionIndex := splitStoreCompactionIndex(ctx, t, full) + 1
boundary := lastCompactionEpoch + splitstore.CompactionThreshold - splitstore.CompactionBoundary
for e > boundary {
boundary += splitstore.CompactionThreshold - splitstore.CompactionBoundary
garbageCompactionIndex++
}
bm.Restart()
// wait for compaction to occur
waitForCompaction(ctx, t, garbageCompactionIndex, full)
assert.True(t, ipldExists(ctx, t, checkRef, full)) // reference to tipset key should be persisted after compaction
bm.Stop()
}
func waitForCompaction(ctx context.Context, t *testing.T, cIdx int64, n *kit.TestFullNode) { func waitForCompaction(ctx context.Context, t *testing.T, cIdx int64, n *kit.TestFullNode) {
for { for {
if splitStoreCompactionIndex(ctx, t, n) >= cIdx { if splitStoreCompactionIndex(ctx, t, n) >= cIdx {
@ -307,6 +350,16 @@ func splitStorePruneIndex(ctx context.Context, t *testing.T, n *kit.TestFullNode
return pruneIndex return pruneIndex
} }
func ipldExists(ctx context.Context, t *testing.T, c cid.Cid, n *kit.TestFullNode) bool {
_, err := n.ChainReadObj(ctx, c)
if ipld.IsNotFound(err) {
return false
} else if err != nil {
t.Fatalf("ChainReadObj failure on existence check: %s", err)
}
return true
}
// Create on chain unreachable garbage for a network to exercise splitstore // Create on chain unreachable garbage for a network to exercise splitstore
// one garbage cid created at a time // one garbage cid created at a time
// //