From ab27bee25a845be90bd60e774ff68d2ea1501772 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 5 Oct 2015 19:37:56 +0300 Subject: [PATCH] core, eth, trie: direct state trie synchronization --- core/blockchain.go | 25 +- core/state/sync.go | 98 ++++++ core/state/sync_test.go | 238 +++++++++++++ eth/downloader/downloader.go | 568 ++++++++++++------------------ eth/downloader/downloader_test.go | 123 +++++-- eth/downloader/metrics.go | 5 + eth/downloader/peer.go | 109 ++++-- eth/downloader/queue.go | 271 +++++++++++--- eth/downloader/types.go | 137 +++++++ eth/handler.go | 25 +- eth/peer.go | 2 +- trie/sync.go | 233 ++++++++++++ trie/sync_test.go | 257 ++++++++++++++ 13 files changed, 1627 insertions(+), 464 deletions(-) create mode 100644 core/state/sync.go create mode 100644 core/state/sync_test.go create mode 100644 eth/downloader/types.go create mode 100644 trie/sync.go create mode 100644 trie/sync_test.go diff --git a/core/blockchain.go b/core/blockchain.go index b68e7d3ae..6c8a24751 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -37,6 +37,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/pow" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" "github.com/hashicorp/golang-lru" ) @@ -246,6 +247,26 @@ func (bc *BlockChain) SetHead(head uint64) { bc.loadLastState() } +// FastSyncCommitHead sets the current head block to the one defined by the hash +// irrelevant what the chain contents were prior. +func (self *BlockChain) FastSyncCommitHead(hash common.Hash) error { + // Make sure that both the block as well at it's state trie exists + block := self.GetBlock(hash) + if block == nil { + return fmt.Errorf("non existent block [%x…]", hash[:4]) + } + if _, err := trie.NewSecure(block.Root(), self.chainDb); err != nil { + return err + } + // If all checks out, manually set the head block + self.mu.Lock() + self.currentBlock = block + self.mu.Unlock() + + glog.V(logger.Info).Infof("committed block #%d [%x…] as new head", block.Number(), hash[:4]) + return nil +} + func (self *BlockChain) GasLimit() *big.Int { self.mu.RLock() defer self.mu.RUnlock() @@ -721,10 +742,6 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain self.wg.Add(1) defer self.wg.Done() - // Make sure only one thread manipulates the chain at once - self.chainmu.Lock() - defer self.chainmu.Unlock() - // Collect some import statistics to report on stats := struct{ processed, ignored int }{} start := time.Now() diff --git a/core/state/sync.go b/core/state/sync.go new file mode 100644 index 000000000..e9bebe8ee --- /dev/null +++ b/core/state/sync.go @@ -0,0 +1,98 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package state + +import ( + "bytes" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto/sha3" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" +) + +type StateSync struct { + db ethdb.Database + sync *trie.TrieSync + codeReqs map[common.Hash]struct{} // requested but not yet written to database + codeReqList []common.Hash // requested since last Missing +} + +var sha3_nil = common.BytesToHash(sha3.NewKeccak256().Sum(nil)) + +func NewStateSync(root common.Hash, db ethdb.Database) *StateSync { + ss := &StateSync{ + db: db, + codeReqs: make(map[common.Hash]struct{}), + } + ss.codeReqs[sha3_nil] = struct{}{} // never request the nil hash + ss.sync = trie.NewTrieSync(root, db, ss.leafFound) + return ss +} + +func (self *StateSync) leafFound(leaf []byte, parent common.Hash) error { + var obj struct { + Nonce uint64 + Balance *big.Int + Root common.Hash + CodeHash []byte + } + if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil { + return err + } + self.sync.AddSubTrie(obj.Root, 64, parent, nil) + + codehash := common.BytesToHash(obj.CodeHash) + if _, ok := self.codeReqs[codehash]; !ok { + code, _ := self.db.Get(obj.CodeHash) + if code == nil { + self.codeReqs[codehash] = struct{}{} + self.codeReqList = append(self.codeReqList, codehash) + } + } + return nil +} + +func (self *StateSync) Missing(max int) []common.Hash { + cr := len(self.codeReqList) + gh := 0 + if max != 0 { + if cr > max { + cr = max + } + gh = max - cr + } + list := append(self.sync.Missing(gh), self.codeReqList[:cr]...) + self.codeReqList = self.codeReqList[cr:] + return list +} + +func (self *StateSync) Process(list []trie.SyncResult) error { + for i := 0; i < len(list); i++ { + if _, ok := self.codeReqs[list[i].Hash]; ok { // code data, not a node + self.db.Put(list[i].Hash[:], list[i].Data) + delete(self.codeReqs, list[i].Hash) + list[i] = list[len(list)-1] + list = list[:len(list)-1] + i-- + } + } + _, err := self.sync.Process(list) + return err +} diff --git a/core/state/sync_test.go b/core/state/sync_test.go new file mode 100644 index 000000000..f6afe8bd8 --- /dev/null +++ b/core/state/sync_test.go @@ -0,0 +1,238 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package state + +import ( + "bytes" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/trie" +) + +// testAccount is the data associated with an account used by the state tests. +type testAccount struct { + address common.Address + balance *big.Int + nonce uint64 + code []byte +} + +// makeTestState create a sample test state to test node-wise reconstruction. +func makeTestState() (ethdb.Database, common.Hash, []*testAccount) { + // Create an empty state + db, _ := ethdb.NewMemDatabase() + state := New(common.Hash{}, db) + + // Fill it with some arbitrary data + accounts := []*testAccount{} + for i := byte(0); i < 255; i++ { + obj := state.GetOrNewStateObject(common.BytesToAddress([]byte{i})) + acc := &testAccount{address: common.BytesToAddress([]byte{i})} + + obj.AddBalance(big.NewInt(int64(11 * i))) + acc.balance = big.NewInt(int64(11 * i)) + + obj.SetNonce(uint64(42 * i)) + acc.nonce = uint64(42 * i) + + if i%3 == 0 { + obj.SetCode([]byte{i, i, i, i, i}) + acc.code = []byte{i, i, i, i, i} + } + state.UpdateStateObject(obj) + accounts = append(accounts, acc) + } + root, _ := state.Commit() + + // Return the generated state + return db, root, accounts +} + +// checkStateAccounts cross references a reconstructed state with an expected +// account array. +func checkStateAccounts(t *testing.T, db ethdb.Database, root common.Hash, accounts []*testAccount) { + state := New(root, db) + for i, acc := range accounts { + + if balance := state.GetBalance(acc.address); balance.Cmp(acc.balance) != 0 { + t.Errorf("account %d: balance mismatch: have %v, want %v", i, balance, acc.balance) + } + if nonce := state.GetNonce(acc.address); nonce != acc.nonce { + t.Errorf("account %d: nonce mismatch: have %v, want %v", i, nonce, acc.nonce) + } + if code := state.GetCode(acc.address); bytes.Compare(code, acc.code) != 0 { + t.Errorf("account %d: code mismatch: have %x, want %x", i, code, acc.code) + } + } +} + +// Tests that an empty state is not scheduled for syncing. +func TestEmptyStateSync(t *testing.T) { + empty := common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421") + db, _ := ethdb.NewMemDatabase() + if req := NewStateSync(empty, db).Missing(1); len(req) != 0 { + t.Errorf("content requested for empty state: %v", req) + } +} + +// Tests that given a root hash, a state can sync iteratively on a single thread, +// requesting retrieval tasks and returning all of them in one go. +func TestIterativeStateSyncIndividual(t *testing.T) { testIterativeStateSync(t, 1) } +func TestIterativeStateSyncBatched(t *testing.T) { testIterativeStateSync(t, 100) } + +func testIterativeStateSync(t *testing.T, batch int) { + // Create a random state to copy + srcDb, srcRoot, srcAccounts := makeTestState() + + // Create a destination state and sync with the scheduler + dstDb, _ := ethdb.NewMemDatabase() + sched := NewStateSync(srcRoot, dstDb) + + queue := append([]common.Hash{}, sched.Missing(batch)...) + for len(queue) > 0 { + results := make([]trie.SyncResult, len(queue)) + for i, hash := range queue { + data, err := srcDb.Get(hash.Bytes()) + if err != nil { + t.Fatalf("failed to retrieve node data for %x: %v", hash, err) + } + results[i] = trie.SyncResult{hash, data} + } + if err := sched.Process(results); err != nil { + t.Fatalf("failed to process results: %v", err) + } + queue = append(queue[:0], sched.Missing(batch)...) + } + // Cross check that the two states are in sync + checkStateAccounts(t, dstDb, srcRoot, srcAccounts) +} + +// Tests that the trie scheduler can correctly reconstruct the state even if only +// partial results are returned, and the others sent only later. +func TestIterativeDelayedStateSync(t *testing.T) { + // Create a random state to copy + srcDb, srcRoot, srcAccounts := makeTestState() + + // Create a destination state and sync with the scheduler + dstDb, _ := ethdb.NewMemDatabase() + sched := NewStateSync(srcRoot, dstDb) + + queue := append([]common.Hash{}, sched.Missing(0)...) + for len(queue) > 0 { + // Sync only half of the scheduled nodes + results := make([]trie.SyncResult, len(queue)/2+1) + for i, hash := range queue[:len(results)] { + data, err := srcDb.Get(hash.Bytes()) + if err != nil { + t.Fatalf("failed to retrieve node data for %x: %v", hash, err) + } + results[i] = trie.SyncResult{hash, data} + } + if err := sched.Process(results); err != nil { + t.Fatalf("failed to process results: %v", err) + } + queue = append(queue[len(results):], sched.Missing(0)...) + } + // Cross check that the two states are in sync + checkStateAccounts(t, dstDb, srcRoot, srcAccounts) +} + +// Tests that given a root hash, a trie can sync iteratively on a single thread, +// requesting retrieval tasks and returning all of them in one go, however in a +// random order. +func TestIterativeRandomStateSyncIndividual(t *testing.T) { testIterativeRandomStateSync(t, 1) } +func TestIterativeRandomStateSyncBatched(t *testing.T) { testIterativeRandomStateSync(t, 100) } + +func testIterativeRandomStateSync(t *testing.T, batch int) { + // Create a random state to copy + srcDb, srcRoot, srcAccounts := makeTestState() + + // Create a destination state and sync with the scheduler + dstDb, _ := ethdb.NewMemDatabase() + sched := NewStateSync(srcRoot, dstDb) + + queue := make(map[common.Hash]struct{}) + for _, hash := range sched.Missing(batch) { + queue[hash] = struct{}{} + } + for len(queue) > 0 { + // Fetch all the queued nodes in a random order + results := make([]trie.SyncResult, 0, len(queue)) + for hash, _ := range queue { + data, err := srcDb.Get(hash.Bytes()) + if err != nil { + t.Fatalf("failed to retrieve node data for %x: %v", hash, err) + } + results = append(results, trie.SyncResult{hash, data}) + } + // Feed the retrieved results back and queue new tasks + if err := sched.Process(results); err != nil { + t.Fatalf("failed to process results: %v", err) + } + queue = make(map[common.Hash]struct{}) + for _, hash := range sched.Missing(batch) { + queue[hash] = struct{}{} + } + } + // Cross check that the two states are in sync + checkStateAccounts(t, dstDb, srcRoot, srcAccounts) +} + +// Tests that the trie scheduler can correctly reconstruct the state even if only +// partial results are returned (Even those randomly), others sent only later. +func TestIterativeRandomDelayedStateSync(t *testing.T) { + // Create a random state to copy + srcDb, srcRoot, srcAccounts := makeTestState() + + // Create a destination state and sync with the scheduler + dstDb, _ := ethdb.NewMemDatabase() + sched := NewStateSync(srcRoot, dstDb) + + queue := make(map[common.Hash]struct{}) + for _, hash := range sched.Missing(0) { + queue[hash] = struct{}{} + } + for len(queue) > 0 { + // Sync only half of the scheduled nodes, even those in random order + results := make([]trie.SyncResult, 0, len(queue)/2+1) + for hash, _ := range queue { + delete(queue, hash) + + data, err := srcDb.Get(hash.Bytes()) + if err != nil { + t.Fatalf("failed to retrieve node data for %x: %v", hash, err) + } + results = append(results, trie.SyncResult{hash, data}) + + if len(results) >= cap(results) { + break + } + } + // Feed the retrieved results back and queue new tasks + if err := sched.Process(results); err != nil { + t.Fatalf("failed to process results: %v", err) + } + for _, hash := range sched.Missing(0) { + queue[hash] = struct{}{} + } + } + // Cross check that the two states are in sync + checkStateAccounts(t, dstDb, srcRoot, srcAccounts) +} diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 24ba3da17..96177ae8a 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -19,7 +19,6 @@ package downloader import ( "errors" - "fmt" "math" "math/big" "strings" @@ -29,9 +28,11 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" + "github.com/rcrowley/go-metrics" ) var ( @@ -39,8 +40,8 @@ var ( MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request - MaxStateFetch = 384 // Amount of node state values to allow fetching per request MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request + MaxStateFetch = 384 // Amount of node state values to allow fetching per request hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth @@ -49,10 +50,13 @@ var ( bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired receiptSoftTTL = 3 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth - receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a block body request is considered expired + receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a receipt request is considered expired + stateSoftTTL = 2 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth + stateHardTTL = 3 * stateSoftTTL // [eth/63] Maximum time allowance before a node data request is considered expired maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) + maxQueuedStates = 256 * 1024 // [eth/63] Maximum number of state requests to queue (DOS protection) maxResultsProcess = 256 // Number of download results to import at once into the chain headerCheckFrequency = 64 // Verification frequency of the downloaded headers during fast sync @@ -84,98 +88,6 @@ var ( errNoSyncActive = errors.New("no sync active") ) -// headerCheckFn is a callback type for verifying a header's presence in the local chain. -type headerCheckFn func(common.Hash) bool - -// blockCheckFn is a callback type for verifying a block's presence in the local chain. -type blockCheckFn func(common.Hash) bool - -// headerRetrievalFn is a callback type for retrieving a header from the local chain. -type headerRetrievalFn func(common.Hash) *types.Header - -// blockRetrievalFn is a callback type for retrieving a block from the local chain. -type blockRetrievalFn func(common.Hash) *types.Block - -// headHeaderRetrievalFn is a callback type for retrieving the head header from the local chain. -type headHeaderRetrievalFn func() *types.Header - -// headBlockRetrievalFn is a callback type for retrieving the head block from the local chain. -type headBlockRetrievalFn func() *types.Block - -// headFastBlockRetrievalFn is a callback type for retrieving the head fast block from the local chain. -type headFastBlockRetrievalFn func() *types.Block - -// tdRetrievalFn is a callback type for retrieving the total difficulty of a local block. -type tdRetrievalFn func(common.Hash) *big.Int - -// headerChainInsertFn is a callback type to insert a batch of headers into the local chain. -type headerChainInsertFn func([]*types.Header, bool) (int, error) - -// blockChainInsertFn is a callback type to insert a batch of blocks into the local chain. -type blockChainInsertFn func(types.Blocks) (int, error) - -// receiptChainInsertFn is a callback type to insert a batch of receipts into the local chain. -type receiptChainInsertFn func(types.Blocks, []types.Receipts) (int, error) - -// peerDropFn is a callback type for dropping a peer detected as malicious. -type peerDropFn func(id string) - -// dataPack is a data message returned by a peer for some query. -type dataPack interface { - PeerId() string - Empty() bool - Stats() string -} - -// hashPack is a batch of block hashes returned by a peer (eth/61). -type hashPack struct { - peerId string - hashes []common.Hash -} - -// blockPack is a batch of blocks returned by a peer (eth/61). -type blockPack struct { - peerId string - blocks []*types.Block -} - -// headerPack is a batch of block headers returned by a peer. -type headerPack struct { - peerId string - headers []*types.Header -} - -// bodyPack is a batch of block bodies returned by a peer. -type bodyPack struct { - peerId string - transactions [][]*types.Transaction - uncles [][]*types.Header -} - -// PeerId retrieves the origin peer who sent this block body packet. -func (p *bodyPack) PeerId() string { return p.peerId } - -// Empty returns whether the no block bodies were delivered. -func (p *bodyPack) Empty() bool { return len(p.transactions) == 0 || len(p.uncles) == 0 } - -// Stats creates a textual stats report for logging purposes. -func (p *bodyPack) Stats() string { return fmt.Sprintf("%d:%d", len(p.transactions), len(p.uncles)) } - -// receiptPack is a batch of receipts returned by a peer. -type receiptPack struct { - peerId string - receipts [][]*types.Receipt -} - -// PeerId retrieves the origin peer who sent this receipt packet. -func (p *receiptPack) PeerId() string { return p.peerId } - -// Empty returns whether the no receipts were delivered. -func (p *receiptPack) Empty() bool { return len(p.receipts) == 0 } - -// Stats creates a textual stats report for logging purposes. -func (p *receiptPack) Stats() string { return fmt.Sprintf("%d", len(p.receipts)) } - type Downloader struct { mode SyncMode // Synchronisation mode defining the strategies used mux *event.TypeMux // Event multiplexer to announce sync operation events @@ -186,23 +98,26 @@ type Downloader struct { interrupt int32 // Atomic boolean to signal termination // Statistics - syncStatsOrigin uint64 // Origin block number where syncing started at - syncStatsHeight uint64 // Highest block number known when syncing started - syncStatsLock sync.RWMutex // Lock protecting the sync stats fields + syncStatsChainOrigin uint64 // Origin block number where syncing started at + syncStatsChainHeight uint64 // Highest block number known when syncing started + syncStatsStateTotal uint64 // Total number of node state entries known so far + syncStatsStateDone uint64 // Number of state trie entries already pulled + syncStatsLock sync.RWMutex // Lock protecting the sync stats fields // Callbacks - hasHeader headerCheckFn // Checks if a header is present in the chain - hasBlock blockCheckFn // Checks if a block is present in the chain - getHeader headerRetrievalFn // Retrieves a header from the chain - getBlock blockRetrievalFn // Retrieves a block from the chain - headHeader headHeaderRetrievalFn // Retrieves the head header from the chain - headBlock headBlockRetrievalFn // Retrieves the head block from the chain - headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain - getTd tdRetrievalFn // Retrieves the TD of a block from the chain - insertHeaders headerChainInsertFn // Injects a batch of headers into the chain - insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain - insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain - dropPeer peerDropFn // Drops a peer for misbehaving + hasHeader headerCheckFn // Checks if a header is present in the chain + hasBlock blockCheckFn // Checks if a block is present in the chain + getHeader headerRetrievalFn // Retrieves a header from the chain + getBlock blockRetrievalFn // Retrieves a block from the chain + headHeader headHeaderRetrievalFn // Retrieves the head header from the chain + headBlock headBlockRetrievalFn // Retrieves the head block from the chain + headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain + commitHeadBlock headBlockCommitterFn // Commits a manually assembled block as the chain head + getTd tdRetrievalFn // Retrieves the TD of a block from the chain + insertHeaders headerChainInsertFn // Injects a batch of headers into the chain + insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain + insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain + dropPeer peerDropFn // Drops a peer for misbehaving // Status synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing @@ -212,14 +127,16 @@ type Downloader struct { // Channels newPeerCh chan *peer - hashCh chan hashPack // [eth/61] Channel receiving inbound hashes - blockCh chan blockPack // [eth/61] Channel receiving inbound blocks - headerCh chan headerPack // [eth/62] Channel receiving inbound block headers - bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies - receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts - blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks - bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks - receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks + hashCh chan dataPack // [eth/61] Channel receiving inbound hashes + blockCh chan dataPack // [eth/61] Channel receiving inbound blocks + headerCh chan dataPack // [eth/62] Channel receiving inbound block headers + bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies + receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts + stateCh chan dataPack // [eth/63] Channel receiving inbound node state data + blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks + bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks + receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks + stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks cancelCh chan struct{} // Channel to cancel mid-flight syncs cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers @@ -232,36 +149,40 @@ type Downloader struct { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(mode SyncMode, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock blockCheckFn, getHeader headerRetrievalFn, getBlock blockRetrievalFn, - headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, headFastBlock headFastBlockRetrievalFn, getTd tdRetrievalFn, - insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader { +func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock blockCheckFn, getHeader headerRetrievalFn, + getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, headFastBlock headFastBlockRetrievalFn, + commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn, + insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader { return &Downloader{ - mode: mode, - mux: mux, - queue: newQueue(), - peers: newPeerSet(), - hasHeader: hasHeader, - hasBlock: hasBlock, - getHeader: getHeader, - getBlock: getBlock, - headHeader: headHeader, - headBlock: headBlock, - headFastBlock: headFastBlock, - getTd: getTd, - insertHeaders: insertHeaders, - insertBlocks: insertBlocks, - insertReceipts: insertReceipts, - dropPeer: dropPeer, - newPeerCh: make(chan *peer, 1), - hashCh: make(chan hashPack, 1), - blockCh: make(chan blockPack, 1), - headerCh: make(chan headerPack, 1), - bodyCh: make(chan dataPack, 1), - receiptCh: make(chan dataPack, 1), - blockWakeCh: make(chan bool, 1), - bodyWakeCh: make(chan bool, 1), - receiptWakeCh: make(chan bool, 1), + mode: mode, + mux: mux, + queue: newQueue(stateDb), + peers: newPeerSet(), + hasHeader: hasHeader, + hasBlock: hasBlock, + getHeader: getHeader, + getBlock: getBlock, + headHeader: headHeader, + headBlock: headBlock, + headFastBlock: headFastBlock, + commitHeadBlock: commitHeadBlock, + getTd: getTd, + insertHeaders: insertHeaders, + insertBlocks: insertBlocks, + insertReceipts: insertReceipts, + dropPeer: dropPeer, + newPeerCh: make(chan *peer, 1), + hashCh: make(chan dataPack, 1), + blockCh: make(chan dataPack, 1), + headerCh: make(chan dataPack, 1), + bodyCh: make(chan dataPack, 1), + receiptCh: make(chan dataPack, 1), + stateCh: make(chan dataPack, 1), + blockWakeCh: make(chan bool, 1), + bodyWakeCh: make(chan bool, 1), + receiptWakeCh: make(chan bool, 1), + stateWakeCh: make(chan bool, 1), } } @@ -272,7 +193,7 @@ func (d *Downloader) Boundaries() (uint64, uint64) { d.syncStatsLock.RLock() defer d.syncStatsLock.RUnlock() - return d.syncStatsOrigin, d.syncStatsHeight + return d.syncStatsChainOrigin, d.syncStatsChainHeight } // Synchronising returns whether the downloader is currently retrieving blocks. @@ -284,10 +205,11 @@ func (d *Downloader) Synchronising() bool { // used for fetching hashes and blocks from. func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading - getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getReceipts receiptFetcherFn) error { + getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, + getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error { glog.V(logger.Detail).Infoln("Registering peer", id) - if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts)); err != nil { + if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil { glog.V(logger.Error).Infoln("Register failed:", err) return err } @@ -357,12 +279,18 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error d.queue.Reset() d.peers.Reset() - for _, ch := range []chan bool{d.blockWakeCh, d.bodyWakeCh, d.receiptWakeCh} { + for _, ch := range []chan bool{d.blockWakeCh, d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { select { case <-ch: default: } } + // Reset and ephemeral sync statistics + d.syncStatsLock.Lock() + d.syncStatsStateTotal = 0 + d.syncStatsStateDone = 0 + d.syncStatsLock.Unlock() + // Create cancel channel for aborting mid-flight d.cancelLock.Lock() d.cancelCh = make(chan struct{}) @@ -414,17 +342,17 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e return err } d.syncStatsLock.Lock() - if d.syncStatsHeight <= origin || d.syncStatsOrigin > origin { - d.syncStatsOrigin = origin + if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin { + d.syncStatsChainOrigin = origin } - d.syncStatsHeight = latest + d.syncStatsChainHeight = latest d.syncStatsLock.Unlock() // Initiate the sync using a concurrent hash and block retrieval algorithm if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - d.queue.Prepare(origin+1, 1) + d.queue.Prepare(origin+1, d.mode, 0) errc := make(chan error, 2) go func() { errc <- d.fetchHashes61(p, td, origin+1) }() @@ -449,26 +377,27 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e return err } d.syncStatsLock.Lock() - if d.syncStatsHeight <= origin || d.syncStatsOrigin > origin { - d.syncStatsOrigin = origin + if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin { + d.syncStatsChainOrigin = origin } - d.syncStatsHeight = latest + d.syncStatsChainHeight = latest d.syncStatsLock.Unlock() // Initiate the sync using a concurrent header and content retrieval algorithm - parts := 1 - if d.mode == FastSync { - parts = 2 // receipts are fetched too + pivot := uint64(0) + if latest > uint64(minFullBlocks) { + pivot = latest - uint64(minFullBlocks) } - d.queue.Prepare(origin+1, parts) + d.queue.Prepare(origin+1, d.mode, pivot) if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - errc := make(chan error, 3) + errc := make(chan error, 4) go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved + go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal and fast sync go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync - go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal sync + go func() { errc <- d.fetchNodeData() }() // Node state data is retrieved during fast sync // If any fetcher fails, cancel the others var fail error @@ -538,14 +467,14 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) { case <-d.hashCh: // Out of bounds hashes received, ignore them - case blockPack := <-d.blockCh: + case packet := <-d.blockCh: // Discard anything not from the origin peer - if blockPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received blocks from incorrect peer(%s)", blockPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received blocks from incorrect peer(%s)", packet.PeerId()) break } // Make sure the peer actually gave something valid - blocks := blockPack.blocks + blocks := packet.(*blockPack).blocks if len(blocks) != 1 { glog.V(logger.Debug).Infof("%v: invalid number of head blocks: %d != 1", p, len(blocks)) return 0, errBadPeer @@ -584,14 +513,14 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) { case <-d.cancelCh: return 0, errCancelHashFetch - case hashPack := <-d.hashCh: + case packet := <-d.hashCh: // Discard anything not from the origin peer - if hashPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId()) break } // Make sure the peer actually gave something valid - hashes := hashPack.hashes + hashes := packet.(*hashPack).hashes if len(hashes) == 0 { glog.V(logger.Debug).Infof("%v: empty head hash set", p) return 0, errEmptyHashSet @@ -639,14 +568,14 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) { case <-d.cancelCh: return 0, errCancelHashFetch - case hashPack := <-d.hashCh: + case packet := <-d.hashCh: // Discard anything not from the origin peer - if hashPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId()) break } // Make sure the peer actually gave something valid - hashes := hashPack.hashes + hashes := packet.(*hashPack).hashes if len(hashes) != 1 { glog.V(logger.Debug).Infof("%v: invalid search hash set (%d)", p, len(hashes)) return 0, errBadPeer @@ -716,17 +645,17 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { case <-d.bodyCh: // Out of bounds eth/62 block bodies received, ignore them - case hashPack := <-d.hashCh: + case packet := <-d.hashCh: // Make sure the active peer is giving us the hashes - if hashPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId()) break } hashReqTimer.UpdateSince(request) timeout.Stop() // If no more hashes are inbound, notify the block fetcher and return - if len(hashPack.hashes) == 0 { + if packet.Items() == 0 { glog.V(logger.Debug).Infof("%v: no available hashes", p) select { @@ -751,12 +680,13 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { return nil } gotHashes = true + hashes := packet.(*hashPack).hashes // Otherwise insert all the new hashes, aborting in case of junk - glog.V(logger.Detail).Infof("%v: scheduling %d hashes from #%d", p, len(hashPack.hashes), from) + glog.V(logger.Detail).Infof("%v: scheduling %d hashes from #%d", p, len(hashes), from) - inserts := d.queue.Schedule61(hashPack.hashes, true) - if len(inserts) != len(hashPack.hashes) { + inserts := d.queue.Schedule61(hashes, true) + if len(inserts) != len(hashes) { glog.V(logger.Debug).Infof("%v: stale hashes", p) return errBadPeer } @@ -776,7 +706,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { return nil } // Queue not yet full, fetch the next batch - from += uint64(len(hashPack.hashes)) + from += uint64(len(hashes)) getHashes(from) case <-timeout.C: @@ -813,16 +743,17 @@ func (d *Downloader) fetchBlocks61(from uint64) error { case <-d.bodyCh: // Out of bounds eth/62 block bodies received, ignore them - case blockPack := <-d.blockCh: + case packet := <-d.blockCh: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. - if peer := d.peers.Peer(blockPack.peerId); peer != nil { + if peer := d.peers.Peer(packet.PeerId()); peer != nil { // Deliver the received chunk of blocks, and demote in case of errors - err := d.queue.Deliver61(blockPack.peerId, blockPack.blocks) + blocks := packet.(*blockPack).blocks + err := d.queue.DeliverBlocks(peer.id, blocks) switch err { case nil: // If no blocks were delivered, demote the peer (need the delivery above) - if len(blockPack.blocks) == 0 { + if len(blocks) == 0 { peer.Demote() peer.SetBlocksIdle() glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) @@ -831,7 +762,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { // All was successful, promote the peer and potentially start processing peer.Promote() peer.SetBlocksIdle() - glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) + glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks)) go d.process() case errInvalidChain: @@ -891,7 +822,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { return errNoPeers } // Check for block request timeouts and demote the responsible peers - for _, pid := range d.queue.Expire61(blockHardTTL) { + for _, pid := range d.queue.ExpireBlocks(blockHardTTL) { if peer := d.peers.Peer(pid); peer != nil { peer.Demote() glog.V(logger.Detail).Infof("%s: block delivery timeout", peer) @@ -907,7 +838,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { } // Send a download request to all idle peers, until throttled throttled := false - idles, total := d.peers.BlockIdlePeers(61) + idles, total := d.peers.BlockIdlePeers() for _, peer := range idles { // Short circuit if throttling activated @@ -918,7 +849,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { // Reserve a chunk of hashes for a peer. A nil can mean either that // no more hashes are available, or that the peer is known not to // have them. - request := d.queue.Reserve61(peer, peer.BlockCapacity()) + request := d.queue.ReserveBlocks(peer, peer.BlockCapacity()) if request == nil { continue } @@ -928,7 +859,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { // Fetch the chunk and make sure any errors return the hashes to the queue if err := peer.Fetch61(request); err != nil { glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer) - d.queue.Cancel61(request) + d.queue.CancelBlocks(request) } } // Make sure that we have peers available for fetching. If all peers have been tried @@ -954,14 +885,14 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) { case <-d.cancelCh: return 0, errCancelBlockFetch - case headerPack := <-d.headerCh: + case packet := <-d.headerCh: // Discard anything not from the origin peer - if headerPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId()) break } // Make sure the peer actually gave something valid - headers := headerPack.headers + headers := packet.(*headerPack).headers if len(headers) != 1 { glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers)) return 0, errBadPeer @@ -1014,14 +945,14 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { case <-d.cancelCh: return 0, errCancelHashFetch - case headerPack := <-d.headerCh: + case packet := <-d.headerCh: // Discard anything not from the origin peer - if headerPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId()) break } // Make sure the peer actually gave something valid - headers := headerPack.headers + headers := packet.(*headerPack).headers if len(headers) == 0 { glog.V(logger.Debug).Infof("%v: empty head header set", p) return 0, errEmptyHeaderSet @@ -1069,14 +1000,14 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { case <-d.cancelCh: return 0, errCancelHashFetch - case headerPack := <-d.headerCh: + case packer := <-d.headerCh: // Discard anything not from the origin peer - if headerPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId) + if packer.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packer.PeerId()) break } // Make sure the peer actually gave something valid - headers := headerPack.headers + headers := packer.(*headerPack).headers if len(headers) != 1 { glog.V(logger.Debug).Infof("%v: invalid search header set (%d)", p, len(headers)) return 0, errBadPeer @@ -1150,20 +1081,20 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { case <-d.blockCh: // Out of bounds eth/61 blocks received, ignore them - case headerPack := <-d.headerCh: + case packet := <-d.headerCh: // Make sure the active peer is giving us the headers - if headerPack.peerId != p.id { - glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", headerPack.peerId) + if packet.PeerId() != p.id { + glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", packet.PeerId()) break } headerReqTimer.UpdateSince(request) timeout.Stop() // If no more headers are inbound, notify the content fetchers and return - if len(headerPack.headers) == 0 { + if packet.Items() == 0 { glog.V(logger.Debug).Infof("%v: no available headers", p) - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { select { case ch <- false: case <-d.cancelCh: @@ -1187,26 +1118,27 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { return nil } gotHeaders = true + headers := packet.(*headerPack).headers // Otherwise insert all the new headers, aborting in case of junk - glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headerPack.headers), from) + glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from) if d.mode == FastSync || d.mode == LightSync { - if n, err := d.insertHeaders(headerPack.headers, false); err != nil { - glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headerPack.headers[n].Number, headerPack.headers[n].Hash().Bytes()[:4], err) + if n, err := d.insertHeaders(headers, false); err != nil { + glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headers[n].Number, headers[n].Hash().Bytes()[:4], err) return errInvalidChain } } if d.mode == FullSync || d.mode == FastSync { - inserts := d.queue.Schedule(headerPack.headers, from, d.mode == FastSync) - if len(inserts) != len(headerPack.headers) { + inserts := d.queue.Schedule(headers, from) + if len(inserts) != len(headers) { glog.V(logger.Debug).Infof("%v: stale headers", p) return errBadPeer } } // Notify the content fetchers of new headers, but stop if queue is full cont := d.queue.PendingBlocks() < maxQueuedHeaders || d.queue.PendingReceipts() < maxQueuedHeaders - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { if cont { // We still have headers to fetch, send continuation wake signal (potential) select { @@ -1223,7 +1155,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { } } // Queue not yet full, fetch the next batch - from += uint64(len(headerPack.headers)) + from += uint64(len(headers)) getHeaders(from) case <-timeout.C: @@ -1233,7 +1165,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { d.dropPeer(p.id) // Finish the sync gracefully instead of dumping the gathered data though - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { select { case ch <- false: case <-d.cancelCh: @@ -1251,19 +1183,19 @@ func (d *Downloader) fetchBodies(from uint64) error { glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from) var ( - deliver = func(packet interface{}) error { + deliver = func(packet dataPack) error { pack := packet.(*bodyPack) - return d.queue.DeliverBlocks(pack.peerId, pack.transactions, pack.uncles) + return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles) } - expire = func() []string { return d.queue.ExpireBlocks(bodyHardTTL) } + expire = func() []string { return d.queue.ExpireBodies(bodyHardTTL) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) } capacity = func(p *peer) int { return p.BlockCapacity() } - getIdles = func() ([]*peer, int) { return d.peers.BlockIdlePeers(62) } + getIdles = func() ([]*peer, int) { return d.peers.BodyIdlePeers() } setIdle = func(p *peer) { p.SetBlocksIdle() } ) - err := d.fetchParts(from, errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, - d.queue.PendingBlocks, d.queue.ThrottleBlocks, d.queue.ReserveBlocks, d.bodyFetchHook, - fetch, d.queue.CancelBlocks, capacity, getIdles, setIdle, "Body") + err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, + d.queue.PendingBlocks, d.queue.ThrottleBlocks, d.queue.ReserveBodies, d.bodyFetchHook, + fetch, d.queue.CancelBodies, capacity, getIdles, setIdle, "Body") glog.V(logger.Debug).Infof("Block body download terminated: %v", err) return err @@ -1276,7 +1208,7 @@ func (d *Downloader) fetchReceipts(from uint64) error { glog.V(logger.Debug).Infof("Downloading receipts from #%d", from) var ( - deliver = func(packet interface{}) error { + deliver = func(packet dataPack) error { pack := packet.(*receiptPack) return d.queue.DeliverReceipts(pack.peerId, pack.receipts) } @@ -1285,7 +1217,7 @@ func (d *Downloader) fetchReceipts(from uint64) error { capacity = func(p *peer) int { return p.ReceiptCapacity() } setIdle = func(p *peer) { p.SetReceiptsIdle() } ) - err := d.fetchParts(from, errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, + err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, d.queue.PendingReceipts, d.queue.ThrottleReceipts, d.queue.ReserveReceipts, d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt") @@ -1293,10 +1225,46 @@ func (d *Downloader) fetchReceipts(from uint64) error { return err } +// fetchNodeData iteratively downloads the scheduled state trie nodes, taking any +// available peers, reserving a chunk of nodes for each, waiting for delivery and +// also periodically checking for timeouts. +func (d *Downloader) fetchNodeData() error { + glog.V(logger.Debug).Infof("Downloading node state data") + + var ( + deliver = func(packet dataPack) error { + start := time.Now() + done, found, err := d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states) + + d.syncStatsLock.Lock() + totalDone, totalKnown := d.syncStatsStateDone+uint64(done), d.syncStatsStateTotal+uint64(found) + d.syncStatsStateDone, d.syncStatsStateTotal = totalDone, totalKnown + d.syncStatsLock.Unlock() + + glog.V(logger.Info).Infof("imported %d [%d / %d] state entries in %v.", done, totalDone, totalKnown, time.Since(start)) + return err + } + expire = func() []string { return d.queue.ExpireNodeData(stateHardTTL) } + throttle = func() bool { return false } + reserve = func(p *peer, count int) (*fetchRequest, bool, error) { + return d.queue.ReserveNodeData(p, count), false, nil + } + fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) } + capacity = func(p *peer) int { return p.NodeDataCapacity() } + setIdle = func(p *peer) { p.SetNodeDataIdle() } + ) + err := d.fetchParts(errCancelReceiptFetch, d.stateCh, deliver, d.stateWakeCh, expire, + d.queue.PendingNodeData, throttle, reserve, nil, fetch, d.queue.CancelNodeData, + capacity, d.peers.ReceiptIdlePeers, setIdle, "State") + + glog.V(logger.Debug).Infof("Node state data download terminated: %v", err) + return err +} + // fetchParts iteratively downloads scheduled block parts, taking any available // peers, reserving a chunk of fetch requests for each, waiting for delivery and // also periodically checking for timeouts. -func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan dataPack, deliver func(packet interface{}) error, wakeCh chan bool, +func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(packet dataPack) error, wakeCh chan bool, expire func() []string, pending func() int, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, idle func() ([]*peer, int), setIdle func(*peer), kind string) error { @@ -1327,7 +1295,7 @@ func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan da switch err := deliver(packet); err { case nil: // If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!) - if packet.Empty() { + if packet.Items() == 0 { peer.Demote() setIdle(peer) glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind)) @@ -1441,7 +1409,11 @@ func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan da continue } if glog.V(logger.Detail) { - glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number) + if len(request.Headers) > 0 { + glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number) + } else { + glog.Infof("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind)) + } } // Fetch the chunk and make sure any errors return the hashes to the queue if fetchHook != nil { @@ -1528,7 +1500,9 @@ func (d *Downloader) process() { blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) case d.mode == FastSync: blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) - receipts = append(receipts, result.Receipts) + if result.Header.Number.Uint64() <= d.queue.fastSyncPivot { + receipts = append(receipts, result.Receipts) + } case d.mode == LightSync: headers = append(headers, result.Header) } @@ -1539,12 +1513,16 @@ func (d *Downloader) process() { index int ) switch { - case d.mode == FullSync: - index, err = d.insertBlocks(blocks) - case d.mode == FastSync: - index, err = d.insertReceipts(blocks, receipts) - case d.mode == LightSync: + case len(headers) > 0: index, err = d.insertHeaders(headers, true) + + case len(receipts) > 0: + index, err = d.insertReceipts(blocks, receipts) + if err == nil && blocks[len(blocks)-1].NumberU64() == d.queue.fastSyncPivot { + err = d.commitHeadBlock(blocks[len(blocks)-1].Hash()) + } + default: + index, err = d.insertBlocks(blocks) } if err != nil { glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err) @@ -1557,125 +1535,47 @@ func (d *Downloader) process() { } } -// DeliverHashes61 injects a new batch of hashes received from a remote node into +// DeliverHashes injects a new batch of hashes received from a remote node into // the download schedule. This is usually invoked through the BlockHashesMsg by // the protocol handler. -func (d *Downloader) DeliverHashes61(id string, hashes []common.Hash) (err error) { - // Update the delivery metrics for both good and failed deliveries - hashInMeter.Mark(int64(len(hashes))) - defer func() { - if err != nil { - hashDropMeter.Mark(int64(len(hashes))) - } - }() - // Make sure the downloader is active - if atomic.LoadInt32(&d.synchronising) == 0 { - return errNoSyncActive - } - // Deliver or abort if the sync is canceled while queuing - d.cancelLock.RLock() - cancel := d.cancelCh - d.cancelLock.RUnlock() - - select { - case d.hashCh <- hashPack{id, hashes}: - return nil - - case <-cancel: - return errNoSyncActive - } +func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) (err error) { + return d.deliver(id, d.hashCh, &hashPack{id, hashes}, hashInMeter, hashDropMeter) } -// DeliverBlocks61 injects a new batch of blocks received from a remote node. +// DeliverBlocks injects a new batch of blocks received from a remote node. // This is usually invoked through the BlocksMsg by the protocol handler. -func (d *Downloader) DeliverBlocks61(id string, blocks []*types.Block) (err error) { - // Update the delivery metrics for both good and failed deliveries - blockInMeter.Mark(int64(len(blocks))) - defer func() { - if err != nil { - blockDropMeter.Mark(int64(len(blocks))) - } - }() - // Make sure the downloader is active - if atomic.LoadInt32(&d.synchronising) == 0 { - return errNoSyncActive - } - // Deliver or abort if the sync is canceled while queuing - d.cancelLock.RLock() - cancel := d.cancelCh - d.cancelLock.RUnlock() - - select { - case d.blockCh <- blockPack{id, blocks}: - return nil - - case <-cancel: - return errNoSyncActive - } +func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) (err error) { + return d.deliver(id, d.blockCh, &blockPack{id, blocks}, blockInMeter, blockDropMeter) } // DeliverHeaders injects a new batch of blck headers received from a remote // node into the download schedule. func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) { - // Update the delivery metrics for both good and failed deliveries - headerInMeter.Mark(int64(len(headers))) - defer func() { - if err != nil { - headerDropMeter.Mark(int64(len(headers))) - } - }() - // Make sure the downloader is active - if atomic.LoadInt32(&d.synchronising) == 0 { - return errNoSyncActive - } - // Deliver or abort if the sync is canceled while queuing - d.cancelLock.RLock() - cancel := d.cancelCh - d.cancelLock.RUnlock() - - select { - case d.headerCh <- headerPack{id, headers}: - return nil - - case <-cancel: - return errNoSyncActive - } + return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter) } // DeliverBodies injects a new batch of block bodies received from a remote node. func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) { - // Update the delivery metrics for both good and failed deliveries - bodyInMeter.Mark(int64(len(transactions))) - defer func() { - if err != nil { - bodyDropMeter.Mark(int64(len(transactions))) - } - }() - // Make sure the downloader is active - if atomic.LoadInt32(&d.synchronising) == 0 { - return errNoSyncActive - } - // Deliver or abort if the sync is canceled while queuing - d.cancelLock.RLock() - cancel := d.cancelCh - d.cancelLock.RUnlock() - - select { - case d.bodyCh <- &bodyPack{id, transactions, uncles}: - return nil - - case <-cancel: - return errNoSyncActive - } + return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter) } // DeliverReceipts injects a new batch of receipts received from a remote node. func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) { + return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter) +} + +// DeliverNodeData injects a new batch of node state data received from a remote node. +func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) { + return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter) +} + +// deliver injects a new batch of data received from a remote node. +func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) { // Update the delivery metrics for both good and failed deliveries - receiptInMeter.Mark(int64(len(receipts))) + inMeter.Mark(int64(packet.Items())) defer func() { if err != nil { - receiptDropMeter.Mark(int64(len(receipts))) + dropMeter.Mark(int64(packet.Items())) } }() // Make sure the downloader is active @@ -1688,7 +1588,7 @@ func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (er d.cancelLock.RUnlock() select { - case d.receiptCh <- &receiptPack{id, receipts}: + case destCh <- packet: return nil case <-cancel: diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 68c4ca26e..8944ae4b0 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -27,11 +27,13 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/trie" ) var ( @@ -115,6 +117,7 @@ func makeChainFork(n, f int, parent *types.Block, parentReceipts types.Receipts) // downloadTester is a test simulator for mocking out local block chain. type downloadTester struct { + stateDb ethdb.Database downloader *Downloader ownHashes []common.Hash // Hash chain belonging to the tester @@ -146,8 +149,10 @@ func newTester(mode SyncMode) *downloadTester { peerReceipts: make(map[string]map[common.Hash]types.Receipts), peerChainTds: make(map[string]map[common.Hash]*big.Int), } - tester.downloader = New(mode, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader, tester.getBlock, - tester.headHeader, tester.headBlock, tester.headFastBlock, tester.getTd, tester.insertHeaders, tester.insertBlocks, tester.insertReceipts, tester.dropPeer) + tester.stateDb, _ = ethdb.NewMemDatabase() + tester.downloader = New(mode, tester.stateDb, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader, + tester.getBlock, tester.headHeader, tester.headBlock, tester.headFastBlock, tester.commitHeadBlock, tester.getTd, + tester.insertHeaders, tester.insertBlocks, tester.insertReceipts, tester.dropPeer) return tester } @@ -213,7 +218,7 @@ func (dl *downloadTester) headHeader() *types.Header { return header } } - return nil + return genesis.Header() } // headBlock retrieves the current head block from the canonical chain. @@ -223,10 +228,12 @@ func (dl *downloadTester) headBlock() *types.Block { for i := len(dl.ownHashes) - 1; i >= 0; i-- { if block := dl.getBlock(dl.ownHashes[i]); block != nil { - return block + if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil { + return block + } } } - return nil + return genesis } // headFastBlock retrieves the current head fast-sync block from the canonical chain. @@ -236,12 +243,20 @@ func (dl *downloadTester) headFastBlock() *types.Block { for i := len(dl.ownHashes) - 1; i >= 0; i-- { if block := dl.getBlock(dl.ownHashes[i]); block != nil { - if _, ok := dl.ownReceipts[block.Hash()]; ok { - return block - } + return block } } - return nil + return genesis +} + +// commitHeadBlock manually sets the head block to a given hash. +func (dl *downloadTester) commitHeadBlock(hash common.Hash) error { + // For now only check that the state trie is correct + if block := dl.getBlock(hash); block != nil { + _, err := trie.NewSecure(block.Root(), dl.stateDb) + return err + } + return fmt.Errorf("non existent block: %x", hash[:4]) } // getTd retrieves the block's total difficulty from the canonical chain. @@ -283,6 +298,7 @@ func (dl *downloadTester) insertBlocks(blocks types.Blocks) (int, error) { dl.ownHashes = append(dl.ownHashes, block.Hash()) dl.ownHeaders[block.Hash()] = block.Header() dl.ownBlocks[block.Hash()] = block + dl.stateDb.Put(block.Root().Bytes(), []byte{}) dl.ownChainTd[block.Hash()] = dl.ownChainTd[block.ParentHash()] } return len(blocks), nil @@ -321,13 +337,13 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha var err error switch version { case 61: - err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), nil, nil, nil, nil) + err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), nil, nil, nil, nil, nil) case 62: - err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil) + err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil, nil) case 63: - err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay)) + err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay)) case 64: - err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay)) + err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay)) } if err == nil { // Assign the owned hashes, headers and blocks to the peer (deep copy) @@ -399,7 +415,7 @@ func (dl *downloadTester) peerGetRelHashesFn(id string, delay time.Duration) fun // Delay delivery a bit to allow attacks to unfold go func() { time.Sleep(time.Millisecond) - dl.downloader.DeliverHashes61(id, result) + dl.downloader.DeliverHashes(id, result) }() return nil } @@ -424,7 +440,7 @@ func (dl *downloadTester) peerGetAbsHashesFn(id string, delay time.Duration) fun // Delay delivery a bit to allow attacks to unfold go func() { time.Sleep(time.Millisecond) - dl.downloader.DeliverHashes61(id, result) + dl.downloader.DeliverHashes(id, result) }() return nil } @@ -447,7 +463,7 @@ func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([ result = append(result, block) } } - go dl.downloader.DeliverBlocks61(id, result) + go dl.downloader.DeliverBlocks(id, result) return nil } @@ -553,17 +569,54 @@ func (dl *downloadTester) peerGetReceiptsFn(id string, delay time.Duration) func } } +// peerGetNodeDataFn constructs a getNodeData method associated with a particular +// peer in the download tester. The returned function can be used to retrieve +// batches of node state data from the particularly requested peer. +func (dl *downloadTester) peerGetNodeDataFn(id string, delay time.Duration) func([]common.Hash) error { + return func(hashes []common.Hash) error { + time.Sleep(delay) + + dl.lock.RLock() + defer dl.lock.RUnlock() + + results := make([][]byte, 0, len(hashes)) + for _, hash := range hashes { + if data, err := testdb.Get(hash.Bytes()); err == nil { + results = append(results, data) + } + } + go dl.downloader.DeliverNodeData(id, results) + + return nil + } +} + // assertOwnChain checks if the local chain contains the correct number of items // of the various chain components. func assertOwnChain(t *testing.T, tester *downloadTester, length int) { - headers, blocks, receipts := length, length, length + assertOwnForkedChain(t, tester, 1, []int{length}) +} + +// assertOwnForkedChain checks if the local forked chain contains the correct +// number of items of the various chain components. +func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, lengths []int) { + // Initialize the counters for the first fork + headers, blocks, receipts := lengths[0], lengths[0], lengths[0]-minFullBlocks + if receipts < 0 { + receipts = 1 + } + // Update the counters for each subsequent fork + for _, length := range lengths[1:] { + headers += length - common + blocks += length - common + receipts += length - common - minFullBlocks + } switch tester.downloader.mode { case FullSync: receipts = 1 case LightSync: blocks, receipts = 1, 1 } - if hs := len(tester.ownHeaders); hs != headers { t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers) } @@ -573,6 +626,14 @@ func assertOwnChain(t *testing.T, tester *downloadTester, length int) { if rs := len(tester.ownReceipts); rs != receipts { t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts) } + // Verify the state trie too for fast syncs + if tester.downloader.mode == FastSync { + if index := lengths[len(lengths)-1] - minFullBlocks - 1; index > 0 { + if statedb := state.New(tester.ownHeaders[tester.ownHashes[index]].Root, tester.stateDb); statedb == nil { + t.Fatalf("state reconstruction failed") + } + } + } } // Tests that simple synchronization against a canonical chain works correctly. @@ -647,7 +708,9 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { cached = len(tester.downloader.queue.blockDonePool) if mode == FastSync { if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached { - cached = receipts + if tester.downloader.queue.resultCache[receipts].Header.Number.Uint64() < tester.downloader.queue.fastSyncPivot { + cached = receipts + } } } tester.downloader.queue.lock.RUnlock() @@ -704,7 +767,7 @@ func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) { if err := tester.sync("fork B", nil); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - assertOwnChain(t, tester, common+2*fork+1) + assertOwnForkedChain(t, tester, common+1, []int{common + fork + 1, common + fork + 1}) } // Tests that an inactive downloader will not accept incoming hashes and blocks. @@ -712,10 +775,10 @@ func TestInactiveDownloader61(t *testing.T) { tester := newTester(FullSync) // Check that neither hashes nor blocks are accepted - if err := tester.downloader.DeliverHashes61("bad peer", []common.Hash{}); err != errNoSyncActive { + if err := tester.downloader.DeliverHashes("bad peer", []common.Hash{}); err != errNoSyncActive { t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) } - if err := tester.downloader.DeliverBlocks61("bad peer", []*types.Block{}); err != errNoSyncActive { + if err := tester.downloader.DeliverBlocks("bad peer", []*types.Block{}); err != errNoSyncActive { t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) } } @@ -809,14 +872,6 @@ func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) { id := fmt.Sprintf("peer #%d", i) tester.newPeer(id, protocol, hashes[i*blockCacheLimit:], headers, blocks, receipts) } - // Synchronise with the middle peer and make sure half of the blocks were retrieved - id := fmt.Sprintf("peer #%d", targetPeers/2) - if err := tester.sync(id, nil); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } - assertOwnChain(t, tester, len(tester.peerHashes[id])) - - // Synchronise with the best peer and make sure everything is retrieved if err := tester.sync("peer #0", nil); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } @@ -870,8 +925,8 @@ func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, F func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) } func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { - // Create a small enough block chain to download - targetBlocks := blockCacheLimit - 15 + // Create a block chain to download + targetBlocks := 2*blockCacheLimit - 15 hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) tester := newTester(mode) @@ -898,8 +953,8 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { bodiesNeeded++ } } - for _, receipt := range receipts { - if mode == FastSync && len(receipt) > 0 { + for hash, receipt := range receipts { + if mode == FastSync && len(receipt) > 0 && headers[hash].Number.Uint64() <= uint64(targetBlocks-minFullBlocks) { receiptsNeeded++ } } diff --git a/eth/downloader/metrics.go b/eth/downloader/metrics.go index 92acb6ba8..d6fcfa25c 100644 --- a/eth/downloader/metrics.go +++ b/eth/downloader/metrics.go @@ -47,4 +47,9 @@ var ( receiptReqTimer = metrics.NewTimer("eth/downloader/receipts/req") receiptDropMeter = metrics.NewMeter("eth/downloader/receipts/drop") receiptTimeoutMeter = metrics.NewMeter("eth/downloader/receipts/timeout") + + stateInMeter = metrics.NewMeter("eth/downloader/states/in") + stateReqTimer = metrics.NewTimer("eth/downloader/states/req") + stateDropMeter = metrics.NewMeter("eth/downloader/states/drop") + stateTimeoutMeter = metrics.NewMeter("eth/downloader/states/timeout") ) diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 5fc0db587..5011d5d46 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -41,6 +41,7 @@ type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error type absoluteHeaderFetcherFn func(uint64, int, int, bool) error type blockBodyFetcherFn func([]common.Hash) error type receiptFetcherFn func([]common.Hash) error +type stateFetcherFn func([]common.Hash) error var ( errAlreadyFetching = errors.New("already fetching blocks from peer") @@ -55,12 +56,16 @@ type peer struct { blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1) receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1) + stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1) rep int32 // Simple peer reputation - blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request - receiptCapacity int32 // Number of receipts allowed to fetch per request - blockStarted time.Time // Time instance when the last block (body)fetch was started - receiptStarted time.Time // Time instance when the last receipt fetch was started + blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request + receiptCapacity int32 // Number of receipts allowed to fetch per request + stateCapacity int32 // Number of node data pieces allowed to fetch per request + + blockStarted time.Time // Time instance when the last block (body)fetch was started + receiptStarted time.Time // Time instance when the last receipt fetch was started + stateStarted time.Time // Time instance when the last node data fetch was started ignored *set.Set // Set of hashes not to request (didn't have previously) @@ -73,6 +78,7 @@ type peer struct { getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies getReceipts receiptFetcherFn // [eth/63] Method to retrieve a batch of block transaction receipts + getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data version int // Eth protocol version number to switch strategies } @@ -82,12 +88,13 @@ type peer struct { func newPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, - getReceipts receiptFetcherFn) *peer { + getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer { return &peer{ id: id, head: head, blockCapacity: 1, receiptCapacity: 1, + stateCapacity: 1, ignored: set.New(), getRelHashes: getRelHashes, @@ -99,6 +106,7 @@ func newPeer(id string, version int, head common.Hash, getBlockBodies: getBlockBodies, getReceipts: getReceipts, + getNodeData: getNodeData, version: version, } @@ -110,6 +118,7 @@ func (p *peer) Reset() { atomic.StoreInt32(&p.receiptIdle, 0) atomic.StoreInt32(&p.blockCapacity, 1) atomic.StoreInt32(&p.receiptCapacity, 1) + atomic.StoreInt32(&p.stateCapacity, 1) p.ignored.Clear() } @@ -167,6 +176,24 @@ func (p *peer) FetchReceipts(request *fetchRequest) error { return nil } +// FetchNodeData sends a node state data retrieval request to the remote peer. +func (p *peer) FetchNodeData(request *fetchRequest) error { + // Short circuit if the peer is already fetching + if !atomic.CompareAndSwapInt32(&p.stateIdle, 0, 1) { + return errAlreadyFetching + } + p.stateStarted = time.Now() + + // Convert the hash set to a retrievable slice + hashes := make([]common.Hash, 0, len(request.Hashes)) + for hash, _ := range request.Hashes { + hashes = append(hashes, hash) + } + go p.getNodeData(hashes) + + return nil +} + // SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests. // Its block retrieval allowance will also be updated either up- or downwards, // depending on whether the previous fetch completed in time or not. @@ -188,6 +215,13 @@ func (p *peer) SetReceiptsIdle() { p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle) } +// SetNodeDataIdle sets the peer to idle, allowing it to execute new retrieval +// requests. Its node data retrieval allowance will also be updated either up- or +// downwards, depending on whether the previous fetch completed in time or not. +func (p *peer) SetNodeDataIdle() { + p.setIdle(p.stateStarted, stateSoftTTL, stateSoftTTL, MaxStateFetch, &p.stateCapacity, &p.stateIdle) +} + // setIdle sets the peer to idle, allowing it to execute new retrieval requests. // Its data retrieval allowance will also be updated either up- or downwards, // depending on whether the previous fetch completed in time or not. @@ -230,6 +264,12 @@ func (p *peer) ReceiptCapacity() int { return int(atomic.LoadInt32(&p.receiptCapacity)) } +// NodeDataCapacity retrieves the peers block download allowance based on its +// previously discovered bandwidth capacity. +func (p *peer) NodeDataCapacity() int { + return int(atomic.LoadInt32(&p.stateCapacity)) +} + // Promote increases the peer's reputation. func (p *peer) Promote() { atomic.AddInt32(&p.rep, 1) @@ -340,39 +380,50 @@ func (ps *peerSet) AllPeers() []*peer { // BlockIdlePeers retrieves a flat list of all the currently idle peers within the // active peer set, ordered by their reputation. -func (ps *peerSet) BlockIdlePeers(version int) ([]*peer, int) { - ps.lock.RLock() - defer ps.lock.RUnlock() - - idle, total := make([]*peer, 0, len(ps.peers)), 0 - for _, p := range ps.peers { - if (version == 61 && p.version == 61) || (version >= 62 && p.version >= 62) { - if atomic.LoadInt32(&p.blockIdle) == 0 { - idle = append(idle, p) - } - total++ - } +func (ps *peerSet) BlockIdlePeers() ([]*peer, int) { + idle := func(p *peer) bool { + return atomic.LoadInt32(&p.blockIdle) == 0 } - for i := 0; i < len(idle); i++ { - for j := i + 1; j < len(idle); j++ { - if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) { - idle[i], idle[j] = idle[j], idle[i] - } - } - } - return idle, total + return ps.idlePeers(61, 61, idle) } -// ReceiptIdlePeers retrieves a flat list of all the currently idle peers within the -// active peer set, ordered by their reputation. +// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within +// the active peer set, ordered by their reputation. +func (ps *peerSet) BodyIdlePeers() ([]*peer, int) { + idle := func(p *peer) bool { + return atomic.LoadInt32(&p.blockIdle) == 0 + } + return ps.idlePeers(62, 64, idle) +} + +// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers +// within the active peer set, ordered by their reputation. func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) { + idle := func(p *peer) bool { + return atomic.LoadInt32(&p.receiptIdle) == 0 + } + return ps.idlePeers(63, 64, idle) +} + +// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle +// peers within the active peer set, ordered by their reputation. +func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) { + idle := func(p *peer) bool { + return atomic.LoadInt32(&p.stateIdle) == 0 + } + return ps.idlePeers(63, 64, idle) +} + +// idlePeers retrieves a flat list of all currently idle peers satisfying the +// protocol version constraints, using the provided function to check idleness. +func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool) ([]*peer, int) { ps.lock.RLock() defer ps.lock.RUnlock() idle, total := make([]*peer, 0, len(ps.peers)), 0 for _, p := range ps.peers { - if p.version >= 63 { - if atomic.LoadInt32(&p.receiptIdle) == 0 { + if p.version >= minProtocol && p.version <= maxProtocol { + if idleCheck(p) { idle = append(idle, p) } total++ diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index c53ad939e..942ed0d63 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -26,9 +26,13 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/trie" "github.com/rcrowley/go-metrics" "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) @@ -39,13 +43,14 @@ var ( var ( errNoFetchesPending = errors.New("no fetches pending") + errStateSyncPending = errors.New("state trie sync already scheduled") errStaleDelivery = errors.New("stale delivery") ) // fetchRequest is a currently running data retrieval operation. type fetchRequest struct { Peer *peer // Peer to which the request was sent - Hashes map[common.Hash]int // [eth/61] Requested block with their insertion index (priority) + Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority) Headers []*types.Header // [eth/62] Requested headers, sorted by request order Time time.Time // Time when the request was made } @@ -64,6 +69,9 @@ type fetchResult struct { // queue represents hashes that are either need fetching or are being fetched type queue struct { + mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching + fastSyncPivot uint64 // Block number where the fast sync pivots into archive synchronisation mode + hashPool map[common.Hash]int // [eth/61] Pending hashes, mapping to their insertion index (priority) hashQueue *prque.Prque // [eth/61] Priority queue of the block hashes to fetch hashCounter int // [eth/61] Counter indexing the added hashes to ensure retrieval order @@ -80,15 +88,22 @@ type queue struct { receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches + stateTaskIndex int // [eth/63] Counter indexing the added hashes to ensure prioritized retrieval order + stateTaskPool map[common.Hash]int // [eth/63] Pending node data retrieval tasks, mapping to their priority + stateTaskQueue *prque.Prque // [eth/63] Priority queue of the hashes to fetch the node data for + statePendPool map[string]*fetchRequest // [eth/63] Currently pending node data retrieval operations + + stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly + stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator + resultCache []*fetchResult // Downloaded but not yet delivered fetch results resultOffset uint64 // Offset of the first cached fetch result in the block-chain - resultParts int // Number of fetch components required to complete an item lock sync.RWMutex } // newQueue creates a new download queue for scheduling block retrieval. -func newQueue() *queue { +func newQueue(stateDb ethdb.Database) *queue { return &queue{ hashPool: make(map[common.Hash]int), hashQueue: prque.New(), @@ -100,6 +115,10 @@ func newQueue() *queue { receiptTaskQueue: prque.New(), receiptPendPool: make(map[string]*fetchRequest), receiptDonePool: make(map[common.Hash]struct{}), + stateTaskPool: make(map[common.Hash]int), + stateTaskQueue: prque.New(), + statePendPool: make(map[string]*fetchRequest), + stateDatabase: stateDb, resultCache: make([]*fetchResult, blockCacheLimit), } } @@ -109,6 +128,9 @@ func (q *queue) Reset() { q.lock.Lock() defer q.lock.Unlock() + q.mode = FullSync + q.fastSyncPivot = 0 + q.hashPool = make(map[common.Hash]int) q.hashQueue.Reset() q.hashCounter = 0 @@ -125,9 +147,14 @@ func (q *queue) Reset() { q.receiptPendPool = make(map[string]*fetchRequest) q.receiptDonePool = make(map[common.Hash]struct{}) + q.stateTaskIndex = 0 + q.stateTaskPool = make(map[common.Hash]int) + q.stateTaskQueue.Reset() + q.statePendPool = make(map[string]*fetchRequest) + q.stateScheduler = nil + q.resultCache = make([]*fetchResult, blockCacheLimit) q.resultOffset = 0 - q.resultParts = 0 } // PendingBlocks retrieves the number of block (body) requests pending for retrieval. @@ -146,12 +173,20 @@ func (q *queue) PendingReceipts() int { return q.receiptTaskQueue.Size() } +// PendingNodeData retrieves the number of node data entries pending for retrieval. +func (q *queue) PendingNodeData() int { + q.lock.RLock() + defer q.lock.RUnlock() + + return q.stateTaskQueue.Size() +} + // InFlight retrieves the number of fetch requests currently in flight. func (q *queue) InFlight() int { q.lock.RLock() defer q.lock.RUnlock() - return len(q.blockPendPool) + len(q.receiptPendPool) + return len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) } // Idle returns if the queue is fully idle or has some data still inside. This @@ -160,8 +195,8 @@ func (q *queue) Idle() bool { q.lock.RLock() defer q.lock.RUnlock() - queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() - pending := len(q.blockPendPool) + len(q.receiptPendPool) + queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size() + pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) cached := len(q.blockDonePool) + len(q.receiptDonePool) return (queued + pending + cached) == 0 @@ -227,7 +262,7 @@ func (q *queue) Schedule61(hashes []common.Hash, fifo bool) []common.Hash { // Schedule adds a set of headers for the download queue for scheduling, returning // the new headers encountered. -func (q *queue) Schedule(headers []*types.Header, from uint64, receipts bool) []*types.Header { +func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { q.lock.Lock() defer q.lock.Unlock() @@ -256,10 +291,21 @@ func (q *queue) Schedule(headers []*types.Header, from uint64, receipts bool) [] // Queue the header for content retrieval q.blockTaskPool[hash] = header q.blockTaskQueue.Push(header, -float32(header.Number.Uint64())) - if receipts { + + if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot { + // Fast phase of the fast sync, retrieve receipts too q.receiptTaskPool[hash] = header q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64())) } + if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot { + // Pivoting point of the fast sync, retrieve the state tries + q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase) + for _, hash := range q.stateScheduler.Missing(0) { + q.stateTaskPool[hash] = q.stateTaskIndex + q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex)) + q.stateTaskIndex++ + } + } inserts = append(inserts, header) q.headerHead = hash from++ @@ -279,6 +325,9 @@ func (q *queue) GetHeadResult() *fetchResult { if q.resultCache[0].Pending > 0 { return nil } + if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot && len(q.stateTaskPool) > 0 { + return nil + } return q.resultCache[0] } @@ -291,9 +340,18 @@ func (q *queue) TakeResults() []*fetchResult { // Accumulate all available results results := []*fetchResult{} for _, result := range q.resultCache { + // Stop if no more results are ready if result == nil || result.Pending > 0 { break } + // The fast sync pivot block may only be processed after state fetch completes + if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot && len(q.stateTaskPool) > 0 { + break + } + // If we've just inserted the fast sync pivot, stop as the following batch needs different insertion + if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 { + break + } results = append(results, result) hash := result.Header.Hash() @@ -312,31 +370,45 @@ func (q *queue) TakeResults() []*fetchResult { return results } -// Reserve61 reserves a set of hashes for the given peer, skipping any previously -// failed download. -func (q *queue) Reserve61(p *peer, count int) *fetchRequest { +// ReserveBlocks reserves a set of block hashes for the given peer, skipping any +// previously failed download. +func (q *queue) ReserveBlocks(p *peer, count int) *fetchRequest { + return q.reserveHashes(p, count, q.hashQueue, q.blockPendPool, len(q.resultCache)-len(q.blockDonePool)) +} + +// ReserveNodeData reserves a set of node data hashes for the given peer, skipping +// any previously failed download. +func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest { + return q.reserveHashes(p, count, q.stateTaskQueue, q.statePendPool, 0) +} + +// reserveHashes reserves a set of hashes for the given peer, skipping previously +// failed ones. +func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, pendPool map[string]*fetchRequest, maxPending int) *fetchRequest { q.lock.Lock() defer q.lock.Unlock() // Short circuit if the pool has been depleted, or if the peer's already // downloading something (sanity check not to corrupt state) - if q.hashQueue.Empty() { + if taskQueue.Empty() { return nil } - if _, ok := q.blockPendPool[p.id]; ok { + if _, ok := pendPool[p.id]; ok { return nil } // Calculate an upper limit on the hashes we might fetch (i.e. throttling) - space := len(q.resultCache) - len(q.blockDonePool) - for _, request := range q.blockPendPool { - space -= len(request.Hashes) + allowance := maxPending + if allowance > 0 { + for _, request := range pendPool { + allowance -= len(request.Hashes) + } } // Retrieve a batch of hashes, skipping previously failed ones send := make(map[common.Hash]int) skip := make(map[common.Hash]int) - for proc := 0; proc < space && len(send) < count && !q.hashQueue.Empty(); proc++ { - hash, priority := q.hashQueue.Pop() + for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ { + hash, priority := taskQueue.Pop() if p.ignored.Has(hash) { skip[hash.(common.Hash)] = int(priority) } else { @@ -345,7 +417,7 @@ func (q *queue) Reserve61(p *peer, count int) *fetchRequest { } // Merge all the skipped hashes back for hash, index := range skip { - q.hashQueue.Push(hash, float32(index)) + taskQueue.Push(hash, float32(index)) } // Assemble and return the block download request if len(send) == 0 { @@ -356,19 +428,19 @@ func (q *queue) Reserve61(p *peer, count int) *fetchRequest { Hashes: send, Time: time.Now(), } - q.blockPendPool[p.id] = request + pendPool[p.id] = request return request } -// ReserveBlocks reserves a set of body fetches for the given peer, skipping any +// ReserveBodies reserves a set of body fetches for the given peer, skipping any // previously failed downloads. Beside the next batch of needed fetches, it also // returns a flag whether empty blocks were queued requiring processing. -func (q *queue) ReserveBlocks(p *peer, count int) (*fetchRequest, bool, error) { +func (q *queue) ReserveBodies(p *peer, count int) (*fetchRequest, bool, error) { noop := func(header *types.Header) bool { return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash } - return q.reserveFetch(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, noop) + return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, noop) } // ReserveReceipts reserves a set of receipt fetches for the given peer, skipping @@ -378,13 +450,13 @@ func (q *queue) ReserveReceipts(p *peer, count int) (*fetchRequest, bool, error) noop := func(header *types.Header) bool { return header.ReceiptHash == types.EmptyRootHash } - return q.reserveFetch(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, noop) + return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, noop) } -// reserveFetch reserves a set of data download operations for a given peer, +// reserveHeaders reserves a set of data download operations for a given peer, // skipping any previously failed ones. This method is a generic version used // by the individual special reservation functions. -func (q *queue) reserveFetch(p *peer, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, +func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, noop func(*types.Header) bool) (*fetchRequest, bool, error) { q.lock.Lock() defer q.lock.Unlock() @@ -416,8 +488,12 @@ func (q *queue) reserveFetch(p *peer, count int, taskPool map[common.Hash]*types return nil, false, errInvalidChain } if q.resultCache[index] == nil { + components := 1 + if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot { + components = 2 + } q.resultCache[index] = &fetchResult{ - Pending: q.resultParts, + Pending: components, Header: header, } } @@ -456,30 +532,36 @@ func (q *queue) reserveFetch(p *peer, count int, taskPool map[common.Hash]*types return request, progress, nil } -// Cancel61 aborts a fetch request, returning all pending hashes to the queue. -func (q *queue) Cancel61(request *fetchRequest) { - q.cancel(request, nil, q.blockPendPool) +// CancelBlocks aborts a fetch request, returning all pending hashes to the queue. +func (q *queue) CancelBlocks(request *fetchRequest) { + q.cancel(request, q.hashQueue, q.blockPendPool) } -// CancelBlocks aborts a body fetch request, returning all pending hashes to the +// CancelBodies aborts a body fetch request, returning all pending headers to the // task queue. -func (q *queue) CancelBlocks(request *fetchRequest) { +func (q *queue) CancelBodies(request *fetchRequest) { q.cancel(request, q.blockTaskQueue, q.blockPendPool) } -// CancelReceipts aborts a body fetch request, returning all pending hashes to +// CancelReceipts aborts a body fetch request, returning all pending headers to // the task queue. func (q *queue) CancelReceipts(request *fetchRequest) { q.cancel(request, q.receiptTaskQueue, q.receiptPendPool) } +// CancelNodeData aborts a node state data fetch request, returning all pending +// hashes to the task queue. +func (q *queue) CancelNodeData(request *fetchRequest) { + q.cancel(request, q.stateTaskQueue, q.statePendPool) +} + // Cancel aborts a fetch request, returning all pending hashes to the task queue. func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) { q.lock.Lock() defer q.lock.Unlock() for hash, index := range request.Hashes { - q.hashQueue.Push(hash, float32(index)) + taskQueue.Push(hash, float32(index)) } for _, header := range request.Headers { taskQueue.Push(header, -float32(header.Number.Uint64())) @@ -509,29 +591,41 @@ func (q *queue) Revoke(peerId string) { } delete(q.receiptPendPool, peerId) } + if request, ok := q.statePendPool[peerId]; ok { + for hash, index := range request.Hashes { + q.stateTaskQueue.Push(hash, float32(index)) + } + delete(q.statePendPool, peerId) + } } -// Expire61 checks for in flight requests that exceeded a timeout allowance, +// ExpireBlocks checks for in flight requests that exceeded a timeout allowance, // canceling them and returning the responsible peers for penalization. -func (q *queue) Expire61(timeout time.Duration) []string { - return q.expire(timeout, q.blockPendPool, nil) +func (q *queue) ExpireBlocks(timeout time.Duration) []string { + return q.expire(timeout, q.blockPendPool, q.hashQueue, blockTimeoutMeter) } -// ExpireBlocks checks for in flight block body requests that exceeded a timeout +// ExpireBodies checks for in flight block body requests that exceeded a timeout // allowance, canceling them and returning the responsible peers for penalization. -func (q *queue) ExpireBlocks(timeout time.Duration) []string { - return q.expire(timeout, q.blockPendPool, q.blockTaskQueue) +func (q *queue) ExpireBodies(timeout time.Duration) []string { + return q.expire(timeout, q.blockPendPool, q.blockTaskQueue, bodyTimeoutMeter) } // ExpireReceipts checks for in flight receipt requests that exceeded a timeout // allowance, canceling them and returning the responsible peers for penalization. func (q *queue) ExpireReceipts(timeout time.Duration) []string { - return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue) + return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter) +} + +// ExpireNodeData checks for in flight node data requests that exceeded a timeout +// allowance, canceling them and returning the responsible peers for penalization. +func (q *queue) ExpireNodeData(timeout time.Duration) []string { + return q.expire(timeout, q.statePendPool, q.stateTaskQueue, stateTimeoutMeter) } // expire is the generic check that move expired tasks from a pending pool back // into a task pool, returning all entities caught with expired tasks. -func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque) []string { +func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string { q.lock.Lock() defer q.lock.Unlock() @@ -540,14 +634,11 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, for id, request := range pendPool { if time.Since(request.Time) > timeout { // Update the metrics with the timeout - if len(request.Hashes) > 0 { - blockTimeoutMeter.Mark(1) - } else { - bodyTimeoutMeter.Mark(1) - } + timeoutMeter.Mark(1) + // Return any non satisfied requests to the pool for hash, index := range request.Hashes { - q.hashQueue.Push(hash, float32(index)) + taskQueue.Push(hash, float32(index)) } for _, header := range request.Headers { taskQueue.Push(header, -float32(header.Number.Uint64())) @@ -562,8 +653,8 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, return peers } -// Deliver61 injects a block retrieval response into the download queue. -func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) { +// DeliverBlocks injects a block retrieval response into the download queue. +func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { q.lock.Lock() defer q.lock.Unlock() @@ -626,8 +717,8 @@ func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) { } } -// DeliverBlocks injects a block (body) retrieval response into the results queue. -func (q *queue) DeliverBlocks(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error { +// DeliverBodies injects a block body retrieval response into the results queue. +func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error { reconstruct := func(header *types.Header, index int, result *fetchResult) error { if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash { return errInvalidBody @@ -717,14 +808,84 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ } } +// DeliverNodeData injects a node state data retrieval response into the queue. +func (q *queue) DeliverNodeData(id string, data [][]byte) (int, int, error) { + q.lock.Lock() + defer q.lock.Unlock() + + // Short circuit if the data was never requested + request := q.statePendPool[id] + if request == nil { + return 0, 0, errNoFetchesPending + } + stateReqTimer.UpdateSince(request.Time) + delete(q.statePendPool, id) + + // If no data was retrieved, mark them as unavailable for the origin peer + if len(data) == 0 { + for hash, _ := range request.Hashes { + request.Peer.ignored.Add(hash) + } + } + // Iterate over the downloaded data and verify each of them + errs := make([]error, 0) + processed := 0 + for _, blob := range data { + // Skip any blocks that were not requested + hash := common.BytesToHash(crypto.Sha3(blob)) + if _, ok := request.Hashes[hash]; !ok { + errs = append(errs, fmt.Errorf("non-requested state data %x", hash)) + continue + } + // Inject the next state trie item into the database + if err := q.stateScheduler.Process([]trie.SyncResult{{hash, blob}}); err != nil { + errs = []error{err} + break + } + processed++ + + delete(request.Hashes, hash) + delete(q.stateTaskPool, hash) + } + // Return all failed or missing fetches to the queue + for hash, index := range request.Hashes { + q.stateTaskQueue.Push(hash, float32(index)) + } + // Also enqueue any newly required state trie nodes + discovered := 0 + if len(q.stateTaskPool) < maxQueuedStates { + for _, hash := range q.stateScheduler.Missing(4 * MaxStateFetch) { + q.stateTaskPool[hash] = q.stateTaskIndex + q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex)) + q.stateTaskIndex++ + discovered++ + } + } + // If none of the data items were good, it's a stale delivery + switch { + case len(errs) == 0: + return processed, discovered, nil + + case len(errs) == len(request.Hashes): + return processed, discovered, errStaleDelivery + + default: + return processed, discovered, fmt.Errorf("multiple failures: %v", errs) + } +} + // Prepare configures the result cache to allow accepting and caching inbound // fetch results. -func (q *queue) Prepare(offset uint64, parts int) { +func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64) { q.lock.Lock() defer q.lock.Unlock() if q.resultOffset < offset { q.resultOffset = offset } - q.resultParts = parts + q.fastSyncPivot = 0 + if mode == FastSync { + q.fastSyncPivot = pivot + } + q.mode = mode } diff --git a/eth/downloader/types.go b/eth/downloader/types.go new file mode 100644 index 000000000..221ef38f6 --- /dev/null +++ b/eth/downloader/types.go @@ -0,0 +1,137 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package downloader + +import ( + "fmt" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// headerCheckFn is a callback type for verifying a header's presence in the local chain. +type headerCheckFn func(common.Hash) bool + +// blockCheckFn is a callback type for verifying a block's presence in the local chain. +type blockCheckFn func(common.Hash) bool + +// headerRetrievalFn is a callback type for retrieving a header from the local chain. +type headerRetrievalFn func(common.Hash) *types.Header + +// blockRetrievalFn is a callback type for retrieving a block from the local chain. +type blockRetrievalFn func(common.Hash) *types.Block + +// headHeaderRetrievalFn is a callback type for retrieving the head header from the local chain. +type headHeaderRetrievalFn func() *types.Header + +// headBlockRetrievalFn is a callback type for retrieving the head block from the local chain. +type headBlockRetrievalFn func() *types.Block + +// headFastBlockRetrievalFn is a callback type for retrieving the head fast block from the local chain. +type headFastBlockRetrievalFn func() *types.Block + +// headBlockCommitterFn is a callback for directly committing the head block to a certain entity. +type headBlockCommitterFn func(common.Hash) error + +// tdRetrievalFn is a callback type for retrieving the total difficulty of a local block. +type tdRetrievalFn func(common.Hash) *big.Int + +// headerChainInsertFn is a callback type to insert a batch of headers into the local chain. +type headerChainInsertFn func([]*types.Header, bool) (int, error) + +// blockChainInsertFn is a callback type to insert a batch of blocks into the local chain. +type blockChainInsertFn func(types.Blocks) (int, error) + +// receiptChainInsertFn is a callback type to insert a batch of receipts into the local chain. +type receiptChainInsertFn func(types.Blocks, []types.Receipts) (int, error) + +// peerDropFn is a callback type for dropping a peer detected as malicious. +type peerDropFn func(id string) + +// dataPack is a data message returned by a peer for some query. +type dataPack interface { + PeerId() string + Items() int + Stats() string +} + +// hashPack is a batch of block hashes returned by a peer (eth/61). +type hashPack struct { + peerId string + hashes []common.Hash +} + +func (p *hashPack) PeerId() string { return p.peerId } +func (p *hashPack) Items() int { return len(p.hashes) } +func (p *hashPack) Stats() string { return fmt.Sprintf("%d", len(p.hashes)) } + +// blockPack is a batch of blocks returned by a peer (eth/61). +type blockPack struct { + peerId string + blocks []*types.Block +} + +func (p *blockPack) PeerId() string { return p.peerId } +func (p *blockPack) Items() int { return len(p.blocks) } +func (p *blockPack) Stats() string { return fmt.Sprintf("%d", len(p.blocks)) } + +// headerPack is a batch of block headers returned by a peer. +type headerPack struct { + peerId string + headers []*types.Header +} + +func (p *headerPack) PeerId() string { return p.peerId } +func (p *headerPack) Items() int { return len(p.headers) } +func (p *headerPack) Stats() string { return fmt.Sprintf("%d", len(p.headers)) } + +// bodyPack is a batch of block bodies returned by a peer. +type bodyPack struct { + peerId string + transactions [][]*types.Transaction + uncles [][]*types.Header +} + +func (p *bodyPack) PeerId() string { return p.peerId } +func (p *bodyPack) Items() int { + if len(p.transactions) <= len(p.uncles) { + return len(p.transactions) + } + return len(p.uncles) +} +func (p *bodyPack) Stats() string { return fmt.Sprintf("%d:%d", len(p.transactions), len(p.uncles)) } + +// receiptPack is a batch of receipts returned by a peer. +type receiptPack struct { + peerId string + receipts [][]*types.Receipt +} + +func (p *receiptPack) PeerId() string { return p.peerId } +func (p *receiptPack) Items() int { return len(p.receipts) } +func (p *receiptPack) Stats() string { return fmt.Sprintf("%d", len(p.receipts)) } + +// statePack is a batch of states returned by a peer. +type statePack struct { + peerId string + states [][]byte +} + +func (p *statePack) PeerId() string { return p.peerId } +func (p *statePack) Items() int { return len(p.states) } +func (p *statePack) Stats() string { return fmt.Sprintf("%d", len(p.states)) } diff --git a/eth/handler.go b/eth/handler.go index 1117cb1b7..b0916d50b 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -129,9 +129,9 @@ func NewProtocolManager(mode Mode, networkId int, mux *event.TypeMux, txpool txP case LightMode: syncMode = downloader.LightSync } - manager.downloader = downloader.New(syncMode, manager.eventMux, blockchain.HasHeader, blockchain.HasBlock, blockchain.GetHeader, - blockchain.GetBlock, blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.GetTd, - blockchain.InsertHeaderChain, blockchain.InsertChain, blockchain.InsertReceiptChain, manager.removePeer) + manager.downloader = downloader.New(syncMode, chaindb, manager.eventMux, blockchain.HasHeader, blockchain.HasBlock, blockchain.GetHeader, + blockchain.GetBlock, blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.FastSyncCommitHead, + blockchain.GetTd, blockchain.InsertHeaderChain, blockchain.InsertChain, blockchain.InsertReceiptChain, manager.removePeer) validator := func(block *types.Block, parent *types.Block) error { return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false) @@ -220,8 +220,8 @@ func (pm *ProtocolManager) handle(p *peer) error { // Register the peer in the downloader. If the downloader considers it banned, we disconnect if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), - p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks, - p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts); err != nil { + p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks, p.RequestHeadersByHash, + p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts, p.RequestNodeData); err != nil { return err } // Propagate existing transactions. new transactions appearing @@ -307,7 +307,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { break } // Deliver them all to the downloader for queuing - err := pm.downloader.DeliverHashes61(p.id, hashes) + err := pm.downloader.DeliverHashes(p.id, hashes) if err != nil { glog.V(logger.Debug).Infoln(err) } @@ -353,7 +353,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } // Filter out any explicitly requested blocks, deliver the rest to the downloader if blocks := pm.fetcher.FilterBlocks(blocks); len(blocks) > 0 { - pm.downloader.DeliverBlocks61(p.id, blocks) + pm.downloader.DeliverBlocks(p.id, blocks) } // Block header query, collect the requested headers and reply @@ -515,6 +515,17 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } return p.SendNodeData(data) + case p.version >= eth63 && msg.Code == NodeDataMsg: + // A batch of node state data arrived to one of our previous requests + var data [][]byte + if err := msg.Decode(&data); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Deliver all to the downloader + if err := pm.downloader.DeliverNodeData(p.id, data); err != nil { + glog.V(logger.Debug).Infof("failed to deliver node state data: %v", err) + } + case p.version >= eth63 && msg.Code == GetReceiptsMsg: // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) diff --git a/eth/peer.go b/eth/peer.go index e24be97f1..68ce903a6 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -191,7 +191,7 @@ func (p *peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error { return p2p.Send(p.rw, BlockBodiesMsg, bodies) } -// SendNodeData sends a batch of arbitrary internal data, corresponding to the +// SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the // hashes requested. func (p *peer) SendNodeData(data [][]byte) error { return p2p.Send(p.rw, NodeDataMsg, data) diff --git a/trie/sync.go b/trie/sync.go new file mode 100644 index 000000000..65cfd6ed8 --- /dev/null +++ b/trie/sync.go @@ -0,0 +1,233 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package trie + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/common" + "gopkg.in/karalabe/cookiejar.v2/collections/prque" +) + +// request represents a scheduled or already in-flight state retrieval request. +type request struct { + hash common.Hash // Hash of the node data content to retrieve + data []byte // Data content of the node, cached until all subtrees complete + object *node // Target node to populate with retrieved data (hashnode originally) + + parents []*request // Parent state nodes referencing this entry (notify all upon completion) + depth int // Depth level within the trie the node is located to prioritize DFS + deps int // Number of dependencies before allowed to commit this node + + callback TrieSyncLeafCallback // Callback to invoke if a leaf node it reached on this branch +} + +// SyncResult is a simple list to return missing nodes along with their request +// hashes. +type SyncResult struct { + Hash common.Hash // Hash of the originally unknown trie node + Data []byte // Data content of the retrieved node +} + +// TrieSyncLeafCallback is a callback type invoked when a trie sync reaches a +// leaf node. It's used by state syncing to check if the leaf node requires some +// further data syncing. +type TrieSyncLeafCallback func(leaf []byte, parent common.Hash) error + +// TrieSync is the main state trie synchronisation scheduler, which provides yet +// unknown trie hashes to retrieve, accepts node data associated with said hashes +// and reconstructs the trie steb by step until all is done. +type TrieSync struct { + database Database // State database for storing all the assembled node data + requests map[common.Hash]*request // Pending requests pertaining to a key hash + queue *prque.Prque // Priority queue with the pending requests +} + +// NewTrieSync creates a new trie data download scheduler. +func NewTrieSync(root common.Hash, database Database, callback TrieSyncLeafCallback) *TrieSync { + ts := &TrieSync{ + database: database, + requests: make(map[common.Hash]*request), + queue: prque.New(), + } + ts.AddSubTrie(root, 0, common.Hash{}, callback) + return ts +} + +// AddSubTrie registers a new trie to the sync code, rooted at the designated parent. +func (s *TrieSync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callback TrieSyncLeafCallback) { + // Short circuit if the trie is empty + if root == emptyRoot { + return + } + // Assemble the new sub-trie sync request + node := node(hashNode(root.Bytes())) + req := &request{ + object: &node, + hash: root, + depth: depth, + callback: callback, + } + // If this sub-trie has a designated parent, link them together + if parent != (common.Hash{}) { + ancestor := s.requests[parent] + if ancestor == nil { + panic(fmt.Sprintf("sub-trie ancestor not found: %x", parent)) + } + ancestor.deps++ + req.parents = append(req.parents, ancestor) + } + s.schedule(req) +} + +// Missing retrieves the known missing nodes from the trie for retrieval. +func (s *TrieSync) Missing(max int) []common.Hash { + requests := []common.Hash{} + for !s.queue.Empty() && (max == 0 || len(requests) < max) { + requests = append(requests, s.queue.PopItem().(common.Hash)) + } + return requests +} + +// Process injects a batch of retrieved trie nodes data. +func (s *TrieSync) Process(results []SyncResult) (int, error) { + for i, item := range results { + // If the item was not requested, bail out + request := s.requests[item.Hash] + if request == nil { + return i, fmt.Errorf("not requested: %x", item.Hash) + } + // Decode the node data content and update the request + node, err := decodeNode(item.Data) + if err != nil { + return i, err + } + *request.object = node + request.data = item.Data + + // Create and schedule a request for all the children nodes + requests, err := s.children(request) + if err != nil { + return i, err + } + if len(requests) == 0 && request.deps == 0 { + s.commit(request) + continue + } + request.deps += len(requests) + for _, child := range requests { + s.schedule(child) + } + } + return 0, nil +} + +// schedule inserts a new state retrieval request into the fetch queue. If there +// is already a pending request for this node, the new request will be discarded +// and only a parent reference added to the old one. +func (s *TrieSync) schedule(req *request) { + // If we're already requesting this node, add a new reference and stop + if old, ok := s.requests[req.hash]; ok { + old.parents = append(old.parents, req.parents...) + return + } + // Schedule the request for future retrieval + s.queue.Push(req.hash, float32(req.depth)) + s.requests[req.hash] = req +} + +// children retrieves all the missing children of a state trie entry for future +// retrieval scheduling. +func (s *TrieSync) children(req *request) ([]*request, error) { + // Gather all the children of the node, irrelevant whether known or not + type child struct { + node *node + depth int + } + children := []child{} + + switch node := (*req.object).(type) { + case shortNode: + children = []child{{ + node: &node.Val, + depth: req.depth + len(node.Key), + }} + case fullNode: + for i := 0; i < 17; i++ { + if node[i] != nil { + children = append(children, child{ + node: &node[i], + depth: req.depth + 1, + }) + } + } + default: + panic(fmt.Sprintf("unknown node: %+v", node)) + } + // Iterate over the children, and request all unknown ones + requests := make([]*request, 0, len(children)) + for _, child := range children { + // Notify any external watcher of a new key/value node + if req.callback != nil { + if node, ok := (*child.node).(valueNode); ok { + if err := req.callback(node, req.hash); err != nil { + return nil, err + } + } + } + // If the child references another node, resolve or schedule + if node, ok := (*child.node).(hashNode); ok { + // Try to resolve the node from the local database + blob, _ := s.database.Get(node) + if local, err := decodeNode(blob); local != nil && err == nil { + *child.node = local + continue + } + // Locally unknown node, schedule for retrieval + requests = append(requests, &request{ + object: child.node, + hash: common.BytesToHash(node), + parents: []*request{req}, + depth: child.depth, + callback: req.callback, + }) + } + } + return requests, nil +} + +// commit finalizes a retrieval request and stores it into the database. If any +// of the referencing parent requests complete due to this commit, they are also +// committed themselves. +func (s *TrieSync) commit(req *request) error { + // Write the node content to disk + if err := s.database.Put(req.hash[:], req.data); err != nil { + return err + } + delete(s.requests, req.hash) + + // Check all parents for completion + for _, parent := range req.parents { + parent.deps-- + if parent.deps == 0 { + if err := s.commit(parent); err != nil { + return err + } + } + } + return nil +} diff --git a/trie/sync_test.go b/trie/sync_test.go new file mode 100644 index 000000000..9c036a3a9 --- /dev/null +++ b/trie/sync_test.go @@ -0,0 +1,257 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package trie + +import ( + "bytes" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" +) + +// makeTestTrie create a sample test trie to test node-wise reconstruction. +func makeTestTrie() (ethdb.Database, *Trie, map[string][]byte) { + // Create an empty trie + db, _ := ethdb.NewMemDatabase() + trie, _ := New(common.Hash{}, db) + + // Fill it with some arbitrary data + content := make(map[string][]byte) + for i := byte(0); i < 255; i++ { + key, val := common.LeftPadBytes([]byte{1, i}, 32), []byte{i} + content[string(key)] = val + trie.Update(key, val) + + key, val = common.LeftPadBytes([]byte{2, i}, 32), []byte{i} + content[string(key)] = val + trie.Update(key, val) + } + trie.Commit() + + // Return the generated trie + return db, trie, content +} + +// checkTrieContents cross references a reconstructed trie with an expected data +// content map. +func checkTrieContents(t *testing.T, db Database, root []byte, content map[string][]byte) { + trie, err := New(common.BytesToHash(root), db) + if err != nil { + t.Fatalf("failed to create trie at %x: %v", root, err) + } + for key, val := range content { + if have := trie.Get([]byte(key)); bytes.Compare(have, val) != 0 { + t.Errorf("entry %x: content mismatch: have %x, want %x", key, have, val) + } + } +} + +// Tests that an empty trie is not scheduled for syncing. +func TestEmptyTrieSync(t *testing.T) { + emptyA, _ := New(common.Hash{}, nil) + emptyB, _ := New(emptyRoot, nil) + + for i, trie := range []*Trie{emptyA, emptyB} { + db, _ := ethdb.NewMemDatabase() + if req := NewTrieSync(common.BytesToHash(trie.Root()), db, nil).Missing(1); len(req) != 0 { + t.Errorf("test %d: content requested for empty trie: %v", i, req) + } + } +} + +// Tests that given a root hash, a trie can sync iteratively on a single thread, +// requesting retrieval tasks and returning all of them in one go. +func TestIterativeTrieSyncIndividual(t *testing.T) { testIterativeTrieSync(t, 1) } +func TestIterativeTrieSyncBatched(t *testing.T) { testIterativeTrieSync(t, 100) } + +func testIterativeTrieSync(t *testing.T, batch int) { + // Create a random trie to copy + srcDb, srcTrie, srcData := makeTestTrie() + + // Create a destination trie and sync with the scheduler + dstDb, _ := ethdb.NewMemDatabase() + sched := NewTrieSync(common.BytesToHash(srcTrie.Root()), dstDb, nil) + + queue := append([]common.Hash{}, sched.Missing(batch)...) + for len(queue) > 0 { + results := make([]SyncResult, len(queue)) + for i, hash := range queue { + data, err := srcDb.Get(hash.Bytes()) + if err != nil { + t.Fatalf("failed to retrieve node data for %x: %v", hash, err) + } + results[i] = SyncResult{hash, data} + } + if index, err := sched.Process(results); err != nil { + t.Fatalf("failed to process result #%d: %v", index, err) + } + queue = append(queue[:0], sched.Missing(batch)...) + } + // Cross check that the two tries re in sync + checkTrieContents(t, dstDb, srcTrie.Root(), srcData) +} + +// Tests that the trie scheduler can correctly reconstruct the state even if only +// partial results are returned, and the others sent only later. +func TestIterativeDelayedTrieSync(t *testing.T) { + // Create a random trie to copy + srcDb, srcTrie, srcData := makeTestTrie() + + // Create a destination trie and sync with the scheduler + dstDb, _ := ethdb.NewMemDatabase() + sched := NewTrieSync(common.BytesToHash(srcTrie.Root()), dstDb, nil) + + queue := append([]common.Hash{}, sched.Missing(10000)...) + for len(queue) > 0 { + // Sync only half of the scheduled nodes + results := make([]SyncResult, len(queue)/2+1) + for i, hash := range queue[:len(results)] { + data, err := srcDb.Get(hash.Bytes()) + if err != nil { + t.Fatalf("failed to retrieve node data for %x: %v", hash, err) + } + results[i] = SyncResult{hash, data} + } + if index, err := sched.Process(results); err != nil { + t.Fatalf("failed to process result #%d: %v", index, err) + } + queue = append(queue[len(results):], sched.Missing(10000)...) + } + // Cross check that the two tries re in sync + checkTrieContents(t, dstDb, srcTrie.Root(), srcData) +} + +// Tests that given a root hash, a trie can sync iteratively on a single thread, +// requesting retrieval tasks and returning all of them in one go, however in a +// random order. +func TestIterativeRandomTrieSyncIndividual(t *testing.T) { testIterativeRandomTrieSync(t, 1) } +func TestIterativeRandomTrieSyncBatched(t *testing.T) { testIterativeRandomTrieSync(t, 100) } + +func testIterativeRandomTrieSync(t *testing.T, batch int) { + // Create a random trie to copy + srcDb, srcTrie, srcData := makeTestTrie() + + // Create a destination trie and sync with the scheduler + dstDb, _ := ethdb.NewMemDatabase() + sched := NewTrieSync(common.BytesToHash(srcTrie.Root()), dstDb, nil) + + queue := make(map[common.Hash]struct{}) + for _, hash := range sched.Missing(batch) { + queue[hash] = struct{}{} + } + for len(queue) > 0 { + // Fetch all the queued nodes in a random order + results := make([]SyncResult, 0, len(queue)) + for hash, _ := range queue { + data, err := srcDb.Get(hash.Bytes()) + if err != nil { + t.Fatalf("failed to retrieve node data for %x: %v", hash, err) + } + results = append(results, SyncResult{hash, data}) + } + // Feed the retrieved results back and queue new tasks + if index, err := sched.Process(results); err != nil { + t.Fatalf("failed to process result #%d: %v", index, err) + } + queue = make(map[common.Hash]struct{}) + for _, hash := range sched.Missing(batch) { + queue[hash] = struct{}{} + } + } + // Cross check that the two tries re in sync + checkTrieContents(t, dstDb, srcTrie.Root(), srcData) +} + +// Tests that the trie scheduler can correctly reconstruct the state even if only +// partial results are returned (Even those randomly), others sent only later. +func TestIterativeRandomDelayedTrieSync(t *testing.T) { + // Create a random trie to copy + srcDb, srcTrie, srcData := makeTestTrie() + + // Create a destination trie and sync with the scheduler + dstDb, _ := ethdb.NewMemDatabase() + sched := NewTrieSync(common.BytesToHash(srcTrie.Root()), dstDb, nil) + + queue := make(map[common.Hash]struct{}) + for _, hash := range sched.Missing(10000) { + queue[hash] = struct{}{} + } + for len(queue) > 0 { + // Sync only half of the scheduled nodes, even those in random order + results := make([]SyncResult, 0, len(queue)/2+1) + for hash, _ := range queue { + data, err := srcDb.Get(hash.Bytes()) + if err != nil { + t.Fatalf("failed to retrieve node data for %x: %v", hash, err) + } + results = append(results, SyncResult{hash, data}) + + if len(results) >= cap(results) { + break + } + } + // Feed the retrieved results back and queue new tasks + if index, err := sched.Process(results); err != nil { + t.Fatalf("failed to process result #%d: %v", index, err) + } + for _, result := range results { + delete(queue, result.Hash) + } + for _, hash := range sched.Missing(10000) { + queue[hash] = struct{}{} + } + } + // Cross check that the two tries re in sync + checkTrieContents(t, dstDb, srcTrie.Root(), srcData) +} + +// Tests that a trie sync will not request nodes multiple times, even if they +// have such references. +func TestDuplicateAvoidanceTrieSync(t *testing.T) { + // Create a random trie to copy + srcDb, srcTrie, srcData := makeTestTrie() + + // Create a destination trie and sync with the scheduler + dstDb, _ := ethdb.NewMemDatabase() + sched := NewTrieSync(common.BytesToHash(srcTrie.Root()), dstDb, nil) + + queue := append([]common.Hash{}, sched.Missing(0)...) + requested := make(map[common.Hash]struct{}) + + for len(queue) > 0 { + results := make([]SyncResult, len(queue)) + for i, hash := range queue { + data, err := srcDb.Get(hash.Bytes()) + if err != nil { + t.Fatalf("failed to retrieve node data for %x: %v", hash, err) + } + if _, ok := requested[hash]; ok { + t.Errorf("hash %x already requested once", hash) + } + requested[hash] = struct{}{} + + results[i] = SyncResult{hash, data} + } + if index, err := sched.Process(results); err != nil { + t.Fatalf("failed to process result #%d: %v", index, err) + } + queue = append(queue[:0], sched.Missing(0)...) + } + // Cross check that the two tries re in sync + checkTrieContents(t, dstDb, srcTrie.Root(), srcData) +}