trie/triedb/pathdb: improve dirty node flushing trigger (#28426)

* trie/triedb/pathdb: improve dirty node flushing trigger

* trie/triedb/pathdb: add tests

* trie/triedb/pathdb: address comment
This commit is contained in:
rjl493456442 2023-10-31 17:39:55 +08:00 committed by GitHub
parent 233db64cc1
commit ea2e66a58e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 93 additions and 42 deletions

View File

@ -96,11 +96,15 @@ type tester struct {
snapStorages map[common.Hash]map[common.Hash]map[common.Hash][]byte snapStorages map[common.Hash]map[common.Hash]map[common.Hash][]byte
} }
func newTester(t *testing.T) *tester { func newTester(t *testing.T, historyLimit uint64) *tester {
var ( var (
disk, _ = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) disk, _ = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false)
db = New(disk, &Config{CleanCacheSize: 256 * 1024, DirtyCacheSize: 256 * 1024}) db = New(disk, &Config{
obj = &tester{ StateHistory: historyLimit,
CleanCacheSize: 256 * 1024,
DirtyCacheSize: 256 * 1024,
})
obj = &tester{
db: db, db: db,
preimages: make(map[common.Hash]common.Address), preimages: make(map[common.Hash]common.Address),
accounts: make(map[common.Hash][]byte), accounts: make(map[common.Hash][]byte),
@ -376,7 +380,7 @@ func (t *tester) bottomIndex() int {
func TestDatabaseRollback(t *testing.T) { func TestDatabaseRollback(t *testing.T) {
// Verify state histories // Verify state histories
tester := newTester(t) tester := newTester(t, 0)
defer tester.release() defer tester.release()
if err := tester.verifyHistory(); err != nil { if err := tester.verifyHistory(); err != nil {
@ -402,7 +406,7 @@ func TestDatabaseRollback(t *testing.T) {
func TestDatabaseRecoverable(t *testing.T) { func TestDatabaseRecoverable(t *testing.T) {
var ( var (
tester = newTester(t) tester = newTester(t, 0)
index = tester.bottomIndex() index = tester.bottomIndex()
) )
defer tester.release() defer tester.release()
@ -440,7 +444,7 @@ func TestDatabaseRecoverable(t *testing.T) {
} }
func TestDisable(t *testing.T) { func TestDisable(t *testing.T) {
tester := newTester(t) tester := newTester(t, 0)
defer tester.release() defer tester.release()
_, stored := rawdb.ReadAccountTrieNode(tester.db.diskdb, nil) _, stored := rawdb.ReadAccountTrieNode(tester.db.diskdb, nil)
@ -476,7 +480,7 @@ func TestDisable(t *testing.T) {
} }
func TestCommit(t *testing.T) { func TestCommit(t *testing.T) {
tester := newTester(t) tester := newTester(t, 0)
defer tester.release() defer tester.release()
if err := tester.db.Commit(tester.lastHash(), false); err != nil { if err := tester.db.Commit(tester.lastHash(), false); err != nil {
@ -500,7 +504,7 @@ func TestCommit(t *testing.T) {
} }
func TestJournal(t *testing.T) { func TestJournal(t *testing.T) {
tester := newTester(t) tester := newTester(t, 0)
defer tester.release() defer tester.release()
if err := tester.db.Journal(tester.lastHash()); err != nil { if err := tester.db.Journal(tester.lastHash()); err != nil {
@ -524,7 +528,7 @@ func TestJournal(t *testing.T) {
} }
func TestCorruptedJournal(t *testing.T) { func TestCorruptedJournal(t *testing.T) {
tester := newTester(t) tester := newTester(t, 0)
defer tester.release() defer tester.release()
if err := tester.db.Journal(tester.lastHash()); err != nil { if err := tester.db.Journal(tester.lastHash()); err != nil {
@ -553,6 +557,35 @@ func TestCorruptedJournal(t *testing.T) {
} }
} }
// TestTailTruncateHistory function is designed to test a specific edge case where,
// when history objects are removed from the end, it should trigger a state flush
// if the ID of the new tail object is even higher than the persisted state ID.
//
// For example, let's say the ID of the persistent state is 10, and the current
// history objects range from ID(5) to ID(15). As we accumulate six more objects,
// the history will expand to cover ID(11) to ID(21). ID(11) then becomes the
// oldest history object, and its ID is even higher than the stored state.
//
// In this scenario, it is mandatory to update the persistent state before
// truncating the tail histories. This ensures that the ID of the persistent state
// always falls within the range of [oldest-history-id, latest-history-id].
func TestTailTruncateHistory(t *testing.T) {
tester := newTester(t, 10)
defer tester.release()
tester.db.Close()
tester.db = New(tester.db.diskdb, &Config{StateHistory: 10})
head, err := tester.db.freezer.Ancients()
if err != nil {
t.Fatalf("Failed to obtain freezer head")
}
stored := rawdb.ReadPersistentStateID(tester.db.diskdb)
if head != stored {
t.Fatalf("Failed to truncate excess history object above, stored: %d, head: %d", stored, head)
}
}
// copyAccounts returns a deep-copied account set of the provided one. // copyAccounts returns a deep-copied account set of the provided one.
func copyAccounts(set map[common.Hash][]byte) map[common.Hash][]byte { func copyAccounts(set map[common.Hash][]byte) map[common.Hash][]byte {
copied := make(map[common.Hash][]byte, len(set)) copied := make(map[common.Hash][]byte, len(set))

View File

@ -172,37 +172,65 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
dl.lock.Lock() dl.lock.Lock()
defer dl.lock.Unlock() defer dl.lock.Unlock()
// Construct and store the state history first. If crash happens // Construct and store the state history first. If crash happens after storing
// after storing the state history but without flushing the // the state history but without flushing the corresponding states(journal),
// corresponding states(journal), the stored state history will // the stored state history will be truncated from head in the next restart.
// be truncated in the next restart. var (
overflow bool
oldest uint64
)
if dl.db.freezer != nil { if dl.db.freezer != nil {
err := writeHistory(dl.db.diskdb, dl.db.freezer, bottom, dl.db.config.StateHistory) err := writeHistory(dl.db.freezer, bottom)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Determine if the persisted history object has exceeded the configured
// limitation, set the overflow as true if so.
tail, err := dl.db.freezer.Tail()
if err != nil {
return nil, err
}
limit := dl.db.config.StateHistory
if limit != 0 && bottom.stateID()-tail > limit {
overflow = true
oldest = bottom.stateID() - limit + 1 // track the id of history **after truncation**
}
} }
// Mark the diskLayer as stale before applying any mutations on top. // Mark the diskLayer as stale before applying any mutations on top.
dl.stale = true dl.stale = true
// Store the root->id lookup afterwards. All stored lookups are // Store the root->id lookup afterwards. All stored lookups are identified
// identified by the **unique** state root. It's impossible that // by the **unique** state root. It's impossible that in the same chain
// in the same chain blocks are not adjacent but have the same // blocks are not adjacent but have the same root.
// root.
if dl.id == 0 { if dl.id == 0 {
rawdb.WriteStateID(dl.db.diskdb, dl.root, 0) rawdb.WriteStateID(dl.db.diskdb, dl.root, 0)
} }
rawdb.WriteStateID(dl.db.diskdb, bottom.rootHash(), bottom.stateID()) rawdb.WriteStateID(dl.db.diskdb, bottom.rootHash(), bottom.stateID())
// Construct a new disk layer by merging the nodes from the provided // Construct a new disk layer by merging the nodes from the provided diff
// diff layer, and flush the content in disk layer if there are too // layer, and flush the content in disk layer if there are too many nodes
// many nodes cached. The clean cache is inherited from the original // cached. The clean cache is inherited from the original disk layer.
// disk layer for reusing.
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, dl.buffer.commit(bottom.nodes)) ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, dl.buffer.commit(bottom.nodes))
err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force)
if err != nil { // In a unique scenario where the ID of the oldest history object (after tail
// truncation) surpasses the persisted state ID, we take the necessary action
// of forcibly committing the cached dirty nodes to ensure that the persisted
// state ID remains higher.
if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest {
force = true
}
if err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force); err != nil {
return nil, err return nil, err
} }
// To remove outdated history objects from the end, we set the 'tail' parameter
// to 'oldest-1' due to the offset between the freezer index and the history ID.
if overflow {
pruned, err := truncateFromTail(ndl.db.diskdb, ndl.db.freezer, oldest-1)
if err != nil {
return nil, err
}
log.Debug("Pruned state history", "items", pruned, "tailid", oldest)
}
return ndl, nil return ndl, nil
} }

View File

@ -512,38 +512,28 @@ func readHistory(freezer *rawdb.ResettableFreezer, id uint64) (*history, error)
return &dec, nil return &dec, nil
} }
// writeHistory writes the state history with provided state set. After // writeHistory persists the state history with the provided state set.
// storing the corresponding state history, it will also prune the stale func writeHistory(freezer *rawdb.ResettableFreezer, dl *diffLayer) error {
// histories from the disk with the given threshold.
func writeHistory(db ethdb.KeyValueStore, freezer *rawdb.ResettableFreezer, dl *diffLayer, limit uint64) error {
// Short circuit if state set is not available. // Short circuit if state set is not available.
if dl.states == nil { if dl.states == nil {
return errors.New("state change set is not available") return errors.New("state change set is not available")
} }
var ( var (
err error start = time.Now()
n int history = newHistory(dl.rootHash(), dl.parentLayer().rootHash(), dl.block, dl.states)
start = time.Now()
h = newHistory(dl.rootHash(), dl.parentLayer().rootHash(), dl.block, dl.states)
) )
accountData, storageData, accountIndex, storageIndex := h.encode() accountData, storageData, accountIndex, storageIndex := history.encode()
dataSize := common.StorageSize(len(accountData) + len(storageData)) dataSize := common.StorageSize(len(accountData) + len(storageData))
indexSize := common.StorageSize(len(accountIndex) + len(storageIndex)) indexSize := common.StorageSize(len(accountIndex) + len(storageIndex))
// Write history data into five freezer table respectively. // Write history data into five freezer table respectively.
rawdb.WriteStateHistory(freezer, dl.stateID(), h.meta.encode(), accountIndex, storageIndex, accountData, storageData) rawdb.WriteStateHistory(freezer, dl.stateID(), history.meta.encode(), accountIndex, storageIndex, accountData, storageData)
// Prune stale state histories based on the config.
if limit != 0 && dl.stateID() > limit {
n, err = truncateFromTail(db, freezer, dl.stateID()-limit)
if err != nil {
return err
}
}
historyDataBytesMeter.Mark(int64(dataSize)) historyDataBytesMeter.Mark(int64(dataSize))
historyIndexBytesMeter.Mark(int64(indexSize)) historyIndexBytesMeter.Mark(int64(indexSize))
historyBuildTimeMeter.UpdateSince(start) historyBuildTimeMeter.UpdateSince(start)
log.Debug("Stored state history", "id", dl.stateID(), "block", dl.block, "data", dataSize, "index", indexSize, "pruned", n, "elapsed", common.PrettyDuration(time.Since(start))) log.Debug("Stored state history", "id", dl.stateID(), "block", dl.block, "data", dataSize, "index", indexSize, "elapsed", common.PrettyDuration(time.Since(start)))
return nil return nil
} }