From 05c132588519be2ac190a3f74ea6584c04945dca Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 3 Dec 2021 11:50:35 +0200 Subject: [PATCH 1/4] add logic for supressing compaction near upgrade boundaries --- blockstore/splitstore/splitstore.go | 32 ++++++++++++++++++++- blockstore/splitstore/splitstore_compact.go | 16 +++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 0e34fe952..e0366512f 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -18,6 +18,8 @@ import ( "github.com/filecoin-project/go-state-types/abi" bstore "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/metrics" @@ -47,6 +49,9 @@ var ( enableDebugLog = false // set this to true if you want to track origin stack traces in the write log enableDebugLogWriteTraces = false + + // upgradeBoundary is the boundary before and after an upgrade where we supress compaction + upgradeBoundary = build.Finality ) func init() { @@ -98,6 +103,12 @@ type ChainAccessor interface { SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error) } +// upgradeRange is a precomputed epoch range during which we shouldn't compact so as to not +// interfere with an upgrade +type upgradeRange struct { + start, end abi.ChainEpoch +} + // hotstore is the interface that must be satisfied by the hot blockstore; it is an extension // of the Blockstore interface with the traits we need for compaction. type hotstore interface { @@ -125,6 +136,8 @@ type SplitStore struct { cold bstore.Blockstore hot hotstore + upgrades []upgradeRange + markSetEnv MarkSetEnv markSetSize int64 @@ -463,10 +476,27 @@ func (s *SplitStore) isWarm() bool { } // State tracking -func (s *SplitStore) Start(chain ChainAccessor) error { +func (s *SplitStore) Start(chain ChainAccessor, us stmgr.UpgradeSchedule) error { s.chain = chain curTs := chain.GetHeaviestTipSet() + // precompute the upgrade boundaries + s.upgrades = make([]upgradeRange, 0, len(us)) + for _, upgrade := range us { + boundary := upgrade.Height + for _, pre := range upgrade.PreMigrations { + preMigrationBoundary := upgrade.Height - pre.StartWithin + if preMigrationBoundary < boundary { + boundary = preMigrationBoundary + } + } + + upgradeStart := boundary - upgradeBoundary + upgradeEnd := upgrade.Height + upgradeBoundary + + s.upgrades = append(s.upgrades, upgradeRange{start: upgradeStart, end: upgradeEnd}) + } + // should we warmup warmup := false diff --git a/blockstore/splitstore/splitstore_compact.go b/blockstore/splitstore/splitstore_compact.go index 4ff38a5fb..3a1fda202 100644 --- a/blockstore/splitstore/splitstore_compact.go +++ b/blockstore/splitstore/splitstore_compact.go @@ -99,6 +99,12 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { return nil } + if s.isNearUpgrade(epoch) { + // we are near an upgrade epoch, supress compaction + atomic.StoreInt32(&s.compacting, 0) + return nil + } + if epoch-s.baseEpoch > CompactionThreshold { // it's time to compact -- prepare the transaction and go! s.beginTxnProtect() @@ -121,6 +127,16 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { return nil } +func (s *SplitStore) isNearUpgrade(epoch abi.ChainEpoch) bool { + for _, upgrade := range s.upgrades { + if epoch >= upgrade.start && epoch <= upgrade.end { + return true + } + } + + return false +} + // transactionally protect incoming tipsets func (s *SplitStore) protectTipSets(apply []*types.TipSet) { s.txnLk.RLock() From 6ce5879071a5f206623c4e822acab4ad69a8561f Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 3 Dec 2021 12:05:15 +0200 Subject: [PATCH 2/4] add unit test for compaction supression --- blockstore/splitstore/splitstore_test.go | 137 ++++++++++++++++++++++- 1 file changed, 136 insertions(+), 1 deletion(-) diff --git a/blockstore/splitstore/splitstore_test.go b/blockstore/splitstore/splitstore_test.go index df9984d41..6d8ff465f 100644 --- a/blockstore/splitstore/splitstore_test.go +++ b/blockstore/splitstore/splitstore_test.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types/mock" @@ -90,7 +91,7 @@ func testSplitStore(t *testing.T, cfg *Config) { return protect(protected.Cid()) }) - err = ss.Start(chain) + err = ss.Start(chain, nil) if err != nil { t.Fatal(err) } @@ -220,6 +221,140 @@ func TestSplitStoreCompactionWithBadger(t *testing.T) { testSplitStore(t, &Config{MarkSetType: "badger"}) } +func TestSplitStoreSupressCompactionNearUpgrade(t *testing.T) { + chain := &mockChain{t: t} + + // the myriads of stores + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + hot := newMockStore() + cold := newMockStore() + + // this is necessary to avoid the garbage mock puts in the blocks + garbage := blocks.NewBlock([]byte{1, 2, 3}) + err := cold.Put(garbage) + if err != nil { + t.Fatal(err) + } + + // genesis + genBlock := mock.MkBlock(nil, 0, 0) + genBlock.Messages = garbage.Cid() + genBlock.ParentMessageReceipts = garbage.Cid() + genBlock.ParentStateRoot = garbage.Cid() + genBlock.Timestamp = uint64(time.Now().Unix()) + + genTs := mock.TipSet(genBlock) + chain.push(genTs) + + // put the genesis block to cold store + blk, err := genBlock.ToStorageBlock() + if err != nil { + t.Fatal(err) + } + + err = cold.Put(blk) + if err != nil { + t.Fatal(err) + } + + // open the splitstore + ss, err := Open("", ds, hot, cold, &Config{MarkSetType: "map"}) + if err != nil { + t.Fatal(err) + } + defer ss.Close() //nolint + + // create an upgrade schedule that will supress compaction during the test + upgradeBoundary = 0 + upgrade := stmgr.Upgrade{ + Height: 10, + PreMigrations: []stmgr.PreMigration{{StartWithin: 10}}, + } + + err = ss.Start(chain, []stmgr.Upgrade{upgrade}) + if err != nil { + t.Fatal(err) + } + + mkBlock := func(curTs *types.TipSet, i int, stateRoot blocks.Block) *types.TipSet { + blk := mock.MkBlock(curTs, uint64(i), uint64(i)) + + blk.Messages = garbage.Cid() + blk.ParentMessageReceipts = garbage.Cid() + blk.ParentStateRoot = stateRoot.Cid() + blk.Timestamp = uint64(time.Now().Unix()) + + sblk, err := blk.ToStorageBlock() + if err != nil { + t.Fatal(err) + } + err = ss.Put(stateRoot) + if err != nil { + t.Fatal(err) + } + err = ss.Put(sblk) + if err != nil { + t.Fatal(err) + } + ts := mock.TipSet(blk) + chain.push(ts) + + return ts + } + + waitForCompaction := func() { + for atomic.LoadInt32(&ss.compacting) == 1 { + time.Sleep(100 * time.Millisecond) + } + } + + curTs := genTs + for i := 1; i < 10; i++ { + stateRoot := blocks.NewBlock([]byte{byte(i), 3, 3, 7}) + curTs = mkBlock(curTs, i, stateRoot) + waitForCompaction() + } + + countBlocks := func(bs blockstore.Blockstore) int { + count := 0 + _ = bs.(blockstore.BlockstoreIterator).ForEachKey(func(_ cid.Cid) error { + count++ + return nil + }) + return count + } + + // we should not have compacted due to suppression and everything should still be hot + hotCnt := countBlocks(hot) + coldCnt := countBlocks(cold) + + if hotCnt != 20 { + t.Errorf("expected %d blocks, but got %d", 20, hotCnt) + } + + if coldCnt != 2 { + t.Errorf("expected %d blocks, but got %d", 2, coldCnt) + } + + // put some more blocks, now we should compact + for i := 10; i < 20; i++ { + stateRoot := blocks.NewBlock([]byte{byte(i), 3, 3, 7}) + curTs = mkBlock(curTs, i, stateRoot) + waitForCompaction() + } + + hotCnt = countBlocks(hot) + coldCnt = countBlocks(cold) + + if hotCnt != 24 { + t.Errorf("expected %d blocks, but got %d", 24, hotCnt) + } + + if coldCnt != 18 { + t.Errorf("expected %d blocks, but got %d", 18, coldCnt) + } +} + type mockChain struct { t testing.TB From 5d6398f20e901657b3244c90e7465173f04484ec Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 3 Dec 2021 12:11:54 +0200 Subject: [PATCH 3/4] hook the upgrade schedule to splitstore start --- node/modules/chain.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/node/modules/chain.go b/node/modules/chain.go index 3518c3b29..b5be24d5d 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -78,6 +78,7 @@ func ChainStore(lc fx.Lifecycle, ds dtypes.MetadataDS, basebs dtypes.BaseBlockstore, weight store.WeightFunc, + us stmgr.UpgradeSchedule, j journal.Journal) *store.ChainStore { chain := store.NewChainStore(cbs, sbs, ds, weight, j) @@ -89,7 +90,7 @@ func ChainStore(lc fx.Lifecycle, var startHook func(context.Context) error if ss, ok := basebs.(*splitstore.SplitStore); ok { startHook = func(_ context.Context) error { - err := ss.Start(chain) + err := ss.Start(chain, us) if err != nil { err = xerrors.Errorf("error starting splitstore: %w", err) } From 489782e21bfac677377329dce1bbf4b42a396b0b Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 3 Dec 2021 12:15:28 +0200 Subject: [PATCH 4/4] satisfy the spellchecker that masquarades as a linter --- blockstore/splitstore/splitstore.go | 2 +- blockstore/splitstore/splitstore_compact.go | 2 +- blockstore/splitstore/splitstore_test.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index e0366512f..145e31d7a 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -50,7 +50,7 @@ var ( // set this to true if you want to track origin stack traces in the write log enableDebugLogWriteTraces = false - // upgradeBoundary is the boundary before and after an upgrade where we supress compaction + // upgradeBoundary is the boundary before and after an upgrade where we suppress compaction upgradeBoundary = build.Finality ) diff --git a/blockstore/splitstore/splitstore_compact.go b/blockstore/splitstore/splitstore_compact.go index 3a1fda202..04c2562fa 100644 --- a/blockstore/splitstore/splitstore_compact.go +++ b/blockstore/splitstore/splitstore_compact.go @@ -100,7 +100,7 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { } if s.isNearUpgrade(epoch) { - // we are near an upgrade epoch, supress compaction + // we are near an upgrade epoch, suppress compaction atomic.StoreInt32(&s.compacting, 0) return nil } diff --git a/blockstore/splitstore/splitstore_test.go b/blockstore/splitstore/splitstore_test.go index 6d8ff465f..f9111a979 100644 --- a/blockstore/splitstore/splitstore_test.go +++ b/blockstore/splitstore/splitstore_test.go @@ -221,7 +221,7 @@ func TestSplitStoreCompactionWithBadger(t *testing.T) { testSplitStore(t, &Config{MarkSetType: "badger"}) } -func TestSplitStoreSupressCompactionNearUpgrade(t *testing.T) { +func TestSplitStoreSuppressCompactionNearUpgrade(t *testing.T) { chain := &mockChain{t: t} // the myriads of stores @@ -264,7 +264,7 @@ func TestSplitStoreSupressCompactionNearUpgrade(t *testing.T) { } defer ss.Close() //nolint - // create an upgrade schedule that will supress compaction during the test + // create an upgrade schedule that will suppress compaction during the test upgradeBoundary = 0 upgrade := stmgr.Upgrade{ Height: 10,