Patch for concurrent iterator & others (onto v1.11.6) #386
@ -615,6 +615,8 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
|
||||
}
|
||||
}()
|
||||
defer s.report(true)
|
||||
// commit any trie- and bytecode-healing data.
|
||||
defer s.commitHealer(true)
|
||||
|
||||
// Whether sync completed or not, disregard any future packets
|
||||
defer func() {
|
||||
@ -2154,14 +2156,7 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
|
||||
log.Error("Invalid trienode processed", "hash", hash, "err", err)
|
||||
}
|
||||
}
|
||||
batch := s.db.NewBatch()
|
||||
if err := s.healer.scheduler.Commit(batch); err != nil {
|
||||
log.Error("Failed to commit healing data", "err", err)
|
||||
}
|
||||
if err := batch.Write(); err != nil {
|
||||
log.Crit("Failed to persist healing data", "err", err)
|
||||
}
|
||||
log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize()))
|
||||
s.commitHealer(false)
|
||||
|
||||
// Calculate the processing rate of one filled trie node
|
||||
rate := float64(fills) / (float64(time.Since(start)) / float64(time.Second))
|
||||
@ -2208,6 +2203,20 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Syncer) commitHealer(force bool) {
|
||||
if !force && s.healer.scheduler.MemSize() < ethdb.IdealBatchSize {
|
||||
return
|
||||
}
|
||||
batch := s.db.NewBatch()
|
||||
if err := s.healer.scheduler.Commit(batch); err != nil {
|
||||
log.Error("Failed to commit healing data", "err", err)
|
||||
}
|
||||
if err := batch.Write(); err != nil {
|
||||
log.Crit("Failed to persist healing data", "err", err)
|
||||
}
|
||||
log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize()))
|
||||
}
|
||||
|
||||
// processBytecodeHealResponse integrates an already validated bytecode response
|
||||
// into the healer tasks.
|
||||
func (s *Syncer) processBytecodeHealResponse(res *bytecodeHealResponse) {
|
||||
@ -2234,14 +2243,7 @@ func (s *Syncer) processBytecodeHealResponse(res *bytecodeHealResponse) {
|
||||
log.Error("Invalid bytecode processed", "hash", hash, "err", err)
|
||||
}
|
||||
}
|
||||
batch := s.db.NewBatch()
|
||||
if err := s.healer.scheduler.Commit(batch); err != nil {
|
||||
log.Error("Failed to commit healing data", "err", err)
|
||||
}
|
||||
if err := batch.Write(); err != nil {
|
||||
log.Crit("Failed to persist healing data", "err", err)
|
||||
}
|
||||
log.Debug("Persisted set of healing data", "type", "bytecode", "bytes", common.StorageSize(batch.ValueSize()))
|
||||
s.commitHealer(false)
|
||||
}
|
||||
|
||||
// forwardAccountTask takes a filled account task and persists anything available
|
||||
|
12
trie/sync.go
12
trie/sync.go
@ -111,6 +111,7 @@ type syncMemBatch struct {
|
||||
nodes map[string][]byte // In-memory membatch of recently completed nodes
|
||||
hashes map[string]common.Hash // Hashes of recently completed nodes
|
||||
codes map[common.Hash][]byte // In-memory membatch of recently completed codes
|
||||
size uint64 // Estimated batch-size of in-memory data.
|
||||
}
|
||||
|
||||
// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes.
|
||||
@ -338,6 +339,11 @@ func (s *Sync) Commit(dbw ethdb.Batch) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// MemSize returns an estimated size (in bytes) of the data held in the membatch.
|
||||
func (s *Sync) MemSize() uint64 {
|
||||
return s.membatch.size
|
||||
}
|
||||
|
||||
// Pending returns the number of state entries currently pending for download.
|
||||
func (s *Sync) Pending() int {
|
||||
return len(s.nodeReqs) + len(s.codeReqs)
|
||||
@ -479,7 +485,10 @@ func (s *Sync) commitNodeRequest(req *nodeRequest) error {
|
||||
// Write the node content to the membatch
|
||||
s.membatch.nodes[string(req.path)] = req.data
|
||||
s.membatch.hashes[string(req.path)] = req.hash
|
||||
|
||||
// The size tracking refers to the db-batch, not the in-memory data.
|
||||
// Therefore, we ignore the req.path, and account only for the hash+data
|
||||
// which eventually is written to db.
|
||||
s.membatch.size += common.HashLength + uint64(len(req.data))
|
||||
delete(s.nodeReqs, string(req.path))
|
||||
s.fetches[len(req.path)]--
|
||||
|
||||
@ -501,6 +510,7 @@ func (s *Sync) commitNodeRequest(req *nodeRequest) error {
|
||||
func (s *Sync) commitCodeRequest(req *codeRequest) error {
|
||||
// Write the node content to the membatch
|
||||
s.membatch.codes[req.hash] = req.data
|
||||
s.membatch.size += common.HashLength + uint64(len(req.data))
|
||||
delete(s.codeReqs, req.hash)
|
||||
s.fetches[len(req.path)]--
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user