Merge pull request #7734 from filecoin-project/feat/splitstore-upgrade-protection
SplitStore: supress compaction near upgrades
This commit is contained in:
commit
9d143426ee
@ -18,6 +18,8 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
bstore "github.com/filecoin-project/lotus/blockstore"
|
bstore "github.com/filecoin-project/lotus/blockstore"
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/metrics"
|
"github.com/filecoin-project/lotus/metrics"
|
||||||
|
|
||||||
@ -47,6 +49,9 @@ var (
|
|||||||
enableDebugLog = false
|
enableDebugLog = false
|
||||||
// set this to true if you want to track origin stack traces in the write log
|
// set this to true if you want to track origin stack traces in the write log
|
||||||
enableDebugLogWriteTraces = false
|
enableDebugLogWriteTraces = false
|
||||||
|
|
||||||
|
// upgradeBoundary is the boundary before and after an upgrade where we suppress compaction
|
||||||
|
upgradeBoundary = build.Finality
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -98,6 +103,12 @@ type ChainAccessor interface {
|
|||||||
SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error)
|
SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// upgradeRange is a precomputed epoch range during which we shouldn't compact so as to not
|
||||||
|
// interfere with an upgrade
|
||||||
|
type upgradeRange struct {
|
||||||
|
start, end abi.ChainEpoch
|
||||||
|
}
|
||||||
|
|
||||||
// hotstore is the interface that must be satisfied by the hot blockstore; it is an extension
|
// hotstore is the interface that must be satisfied by the hot blockstore; it is an extension
|
||||||
// of the Blockstore interface with the traits we need for compaction.
|
// of the Blockstore interface with the traits we need for compaction.
|
||||||
type hotstore interface {
|
type hotstore interface {
|
||||||
@ -125,6 +136,8 @@ type SplitStore struct {
|
|||||||
cold bstore.Blockstore
|
cold bstore.Blockstore
|
||||||
hot hotstore
|
hot hotstore
|
||||||
|
|
||||||
|
upgrades []upgradeRange
|
||||||
|
|
||||||
markSetEnv MarkSetEnv
|
markSetEnv MarkSetEnv
|
||||||
markSetSize int64
|
markSetSize int64
|
||||||
|
|
||||||
@ -463,10 +476,27 @@ func (s *SplitStore) isWarm() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// State tracking
|
// State tracking
|
||||||
func (s *SplitStore) Start(chain ChainAccessor) error {
|
func (s *SplitStore) Start(chain ChainAccessor, us stmgr.UpgradeSchedule) error {
|
||||||
s.chain = chain
|
s.chain = chain
|
||||||
curTs := chain.GetHeaviestTipSet()
|
curTs := chain.GetHeaviestTipSet()
|
||||||
|
|
||||||
|
// precompute the upgrade boundaries
|
||||||
|
s.upgrades = make([]upgradeRange, 0, len(us))
|
||||||
|
for _, upgrade := range us {
|
||||||
|
boundary := upgrade.Height
|
||||||
|
for _, pre := range upgrade.PreMigrations {
|
||||||
|
preMigrationBoundary := upgrade.Height - pre.StartWithin
|
||||||
|
if preMigrationBoundary < boundary {
|
||||||
|
boundary = preMigrationBoundary
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
upgradeStart := boundary - upgradeBoundary
|
||||||
|
upgradeEnd := upgrade.Height + upgradeBoundary
|
||||||
|
|
||||||
|
s.upgrades = append(s.upgrades, upgradeRange{start: upgradeStart, end: upgradeEnd})
|
||||||
|
}
|
||||||
|
|
||||||
// should we warmup
|
// should we warmup
|
||||||
warmup := false
|
warmup := false
|
||||||
|
|
||||||
|
@ -99,6 +99,12 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if s.isNearUpgrade(epoch) {
|
||||||
|
// we are near an upgrade epoch, suppress compaction
|
||||||
|
atomic.StoreInt32(&s.compacting, 0)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
if epoch-s.baseEpoch > CompactionThreshold {
|
if epoch-s.baseEpoch > CompactionThreshold {
|
||||||
// it's time to compact -- prepare the transaction and go!
|
// it's time to compact -- prepare the transaction and go!
|
||||||
s.beginTxnProtect()
|
s.beginTxnProtect()
|
||||||
@ -121,6 +127,16 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) isNearUpgrade(epoch abi.ChainEpoch) bool {
|
||||||
|
for _, upgrade := range s.upgrades {
|
||||||
|
if epoch >= upgrade.start && epoch <= upgrade.end {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// transactionally protect incoming tipsets
|
// transactionally protect incoming tipsets
|
||||||
func (s *SplitStore) protectTipSets(apply []*types.TipSet) {
|
func (s *SplitStore) protectTipSets(apply []*types.TipSet) {
|
||||||
s.txnLk.RLock()
|
s.txnLk.RLock()
|
||||||
|
@ -11,6 +11,7 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
"github.com/filecoin-project/lotus/blockstore"
|
"github.com/filecoin-project/lotus/blockstore"
|
||||||
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/chain/types/mock"
|
"github.com/filecoin-project/lotus/chain/types/mock"
|
||||||
|
|
||||||
@ -90,7 +91,7 @@ func testSplitStore(t *testing.T, cfg *Config) {
|
|||||||
return protect(protected.Cid())
|
return protect(protected.Cid())
|
||||||
})
|
})
|
||||||
|
|
||||||
err = ss.Start(chain)
|
err = ss.Start(chain, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -220,6 +221,140 @@ func TestSplitStoreCompactionWithBadger(t *testing.T) {
|
|||||||
testSplitStore(t, &Config{MarkSetType: "badger"})
|
testSplitStore(t, &Config{MarkSetType: "badger"})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSplitStoreSuppressCompactionNearUpgrade(t *testing.T) {
|
||||||
|
chain := &mockChain{t: t}
|
||||||
|
|
||||||
|
// the myriads of stores
|
||||||
|
ds := dssync.MutexWrap(datastore.NewMapDatastore())
|
||||||
|
hot := newMockStore()
|
||||||
|
cold := newMockStore()
|
||||||
|
|
||||||
|
// this is necessary to avoid the garbage mock puts in the blocks
|
||||||
|
garbage := blocks.NewBlock([]byte{1, 2, 3})
|
||||||
|
err := cold.Put(garbage)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// genesis
|
||||||
|
genBlock := mock.MkBlock(nil, 0, 0)
|
||||||
|
genBlock.Messages = garbage.Cid()
|
||||||
|
genBlock.ParentMessageReceipts = garbage.Cid()
|
||||||
|
genBlock.ParentStateRoot = garbage.Cid()
|
||||||
|
genBlock.Timestamp = uint64(time.Now().Unix())
|
||||||
|
|
||||||
|
genTs := mock.TipSet(genBlock)
|
||||||
|
chain.push(genTs)
|
||||||
|
|
||||||
|
// put the genesis block to cold store
|
||||||
|
blk, err := genBlock.ToStorageBlock()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = cold.Put(blk)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// open the splitstore
|
||||||
|
ss, err := Open("", ds, hot, cold, &Config{MarkSetType: "map"})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer ss.Close() //nolint
|
||||||
|
|
||||||
|
// create an upgrade schedule that will suppress compaction during the test
|
||||||
|
upgradeBoundary = 0
|
||||||
|
upgrade := stmgr.Upgrade{
|
||||||
|
Height: 10,
|
||||||
|
PreMigrations: []stmgr.PreMigration{{StartWithin: 10}},
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ss.Start(chain, []stmgr.Upgrade{upgrade})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mkBlock := func(curTs *types.TipSet, i int, stateRoot blocks.Block) *types.TipSet {
|
||||||
|
blk := mock.MkBlock(curTs, uint64(i), uint64(i))
|
||||||
|
|
||||||
|
blk.Messages = garbage.Cid()
|
||||||
|
blk.ParentMessageReceipts = garbage.Cid()
|
||||||
|
blk.ParentStateRoot = stateRoot.Cid()
|
||||||
|
blk.Timestamp = uint64(time.Now().Unix())
|
||||||
|
|
||||||
|
sblk, err := blk.ToStorageBlock()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
err = ss.Put(stateRoot)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
err = ss.Put(sblk)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
ts := mock.TipSet(blk)
|
||||||
|
chain.push(ts)
|
||||||
|
|
||||||
|
return ts
|
||||||
|
}
|
||||||
|
|
||||||
|
waitForCompaction := func() {
|
||||||
|
for atomic.LoadInt32(&ss.compacting) == 1 {
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
curTs := genTs
|
||||||
|
for i := 1; i < 10; i++ {
|
||||||
|
stateRoot := blocks.NewBlock([]byte{byte(i), 3, 3, 7})
|
||||||
|
curTs = mkBlock(curTs, i, stateRoot)
|
||||||
|
waitForCompaction()
|
||||||
|
}
|
||||||
|
|
||||||
|
countBlocks := func(bs blockstore.Blockstore) int {
|
||||||
|
count := 0
|
||||||
|
_ = bs.(blockstore.BlockstoreIterator).ForEachKey(func(_ cid.Cid) error {
|
||||||
|
count++
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return count
|
||||||
|
}
|
||||||
|
|
||||||
|
// we should not have compacted due to suppression and everything should still be hot
|
||||||
|
hotCnt := countBlocks(hot)
|
||||||
|
coldCnt := countBlocks(cold)
|
||||||
|
|
||||||
|
if hotCnt != 20 {
|
||||||
|
t.Errorf("expected %d blocks, but got %d", 20, hotCnt)
|
||||||
|
}
|
||||||
|
|
||||||
|
if coldCnt != 2 {
|
||||||
|
t.Errorf("expected %d blocks, but got %d", 2, coldCnt)
|
||||||
|
}
|
||||||
|
|
||||||
|
// put some more blocks, now we should compact
|
||||||
|
for i := 10; i < 20; i++ {
|
||||||
|
stateRoot := blocks.NewBlock([]byte{byte(i), 3, 3, 7})
|
||||||
|
curTs = mkBlock(curTs, i, stateRoot)
|
||||||
|
waitForCompaction()
|
||||||
|
}
|
||||||
|
|
||||||
|
hotCnt = countBlocks(hot)
|
||||||
|
coldCnt = countBlocks(cold)
|
||||||
|
|
||||||
|
if hotCnt != 24 {
|
||||||
|
t.Errorf("expected %d blocks, but got %d", 24, hotCnt)
|
||||||
|
}
|
||||||
|
|
||||||
|
if coldCnt != 18 {
|
||||||
|
t.Errorf("expected %d blocks, but got %d", 18, coldCnt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type mockChain struct {
|
type mockChain struct {
|
||||||
t testing.TB
|
t testing.TB
|
||||||
|
|
||||||
|
@ -78,6 +78,7 @@ func ChainStore(lc fx.Lifecycle,
|
|||||||
ds dtypes.MetadataDS,
|
ds dtypes.MetadataDS,
|
||||||
basebs dtypes.BaseBlockstore,
|
basebs dtypes.BaseBlockstore,
|
||||||
weight store.WeightFunc,
|
weight store.WeightFunc,
|
||||||
|
us stmgr.UpgradeSchedule,
|
||||||
j journal.Journal) *store.ChainStore {
|
j journal.Journal) *store.ChainStore {
|
||||||
|
|
||||||
chain := store.NewChainStore(cbs, sbs, ds, weight, j)
|
chain := store.NewChainStore(cbs, sbs, ds, weight, j)
|
||||||
@ -89,7 +90,7 @@ func ChainStore(lc fx.Lifecycle,
|
|||||||
var startHook func(context.Context) error
|
var startHook func(context.Context) error
|
||||||
if ss, ok := basebs.(*splitstore.SplitStore); ok {
|
if ss, ok := basebs.(*splitstore.SplitStore); ok {
|
||||||
startHook = func(_ context.Context) error {
|
startHook = func(_ context.Context) error {
|
||||||
err := ss.Start(chain)
|
err := ss.Start(chain, us)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = xerrors.Errorf("error starting splitstore: %w", err)
|
err = xerrors.Errorf("error starting splitstore: %w", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user