diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index e23693789..f262824f9 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -615,6 +615,8 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { } }() defer s.report(true) + // commit any trie- and bytecode-healing data. + defer s.commitHealer(true) // Whether sync completed or not, disregard any future packets defer func() { @@ -2154,14 +2156,7 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { log.Error("Invalid trienode processed", "hash", hash, "err", err) } } - batch := s.db.NewBatch() - if err := s.healer.scheduler.Commit(batch); err != nil { - log.Error("Failed to commit healing data", "err", err) - } - if err := batch.Write(); err != nil { - log.Crit("Failed to persist healing data", "err", err) - } - log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize())) + s.commitHealer(false) // Calculate the processing rate of one filled trie node rate := float64(fills) / (float64(time.Since(start)) / float64(time.Second)) @@ -2208,6 +2203,20 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { } } +func (s *Syncer) commitHealer(force bool) { + if !force && s.healer.scheduler.MemSize() < ethdb.IdealBatchSize { + return + } + batch := s.db.NewBatch() + if err := s.healer.scheduler.Commit(batch); err != nil { + log.Error("Failed to commit healing data", "err", err) + } + if err := batch.Write(); err != nil { + log.Crit("Failed to persist healing data", "err", err) + } + log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize())) +} + // processBytecodeHealResponse integrates an already validated bytecode response // into the healer tasks. func (s *Syncer) processBytecodeHealResponse(res *bytecodeHealResponse) { @@ -2234,14 +2243,7 @@ func (s *Syncer) processBytecodeHealResponse(res *bytecodeHealResponse) { log.Error("Invalid bytecode processed", "hash", hash, "err", err) } } - batch := s.db.NewBatch() - if err := s.healer.scheduler.Commit(batch); err != nil { - log.Error("Failed to commit healing data", "err", err) - } - if err := batch.Write(); err != nil { - log.Crit("Failed to persist healing data", "err", err) - } - log.Debug("Persisted set of healing data", "type", "bytecode", "bytes", common.StorageSize(batch.ValueSize())) + s.commitHealer(false) } // forwardAccountTask takes a filled account task and persists anything available diff --git a/trie/sync.go b/trie/sync.go index 862ce7e16..31d3cbe91 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -111,6 +111,7 @@ type syncMemBatch struct { nodes map[string][]byte // In-memory membatch of recently completed nodes hashes map[string]common.Hash // Hashes of recently completed nodes codes map[common.Hash][]byte // In-memory membatch of recently completed codes + size uint64 // Estimated batch-size of in-memory data. } // newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes. @@ -338,6 +339,11 @@ func (s *Sync) Commit(dbw ethdb.Batch) error { return nil } +// MemSize returns an estimated size (in bytes) of the data held in the membatch. +func (s *Sync) MemSize() uint64 { + return s.membatch.size +} + // Pending returns the number of state entries currently pending for download. func (s *Sync) Pending() int { return len(s.nodeReqs) + len(s.codeReqs) @@ -479,7 +485,10 @@ func (s *Sync) commitNodeRequest(req *nodeRequest) error { // Write the node content to the membatch s.membatch.nodes[string(req.path)] = req.data s.membatch.hashes[string(req.path)] = req.hash - + // The size tracking refers to the db-batch, not the in-memory data. + // Therefore, we ignore the req.path, and account only for the hash+data + // which eventually is written to db. + s.membatch.size += common.HashLength + uint64(len(req.data)) delete(s.nodeReqs, string(req.path)) s.fetches[len(req.path)]-- @@ -501,6 +510,7 @@ func (s *Sync) commitNodeRequest(req *nodeRequest) error { func (s *Sync) commitCodeRequest(req *codeRequest) error { // Write the node content to the membatch s.membatch.codes[req.hash] = req.data + s.membatch.size += common.HashLength + uint64(len(req.data)) delete(s.codeReqs, req.hash) s.fetches[len(req.path)]--