forked from cerc-io/plugeth
core, ethdb, trie: mode dirty data to clean cache on flush (#19307)
This PR is a more advanced form of the dirty-to-clean cacher (#18995), where we reuse previous database write batches as datasets to uncache, saving a dirty-trie-iteration and a dirty-trie-rlp-reencoding per block.
This commit is contained in:
parent
df717abc99
commit
59e1953246
@ -301,7 +301,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
|
|||||||
defer bc.chainmu.Unlock()
|
defer bc.chainmu.Unlock()
|
||||||
|
|
||||||
// Rewind the header chain, deleting all block bodies until then
|
// Rewind the header chain, deleting all block bodies until then
|
||||||
delFn := func(db ethdb.Deleter, hash common.Hash, num uint64) {
|
delFn := func(db ethdb.Writer, hash common.Hash, num uint64) {
|
||||||
rawdb.DeleteBody(db, hash, num)
|
rawdb.DeleteBody(db, hash, num)
|
||||||
}
|
}
|
||||||
bc.hc.SetHead(head, delFn)
|
bc.hc.SetHead(head, delFn)
|
||||||
|
@ -455,7 +455,7 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.Header) {
|
|||||||
|
|
||||||
// DeleteCallback is a callback function that is called by SetHead before
|
// DeleteCallback is a callback function that is called by SetHead before
|
||||||
// each header is deleted.
|
// each header is deleted.
|
||||||
type DeleteCallback func(ethdb.Deleter, common.Hash, uint64)
|
type DeleteCallback func(ethdb.Writer, common.Hash, uint64)
|
||||||
|
|
||||||
// SetHead rewinds the local chain to a new head. Everything above the new head
|
// SetHead rewinds the local chain to a new head. Everything above the new head
|
||||||
// will be deleted and the new one set.
|
// will be deleted and the new one set.
|
||||||
|
@ -45,7 +45,7 @@ func WriteCanonicalHash(db ethdb.Writer, hash common.Hash, number uint64) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DeleteCanonicalHash removes the number to hash canonical mapping.
|
// DeleteCanonicalHash removes the number to hash canonical mapping.
|
||||||
func DeleteCanonicalHash(db ethdb.Deleter, number uint64) {
|
func DeleteCanonicalHash(db ethdb.Writer, number uint64) {
|
||||||
if err := db.Delete(headerHashKey(number)); err != nil {
|
if err := db.Delete(headerHashKey(number)); err != nil {
|
||||||
log.Crit("Failed to delete number to hash mapping", "err", err)
|
log.Crit("Failed to delete number to hash mapping", "err", err)
|
||||||
}
|
}
|
||||||
@ -180,7 +180,7 @@ func WriteHeader(db ethdb.Writer, header *types.Header) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DeleteHeader removes all block header data associated with a hash.
|
// DeleteHeader removes all block header data associated with a hash.
|
||||||
func DeleteHeader(db ethdb.Deleter, hash common.Hash, number uint64) {
|
func DeleteHeader(db ethdb.Writer, hash common.Hash, number uint64) {
|
||||||
deleteHeaderWithoutNumber(db, hash, number)
|
deleteHeaderWithoutNumber(db, hash, number)
|
||||||
if err := db.Delete(headerNumberKey(hash)); err != nil {
|
if err := db.Delete(headerNumberKey(hash)); err != nil {
|
||||||
log.Crit("Failed to delete hash to number mapping", "err", err)
|
log.Crit("Failed to delete hash to number mapping", "err", err)
|
||||||
@ -189,7 +189,7 @@ func DeleteHeader(db ethdb.Deleter, hash common.Hash, number uint64) {
|
|||||||
|
|
||||||
// deleteHeaderWithoutNumber removes only the block header but does not remove
|
// deleteHeaderWithoutNumber removes only the block header but does not remove
|
||||||
// the hash to number mapping.
|
// the hash to number mapping.
|
||||||
func deleteHeaderWithoutNumber(db ethdb.Deleter, hash common.Hash, number uint64) {
|
func deleteHeaderWithoutNumber(db ethdb.Writer, hash common.Hash, number uint64) {
|
||||||
if err := db.Delete(headerKey(number, hash)); err != nil {
|
if err := db.Delete(headerKey(number, hash)); err != nil {
|
||||||
log.Crit("Failed to delete header", "err", err)
|
log.Crit("Failed to delete header", "err", err)
|
||||||
}
|
}
|
||||||
@ -240,7 +240,7 @@ func WriteBody(db ethdb.Writer, hash common.Hash, number uint64, body *types.Bod
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DeleteBody removes all block body data associated with a hash.
|
// DeleteBody removes all block body data associated with a hash.
|
||||||
func DeleteBody(db ethdb.Deleter, hash common.Hash, number uint64) {
|
func DeleteBody(db ethdb.Writer, hash common.Hash, number uint64) {
|
||||||
if err := db.Delete(blockBodyKey(number, hash)); err != nil {
|
if err := db.Delete(blockBodyKey(number, hash)); err != nil {
|
||||||
log.Crit("Failed to delete block body", "err", err)
|
log.Crit("Failed to delete block body", "err", err)
|
||||||
}
|
}
|
||||||
@ -278,7 +278,7 @@ func WriteTd(db ethdb.Writer, hash common.Hash, number uint64, td *big.Int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DeleteTd removes all block total difficulty data associated with a hash.
|
// DeleteTd removes all block total difficulty data associated with a hash.
|
||||||
func DeleteTd(db ethdb.Deleter, hash common.Hash, number uint64) {
|
func DeleteTd(db ethdb.Writer, hash common.Hash, number uint64) {
|
||||||
if err := db.Delete(headerTDKey(number, hash)); err != nil {
|
if err := db.Delete(headerTDKey(number, hash)); err != nil {
|
||||||
log.Crit("Failed to delete block total difficulty", "err", err)
|
log.Crit("Failed to delete block total difficulty", "err", err)
|
||||||
}
|
}
|
||||||
@ -347,7 +347,7 @@ func WriteReceipts(db ethdb.Writer, hash common.Hash, number uint64, receipts ty
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DeleteReceipts removes all receipt data associated with a block hash.
|
// DeleteReceipts removes all receipt data associated with a block hash.
|
||||||
func DeleteReceipts(db ethdb.Deleter, hash common.Hash, number uint64) {
|
func DeleteReceipts(db ethdb.Writer, hash common.Hash, number uint64) {
|
||||||
if err := db.Delete(blockReceiptsKey(number, hash)); err != nil {
|
if err := db.Delete(blockReceiptsKey(number, hash)); err != nil {
|
||||||
log.Crit("Failed to delete block receipts", "err", err)
|
log.Crit("Failed to delete block receipts", "err", err)
|
||||||
}
|
}
|
||||||
@ -378,7 +378,7 @@ func WriteBlock(db ethdb.Writer, block *types.Block) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DeleteBlock removes all block data associated with a hash.
|
// DeleteBlock removes all block data associated with a hash.
|
||||||
func DeleteBlock(db ethdb.Deleter, hash common.Hash, number uint64) {
|
func DeleteBlock(db ethdb.Writer, hash common.Hash, number uint64) {
|
||||||
DeleteReceipts(db, hash, number)
|
DeleteReceipts(db, hash, number)
|
||||||
DeleteHeader(db, hash, number)
|
DeleteHeader(db, hash, number)
|
||||||
DeleteBody(db, hash, number)
|
DeleteBody(db, hash, number)
|
||||||
@ -387,7 +387,7 @@ func DeleteBlock(db ethdb.Deleter, hash common.Hash, number uint64) {
|
|||||||
|
|
||||||
// deleteBlockWithoutNumber removes all block data associated with a hash, except
|
// deleteBlockWithoutNumber removes all block data associated with a hash, except
|
||||||
// the hash to number mapping.
|
// the hash to number mapping.
|
||||||
func deleteBlockWithoutNumber(db ethdb.Deleter, hash common.Hash, number uint64) {
|
func deleteBlockWithoutNumber(db ethdb.Writer, hash common.Hash, number uint64) {
|
||||||
DeleteReceipts(db, hash, number)
|
DeleteReceipts(db, hash, number)
|
||||||
deleteHeaderWithoutNumber(db, hash, number)
|
deleteHeaderWithoutNumber(db, hash, number)
|
||||||
DeleteBody(db, hash, number)
|
DeleteBody(db, hash, number)
|
||||||
|
@ -54,7 +54,7 @@ func WriteTxLookupEntries(db ethdb.Writer, block *types.Block) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DeleteTxLookupEntry removes all transaction data associated with a hash.
|
// DeleteTxLookupEntry removes all transaction data associated with a hash.
|
||||||
func DeleteTxLookupEntry(db ethdb.Deleter, hash common.Hash) {
|
func DeleteTxLookupEntry(db ethdb.Writer, hash common.Hash) {
|
||||||
db.Delete(txLookupKey(hash))
|
db.Delete(txLookupKey(hash))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -148,3 +148,8 @@ func (b *tableBatch) Write() error {
|
|||||||
func (b *tableBatch) Reset() {
|
func (b *tableBatch) Reset() {
|
||||||
b.batch.Reset()
|
b.batch.Reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Replay replays the batch contents.
|
||||||
|
func (b *tableBatch) Replay(w ethdb.Writer) error {
|
||||||
|
return b.batch.Replay(w)
|
||||||
|
}
|
||||||
|
@ -53,6 +53,10 @@ func (n *proofList) Put(key []byte, value []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *proofList) Delete(key []byte) error {
|
||||||
|
panic("not supported")
|
||||||
|
}
|
||||||
|
|
||||||
// StateDBs within the ethereum protocol are used to store anything
|
// StateDBs within the ethereum protocol are used to store anything
|
||||||
// within the merkle trie. StateDBs take care of caching and storing
|
// within the merkle trie. StateDBs take care of caching and storing
|
||||||
// nested states. It's the general query interface to retrieve:
|
// nested states. It's the general query interface to retrieve:
|
||||||
|
@ -24,7 +24,6 @@ const IdealBatchSize = 100 * 1024
|
|||||||
// when Write is called. A batch cannot be used concurrently.
|
// when Write is called. A batch cannot be used concurrently.
|
||||||
type Batch interface {
|
type Batch interface {
|
||||||
Writer
|
Writer
|
||||||
Deleter
|
|
||||||
|
|
||||||
// ValueSize retrieves the amount of data queued up for writing.
|
// ValueSize retrieves the amount of data queued up for writing.
|
||||||
ValueSize() int
|
ValueSize() int
|
||||||
@ -32,8 +31,11 @@ type Batch interface {
|
|||||||
// Write flushes any accumulated data to disk.
|
// Write flushes any accumulated data to disk.
|
||||||
Write() error
|
Write() error
|
||||||
|
|
||||||
// Reset resets the batch for reuse
|
// Reset resets the batch for reuse.
|
||||||
Reset()
|
Reset()
|
||||||
|
|
||||||
|
// Replay replays the batch contents.
|
||||||
|
Replay(w Writer) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Batcher wraps the NewBatch method of a backing data store.
|
// Batcher wraps the NewBatch method of a backing data store.
|
||||||
|
@ -14,7 +14,7 @@
|
|||||||
// You should have received a copy of the GNU Lesser General Public License
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
// Package database defines the interfaces for an Ethereum data store.
|
// Package ethdb defines the interfaces for an Ethereum data store.
|
||||||
package ethdb
|
package ethdb
|
||||||
|
|
||||||
import "io"
|
import "io"
|
||||||
@ -32,10 +32,7 @@ type Reader interface {
|
|||||||
type Writer interface {
|
type Writer interface {
|
||||||
// Put inserts the given value into the key-value data store.
|
// Put inserts the given value into the key-value data store.
|
||||||
Put(key []byte, value []byte) error
|
Put(key []byte, value []byte) error
|
||||||
}
|
|
||||||
|
|
||||||
// Deleter wraps the Delete method of a backing data store.
|
|
||||||
type Deleter interface {
|
|
||||||
// Delete removes the key from the key-value data store.
|
// Delete removes the key from the key-value data store.
|
||||||
Delete(key []byte) error
|
Delete(key []byte) error
|
||||||
}
|
}
|
||||||
@ -63,7 +60,6 @@ type Compacter interface {
|
|||||||
type KeyValueStore interface {
|
type KeyValueStore interface {
|
||||||
Reader
|
Reader
|
||||||
Writer
|
Writer
|
||||||
Deleter
|
|
||||||
Batcher
|
Batcher
|
||||||
Iteratee
|
Iteratee
|
||||||
Stater
|
Stater
|
||||||
@ -76,7 +72,6 @@ type KeyValueStore interface {
|
|||||||
type Database interface {
|
type Database interface {
|
||||||
Reader
|
Reader
|
||||||
Writer
|
Writer
|
||||||
Deleter
|
|
||||||
Batcher
|
Batcher
|
||||||
Iteratee
|
Iteratee
|
||||||
Stater
|
Stater
|
||||||
|
@ -416,3 +416,32 @@ func (b *batch) Reset() {
|
|||||||
b.b.Reset()
|
b.b.Reset()
|
||||||
b.size = 0
|
b.size = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Replay replays the batch contents.
|
||||||
|
func (b *batch) Replay(w ethdb.Writer) error {
|
||||||
|
return b.b.Replay(&replayer{writer: w})
|
||||||
|
}
|
||||||
|
|
||||||
|
// replayer is a small wrapper to implement the correct replay methods.
|
||||||
|
type replayer struct {
|
||||||
|
writer ethdb.Writer
|
||||||
|
failure error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put inserts the given value into the key-value data store.
|
||||||
|
func (r *replayer) Put(key, value []byte) {
|
||||||
|
// If the replay already failed, stop executing ops
|
||||||
|
if r.failure != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
r.failure = r.writer.Put(key, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete removes the key from the key-value data store.
|
||||||
|
func (r *replayer) Delete(key []byte) {
|
||||||
|
// If the replay already failed, stop executing ops
|
||||||
|
if r.failure != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
r.failure = r.writer.Delete(key)
|
||||||
|
}
|
||||||
|
@ -240,6 +240,22 @@ func (b *batch) Reset() {
|
|||||||
b.size = 0
|
b.size = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Replay replays the batch contents.
|
||||||
|
func (b *batch) Replay(w ethdb.Writer) error {
|
||||||
|
for _, keyvalue := range b.writes {
|
||||||
|
if keyvalue.delete {
|
||||||
|
if err := w.Delete(keyvalue.key); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := w.Put(keyvalue.key, keyvalue.value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// iterator can walk over the (potentially partial) keyspace of a memory key
|
// iterator can walk over the (potentially partial) keyspace of a memory key
|
||||||
// value store. Internally it is a deep copy of the entire iterated state,
|
// value store. Internally it is a deep copy of the entire iterated state,
|
||||||
// sorted by keys.
|
// sorted by keys.
|
||||||
|
@ -60,6 +60,15 @@ func (db *NodeSet) Put(key []byte, value []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete removes a node from the set
|
||||||
|
func (db *NodeSet) Delete(key []byte) error {
|
||||||
|
db.lock.Lock()
|
||||||
|
defer db.lock.Unlock()
|
||||||
|
|
||||||
|
delete(db.nodes, string(key))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Get returns a stored node
|
// Get returns a stored node
|
||||||
func (db *NodeSet) Get(key []byte) ([]byte, error) {
|
func (db *NodeSet) Get(key []byte) ([]byte, error) {
|
||||||
db.lock.RLock()
|
db.lock.RLock()
|
||||||
@ -138,6 +147,11 @@ func (n *NodeList) Put(key []byte, value []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete panics as there's no reason to remove a node from the list.
|
||||||
|
func (n *NodeList) Delete(key []byte) error {
|
||||||
|
panic("not supported")
|
||||||
|
}
|
||||||
|
|
||||||
// DataSize returns the aggregated data size of nodes in the list
|
// DataSize returns the aggregated data size of nodes in the list
|
||||||
func (n NodeList) DataSize() int {
|
func (n NodeList) DataSize() int {
|
||||||
var size int
|
var size int
|
||||||
|
120
trie/database.go
120
trie/database.go
@ -59,6 +59,11 @@ const secureKeyLength = 11 + 32
|
|||||||
// Database is an intermediate write layer between the trie data structures and
|
// Database is an intermediate write layer between the trie data structures and
|
||||||
// the disk database. The aim is to accumulate trie writes in-memory and only
|
// the disk database. The aim is to accumulate trie writes in-memory and only
|
||||||
// periodically flush a couple tries to disk, garbage collecting the remainder.
|
// periodically flush a couple tries to disk, garbage collecting the remainder.
|
||||||
|
//
|
||||||
|
// Note, the trie Database is **not** thread safe in its mutations, but it **is**
|
||||||
|
// thread safe in providing individual, independent node access. The rationale
|
||||||
|
// behind this split design is to provide read access to RPC handlers and sync
|
||||||
|
// servers even while the trie is executing expensive garbage collection.
|
||||||
type Database struct {
|
type Database struct {
|
||||||
diskdb ethdb.KeyValueStore // Persistent storage for matured trie nodes
|
diskdb ethdb.KeyValueStore // Persistent storage for matured trie nodes
|
||||||
|
|
||||||
@ -465,8 +470,8 @@ func (db *Database) Nodes() []common.Hash {
|
|||||||
|
|
||||||
// Reference adds a new reference from a parent node to a child node.
|
// Reference adds a new reference from a parent node to a child node.
|
||||||
func (db *Database) Reference(child common.Hash, parent common.Hash) {
|
func (db *Database) Reference(child common.Hash, parent common.Hash) {
|
||||||
db.lock.RLock()
|
db.lock.Lock()
|
||||||
defer db.lock.RUnlock()
|
defer db.lock.Unlock()
|
||||||
|
|
||||||
db.reference(child, parent)
|
db.reference(child, parent)
|
||||||
}
|
}
|
||||||
@ -561,13 +566,14 @@ func (db *Database) dereference(child common.Hash, parent common.Hash) {
|
|||||||
|
|
||||||
// Cap iteratively flushes old but still referenced trie nodes until the total
|
// Cap iteratively flushes old but still referenced trie nodes until the total
|
||||||
// memory usage goes below the given threshold.
|
// memory usage goes below the given threshold.
|
||||||
|
//
|
||||||
|
// Note, this method is a non-synchronized mutator. It is unsafe to call this
|
||||||
|
// concurrently with other mutators.
|
||||||
func (db *Database) Cap(limit common.StorageSize) error {
|
func (db *Database) Cap(limit common.StorageSize) error {
|
||||||
// Create a database batch to flush persistent data out. It is important that
|
// Create a database batch to flush persistent data out. It is important that
|
||||||
// outside code doesn't see an inconsistent state (referenced data removed from
|
// outside code doesn't see an inconsistent state (referenced data removed from
|
||||||
// memory cache during commit but not yet in persistent storage). This is ensured
|
// memory cache during commit but not yet in persistent storage). This is ensured
|
||||||
// by only uncaching existing data when the database write finalizes.
|
// by only uncaching existing data when the database write finalizes.
|
||||||
db.lock.RLock()
|
|
||||||
|
|
||||||
nodes, storage, start := len(db.dirties), db.dirtiesSize, time.Now()
|
nodes, storage, start := len(db.dirties), db.dirtiesSize, time.Now()
|
||||||
batch := db.diskdb.NewBatch()
|
batch := db.diskdb.NewBatch()
|
||||||
|
|
||||||
@ -583,12 +589,10 @@ func (db *Database) Cap(limit common.StorageSize) error {
|
|||||||
for hash, preimage := range db.preimages {
|
for hash, preimage := range db.preimages {
|
||||||
if err := batch.Put(db.secureKey(hash[:]), preimage); err != nil {
|
if err := batch.Put(db.secureKey(hash[:]), preimage); err != nil {
|
||||||
log.Error("Failed to commit preimage from trie database", "err", err)
|
log.Error("Failed to commit preimage from trie database", "err", err)
|
||||||
db.lock.RUnlock()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if batch.ValueSize() > ethdb.IdealBatchSize {
|
if batch.ValueSize() > ethdb.IdealBatchSize {
|
||||||
if err := batch.Write(); err != nil {
|
if err := batch.Write(); err != nil {
|
||||||
db.lock.RUnlock()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
batch.Reset()
|
batch.Reset()
|
||||||
@ -601,14 +605,12 @@ func (db *Database) Cap(limit common.StorageSize) error {
|
|||||||
// Fetch the oldest referenced node and push into the batch
|
// Fetch the oldest referenced node and push into the batch
|
||||||
node := db.dirties[oldest]
|
node := db.dirties[oldest]
|
||||||
if err := batch.Put(oldest[:], node.rlp()); err != nil {
|
if err := batch.Put(oldest[:], node.rlp()); err != nil {
|
||||||
db.lock.RUnlock()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// If we exceeded the ideal batch size, commit and reset
|
// If we exceeded the ideal batch size, commit and reset
|
||||||
if batch.ValueSize() >= ethdb.IdealBatchSize {
|
if batch.ValueSize() >= ethdb.IdealBatchSize {
|
||||||
if err := batch.Write(); err != nil {
|
if err := batch.Write(); err != nil {
|
||||||
log.Error("Failed to write flush list to disk", "err", err)
|
log.Error("Failed to write flush list to disk", "err", err)
|
||||||
db.lock.RUnlock()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
batch.Reset()
|
batch.Reset()
|
||||||
@ -623,11 +625,8 @@ func (db *Database) Cap(limit common.StorageSize) error {
|
|||||||
// Flush out any remainder data from the last batch
|
// Flush out any remainder data from the last batch
|
||||||
if err := batch.Write(); err != nil {
|
if err := batch.Write(); err != nil {
|
||||||
log.Error("Failed to write flush list to disk", "err", err)
|
log.Error("Failed to write flush list to disk", "err", err)
|
||||||
db.lock.RUnlock()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
db.lock.RUnlock()
|
|
||||||
|
|
||||||
// Write successful, clear out the flushed data
|
// Write successful, clear out the flushed data
|
||||||
db.lock.Lock()
|
db.lock.Lock()
|
||||||
defer db.lock.Unlock()
|
defer db.lock.Unlock()
|
||||||
@ -661,16 +660,16 @@ func (db *Database) Cap(limit common.StorageSize) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Commit iterates over all the children of a particular node, writes them out
|
// Commit iterates over all the children of a particular node, writes them out
|
||||||
// to disk, forcefully tearing down all references in both directions.
|
// to disk, forcefully tearing down all references in both directions. As a side
|
||||||
|
// effect, all pre-images accumulated up to this point are also written.
|
||||||
//
|
//
|
||||||
// As a side effect, all pre-images accumulated up to this point are also written.
|
// Note, this method is a non-synchronized mutator. It is unsafe to call this
|
||||||
|
// concurrently with other mutators.
|
||||||
func (db *Database) Commit(node common.Hash, report bool) error {
|
func (db *Database) Commit(node common.Hash, report bool) error {
|
||||||
// Create a database batch to flush persistent data out. It is important that
|
// Create a database batch to flush persistent data out. It is important that
|
||||||
// outside code doesn't see an inconsistent state (referenced data removed from
|
// outside code doesn't see an inconsistent state (referenced data removed from
|
||||||
// memory cache during commit but not yet in persistent storage). This is ensured
|
// memory cache during commit but not yet in persistent storage). This is ensured
|
||||||
// by only uncaching existing data when the database write finalizes.
|
// by only uncaching existing data when the database write finalizes.
|
||||||
db.lock.RLock()
|
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
batch := db.diskdb.NewBatch()
|
batch := db.diskdb.NewBatch()
|
||||||
|
|
||||||
@ -678,41 +677,47 @@ func (db *Database) Commit(node common.Hash, report bool) error {
|
|||||||
for hash, preimage := range db.preimages {
|
for hash, preimage := range db.preimages {
|
||||||
if err := batch.Put(db.secureKey(hash[:]), preimage); err != nil {
|
if err := batch.Put(db.secureKey(hash[:]), preimage); err != nil {
|
||||||
log.Error("Failed to commit preimage from trie database", "err", err)
|
log.Error("Failed to commit preimage from trie database", "err", err)
|
||||||
db.lock.RUnlock()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// If the batch is too large, flush to disk
|
||||||
if batch.ValueSize() > ethdb.IdealBatchSize {
|
if batch.ValueSize() > ethdb.IdealBatchSize {
|
||||||
if err := batch.Write(); err != nil {
|
if err := batch.Write(); err != nil {
|
||||||
db.lock.RUnlock()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
batch.Reset()
|
batch.Reset()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Since we're going to replay trie node writes into the clean cache, flush out
|
||||||
|
// any batched pre-images before continuing.
|
||||||
|
if err := batch.Write(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
batch.Reset()
|
||||||
|
|
||||||
// Move the trie itself into the batch, flushing if enough data is accumulated
|
// Move the trie itself into the batch, flushing if enough data is accumulated
|
||||||
nodes, storage := len(db.dirties), db.dirtiesSize
|
nodes, storage := len(db.dirties), db.dirtiesSize
|
||||||
if err := db.commit(node, batch); err != nil {
|
|
||||||
|
uncacher := &cleaner{db}
|
||||||
|
if err := db.commit(node, batch, uncacher); err != nil {
|
||||||
log.Error("Failed to commit trie from trie database", "err", err)
|
log.Error("Failed to commit trie from trie database", "err", err)
|
||||||
db.lock.RUnlock()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Write batch ready, unlock for readers during persistence
|
// Trie mostly committed to disk, flush any batch leftovers
|
||||||
if err := batch.Write(); err != nil {
|
if err := batch.Write(); err != nil {
|
||||||
log.Error("Failed to write trie to disk", "err", err)
|
log.Error("Failed to write trie to disk", "err", err)
|
||||||
db.lock.RUnlock()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
db.lock.RUnlock()
|
// Uncache any leftovers in the last batch
|
||||||
|
|
||||||
// Write successful, clear out the flushed data
|
|
||||||
db.lock.Lock()
|
db.lock.Lock()
|
||||||
defer db.lock.Unlock()
|
defer db.lock.Unlock()
|
||||||
|
|
||||||
|
batch.Replay(uncacher)
|
||||||
|
batch.Reset()
|
||||||
|
|
||||||
|
// Reset the storage counters and bumpd metrics
|
||||||
db.preimages = make(map[common.Hash][]byte)
|
db.preimages = make(map[common.Hash][]byte)
|
||||||
db.preimagesSize = 0
|
db.preimagesSize = 0
|
||||||
|
|
||||||
db.uncache(node)
|
|
||||||
|
|
||||||
memcacheCommitTimeTimer.Update(time.Since(start))
|
memcacheCommitTimeTimer.Update(time.Since(start))
|
||||||
memcacheCommitSizeMeter.Mark(int64(storage - db.dirtiesSize))
|
memcacheCommitSizeMeter.Mark(int64(storage - db.dirtiesSize))
|
||||||
memcacheCommitNodesMeter.Mark(int64(nodes - len(db.dirties)))
|
memcacheCommitNodesMeter.Mark(int64(nodes - len(db.dirties)))
|
||||||
@ -732,14 +737,14 @@ func (db *Database) Commit(node common.Hash, report bool) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// commit is the private locked version of Commit.
|
// commit is the private locked version of Commit.
|
||||||
func (db *Database) commit(hash common.Hash, batch ethdb.Batch) error {
|
func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleaner) error {
|
||||||
// If the node does not exist, it's a previously committed node
|
// If the node does not exist, it's a previously committed node
|
||||||
node, ok := db.dirties[hash]
|
node, ok := db.dirties[hash]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
for _, child := range node.childs() {
|
for _, child := range node.childs() {
|
||||||
if err := db.commit(child, batch); err != nil {
|
if err := db.commit(child, batch, uncacher); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -751,39 +756,58 @@ func (db *Database) commit(hash common.Hash, batch ethdb.Batch) error {
|
|||||||
if err := batch.Write(); err != nil {
|
if err := batch.Write(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
db.lock.Lock()
|
||||||
|
batch.Replay(uncacher)
|
||||||
batch.Reset()
|
batch.Reset()
|
||||||
|
db.lock.Unlock()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// uncache is the post-processing step of a commit operation where the already
|
// cleaner is a database batch replayer that takes a batch of write operations
|
||||||
// persisted trie is removed from the cache. The reason behind the two-phase
|
// and cleans up the trie database from anything written to disk.
|
||||||
// commit is to ensure consistent data availability while moving from memory
|
type cleaner struct {
|
||||||
// to disk.
|
db *Database
|
||||||
func (db *Database) uncache(hash common.Hash) {
|
}
|
||||||
|
|
||||||
|
// Put reacts to database writes and implements dirty data uncaching. This is the
|
||||||
|
// post-processing step of a commit operation where the already persisted trie is
|
||||||
|
// removed from the dirty cache and moved into the clean cache. The reason behind
|
||||||
|
// the two-phase commit is to ensure ensure data availability while moving from
|
||||||
|
// memory to disk.
|
||||||
|
func (c *cleaner) Put(key []byte, rlp []byte) error {
|
||||||
|
hash := common.BytesToHash(key)
|
||||||
|
|
||||||
// If the node does not exist, we're done on this path
|
// If the node does not exist, we're done on this path
|
||||||
node, ok := db.dirties[hash]
|
node, ok := c.db.dirties[hash]
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
// Node still exists, remove it from the flush-list
|
// Node still exists, remove it from the flush-list
|
||||||
switch hash {
|
switch hash {
|
||||||
case db.oldest:
|
case c.db.oldest:
|
||||||
db.oldest = node.flushNext
|
c.db.oldest = node.flushNext
|
||||||
db.dirties[node.flushNext].flushPrev = common.Hash{}
|
c.db.dirties[node.flushNext].flushPrev = common.Hash{}
|
||||||
case db.newest:
|
case c.db.newest:
|
||||||
db.newest = node.flushPrev
|
c.db.newest = node.flushPrev
|
||||||
db.dirties[node.flushPrev].flushNext = common.Hash{}
|
c.db.dirties[node.flushPrev].flushNext = common.Hash{}
|
||||||
default:
|
default:
|
||||||
db.dirties[node.flushPrev].flushNext = node.flushNext
|
c.db.dirties[node.flushPrev].flushNext = node.flushNext
|
||||||
db.dirties[node.flushNext].flushPrev = node.flushPrev
|
c.db.dirties[node.flushNext].flushPrev = node.flushPrev
|
||||||
}
|
}
|
||||||
// Uncache the node's subtries and remove the node itself too
|
// Remove the node from the dirty cache
|
||||||
for _, child := range node.childs() {
|
delete(c.db.dirties, hash)
|
||||||
db.uncache(child)
|
c.db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size))
|
||||||
|
|
||||||
|
// Move the flushed node into the clean cache to prevent insta-reloads
|
||||||
|
if c.db.cleans != nil {
|
||||||
|
c.db.cleans.Set(string(hash[:]), rlp)
|
||||||
}
|
}
|
||||||
delete(db.dirties, hash)
|
return nil
|
||||||
db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size))
|
}
|
||||||
|
|
||||||
|
func (c *cleaner) Delete(key []byte) error {
|
||||||
|
panic("Not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Size returns the current storage size of the memory cache in front of the
|
// Size returns the current storage size of the memory cache in front of the
|
||||||
|
Loading…
Reference in New Issue
Block a user