core, accounts, eth, trie: handle genesis state missing (#28171)

* core, accounts, eth, trie: handle genesis state missing

* core, eth, trie: polish

* core: manage txpool subscription in mainpool

* eth/backend: fix test

* cmd, eth: fix test

* core/rawdb, trie/triedb/pathdb: address comments

* eth, trie: address comments

* eth: inline the function

* eth: use synced flag

* core/txpool: revert changes in txpool

* core, eth, trie: rename functions
This commit is contained in:
rjl493456442 2023-09-28 15:00:53 +08:00 committed by GitHub
parent a081130081
commit 73f5bcb75b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 244 additions and 148 deletions

View File

@ -199,7 +199,6 @@ func (b *SimulatedBackend) CodeAt(ctx context.Context, contract common.Address,
if err != nil { if err != nil {
return nil, err return nil, err
} }
return stateDB.GetCode(contract), nil return stateDB.GetCode(contract), nil
} }
@ -212,7 +211,6 @@ func (b *SimulatedBackend) BalanceAt(ctx context.Context, contract common.Addres
if err != nil { if err != nil {
return nil, err return nil, err
} }
return stateDB.GetBalance(contract), nil return stateDB.GetBalance(contract), nil
} }
@ -225,7 +223,6 @@ func (b *SimulatedBackend) NonceAt(ctx context.Context, contract common.Address,
if err != nil { if err != nil {
return 0, err return 0, err
} }
return stateDB.GetNonce(contract), nil return stateDB.GetNonce(contract), nil
} }
@ -238,7 +235,6 @@ func (b *SimulatedBackend) StorageAt(ctx context.Context, contract common.Addres
if err != nil { if err != nil {
return nil, err return nil, err
} }
val := stateDB.GetState(contract, key) val := stateDB.GetState(contract, key)
return val[:], nil return val[:], nil
} }
@ -700,8 +696,10 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa
} }
block.AddTxWithChain(b.blockchain, tx) block.AddTxWithChain(b.blockchain, tx)
}) })
stateDB, _ := b.blockchain.State() stateDB, err := b.blockchain.State()
if err != nil {
return err
}
b.pendingBlock = blocks[0] b.pendingBlock = blocks[0]
b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database(), nil) b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database(), nil)
b.pendingReceipts = receipts[0] b.pendingReceipts = receipts[0]
@ -821,11 +819,12 @@ func (b *SimulatedBackend) AdjustTime(adjustment time.Duration) error {
blocks, _ := core.GenerateChain(b.config, block, ethash.NewFaker(), b.database, 1, func(number int, block *core.BlockGen) { blocks, _ := core.GenerateChain(b.config, block, ethash.NewFaker(), b.database, 1, func(number int, block *core.BlockGen) {
block.OffsetTime(int64(adjustment.Seconds())) block.OffsetTime(int64(adjustment.Seconds()))
}) })
stateDB, _ := b.blockchain.State() stateDB, err := b.blockchain.State()
if err != nil {
return err
}
b.pendingBlock = blocks[0] b.pendingBlock = blocks[0]
b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database(), nil) b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database(), nil)
return nil return nil
} }

View File

@ -120,6 +120,7 @@ func setupGeth(stack *node.Node) error {
if err != nil { if err != nil {
return err return err
} }
backend.SetSynced()
_, err = backend.BlockChain().InsertChain(chain.blocks[1:]) _, err = backend.BlockChain().InsertChain(chain.blocks[1:])
return err return err

View File

@ -337,17 +337,17 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
if err := bc.loadLastState(); err != nil { if err := bc.loadLastState(); err != nil {
return nil, err return nil, err
} }
// Make sure the state associated with the block is available // Make sure the state associated with the block is available, or log out
// if there is no available state, waiting for state sync.
head := bc.CurrentBlock() head := bc.CurrentBlock()
if !bc.HasState(head.Root) { if !bc.HasState(head.Root) {
if head.Number.Uint64() == 0 { if head.Number.Uint64() == 0 {
// The genesis state is missing, which is only possible in the path-based // The genesis state is missing, which is only possible in the path-based
// scheme. This situation occurs when the state syncer overwrites it. // scheme. This situation occurs when the initial state sync is not finished
// // yet, or the chain head is rewound below the pivot point. In both scenario,
// The solution is to reset the state to the genesis state. Although it may not // there is no possible recovery approach except for rerunning a snap sync.
// match the sync target, the state healer will later address and correct any // Do nothing here until the state syncer picks it up.
// inconsistencies. log.Info("Genesis state is missing, wait state sync")
bc.resetState()
} else { } else {
// Head state is missing, before the state recovery, find out the // Head state is missing, before the state recovery, find out the
// disk layer point of snapshot(if it's enabled). Make sure the // disk layer point of snapshot(if it's enabled). Make sure the
@ -630,28 +630,6 @@ func (bc *BlockChain) SetSafe(header *types.Header) {
} }
} }
// resetState resets the persistent state to genesis state if it's not present.
func (bc *BlockChain) resetState() {
// Short circuit if the genesis state is already present.
root := bc.genesisBlock.Root()
if bc.HasState(root) {
return
}
// Reset the state database to empty for committing genesis state.
// Note, it should only happen in path-based scheme and Reset function
// is also only call-able in this mode.
if bc.triedb.Scheme() == rawdb.PathScheme {
if err := bc.triedb.Reset(types.EmptyRootHash); err != nil {
log.Crit("Failed to clean state", "err", err) // Shouldn't happen
}
}
// Write genesis state into database.
if err := CommitGenesisState(bc.db, bc.triedb, bc.genesisBlock.Hash()); err != nil {
log.Crit("Failed to commit genesis state", "err", err)
}
log.Info("Reset state to genesis", "root", root)
}
// setHeadBeyondRoot rewinds the local chain to a new head with the extra condition // setHeadBeyondRoot rewinds the local chain to a new head with the extra condition
// that the rewind must pass the specified state root. This method is meant to be // that the rewind must pass the specified state root. This method is meant to be
// used when rewinding with snapshots enabled to ensure that we go back further than // used when rewinding with snapshots enabled to ensure that we go back further than
@ -687,7 +665,6 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
if newHeadBlock == nil { if newHeadBlock == nil {
log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash()) log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash())
newHeadBlock = bc.genesisBlock newHeadBlock = bc.genesisBlock
bc.resetState()
} else { } else {
// Block exists, keep rewinding until we find one with state, // Block exists, keep rewinding until we find one with state,
// keeping rewinding until we exceed the optional threshold // keeping rewinding until we exceed the optional threshold
@ -715,16 +692,14 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
} }
} }
if beyondRoot || newHeadBlock.NumberU64() == 0 { if beyondRoot || newHeadBlock.NumberU64() == 0 {
if newHeadBlock.NumberU64() == 0 { if !bc.HasState(newHeadBlock.Root()) && bc.stateRecoverable(newHeadBlock.Root()) {
bc.resetState()
} else if !bc.HasState(newHeadBlock.Root()) {
// Rewind to a block with recoverable state. If the state is // Rewind to a block with recoverable state. If the state is
// missing, run the state recovery here. // missing, run the state recovery here.
if err := bc.triedb.Recover(newHeadBlock.Root()); err != nil { if err := bc.triedb.Recover(newHeadBlock.Root()); err != nil {
log.Crit("Failed to rollback state", "err", err) // Shouldn't happen log.Crit("Failed to rollback state", "err", err) // Shouldn't happen
} }
}
log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash()) log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
}
break break
} }
log.Debug("Skipping block with threshold state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "root", newHeadBlock.Root()) log.Debug("Skipping block with threshold state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "root", newHeadBlock.Root())
@ -739,6 +714,15 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
// to low, so it's safe to update in-memory markers directly. // to low, so it's safe to update in-memory markers directly.
bc.currentBlock.Store(newHeadBlock.Header()) bc.currentBlock.Store(newHeadBlock.Header())
headBlockGauge.Update(int64(newHeadBlock.NumberU64())) headBlockGauge.Update(int64(newHeadBlock.NumberU64()))
// The head state is missing, which is only possible in the path-based
// scheme. This situation occurs when the chain head is rewound below
// the pivot point. In this scenario, there is no possible recovery
// approach except for rerunning a snap sync. Do nothing here until the
// state syncer picks it up.
if !bc.HasState(newHeadBlock.Root()) {
log.Info("Chain is stateless, wait state sync", "number", newHeadBlock.Number(), "hash", newHeadBlock.Hash())
}
} }
// Rewind the snap block in a simpleton way to the target head // Rewind the snap block in a simpleton way to the target head
if currentSnapBlock := bc.CurrentSnapBlock(); currentSnapBlock != nil && header.Number.Uint64() < currentSnapBlock.Number.Uint64() { if currentSnapBlock := bc.CurrentSnapBlock(); currentSnapBlock != nil && header.Number.Uint64() < currentSnapBlock.Number.Uint64() {
@ -838,7 +822,7 @@ func (bc *BlockChain) SnapSyncCommitHead(hash common.Hash) error {
// Reset the trie database with the fresh snap synced state. // Reset the trie database with the fresh snap synced state.
root := block.Root() root := block.Root()
if bc.triedb.Scheme() == rawdb.PathScheme { if bc.triedb.Scheme() == rawdb.PathScheme {
if err := bc.triedb.Reset(root); err != nil { if err := bc.triedb.Enable(root); err != nil {
return err return err
} }
} }

View File

@ -76,3 +76,25 @@ func DeleteSkeletonHeader(db ethdb.KeyValueWriter, number uint64) {
log.Crit("Failed to delete skeleton header", "err", err) log.Crit("Failed to delete skeleton header", "err", err)
} }
} }
const (
StateSyncUnknown = uint8(0) // flags the state snap sync is unknown
StateSyncRunning = uint8(1) // flags the state snap sync is not completed yet
StateSyncFinished = uint8(2) // flags the state snap sync is completed
)
// ReadSnapSyncStatusFlag retrieves the state snap sync status flag.
func ReadSnapSyncStatusFlag(db ethdb.KeyValueReader) uint8 {
blob, err := db.Get(snapSyncStatusFlagKey)
if err != nil || len(blob) != 1 {
return StateSyncUnknown
}
return blob[0]
}
// WriteSnapSyncStatusFlag stores the state snap sync status flag into database.
func WriteSnapSyncStatusFlag(db ethdb.KeyValueWriter, flag uint8) {
if err := db.Put(snapSyncStatusFlagKey, []byte{flag}); err != nil {
log.Crit("Failed to store sync status flag", "err", err)
}
}

View File

@ -555,7 +555,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
lastPivotKey, fastTrieProgressKey, snapshotDisabledKey, SnapshotRootKey, snapshotJournalKey, lastPivotKey, fastTrieProgressKey, snapshotDisabledKey, SnapshotRootKey, snapshotJournalKey,
snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey, snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey,
uncleanShutdownKey, badBlockKey, transitionStatusKey, skeletonSyncStatusKey, uncleanShutdownKey, badBlockKey, transitionStatusKey, skeletonSyncStatusKey,
persistentStateIDKey, trieJournalKey, snapshotSyncStatusKey, persistentStateIDKey, trieJournalKey, snapshotSyncStatusKey, snapSyncStatusFlagKey,
} { } {
if bytes.Equal(key, meta) { if bytes.Equal(key, meta) {
metadata.Add(size) metadata.Add(size)

View File

@ -91,6 +91,9 @@ var (
// transitionStatusKey tracks the eth2 transition status. // transitionStatusKey tracks the eth2 transition status.
transitionStatusKey = []byte("eth2-transition") transitionStatusKey = []byte("eth2-transition")
// snapSyncStatusFlagKey flags that status of snap sync.
snapSyncStatusFlagKey = []byte("SnapSyncStatus")
// Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes). // Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes).
headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header
headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td

View File

@ -355,7 +355,13 @@ func (p *BlobPool) Init(gasTip *big.Int, head *types.Header, reserve txpool.Addr
return err return err
} }
} }
// Initialize the state with head block, or fallback to empty one in
// case the head state is not available(might occur when node is not
// fully synced).
state, err := p.chain.StateAt(head.Root) state, err := p.chain.StateAt(head.Root)
if err != nil {
state, err = p.chain.StateAt(types.EmptyRootHash)
}
if err != nil { if err != nil {
return err return err
} }

View File

@ -298,7 +298,20 @@ func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header, reserve txpool
// Set the basic pool parameters // Set the basic pool parameters
pool.gasTip.Store(gasTip) pool.gasTip.Store(gasTip)
pool.reset(nil, head)
// Initialize the state with head block, or fallback to empty one in
// case the head state is not available(might occur when node is not
// fully synced).
statedb, err := pool.chain.StateAt(head.Root)
if err != nil {
statedb, err = pool.chain.StateAt(types.EmptyRootHash)
}
if err != nil {
return err
}
pool.currentHead.Store(head)
pool.currentState = statedb
pool.pendingNonces = newNoncer(statedb)
// Start the reorg loop early, so it can handle requests generated during // Start the reorg loop early, so it can handle requests generated during
// journal loading. // journal loading.

View File

@ -204,7 +204,10 @@ func (b *EthAPIBackend) StateAndHeaderByNumber(ctx context.Context, number rpc.B
return nil, nil, errors.New("header not found") return nil, nil, errors.New("header not found")
} }
stateDb, err := b.eth.BlockChain().StateAt(header.Root) stateDb, err := b.eth.BlockChain().StateAt(header.Root)
return stateDb, header, err if err != nil {
return nil, nil, err
}
return stateDb, header, nil
} }
func (b *EthAPIBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error) { func (b *EthAPIBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error) {
@ -223,7 +226,10 @@ func (b *EthAPIBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockN
return nil, nil, errors.New("hash is not currently canonical") return nil, nil, errors.New("hash is not currently canonical")
} }
stateDb, err := b.eth.BlockChain().StateAt(header.Root) stateDb, err := b.eth.BlockChain().StateAt(header.Root)
return stateDb, header, err if err != nil {
return nil, nil, err
}
return stateDb, header, nil
} }
return nil, nil, errors.New("invalid arguments; neither block nor hash specified") return nil, nil, errors.New("invalid arguments; neither block nor hash specified")
} }

View File

@ -474,7 +474,7 @@ func (s *Ethereum) Engine() consensus.Engine { return s.engine }
func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb } func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb }
func (s *Ethereum) IsListening() bool { return true } // Always listening func (s *Ethereum) IsListening() bool { return true } // Always listening
func (s *Ethereum) Downloader() *downloader.Downloader { return s.handler.downloader } func (s *Ethereum) Downloader() *downloader.Downloader { return s.handler.downloader }
func (s *Ethereum) Synced() bool { return s.handler.acceptTxs.Load() } func (s *Ethereum) Synced() bool { return s.handler.synced.Load() }
func (s *Ethereum) SetSynced() { s.handler.enableSyncedFeatures() } func (s *Ethereum) SetSynced() { s.handler.enableSyncedFeatures() }
func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning } func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning }
func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer } func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer }

View File

@ -403,7 +403,9 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int,
// subsequent state reads, explicitly disable the trie database and state // subsequent state reads, explicitly disable the trie database and state
// syncer is responsible to address and correct any state missing. // syncer is responsible to address and correct any state missing.
if d.blockchain.TrieDB().Scheme() == rawdb.PathScheme { if d.blockchain.TrieDB().Scheme() == rawdb.PathScheme {
d.blockchain.TrieDB().Reset(types.EmptyRootHash) if err := d.blockchain.TrieDB().Disable(); err != nil {
return err
}
} }
// Snap sync uses the snapshot namespace to store potentially flaky data until // Snap sync uses the snapshot namespace to store potentially flaky data until
// sync completely heals and finishes. Pause snapshot maintenance in the mean- // sync completely heals and finishes. Pause snapshot maintenance in the mean-

View File

@ -101,7 +101,7 @@ type handler struct {
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
snapSync atomic.Bool // Flag whether snap sync is enabled (gets disabled if we already have blocks) snapSync atomic.Bool // Flag whether snap sync is enabled (gets disabled if we already have blocks)
acceptTxs atomic.Bool // Flag whether we're considered synchronised (enables transaction processing) synced atomic.Bool // Flag whether we're considered synchronised (enables transaction processing)
database ethdb.Database database ethdb.Database
txpool txPool txpool txPool
@ -163,32 +163,24 @@ func newHandler(config *handlerConfig) (*handler, error) {
fullBlock, snapBlock := h.chain.CurrentBlock(), h.chain.CurrentSnapBlock() fullBlock, snapBlock := h.chain.CurrentBlock(), h.chain.CurrentSnapBlock()
if fullBlock.Number.Uint64() == 0 && snapBlock.Number.Uint64() > 0 { if fullBlock.Number.Uint64() == 0 && snapBlock.Number.Uint64() > 0 {
h.snapSync.Store(true) h.snapSync.Store(true)
log.Warn("Switch sync mode from full sync to snap sync") log.Warn("Switch sync mode from full sync to snap sync", "reason", "snap sync incomplete")
} else if !h.chain.HasState(fullBlock.Root) {
h.snapSync.Store(true)
log.Warn("Switch sync mode from full sync to snap sync", "reason", "head state missing")
} }
} else { } else {
if h.chain.CurrentBlock().Number.Uint64() > 0 { head := h.chain.CurrentBlock()
if head.Number.Uint64() > 0 && h.chain.HasState(head.Root) {
// Print warning log if database is not empty to run snap sync. // Print warning log if database is not empty to run snap sync.
log.Warn("Switch sync mode from snap sync to full sync") log.Warn("Switch sync mode from snap sync to full sync", "reason", "snap sync complete")
} else { } else {
// If snap sync was requested and our database is empty, grant it // If snap sync was requested and our database is empty, grant it
h.snapSync.Store(true) h.snapSync.Store(true)
log.Info("Enabled snap sync", "head", head.Number, "hash", head.Hash())
} }
} }
// If sync succeeds, pass a callback to potentially disable snap sync mode
// and enable transaction propagation.
success := func() {
// If we were running snap sync and it finished, disable doing another
// round on next sync cycle
if h.snapSync.Load() {
log.Info("Snap sync complete, auto disabling")
h.snapSync.Store(false)
}
// If we've successfully finished a sync cycle, accept transactions from
// the network
h.enableSyncedFeatures()
}
// Construct the downloader (long sync) // Construct the downloader (long sync)
h.downloader = downloader.New(config.Database, h.eventMux, h.chain, nil, h.removePeer, success) h.downloader = downloader.New(config.Database, h.eventMux, h.chain, nil, h.removePeer, h.enableSyncedFeatures)
if ttd := h.chain.Config().TerminalTotalDifficulty; ttd != nil { if ttd := h.chain.Config().TerminalTotalDifficulty; ttd != nil {
if h.chain.Config().TerminalTotalDifficultyPassed { if h.chain.Config().TerminalTotalDifficultyPassed {
log.Info("Chain post-merge, sync via beacon client") log.Info("Chain post-merge, sync via beacon client")
@ -245,8 +237,8 @@ func newHandler(config *handlerConfig) (*handler, error) {
// accept each others' blocks until a restart. Unfortunately we haven't figured // accept each others' blocks until a restart. Unfortunately we haven't figured
// out a way yet where nodes can decide unilaterally whether the network is new // out a way yet where nodes can decide unilaterally whether the network is new
// or not. This should be fixed if we figure out a solution. // or not. This should be fixed if we figure out a solution.
if h.snapSync.Load() { if !h.synced.Load() {
log.Warn("Snap syncing, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash()) log.Warn("Syncing, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
return 0, nil return 0, nil
} }
if h.merger.TDDReached() { if h.merger.TDDReached() {
@ -272,11 +264,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
} }
return 0, nil return 0, nil
} }
n, err := h.chain.InsertChain(blocks) return h.chain.InsertChain(blocks)
if err == nil {
h.enableSyncedFeatures() // Mark initial sync done on any fetcher import
}
return n, err
} }
h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer) h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer)
@ -680,7 +668,15 @@ func (h *handler) txBroadcastLoop() {
// enableSyncedFeatures enables the post-sync functionalities when the initial // enableSyncedFeatures enables the post-sync functionalities when the initial
// sync is finished. // sync is finished.
func (h *handler) enableSyncedFeatures() { func (h *handler) enableSyncedFeatures() {
h.acceptTxs.Store(true) // Mark the local node as synced.
h.synced.Store(true)
// If we were running snap sync and it finished, disable doing another
// round on next sync cycle
if h.snapSync.Load() {
log.Info("Snap sync complete, auto disabling")
h.snapSync.Store(false)
}
if h.chain.TrieDB().Scheme() == rawdb.PathScheme { if h.chain.TrieDB().Scheme() == rawdb.PathScheme {
h.chain.TrieDB().SetBufferSize(pathdb.DefaultBufferSize) h.chain.TrieDB().SetBufferSize(pathdb.DefaultBufferSize)
} }

View File

@ -51,7 +51,7 @@ func (h *ethHandler) PeerInfo(id enode.ID) interface{} {
// AcceptTxs retrieves whether transaction processing is enabled on the node // AcceptTxs retrieves whether transaction processing is enabled on the node
// or if inbound transactions should simply be dropped. // or if inbound transactions should simply be dropped.
func (h *ethHandler) AcceptTxs() bool { func (h *ethHandler) AcceptTxs() bool {
return h.acceptTxs.Load() return h.synced.Load()
} }
// Handle is invoked from a peer's message handler when it receives a new remote // Handle is invoked from a peer's message handler when it receives a new remote

View File

@ -248,7 +248,7 @@ func testRecvTransactions(t *testing.T, protocol uint) {
handler := newTestHandler() handler := newTestHandler()
defer handler.close() defer handler.close()
handler.handler.acceptTxs.Store(true) // mark synced to accept transactions handler.handler.synced.Store(true) // mark synced to accept transactions
txs := make(chan core.NewTxsEvent) txs := make(chan core.NewTxsEvent)
sub := handler.txpool.SubscribeNewTxsEvent(txs) sub := handler.txpool.SubscribeNewTxsEvent(txs)
@ -401,7 +401,7 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
sinks[i] = newTestHandler() sinks[i] = newTestHandler()
defer sinks[i].close() defer sinks[i].close()
sinks[i].handler.acceptTxs.Store(true) // mark synced to accept transactions sinks[i].handler.synced.Store(true) // mark synced to accept transactions
} }
// Interconnect all the sink handlers with the source handler // Interconnect all the sink handlers with the source handler
for i, sink := range sinks { for i, sink := range sinks {

View File

@ -197,16 +197,25 @@ func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) {
return downloader.SnapSync, td return downloader.SnapSync, td
} }
// We are probably in full sync, but we might have rewound to before the // We are probably in full sync, but we might have rewound to before the
// snap sync pivot, check if we should reenable // snap sync pivot, check if we should re-enable snap sync.
head := cs.handler.chain.CurrentBlock()
if pivot := rawdb.ReadLastPivotNumber(cs.handler.database); pivot != nil { if pivot := rawdb.ReadLastPivotNumber(cs.handler.database); pivot != nil {
if head := cs.handler.chain.CurrentBlock(); head.Number.Uint64() < *pivot { if head.Number.Uint64() < *pivot {
block := cs.handler.chain.CurrentSnapBlock() block := cs.handler.chain.CurrentSnapBlock()
td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64()) td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64())
return downloader.SnapSync, td return downloader.SnapSync, td
} }
} }
// We are in a full sync, but the associated head state is missing. To complete
// the head state, forcefully rerun the snap sync. Note it doesn't mean the
// persistent state is corrupted, just mismatch with the head block.
if !cs.handler.chain.HasState(head.Root) {
block := cs.handler.chain.CurrentSnapBlock()
td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64())
log.Info("Reenabled snap sync as chain is stateless")
return downloader.SnapSync, td
}
// Nope, we're really full syncing // Nope, we're really full syncing
head := cs.handler.chain.CurrentBlock()
td := cs.handler.chain.GetTd(head.Hash(), head.Number.Uint64()) td := cs.handler.chain.GetTd(head.Hash(), head.Number.Uint64())
return downloader.FullSync, td return downloader.FullSync, td
} }
@ -242,13 +251,7 @@ func (h *handler) doSync(op *chainSyncOp) error {
if err != nil { if err != nil {
return err return err
} }
if h.snapSync.Load() { h.enableSyncedFeatures()
log.Info("Snap sync complete, auto disabling")
h.snapSync.Store(false)
}
// If we've successfully finished a sync cycle, enable accepting transactions
// from the network.
h.acceptTxs.Store(true)
head := h.chain.CurrentBlock() head := h.chain.CurrentBlock()
if head.Number.Uint64() > 0 { if head.Number.Uint64() > 0 {

View File

@ -64,6 +64,7 @@ func (m *mockBackend) StateAtBlock(block *types.Block, reexec uint64, base *stat
} }
type testBlockChain struct { type testBlockChain struct {
root common.Hash
config *params.ChainConfig config *params.ChainConfig
statedb *state.StateDB statedb *state.StateDB
gasLimit uint64 gasLimit uint64
@ -89,6 +90,10 @@ func (bc *testBlockChain) StateAt(common.Hash) (*state.StateDB, error) {
return bc.statedb, nil return bc.statedb, nil
} }
func (bc *testBlockChain) HasState(root common.Hash) bool {
return bc.root == root
}
func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
return bc.chainHeadFeed.Subscribe(ch) return bc.chainHeadFeed.Subscribe(ch)
} }
@ -302,7 +307,7 @@ func createMiner(t *testing.T) (*Miner, *event.TypeMux, func(skipMiner bool)) {
t.Fatalf("can't create new chain %v", err) t.Fatalf("can't create new chain %v", err)
} }
statedb, _ := state.New(bc.Genesis().Root(), bc.StateCache(), nil) statedb, _ := state.New(bc.Genesis().Root(), bc.StateCache(), nil)
blockchain := &testBlockChain{chainConfig, statedb, 10000000, new(event.Feed)} blockchain := &testBlockChain{bc.Genesis().Root(), chainConfig, statedb, 10000000, new(event.Feed)}
pool := legacypool.New(testTxPoolConfig, blockchain) pool := legacypool.New(testTxPoolConfig, blockchain)
txpool, _ := txpool.New(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain, []txpool.SubPool{pool}) txpool, _ := txpool.New(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain, []txpool.SubPool{pool})

View File

@ -273,15 +273,27 @@ func (db *Database) Recoverable(root common.Hash) (bool, error) {
return pdb.Recoverable(root), nil return pdb.Recoverable(root), nil
} }
// Reset wipes all available journal from the persistent database and discard // Disable deactivates the database and invalidates all available state layers
// all caches and diff layers. Using the given root to create a new disk layer. // as stale to prevent access to the persistent state, which is in the syncing
// stage.
//
// It's only supported by path-based database and will return an error for others. // It's only supported by path-based database and will return an error for others.
func (db *Database) Reset(root common.Hash) error { func (db *Database) Disable() error {
pdb, ok := db.backend.(*pathdb.Database) pdb, ok := db.backend.(*pathdb.Database)
if !ok { if !ok {
return errors.New("not supported") return errors.New("not supported")
} }
return pdb.Reset(root) return pdb.Disable()
}
// Enable activates database and resets the state tree with the provided persistent
// state root once the state sync is finished.
func (db *Database) Enable(root common.Hash) error {
pdb, ok := db.backend.(*pathdb.Database)
if !ok {
return errors.New("not supported")
}
return pdb.Enable(root)
} }
// Journal commits an entire diff hierarchy to disk into a single journal entry. // Journal commits an entire diff hierarchy to disk into a single journal entry.

View File

@ -128,7 +128,8 @@ type Database struct {
// readOnly is the flag whether the mutation is allowed to be applied. // readOnly is the flag whether the mutation is allowed to be applied.
// It will be set automatically when the database is journaled during // It will be set automatically when the database is journaled during
// the shutdown to reject all following unexpected mutations. // the shutdown to reject all following unexpected mutations.
readOnly bool // Indicator if database is opened in read only mode readOnly bool // Flag if database is opened in read only mode
waitSync bool // Flag if database is deactivated due to initial state sync
bufferSize int // Memory allowance (in bytes) for caching dirty nodes bufferSize int // Memory allowance (in bytes) for caching dirty nodes
config *Config // Configuration for database config *Config // Configuration for database
diskdb ethdb.Database // Persistent storage for matured trie nodes diskdb ethdb.Database // Persistent storage for matured trie nodes
@ -179,6 +180,12 @@ func New(diskdb ethdb.Database, config *Config) *Database {
log.Warn("Truncated extra state histories", "number", pruned) log.Warn("Truncated extra state histories", "number", pruned)
} }
} }
// Disable database in case node is still in the initial state sync stage.
if rawdb.ReadSnapSyncStatusFlag(diskdb) == rawdb.StateSyncRunning && !db.readOnly {
if err := db.Disable(); err != nil {
log.Crit("Failed to disable database", "err", err) // impossible to happen
}
}
log.Warn("Path-based state scheme is an experimental feature") log.Warn("Path-based state scheme is an experimental feature")
return db return db
} }
@ -204,9 +211,9 @@ func (db *Database) Update(root common.Hash, parentRoot common.Hash, block uint6
db.lock.Lock() db.lock.Lock()
defer db.lock.Unlock() defer db.lock.Unlock()
// Short circuit if the database is in read only mode. // Short circuit if the mutation is not allowed.
if db.readOnly { if err := db.modifyAllowed(); err != nil {
return errSnapshotReadOnly return err
} }
if err := db.tree.add(root, parentRoot, block, nodes, states); err != nil { if err := db.tree.add(root, parentRoot, block, nodes, states); err != nil {
return err return err
@ -227,45 +234,59 @@ func (db *Database) Commit(root common.Hash, report bool) error {
db.lock.Lock() db.lock.Lock()
defer db.lock.Unlock() defer db.lock.Unlock()
// Short circuit if the database is in read only mode. // Short circuit if the mutation is not allowed.
if db.readOnly { if err := db.modifyAllowed(); err != nil {
return errSnapshotReadOnly return err
} }
return db.tree.cap(root, 0) return db.tree.cap(root, 0)
} }
// Reset rebuilds the database with the specified state as the base. // Disable deactivates the database and invalidates all available state layers
// // as stale to prevent access to the persistent state, which is in the syncing
// - if target state is empty, clear the stored state and all layers on top // stage.
// - if target state is non-empty, ensure the stored state matches with it func (db *Database) Disable() error {
// and clear all other layers on top.
func (db *Database) Reset(root common.Hash) error {
db.lock.Lock() db.lock.Lock()
defer db.lock.Unlock() defer db.lock.Unlock()
// Short circuit if the database is in read only mode. // Short circuit if the database is in read only mode.
if db.readOnly { if db.readOnly {
return errSnapshotReadOnly return errDatabaseReadOnly
} }
batch := db.diskdb.NewBatch() // Prevent duplicated disable operation.
root = types.TrieRootHash(root) if db.waitSync {
if root == types.EmptyRootHash { log.Error("Reject duplicated disable operation")
// Empty state is requested as the target, nuke out return nil
// the root node and leave all others as dangling.
rawdb.DeleteAccountTrieNode(batch, nil)
} else {
// Ensure the requested state is existent before any
// action is applied.
_, hash := rawdb.ReadAccountTrieNode(db.diskdb, nil)
if hash != root {
return fmt.Errorf("state is mismatched, local: %x, target: %x", hash, root)
} }
} db.waitSync = true
// Mark the disk layer as stale before applying any mutation.
// Mark the disk layer as stale to prevent access to persistent state.
db.tree.bottom().markStale() db.tree.bottom().markStale()
// Write the initial sync flag to persist it across restarts.
rawdb.WriteSnapSyncStatusFlag(db.diskdb, rawdb.StateSyncRunning)
log.Info("Disabled trie database due to state sync")
return nil
}
// Enable activates database and resets the state tree with the provided persistent
// state root once the state sync is finished.
func (db *Database) Enable(root common.Hash) error {
db.lock.Lock()
defer db.lock.Unlock()
// Short circuit if the database is in read only mode.
if db.readOnly {
return errDatabaseReadOnly
}
// Ensure the provided state root matches the stored one.
root = types.TrieRootHash(root)
_, stored := rawdb.ReadAccountTrieNode(db.diskdb, nil)
if stored != root {
return fmt.Errorf("state root mismatch: stored %x, synced %x", stored, root)
}
// Drop the stale state journal in persistent database and // Drop the stale state journal in persistent database and
// reset the persistent state id back to zero. // reset the persistent state id back to zero.
batch := db.diskdb.NewBatch()
rawdb.DeleteTrieJournal(batch) rawdb.DeleteTrieJournal(batch)
rawdb.WritePersistentStateID(batch, 0) rawdb.WritePersistentStateID(batch, 0)
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {
@ -282,8 +303,11 @@ func (db *Database) Reset(root common.Hash) error {
} }
// Re-construct a new disk layer backed by persistent state // Re-construct a new disk layer backed by persistent state
// with **empty clean cache and node buffer**. // with **empty clean cache and node buffer**.
dl := newDiskLayer(root, 0, db, nil, newNodeBuffer(db.bufferSize, nil, 0)) db.tree.reset(newDiskLayer(root, 0, db, nil, newNodeBuffer(db.bufferSize, nil, 0)))
db.tree.reset(dl)
// Re-enable the database as the final step.
db.waitSync = false
rawdb.WriteSnapSyncStatusFlag(db.diskdb, rawdb.StateSyncFinished)
log.Info("Rebuilt trie database", "root", root) log.Info("Rebuilt trie database", "root", root)
return nil return nil
} }
@ -296,7 +320,10 @@ func (db *Database) Recover(root common.Hash, loader triestate.TrieLoader) error
defer db.lock.Unlock() defer db.lock.Unlock()
// Short circuit if rollback operation is not supported. // Short circuit if rollback operation is not supported.
if db.readOnly || db.freezer == nil { if err := db.modifyAllowed(); err != nil {
return err
}
if db.freezer == nil {
return errors.New("state rollback is non-supported") return errors.New("state rollback is non-supported")
} }
// Short circuit if the target state is not recoverable. // Short circuit if the target state is not recoverable.
@ -424,3 +451,15 @@ func (db *Database) SetBufferSize(size int) error {
func (db *Database) Scheme() string { func (db *Database) Scheme() string {
return rawdb.PathScheme return rawdb.PathScheme
} }
// modifyAllowed returns the indicator if mutation is allowed. This function
// assumes the db.lock is already held.
func (db *Database) modifyAllowed() error {
if db.readOnly {
return errDatabaseReadOnly
}
if db.waitSync {
return errDatabaseWaitSync
}
return nil
}

View File

@ -439,38 +439,39 @@ func TestDatabaseRecoverable(t *testing.T) {
} }
} }
func TestReset(t *testing.T) { func TestDisable(t *testing.T) {
var ( tester := newTester(t)
tester = newTester(t)
index = tester.bottomIndex()
)
defer tester.release() defer tester.release()
// Reset database to unknown target, should reject it _, stored := rawdb.ReadAccountTrieNode(tester.db.diskdb, nil)
if err := tester.db.Reset(testutil.RandomHash()); err == nil { if err := tester.db.Disable(); err != nil {
t.Fatal("Failed to reject invalid reset") t.Fatal("Failed to deactivate database")
} }
// Reset database to state persisted in the disk if err := tester.db.Enable(types.EmptyRootHash); err == nil {
if err := tester.db.Reset(types.EmptyRootHash); err != nil { t.Fatalf("Invalid activation should be rejected")
t.Fatalf("Failed to reset database %v", err)
} }
if err := tester.db.Enable(stored); err != nil {
t.Fatal("Failed to activate database")
}
// Ensure journal is deleted from disk // Ensure journal is deleted from disk
if blob := rawdb.ReadTrieJournal(tester.db.diskdb); len(blob) != 0 { if blob := rawdb.ReadTrieJournal(tester.db.diskdb); len(blob) != 0 {
t.Fatal("Failed to clean journal") t.Fatal("Failed to clean journal")
} }
// Ensure all trie histories are removed // Ensure all trie histories are removed
for i := 0; i <= index; i++ { n, err := tester.db.freezer.Ancients()
_, err := readHistory(tester.db.freezer, uint64(i+1)) if err != nil {
if err == nil { t.Fatal("Failed to clean state history")
t.Fatalf("Failed to clean state history, index %d", i+1)
} }
if n != 0 {
t.Fatal("Failed to clean state history")
} }
// Verify layer tree structure, single disk layer is expected // Verify layer tree structure, single disk layer is expected
if tester.db.tree.len() != 1 { if tester.db.tree.len() != 1 {
t.Fatalf("Extra layer kept %d", tester.db.tree.len()) t.Fatalf("Extra layer kept %d", tester.db.tree.len())
} }
if tester.db.tree.bottom().rootHash() != types.EmptyRootHash { if tester.db.tree.bottom().rootHash() != stored {
t.Fatalf("Root hash is not matched exp %x got %x", types.EmptyRootHash, tester.db.tree.bottom().rootHash()) t.Fatalf("Root hash is not matched exp %x got %x", stored, tester.db.tree.bottom().rootHash())
} }
} }

View File

@ -25,9 +25,13 @@ import (
) )
var ( var (
// errSnapshotReadOnly is returned if the database is opened in read only mode // errDatabaseReadOnly is returned if the database is opened in read only mode
// and mutation is requested. // to prevent any mutation.
errSnapshotReadOnly = errors.New("read only") errDatabaseReadOnly = errors.New("read only")
// errDatabaseWaitSync is returned if the initial state sync is not completed
// yet and database is disabled to prevent accessing state.
errDatabaseWaitSync = errors.New("waiting for sync")
// errSnapshotStale is returned from data accessors if the underlying layer // errSnapshotStale is returned from data accessors if the underlying layer
// layer had been invalidated due to the chain progressing forward far enough // layer had been invalidated due to the chain progressing forward far enough

View File

@ -356,7 +356,7 @@ func (db *Database) Journal(root common.Hash) error {
// Short circuit if the database is in read only mode. // Short circuit if the database is in read only mode.
if db.readOnly { if db.readOnly {
return errSnapshotReadOnly return errDatabaseReadOnly
} }
// Firstly write out the metadata of journal // Firstly write out the metadata of journal
journal := new(bytes.Buffer) journal := new(bytes.Buffer)