eth/protocols/snap: generate storage trie from full dirty snap data (#22668)

* eth/protocols/snap: generate storage trie from full dirty snap data

* eth/protocols/snap: get rid of some more dead code

* eth/protocols/snap: less frequent logs, also log during trie generation

* eth/protocols/snap: implement dirty account range stack-hashing

* eth/protocols/snap: don't loop on account trie generation

* eth/protocols/snap: fix account format in trie

* core, eth, ethdb: glue snap packets together, but not chunks

* eth/protocols/snap: print completion log for snap phase

* eth/protocols/snap: extended tests

* eth/protocols/snap: make testcase pass

* eth/protocols/snap: fix account stacktrie commit without defer

* ethdb: fix key counts on reset

* eth/protocols: fix typos

* eth/protocols/snap: make better use of delivered data (#44)

* eth/protocols/snap: make better use of delivered data

* squashme

* eth/protocols/snap: reduce chunking

* squashme

* eth/protocols/snap: reduce chunking further

* eth/protocols/snap: break out hash range calculations

* eth/protocols/snap: use sort.Search instead of looping

* eth/protocols/snap: prevent crash on storage response with no keys

* eth/protocols/snap: nitpicks all around

* eth/protocols/snap: clear heal need on 1-chunk storage completion

* eth/protocols/snap: fix range chunker, add tests

Co-authored-by: Péter Szilágyi <peterke@gmail.com>

* trie: fix test API error

* eth/protocols/snap: fix some further liter issues

* eth/protocols/snap: fix accidental batch reuse

Co-authored-by: Martin Holst Swende <martin@swende.se>
This commit is contained in:
Péter Szilágyi 2021-04-27 17:19:59 +03:00 committed by GitHub
parent 65a1c2d829
commit caea6c4661
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 604 additions and 189 deletions

View File

@ -0,0 +1,17 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rawdb

View File

@ -176,6 +176,11 @@ func (b *tableBatch) Delete(key []byte) error {
return b.batch.Delete(append([]byte(b.prefix), key...)) return b.batch.Delete(append([]byte(b.prefix), key...))
} }
// KeyCount retrieves the number of keys queued up for writing.
func (b *tableBatch) KeyCount() int {
return b.batch.KeyCount()
}
// ValueSize retrieves the amount of data queued up for writing. // ValueSize retrieves the amount of data queued up for writing.
func (b *tableBatch) ValueSize() int { func (b *tableBatch) ValueSize() int {
return b.batch.ValueSize() return b.batch.ValueSize()

View File

@ -354,7 +354,7 @@ func handleMessage(backend Backend, peer *Peer) error {
if err := msg.Decode(res); err != nil { if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
} }
// Ensure the ranges ae monotonically increasing // Ensure the ranges are monotonically increasing
for i, slots := range res.Slots { for i, slots := range res.Slots {
for j := 1; j < len(slots); j++ { for j := 1; j < len(slots); j++ {
if bytes.Compare(slots[j-1].Hash[:], slots[j].Hash[:]) >= 0 { if bytes.Compare(slots[j-1].Hash[:], slots[j].Hash[:]) >= 0 {

View File

@ -0,0 +1,80 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package snap
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/holiman/uint256"
)
// hashRange is a utility to handle ranges of hashes, Split up the
// hash-space into sections, and 'walk' over the sections
type hashRange struct {
current *uint256.Int
step *uint256.Int
}
// newHashRange creates a new hashRange, initiated at the start position,
// and with the step set to fill the desired 'num' chunks
func newHashRange(start common.Hash, num uint64) *hashRange {
left := new(big.Int).Sub(hashSpace, start.Big())
step := new(big.Int).Div(
new(big.Int).Add(left, new(big.Int).SetUint64(num-1)),
new(big.Int).SetUint64(num),
)
step256 := new(uint256.Int)
step256.SetFromBig(step)
return &hashRange{
current: uint256.NewInt().SetBytes32(start[:]),
step: step256,
}
}
// Next pushes the hash range to the next interval.
func (r *hashRange) Next() bool {
next := new(uint256.Int)
if overflow := next.AddOverflow(r.current, r.step); overflow {
return false
}
r.current = next
return true
}
// Start returns the first hash in the current interval.
func (r *hashRange) Start() common.Hash {
return r.current.Bytes32()
}
// End returns the last hash in the current interval.
func (r *hashRange) End() common.Hash {
// If the end overflows (non divisible range), return a shorter interval
next := new(uint256.Int)
if overflow := next.AddOverflow(r.current, r.step); overflow {
return common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
}
return new(uint256.Int).Sub(next, uint256.NewInt().SetOne()).Bytes32()
}
// incHash returns the next hash, in lexicographical order (a.k.a plus one)
func incHash(h common.Hash) common.Hash {
a := uint256.NewInt().SetBytes32(h[:])
a.Add(a, uint256.NewInt().SetOne())
return common.Hash(a.Bytes32())
}

View File

@ -0,0 +1,143 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package snap
import (
"testing"
"github.com/ethereum/go-ethereum/common"
)
// Tests that given a starting hash and a density, the hash ranger can correctly
// split up the remaining hash space into a fixed number of chunks.
func TestHashRanges(t *testing.T) {
tests := []struct {
head common.Hash
chunks uint64
starts []common.Hash
ends []common.Hash
}{
// Simple test case to split the entire hash range into 4 chunks
{
head: common.Hash{},
chunks: 4,
starts: []common.Hash{
{},
common.HexToHash("0x4000000000000000000000000000000000000000000000000000000000000000"),
common.HexToHash("0x8000000000000000000000000000000000000000000000000000000000000000"),
common.HexToHash("0xc000000000000000000000000000000000000000000000000000000000000000"),
},
ends: []common.Hash{
common.HexToHash("0x3fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
common.HexToHash("0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
common.HexToHash("0xbfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
},
},
// Split a divisible part of the hash range up into 2 chunks
{
head: common.HexToHash("0x2000000000000000000000000000000000000000000000000000000000000000"),
chunks: 2,
starts: []common.Hash{
common.Hash{},
common.HexToHash("0x9000000000000000000000000000000000000000000000000000000000000000"),
},
ends: []common.Hash{
common.HexToHash("0x8fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
},
},
// Split the entire hash range into a non divisible 3 chunks
{
head: common.Hash{},
chunks: 3,
starts: []common.Hash{
{},
common.HexToHash("0x5555555555555555555555555555555555555555555555555555555555555556"),
common.HexToHash("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaac"),
},
ends: []common.Hash{
common.HexToHash("0x5555555555555555555555555555555555555555555555555555555555555555"),
common.HexToHash("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab"),
common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
},
},
// Split a part of hash range into a non divisible 3 chunks
{
head: common.HexToHash("0x2000000000000000000000000000000000000000000000000000000000000000"),
chunks: 3,
starts: []common.Hash{
{},
common.HexToHash("0x6aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab"),
common.HexToHash("0xb555555555555555555555555555555555555555555555555555555555555556"),
},
ends: []common.Hash{
common.HexToHash("0x6aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
common.HexToHash("0xb555555555555555555555555555555555555555555555555555555555555555"),
common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
},
},
// Split a part of hash range into a non divisible 3 chunks, but with a
// meaningful space size for manual verification.
// - The head being 0xff...f0, we have 14 hashes left in the space
// - Chunking up 14 into 3 pieces is 4.(6), but we need the ceil of 5 to avoid a micro-last-chunk
// - Since the range is not divisible, the last interval will be shrter, capped at 0xff...f
// - The chunk ranges thus needs to be [..0, ..5], [..6, ..b], [..c, ..f]
{
head: common.HexToHash("0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff0"),
chunks: 3,
starts: []common.Hash{
{},
common.HexToHash("0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff6"),
common.HexToHash("0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffc"),
},
ends: []common.Hash{
common.HexToHash("0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff5"),
common.HexToHash("0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffb"),
common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
},
},
}
for i, tt := range tests {
r := newHashRange(tt.head, tt.chunks)
var (
starts = []common.Hash{{}}
ends = []common.Hash{r.End()}
)
for r.Next() {
starts = append(starts, r.Start())
ends = append(ends, r.End())
}
if len(starts) != len(tt.starts) {
t.Errorf("test %d: starts count mismatch: have %d, want %d", i, len(starts), len(tt.starts))
}
for j := 0; j < len(starts) && j < len(tt.starts); j++ {
if starts[j] != tt.starts[j] {
t.Errorf("test %d, start %d: hash mismatch: have %x, want %x", i, j, starts[j], tt.starts[j])
}
}
if len(ends) != len(tt.ends) {
t.Errorf("test %d: ends count mismatch: have %d, want %d", i, len(ends), len(tt.ends))
}
for j := 0; j < len(ends) && j < len(tt.ends); j++ {
if ends[j] != tt.ends[j] {
t.Errorf("test %d, end %d: hash mismatch: have %x, want %x", i, j, ends[j], tt.ends[j])
}
}
}
}

View File

@ -23,10 +23,12 @@ import (
"fmt" "fmt"
"math/big" "math/big"
"math/rand" "math/rand"
"sort"
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/state/snapshot"
@ -73,7 +75,9 @@ const (
// and waste round trip times. If it's too high, we're capping responses and // and waste round trip times. If it's too high, we're capping responses and
// waste bandwidth. // waste bandwidth.
maxTrieRequestCount = 512 maxTrieRequestCount = 512
)
var (
// accountConcurrency is the number of chunks to split the account trie into // accountConcurrency is the number of chunks to split the account trie into
// to allow concurrent retrievals. // to allow concurrent retrievals.
accountConcurrency = 16 accountConcurrency = 16
@ -81,9 +85,7 @@ const (
// storageConcurrency is the number of chunks to split the a large contract // storageConcurrency is the number of chunks to split the a large contract
// storage trie into to allow concurrent retrievals. // storage trie into to allow concurrent retrievals.
storageConcurrency = 16 storageConcurrency = 16
)
var (
// requestTimeout is the maximum time a peer is allowed to spend on serving // requestTimeout is the maximum time a peer is allowed to spend on serving
// a single network request. // a single network request.
requestTimeout = 15 * time.Second // TODO(karalabe): Make it dynamic ala fast-sync? requestTimeout = 15 * time.Second // TODO(karalabe): Make it dynamic ala fast-sync?
@ -127,12 +129,6 @@ type accountResponse struct {
hashes []common.Hash // Account hashes in the returned range hashes []common.Hash // Account hashes in the returned range
accounts []*state.Account // Expanded accounts in the returned range accounts []*state.Account // Expanded accounts in the returned range
nodes ethdb.KeyValueStore // Database containing the reconstructed trie nodes
trie *trie.Trie // Reconstructed trie to reject incomplete account paths
bounds map[common.Hash]struct{} // Boundary nodes to avoid persisting incomplete accounts
overflow *light.NodeSet // Overflow nodes to avoid persisting across chunk boundaries
cont bool // Whether the account range has a continuation cont bool // Whether the account range has a continuation
} }
@ -209,12 +205,8 @@ type storageResponse struct {
hashes [][]common.Hash // Storage slot hashes in the returned range hashes [][]common.Hash // Storage slot hashes in the returned range
slots [][][]byte // Storage slot values in the returned range slots [][][]byte // Storage slot values in the returned range
nodes []ethdb.KeyValueStore // Database containing the reconstructed trie nodes nodes []ethdb.KeyValueStore // Database containing the reconstructed trie nodes
tries []*trie.Trie // Reconstructed tries to reject overflown slots
// Fields relevant for the last account only cont bool // Whether the last storage range has a continuation
bounds map[common.Hash]struct{} // Boundary nodes to avoid persisting (incomplete)
overflow *light.NodeSet // Overflow nodes to avoid persisting across chunk boundaries
cont bool // Whether the last storage range has a continuation
} }
// trienodeHealRequest tracks a pending state trie request to ensure responses // trienodeHealRequest tracks a pending state trie request to ensure responses
@ -301,6 +293,9 @@ type accountTask struct {
codeTasks map[common.Hash]struct{} // Code hashes that need retrieval codeTasks map[common.Hash]struct{} // Code hashes that need retrieval
stateTasks map[common.Hash]common.Hash // Account hashes->roots that need full state retrieval stateTasks map[common.Hash]common.Hash // Account hashes->roots that need full state retrieval
genBatch ethdb.Batch // Batch used by the node generator
genTrie *trie.StackTrie // Node generator from storage slots
done bool // Flag whether the task can be removed done bool // Flag whether the task can be removed
} }
@ -312,7 +307,11 @@ type storageTask struct {
// These fields are internals used during runtime // These fields are internals used during runtime
root common.Hash // Storage root hash for this instance root common.Hash // Storage root hash for this instance
req *storageRequest // Pending request to fill this task req *storageRequest // Pending request to fill this task
done bool // Flag whether the task can be removed
genBatch ethdb.Batch // Batch used by the node generator
genTrie *trie.StackTrie // Node generator from storage slots
done bool // Flag whether the task can be removed
} }
// healTask represents the sync task for healing the snap-synced chunk boundaries. // healTask represents the sync task for healing the snap-synced chunk boundaries.
@ -359,7 +358,7 @@ type SyncPeer interface {
// trie, starting with the origin. // trie, starting with the origin.
RequestAccountRange(id uint64, root, origin, limit common.Hash, bytes uint64) error RequestAccountRange(id uint64, root, origin, limit common.Hash, bytes uint64) error
// RequestStorageRange fetches a batch of storage slots belonging to one or // RequestStorageRanges fetches a batch of storage slots belonging to one or
// more accounts. If slots from only one accout is requested, an origin marker // more accounts. If slots from only one accout is requested, an origin marker
// may also be used to retrieve from there. // may also be used to retrieve from there.
RequestStorageRanges(id uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, bytes uint64) error RequestStorageRanges(id uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, bytes uint64) error
@ -680,6 +679,17 @@ func (s *Syncer) loadSyncStatus() {
log.Debug("Scheduled account sync task", "from", task.Next, "last", task.Last) log.Debug("Scheduled account sync task", "from", task.Next, "last", task.Last)
} }
s.tasks = progress.Tasks s.tasks = progress.Tasks
for _, task := range s.tasks {
task.genBatch = s.db.NewBatch()
task.genTrie = trie.NewStackTrie(task.genBatch)
for _, subtasks := range task.SubTasks {
for _, subtask := range subtasks {
subtask.genBatch = s.db.NewBatch()
subtask.genTrie = trie.NewStackTrie(task.genBatch)
}
}
}
s.snapped = len(s.tasks) == 0 s.snapped = len(s.tasks) == 0
s.accountSynced = progress.AccountSynced s.accountSynced = progress.AccountSynced
@ -710,7 +720,7 @@ func (s *Syncer) loadSyncStatus() {
step := new(big.Int).Sub( step := new(big.Int).Sub(
new(big.Int).Div( new(big.Int).Div(
new(big.Int).Exp(common.Big2, common.Big256, nil), new(big.Int).Exp(common.Big2, common.Big256, nil),
big.NewInt(accountConcurrency), big.NewInt(int64(accountConcurrency)),
), common.Big1, ), common.Big1,
) )
for i := 0; i < accountConcurrency; i++ { for i := 0; i < accountConcurrency; i++ {
@ -719,10 +729,13 @@ func (s *Syncer) loadSyncStatus() {
// Make sure we don't overflow if the step is not a proper divisor // Make sure we don't overflow if the step is not a proper divisor
last = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") last = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
} }
batch := s.db.NewBatch()
s.tasks = append(s.tasks, &accountTask{ s.tasks = append(s.tasks, &accountTask{
Next: next, Next: next,
Last: last, Last: last,
SubTasks: make(map[common.Hash][]*storageTask), SubTasks: make(map[common.Hash][]*storageTask),
genBatch: batch,
genTrie: trie.NewStackTrie(batch),
}) })
log.Debug("Created account sync task", "from", next, "last", last) log.Debug("Created account sync task", "from", next, "last", last)
next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1)) next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1))
@ -731,6 +744,25 @@ func (s *Syncer) loadSyncStatus() {
// saveSyncStatus marshals the remaining sync tasks into leveldb. // saveSyncStatus marshals the remaining sync tasks into leveldb.
func (s *Syncer) saveSyncStatus() { func (s *Syncer) saveSyncStatus() {
// Serialize any partial progress to disk before spinning down
for _, task := range s.tasks {
keys, bytes := task.genBatch.KeyCount(), task.genBatch.ValueSize()
if err := task.genBatch.Write(); err != nil {
log.Error("Failed to persist account slots", "err", err)
}
s.accountBytes += common.StorageSize(keys*common.HashLength + bytes)
for _, subtasks := range task.SubTasks {
for _, subtask := range subtasks {
keys, bytes := subtask.genBatch.KeyCount(), subtask.genBatch.ValueSize()
if err := subtask.genBatch.Write(); err != nil {
log.Error("Failed to persist storage slots", "err", err)
}
s.accountBytes += common.StorageSize(keys*common.HashLength + bytes)
}
}
}
// Store the actual progress markers
progress := &syncProgress{ progress := &syncProgress{
Tasks: s.tasks, Tasks: s.tasks,
AccountSynced: s.accountSynced, AccountSynced: s.accountSynced,
@ -754,16 +786,25 @@ func (s *Syncer) saveSyncStatus() {
// cleanAccountTasks removes account range retrieval tasks that have already been // cleanAccountTasks removes account range retrieval tasks that have already been
// completed. // completed.
func (s *Syncer) cleanAccountTasks() { func (s *Syncer) cleanAccountTasks() {
// If the sync was already done before, don't even bother
if len(s.tasks) == 0 {
return
}
// Sync wasn't finished previously, check for any task that can be finalized
for i := 0; i < len(s.tasks); i++ { for i := 0; i < len(s.tasks); i++ {
if s.tasks[i].done { if s.tasks[i].done {
s.tasks = append(s.tasks[:i], s.tasks[i+1:]...) s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
i-- i--
} }
} }
// If everything was just finalized just, generate the account trie and start heal
if len(s.tasks) == 0 { if len(s.tasks) == 0 {
s.lock.Lock() s.lock.Lock()
s.snapped = true s.snapped = true
s.lock.Unlock() s.lock.Unlock()
// Push the final sync report
s.reportSyncProgress(true)
} }
} }
@ -1600,12 +1641,7 @@ func (s *Syncer) processAccountResponse(res *accountResponse) {
continue continue
} }
if cmp > 0 { if cmp > 0 {
// Chunk overflown, cut off excess, but also update the boundary nodes // Chunk overflown, cut off excess
for j := i; j < len(res.hashes); j++ {
if err := res.trie.Prove(res.hashes[j][:], 0, res.overflow); err != nil {
panic(err) // Account range was already proven, what happened
}
}
res.hashes = res.hashes[:i] res.hashes = res.hashes[:i]
res.accounts = res.accounts[:i] res.accounts = res.accounts[:i]
res.cont = false // Mark range completed res.cont = false // Mark range completed
@ -1681,7 +1717,6 @@ func (s *Syncer) processBytecodeResponse(res *bytecodeResponse) {
var ( var (
codes uint64 codes uint64
bytes common.StorageSize
) )
for i, hash := range res.hashes { for i, hash := range res.hashes {
code := res.codes[i] code := res.codes[i]
@ -1699,17 +1734,16 @@ func (s *Syncer) processBytecodeResponse(res *bytecodeResponse) {
} }
} }
// Push the bytecode into a database batch // Push the bytecode into a database batch
s.bytecodeSynced++
s.bytecodeBytes += common.StorageSize(len(code))
codes++ codes++
bytes += common.StorageSize(len(code))
rawdb.WriteCode(batch, hash, code) rawdb.WriteCode(batch, hash, code)
} }
bytes := common.StorageSize(batch.ValueSize())
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {
log.Crit("Failed to persist bytecodes", "err", err) log.Crit("Failed to persist bytecodes", "err", err)
} }
s.bytecodeSynced += codes
s.bytecodeBytes += bytes
log.Debug("Persisted set of bytecodes", "count", codes, "bytes", bytes) log.Debug("Persisted set of bytecodes", "count", codes, "bytes", bytes)
// If this delivery completed the last pending task, forward the account task // If this delivery completed the last pending task, forward the account task
@ -1732,10 +1766,9 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
batch := s.db.NewBatch() batch := s.db.NewBatch()
var ( var (
slots int slots int
nodes int nodes int
skipped int bytes common.StorageSize
bytes common.StorageSize
) )
// Iterate over all the accounts and reconstruct their storage tries from the // Iterate over all the accounts and reconstruct their storage tries from the
// delivered slots // delivered slots
@ -1772,27 +1805,50 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
// the subtasks for it within the main account task // the subtasks for it within the main account task
if tasks, ok := res.mainTask.SubTasks[account]; !ok { if tasks, ok := res.mainTask.SubTasks[account]; !ok {
var ( var (
next common.Hash keys = res.hashes[i]
chunks = uint64(storageConcurrency)
lastKey common.Hash
) )
step := new(big.Int).Sub( if len(keys) > 0 {
new(big.Int).Div( lastKey = keys[len(keys)-1]
new(big.Int).Exp(common.Big2, common.Big256, nil), }
big.NewInt(storageConcurrency), // If the number of slots remaining is low, decrease the
), common.Big1, // number of chunks. Somewhere on the order of 10-15K slots
) // fit into a packet of 500KB. A key/slot pair is maximum 64
for k := 0; k < storageConcurrency; k++ { // bytes, so pessimistically maxRequestSize/64 = 8K.
last := common.BigToHash(new(big.Int).Add(next.Big(), step)) //
if k == storageConcurrency-1 { // Chunk so that at least 2 packets are needed to fill a task.
// Make sure we don't overflow if the step is not a proper divisor if estimate, err := estimateRemainingSlots(len(keys), lastKey); err == nil {
last = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") if n := estimate / (2 * (maxRequestSize / 64)); n+1 < chunks {
chunks = n + 1
} }
log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "remaining", estimate, "chunks", chunks)
} else {
log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "chunks", chunks)
}
r := newHashRange(lastKey, chunks)
// Our first task is the one that was just filled by this response.
batch := s.db.NewBatch()
tasks = append(tasks, &storageTask{
Next: common.Hash{},
Last: r.End(),
root: acc.Root,
genBatch: batch,
genTrie: trie.NewStackTrie(batch),
})
for r.Next() {
batch := s.db.NewBatch()
tasks = append(tasks, &storageTask{ tasks = append(tasks, &storageTask{
Next: next, Next: r.Start(),
Last: last, Last: r.End(),
root: acc.Root, root: acc.Root,
genBatch: batch,
genTrie: trie.NewStackTrie(batch),
}) })
log.Debug("Created storage sync task", "account", account, "root", acc.Root, "from", next, "last", last) }
next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1)) for _, task := range tasks {
log.Debug("Created storage sync task", "account", account, "root", acc.Root, "from", task.Next, "last", task.Last)
} }
res.mainTask.SubTasks[account] = tasks res.mainTask.SubTasks[account] = tasks
@ -1805,74 +1861,90 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
if res.subTask != nil { if res.subTask != nil {
// Ensure the response doesn't overflow into the subsequent task // Ensure the response doesn't overflow into the subsequent task
last := res.subTask.Last.Big() last := res.subTask.Last.Big()
for k, hash := range res.hashes[i] { // Find the first overflowing key. While at it, mark res as complete
// Mark the range complete if the last is already included. // if we find the range to include or pass the 'last'
// Keep iteration to delete the extra states if exists. index := sort.Search(len(res.hashes[i]), func(k int) bool {
cmp := hash.Big().Cmp(last) cmp := res.hashes[i][k].Big().Cmp(last)
if cmp == 0 { if cmp >= 0 {
res.cont = false res.cont = false
continue
}
if cmp > 0 {
// Chunk overflown, cut off excess, but also update the boundary
for l := k; l < len(res.hashes[i]); l++ {
if err := res.tries[i].Prove(res.hashes[i][l][:], 0, res.overflow); err != nil {
panic(err) // Account range was already proven, what happened
}
}
res.hashes[i] = res.hashes[i][:k]
res.slots[i] = res.slots[i][:k]
res.cont = false // Mark range completed
break
} }
return cmp > 0
})
if index >= 0 {
// cut off excess
res.hashes[i] = res.hashes[i][:index]
res.slots[i] = res.slots[i][:index]
} }
// Forward the relevant storage chunk (even if created just now) // Forward the relevant storage chunk (even if created just now)
if res.cont { if res.cont {
res.subTask.Next = common.BigToHash(new(big.Int).Add(res.hashes[i][len(res.hashes[i])-1].Big(), big.NewInt(1))) res.subTask.Next = incHash(res.hashes[i][len(res.hashes[i])-1])
} else { } else {
res.subTask.done = true res.subTask.done = true
} }
} }
} }
// Iterate over all the reconstructed trie nodes and push them to disk // Iterate over all the reconstructed trie nodes and push them to disk
// if the contract is fully delivered. If it's chunked, the trie nodes
// will be reconstructed later.
slots += len(res.hashes[i]) slots += len(res.hashes[i])
it := res.nodes[i].NewIterator(nil, nil) if i < len(res.hashes)-1 || res.subTask == nil {
for it.Next() { it := res.nodes[i].NewIterator(nil, nil)
// Boundary nodes are not written for the last result, since they are incomplete for it.Next() {
if i == len(res.hashes)-1 && res.subTask != nil { batch.Put(it.Key(), it.Value())
if _, ok := res.bounds[common.BytesToHash(it.Key())]; ok {
skipped++ bytes += common.StorageSize(common.HashLength + len(it.Value()))
continue nodes++
}
if _, err := res.overflow.Get(it.Key()); err == nil {
skipped++
continue
}
} }
// Node is not a boundary, persist to disk it.Release()
batch.Put(it.Key(), it.Value())
bytes += common.StorageSize(common.HashLength + len(it.Value()))
nodes++
} }
it.Release()
// Persist the received storage segements. These flat state maybe // Persist the received storage segements. These flat state maybe
// outdated during the sync, but it can be fixed later during the // outdated during the sync, but it can be fixed later during the
// snapshot generation. // snapshot generation.
for j := 0; j < len(res.hashes[i]); j++ { for j := 0; j < len(res.hashes[i]); j++ {
rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j]) rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j])
bytes += common.StorageSize(1 + 2*common.HashLength + len(res.slots[i][j])) bytes += common.StorageSize(1 + 2*common.HashLength + len(res.slots[i][j]))
// If we're storing large contracts, generate the trie nodes
// on the fly to not trash the gluing points
if i == len(res.hashes)-1 && res.subTask != nil {
res.subTask.genTrie.Update(res.hashes[i][j][:], res.slots[i][j])
}
} }
} }
// Large contracts could have generated new trie nodes, flush them to disk
if res.subTask != nil {
if res.subTask.done {
if root, err := res.subTask.genTrie.Commit(); err != nil {
log.Error("Failed to commit stack slots", "err", err)
} else if root == res.subTask.root {
// If the chunk's root is an overflown but full delivery, clear the heal request
for i, account := range res.mainTask.res.hashes {
if account == res.accounts[len(res.accounts)-1] {
res.mainTask.needHeal[i] = false
}
}
}
}
if data := res.subTask.genBatch.ValueSize(); data > ethdb.IdealBatchSize || res.subTask.done {
keys := res.subTask.genBatch.KeyCount()
if err := res.subTask.genBatch.Write(); err != nil {
log.Error("Failed to persist stack slots", "err", err)
}
res.subTask.genBatch.Reset()
bytes += common.StorageSize(keys*common.HashLength + data)
nodes += keys
}
}
// Flush anything written just now and update the stats
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {
log.Crit("Failed to persist storage slots", "err", err) log.Crit("Failed to persist storage slots", "err", err)
} }
s.storageSynced += uint64(slots) s.storageSynced += uint64(slots)
s.storageBytes += bytes s.storageBytes += bytes
log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "nodes", nodes, "skipped", skipped, "bytes", bytes) log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "nodes", nodes, "bytes", bytes)
// If this delivery completed the last pending task, forward the account task // If this delivery completed the last pending task, forward the account task
// to the next chunk // to the next chunk
@ -1967,87 +2039,69 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
} }
task.res = nil task.res = nil
// Iterate over all the accounts and gather all the incomplete trie nodes. A
// node is incomplete if we haven't yet filled it (sync was interrupted), or
// if we filled it in multiple chunks (storage trie), in which case the few
// nodes on the chunk boundaries are missing.
incompletes := light.NewNodeSet()
for i := range res.accounts {
// If the filling was interrupted, mark everything after as incomplete
if task.needCode[i] || task.needState[i] {
for j := i; j < len(res.accounts); j++ {
if err := res.trie.Prove(res.hashes[j][:], 0, incompletes); err != nil {
panic(err) // Account range was already proven, what happened
}
}
break
}
// Filling not interrupted until this point, mark incomplete if needs healing
if task.needHeal[i] {
if err := res.trie.Prove(res.hashes[i][:], 0, incompletes); err != nil {
panic(err) // Account range was already proven, what happened
}
}
}
// Persist every finalized trie node that's not on the boundary
batch := s.db.NewBatch()
var (
nodes int
skipped int
bytes common.StorageSize
)
it := res.nodes.NewIterator(nil, nil)
for it.Next() {
// Boundary nodes are not written, since they are incomplete
if _, ok := res.bounds[common.BytesToHash(it.Key())]; ok {
skipped++
continue
}
// Overflow nodes are not written, since they mess with another task
if _, err := res.overflow.Get(it.Key()); err == nil {
skipped++
continue
}
// Accounts with split storage requests are incomplete
if _, err := incompletes.Get(it.Key()); err == nil {
skipped++
continue
}
// Node is neither a boundary, not an incomplete account, persist to disk
batch.Put(it.Key(), it.Value())
bytes += common.StorageSize(common.HashLength + len(it.Value()))
nodes++
}
it.Release()
// Persist the received account segements. These flat state maybe // Persist the received account segements. These flat state maybe
// outdated during the sync, but it can be fixed later during the // outdated during the sync, but it can be fixed later during the
// snapshot generation. // snapshot generation.
var (
nodes int
bytes common.StorageSize
)
batch := s.db.NewBatch()
for i, hash := range res.hashes { for i, hash := range res.hashes {
blob := snapshot.SlimAccountRLP(res.accounts[i].Nonce, res.accounts[i].Balance, res.accounts[i].Root, res.accounts[i].CodeHash) if task.needCode[i] || task.needState[i] {
rawdb.WriteAccountSnapshot(batch, hash, blob) break
bytes += common.StorageSize(1 + common.HashLength + len(blob)) }
slim := snapshot.SlimAccountRLP(res.accounts[i].Nonce, res.accounts[i].Balance, res.accounts[i].Root, res.accounts[i].CodeHash)
rawdb.WriteAccountSnapshot(batch, hash, slim)
bytes += common.StorageSize(1 + common.HashLength + len(slim))
// If the task is complete, drop it into the stack trie to generate
// account trie nodes for it
if !task.needHeal[i] {
full, err := snapshot.FullAccountRLP(slim) // TODO(karalabe): Slim parsing can be omitted
if err != nil {
panic(err) // Really shouldn't ever happen
}
task.genTrie.Update(hash[:], full)
}
} }
// Flush anything written just now and update the stats
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {
log.Crit("Failed to persist accounts", "err", err) log.Crit("Failed to persist accounts", "err", err)
} }
s.accountBytes += bytes s.accountBytes += bytes
s.accountSynced += uint64(len(res.accounts)) s.accountSynced += uint64(len(res.accounts))
log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "nodes", nodes, "skipped", skipped, "bytes", bytes)
// Task filling persisted, push it the chunk marker forward to the first // Task filling persisted, push it the chunk marker forward to the first
// account still missing data. // account still missing data.
for i, hash := range res.hashes { for i, hash := range res.hashes {
if task.needCode[i] || task.needState[i] { if task.needCode[i] || task.needState[i] {
return return
} }
task.Next = common.BigToHash(new(big.Int).Add(hash.Big(), big.NewInt(1))) task.Next = incHash(hash)
} }
// All accounts marked as complete, track if the entire task is done // All accounts marked as complete, track if the entire task is done
task.done = !res.cont task.done = !res.cont
// Stack trie could have generated trie nodes, push them to disk (we need to
// flush after finalizing task.done. It's fine even if we crash and lose this
// write as it will only cause more data to be downloaded during heal.
if task.done {
if _, err := task.genTrie.Commit(); err != nil {
log.Error("Failed to commit stack account", "err", err)
}
}
if data := task.genBatch.ValueSize(); data > ethdb.IdealBatchSize || task.done {
keys := task.genBatch.KeyCount()
if err := task.genBatch.Write(); err != nil {
log.Error("Failed to persist stack account", "err", err)
}
task.genBatch.Reset()
nodes += keys
bytes += common.StorageSize(keys*common.HashLength + data)
}
log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "nodes", nodes, "bytes", bytes)
} }
// OnAccounts is a callback method to invoke when a range of accounts are // OnAccounts is a callback method to invoke when a range of accounts are
@ -2091,7 +2145,6 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco
s.lock.Unlock() s.lock.Unlock()
return nil return nil
} }
// Response is valid, but check if peer is signalling that it does not have // Response is valid, but check if peer is signalling that it does not have
// the requested data. For account range queries that means the state being // the requested data. For account range queries that means the state being
// retrieved was either already pruned remotely, or the peer is not yet // retrieved was either already pruned remotely, or the peer is not yet
@ -2123,22 +2176,13 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco
if len(keys) > 0 { if len(keys) > 0 {
end = keys[len(keys)-1] end = keys[len(keys)-1]
} }
db, tr, notary, cont, err := trie.VerifyRangeProof(root, req.origin[:], end, keys, accounts, proofdb) _, _, _, cont, err := trie.VerifyRangeProof(root, req.origin[:], end, keys, accounts, proofdb)
if err != nil { if err != nil {
logger.Warn("Account range failed proof", "err", err) logger.Warn("Account range failed proof", "err", err)
// Signal this request as failed, and ready for rescheduling // Signal this request as failed, and ready for rescheduling
s.scheduleRevertAccountRequest(req) s.scheduleRevertAccountRequest(req)
return err return err
} }
// Partial trie reconstructed, send it to the scheduler for storage filling
bounds := make(map[common.Hash]struct{})
it := notary.Accessed().NewIterator(nil, nil)
for it.Next() {
bounds[common.BytesToHash(it.Key())] = struct{}{}
}
it.Release()
accs := make([]*state.Account, len(accounts)) accs := make([]*state.Account, len(accounts))
for i, account := range accounts { for i, account := range accounts {
acc := new(state.Account) acc := new(state.Account)
@ -2151,10 +2195,6 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco
task: req.task, task: req.task,
hashes: hashes, hashes: hashes,
accounts: accs, accounts: accs,
nodes: db,
trie: tr,
bounds: bounds,
overflow: light.NewNodeSet(),
cont: cont, cont: cont,
} }
select { select {
@ -2354,10 +2394,8 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
// Reconstruct the partial tries from the response and verify them // Reconstruct the partial tries from the response and verify them
var ( var (
dbs = make([]ethdb.KeyValueStore, len(hashes)) dbs = make([]ethdb.KeyValueStore, len(hashes))
tries = make([]*trie.Trie, len(hashes)) cont bool
notary *trie.KeyValueNotary
cont bool
) )
for i := 0; i < len(hashes); i++ { for i := 0; i < len(hashes); i++ {
// Convert the keys and proofs into an internal format // Convert the keys and proofs into an internal format
@ -2375,7 +2413,7 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
if len(nodes) == 0 { if len(nodes) == 0 {
// No proof has been attached, the response must cover the entire key // No proof has been attached, the response must cover the entire key
// space and hash to the origin root. // space and hash to the origin root.
dbs[i], tries[i], _, _, err = trie.VerifyRangeProof(req.roots[i], nil, nil, keys, slots[i], nil) dbs[i], _, _, _, err = trie.VerifyRangeProof(req.roots[i], nil, nil, keys, slots[i], nil)
if err != nil { if err != nil {
s.scheduleRevertStorageRequest(req) // reschedule request s.scheduleRevertStorageRequest(req) // reschedule request
logger.Warn("Storage slots failed proof", "err", err) logger.Warn("Storage slots failed proof", "err", err)
@ -2390,7 +2428,7 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
if len(keys) > 0 { if len(keys) > 0 {
end = keys[len(keys)-1] end = keys[len(keys)-1]
} }
dbs[i], tries[i], notary, cont, err = trie.VerifyRangeProof(req.roots[i], req.origin[:], end, keys, slots[i], proofdb) dbs[i], _, _, cont, err = trie.VerifyRangeProof(req.roots[i], req.origin[:], end, keys, slots[i], proofdb)
if err != nil { if err != nil {
s.scheduleRevertStorageRequest(req) // reschedule request s.scheduleRevertStorageRequest(req) // reschedule request
logger.Warn("Storage range failed proof", "err", err) logger.Warn("Storage range failed proof", "err", err)
@ -2399,15 +2437,6 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
} }
} }
// Partial tries reconstructed, send them to the scheduler for storage filling // Partial tries reconstructed, send them to the scheduler for storage filling
bounds := make(map[common.Hash]struct{})
if notary != nil { // if all contract storages are delivered in full, no notary will be created
it := notary.Accessed().NewIterator(nil, nil)
for it.Next() {
bounds[common.BytesToHash(it.Key())] = struct{}{}
}
it.Release()
}
response := &storageResponse{ response := &storageResponse{
mainTask: req.mainTask, mainTask: req.mainTask,
subTask: req.subTask, subTask: req.subTask,
@ -2416,9 +2445,6 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
hashes: hashes, hashes: hashes,
slots: slots, slots: slots,
nodes: dbs, nodes: dbs,
tries: tries,
bounds: bounds,
overflow: light.NewNodeSet(),
cont: cont, cont: cont,
} }
select { select {
@ -2658,7 +2684,7 @@ func (s *Syncer) report(force bool) {
// reportSyncProgress calculates various status reports and provides it to the user. // reportSyncProgress calculates various status reports and provides it to the user.
func (s *Syncer) reportSyncProgress(force bool) { func (s *Syncer) reportSyncProgress(force bool) {
// Don't report all the events, just occasionally // Don't report all the events, just occasionally
if !force && time.Since(s.logTime) < 3*time.Second { if !force && time.Since(s.logTime) < 8*time.Second {
return return
} }
// Don't report anything until we have a meaningful progress // Don't report anything until we have a meaningful progress
@ -2697,7 +2723,7 @@ func (s *Syncer) reportSyncProgress(force bool) {
// reportHealProgress calculates various status reports and provides it to the user. // reportHealProgress calculates various status reports and provides it to the user.
func (s *Syncer) reportHealProgress(force bool) { func (s *Syncer) reportHealProgress(force bool) {
// Don't report all the events, just occasionally // Don't report all the events, just occasionally
if !force && time.Since(s.logTime) < 3*time.Second { if !force && time.Since(s.logTime) < 8*time.Second {
return return
} }
s.logTime = time.Now() s.logTime = time.Now()
@ -2712,3 +2738,19 @@ func (s *Syncer) reportHealProgress(force bool) {
log.Info("State heal in progress", "accounts", accounts, "slots", storage, log.Info("State heal in progress", "accounts", accounts, "slots", storage,
"codes", bytecode, "nodes", trienode, "pending", s.healer.scheduler.Pending()) "codes", bytecode, "nodes", trienode, "pending", s.healer.scheduler.Pending())
} }
// estimateRemainingSlots tries to determine roughly how many slots are left in
// a contract storage, based on the number of keys and the last hash. This method
// assumes that the hashes are lexicographically ordered and evenly distributed.
func estimateRemainingSlots(hashes int, last common.Hash) (uint64, error) {
if last == (common.Hash{}) {
return 0, errors.New("last hash empty")
}
space := new(big.Int).Mul(math.MaxBig256, big.NewInt(int64(hashes)))
space.Div(space, last.Big())
if !space.IsUint64() {
// Gigantic address space probably due to too few or malicious slots
return 0, errors.New("too few slots for estimation")
}
return space.Uint64() - uint64(hashes), nil
}

View File

@ -135,6 +135,12 @@ type testPeer struct {
trieRequestHandler trieHandlerFunc trieRequestHandler trieHandlerFunc
codeRequestHandler codeHandlerFunc codeRequestHandler codeHandlerFunc
term func() term func()
// counters
nAccountRequests int
nStorageRequests int
nBytecodeRequests int
nTrienodeRequests int
} }
func newTestPeer(id string, t *testing.T, term func()) *testPeer { func newTestPeer(id string, t *testing.T, term func()) *testPeer {
@ -156,19 +162,30 @@ func newTestPeer(id string, t *testing.T, term func()) *testPeer {
func (t *testPeer) ID() string { return t.id } func (t *testPeer) ID() string { return t.id }
func (t *testPeer) Log() log.Logger { return t.logger } func (t *testPeer) Log() log.Logger { return t.logger }
func (t *testPeer) Stats() string {
return fmt.Sprintf(`Account requests: %d
Storage requests: %d
Bytecode requests: %d
Trienode requests: %d
`, t.nAccountRequests, t.nStorageRequests, t.nBytecodeRequests, t.nTrienodeRequests)
}
func (t *testPeer) RequestAccountRange(id uint64, root, origin, limit common.Hash, bytes uint64) error { func (t *testPeer) RequestAccountRange(id uint64, root, origin, limit common.Hash, bytes uint64) error {
t.logger.Trace("Fetching range of accounts", "reqid", id, "root", root, "origin", origin, "limit", limit, "bytes", common.StorageSize(bytes)) t.logger.Trace("Fetching range of accounts", "reqid", id, "root", root, "origin", origin, "limit", limit, "bytes", common.StorageSize(bytes))
t.nAccountRequests++
go t.accountRequestHandler(t, id, root, origin, limit, bytes) go t.accountRequestHandler(t, id, root, origin, limit, bytes)
return nil return nil
} }
func (t *testPeer) RequestTrieNodes(id uint64, root common.Hash, paths []TrieNodePathSet, bytes uint64) error { func (t *testPeer) RequestTrieNodes(id uint64, root common.Hash, paths []TrieNodePathSet, bytes uint64) error {
t.logger.Trace("Fetching set of trie nodes", "reqid", id, "root", root, "pathsets", len(paths), "bytes", common.StorageSize(bytes)) t.logger.Trace("Fetching set of trie nodes", "reqid", id, "root", root, "pathsets", len(paths), "bytes", common.StorageSize(bytes))
t.nTrienodeRequests++
go t.trieRequestHandler(t, id, root, paths, bytes) go t.trieRequestHandler(t, id, root, paths, bytes)
return nil return nil
} }
func (t *testPeer) RequestStorageRanges(id uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, bytes uint64) error { func (t *testPeer) RequestStorageRanges(id uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, bytes uint64) error {
t.nStorageRequests++
if len(accounts) == 1 && origin != nil { if len(accounts) == 1 && origin != nil {
t.logger.Trace("Fetching range of large storage slots", "reqid", id, "root", root, "account", accounts[0], "origin", common.BytesToHash(origin), "limit", common.BytesToHash(limit), "bytes", common.StorageSize(bytes)) t.logger.Trace("Fetching range of large storage slots", "reqid", id, "root", root, "account", accounts[0], "origin", common.BytesToHash(origin), "limit", common.BytesToHash(limit), "bytes", common.StorageSize(bytes))
} else { } else {
@ -179,6 +196,7 @@ func (t *testPeer) RequestStorageRanges(id uint64, root common.Hash, accounts []
} }
func (t *testPeer) RequestByteCodes(id uint64, hashes []common.Hash, bytes uint64) error { func (t *testPeer) RequestByteCodes(id uint64, hashes []common.Hash, bytes uint64) error {
t.nBytecodeRequests++
t.logger.Trace("Fetching set of byte codes", "reqid", id, "hashes", len(hashes), "bytes", common.StorageSize(bytes)) t.logger.Trace("Fetching set of byte codes", "reqid", id, "hashes", len(hashes), "bytes", common.StorageSize(bytes))
go t.codeRequestHandler(t, id, hashes, bytes) go t.codeRequestHandler(t, id, hashes, bytes)
return nil return nil
@ -1365,7 +1383,7 @@ func makeBoundaryAccountTrie(n int) (*trie.Trie, entrySlice) {
step := new(big.Int).Sub( step := new(big.Int).Sub(
new(big.Int).Div( new(big.Int).Div(
new(big.Int).Exp(common.Big2, common.Big256, nil), new(big.Int).Exp(common.Big2, common.Big256, nil),
big.NewInt(accountConcurrency), big.NewInt(int64(accountConcurrency)),
), common.Big1, ), common.Big1,
) )
for i := 0; i < accountConcurrency; i++ { for i := 0; i < accountConcurrency; i++ {
@ -1529,7 +1547,7 @@ func makeBoundaryStorageTrie(n int, db *trie.Database) (*trie.Trie, entrySlice)
step := new(big.Int).Sub( step := new(big.Int).Sub(
new(big.Int).Div( new(big.Int).Div(
new(big.Int).Exp(common.Big2, common.Big256, nil), new(big.Int).Exp(common.Big2, common.Big256, nil),
big.NewInt(accountConcurrency), big.NewInt(int64(accountConcurrency)),
), common.Big1, ), common.Big1,
) )
for i := 0; i < accountConcurrency; i++ { for i := 0; i < accountConcurrency; i++ {
@ -1605,3 +1623,94 @@ func verifyTrie(db ethdb.KeyValueStore, root common.Hash, t *testing.T) {
} }
t.Logf("accounts: %d, slots: %d", accounts, slots) t.Logf("accounts: %d, slots: %d", accounts, slots)
} }
// TestSyncAccountPerformance tests how efficient the snap algo is at minimizing
// state healing
func TestSyncAccountPerformance(t *testing.T) {
// Set the account concurrency to 1. This _should_ result in the
// range root to become correct, and there should be no healing needed
defer func(old int) { accountConcurrency = old }(accountConcurrency)
accountConcurrency = 1
var (
once sync.Once
cancel = make(chan struct{})
term = func() {
once.Do(func() {
close(cancel)
})
}
)
sourceAccountTrie, elems := makeAccountTrieNoStorage(100)
mkSource := func(name string) *testPeer {
source := newTestPeer(name, t, term)
source.accountTrie = sourceAccountTrie
source.accountValues = elems
return source
}
src := mkSource("source")
syncer := setupSyncer(src)
if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
t.Fatalf("sync failed: %v", err)
}
verifyTrie(syncer.db, sourceAccountTrie.Hash(), t)
// The trie root will always be requested, since it is added when the snap
// sync cycle starts. When popping the queue, we do not look it up again.
// Doing so would bring this number down to zero in this artificial testcase,
// but only add extra IO for no reason in practice.
if have, want := src.nTrienodeRequests, 1; have != want {
fmt.Printf(src.Stats())
t.Errorf("trie node heal requests wrong, want %d, have %d", want, have)
}
}
func TestSlotEstimation(t *testing.T) {
for i, tc := range []struct {
last common.Hash
count int
want uint64
}{
{
// Half the space
common.HexToHash("0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
100,
100,
},
{
// 1 / 16th
common.HexToHash("0x0fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
100,
1500,
},
{
// Bit more than 1 / 16th
common.HexToHash("0x1000000000000000000000000000000000000000000000000000000000000000"),
100,
1499,
},
{
// Almost everything
common.HexToHash("0xF000000000000000000000000000000000000000000000000000000000000000"),
100,
6,
},
{
// Almost nothing -- should lead to error
common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001"),
1,
0,
},
{
// Nothing -- should lead to error
common.Hash{},
100,
0,
},
} {
have, _ := estimateRemainingSlots(tc.count, tc.last)
if want := tc.want; have != want {
t.Errorf("test %d: have %d want %d", i, have, want)
}
}
}

View File

@ -25,6 +25,9 @@ const IdealBatchSize = 100 * 1024
type Batch interface { type Batch interface {
KeyValueWriter KeyValueWriter
// KeyCount retrieves the number of keys queued up for writing.
KeyCount() int
// ValueSize retrieves the amount of data queued up for writing. // ValueSize retrieves the amount of data queued up for writing.
ValueSize() int ValueSize() int

View File

@ -448,6 +448,7 @@ func (db *Database) meter(refresh time.Duration) {
type batch struct { type batch struct {
db *leveldb.DB db *leveldb.DB
b *leveldb.Batch b *leveldb.Batch
keys int
size int size int
} }
@ -461,10 +462,16 @@ func (b *batch) Put(key, value []byte) error {
// Delete inserts the a key removal into the batch for later committing. // Delete inserts the a key removal into the batch for later committing.
func (b *batch) Delete(key []byte) error { func (b *batch) Delete(key []byte) error {
b.b.Delete(key) b.b.Delete(key)
b.keys++
b.size += len(key) b.size += len(key)
return nil return nil
} }
// KeyCount retrieves the number of keys queued up for writing.
func (b *batch) KeyCount() int {
return b.keys
}
// ValueSize retrieves the amount of data queued up for writing. // ValueSize retrieves the amount of data queued up for writing.
func (b *batch) ValueSize() int { func (b *batch) ValueSize() int {
return b.size return b.size
@ -478,7 +485,7 @@ func (b *batch) Write() error {
// Reset resets the batch for reuse. // Reset resets the batch for reuse.
func (b *batch) Reset() { func (b *batch) Reset() {
b.b.Reset() b.b.Reset()
b.size = 0 b.keys, b.size = 0, 0
} }
// Replay replays the batch contents. // Replay replays the batch contents.

View File

@ -198,6 +198,7 @@ type keyvalue struct {
type batch struct { type batch struct {
db *Database db *Database
writes []keyvalue writes []keyvalue
keys int
size int size int
} }
@ -211,10 +212,16 @@ func (b *batch) Put(key, value []byte) error {
// Delete inserts the a key removal into the batch for later committing. // Delete inserts the a key removal into the batch for later committing.
func (b *batch) Delete(key []byte) error { func (b *batch) Delete(key []byte) error {
b.writes = append(b.writes, keyvalue{common.CopyBytes(key), nil, true}) b.writes = append(b.writes, keyvalue{common.CopyBytes(key), nil, true})
b.keys++
b.size += len(key) b.size += len(key)
return nil return nil
} }
// KeyCount retrieves the number of keys queued up for writing.
func (b *batch) KeyCount() int {
return b.keys
}
// ValueSize retrieves the amount of data queued up for writing. // ValueSize retrieves the amount of data queued up for writing.
func (b *batch) ValueSize() int { func (b *batch) ValueSize() int {
return b.size return b.size
@ -238,7 +245,7 @@ func (b *batch) Write() error {
// Reset resets the batch for reuse. // Reset resets the batch for reuse.
func (b *batch) Reset() { func (b *batch) Reset() {
b.writes = b.writes[:0] b.writes = b.writes[:0]
b.size = 0 b.keys, b.size = 0, 0
} }
// Replay replays the batch contents. // Replay replays the batch contents.

View File

@ -90,6 +90,7 @@ func (b *spongeBatch) Put(key, value []byte) error {
return nil return nil
} }
func (b *spongeBatch) Delete(key []byte) error { panic("implement me") } func (b *spongeBatch) Delete(key []byte) error { panic("implement me") }
func (b *spongeBatch) KeyCount() int { panic("not implemented") }
func (b *spongeBatch) ValueSize() int { return 100 } func (b *spongeBatch) ValueSize() int { return 100 }
func (b *spongeBatch) Write() error { return nil } func (b *spongeBatch) Write() error { return nil }
func (b *spongeBatch) Reset() {} func (b *spongeBatch) Reset() {}

View File

@ -706,6 +706,7 @@ func (b *spongeBatch) Put(key, value []byte) error {
return nil return nil
} }
func (b *spongeBatch) Delete(key []byte) error { panic("implement me") } func (b *spongeBatch) Delete(key []byte) error { panic("implement me") }
func (b *spongeBatch) KeyCount() int { return 100 }
func (b *spongeBatch) ValueSize() int { return 100 } func (b *spongeBatch) ValueSize() int { return 100 }
func (b *spongeBatch) Write() error { return nil } func (b *spongeBatch) Write() error { return nil }
func (b *spongeBatch) Reset() {} func (b *spongeBatch) Reset() {}