eth: added downloader for syncing up the chain

This commit is contained in:
obscuren 2015-04-13 17:22:32 +02:00
parent a8a2b2a488
commit 97d2954e22
2 changed files with 73 additions and 38 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
@ -130,6 +131,7 @@ type Ethereum struct {
accountManager *accounts.Manager accountManager *accounts.Manager
whisper *whisper.Whisper whisper *whisper.Whisper
pow *ethash.Ethash pow *ethash.Ethash
downloader *downloader.Downloader
net *p2p.Server net *p2p.Server
eventMux *event.TypeMux eventMux *event.TypeMux
@ -194,6 +196,7 @@ func New(config *Config) (*Ethereum, error) {
} }
eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux()) eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux())
eth.downloader = downloader.New(eth.chainManager.HasBlock, eth.chainManager.InsertChain, eth.chainManager.Td)
eth.pow = ethash.New(eth.chainManager) eth.pow = ethash.New(eth.chainManager)
eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State) eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State)
eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux()) eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux())
@ -212,7 +215,7 @@ func New(config *Config) (*Ethereum, error) {
return nil, err return nil, err
} }
ethProto := EthProtocol(config.ProtocolVersion, config.NetworkId, eth.txPool, eth.chainManager, eth.blockPool) ethProto := EthProtocol(config.ProtocolVersion, config.NetworkId, eth.txPool, eth.chainManager, eth.blockPool, eth.downloader)
protocols := []p2p.Protocol{ethProto} protocols := []p2p.Protocol{ethProto}
if config.Shh { if config.Shh {
protocols = append(protocols, eth.whisper.Protocol()) protocols = append(protocols, eth.whisper.Protocol())
@ -349,6 +352,7 @@ func (s *Ethereum) ClientVersion() string { return s.clientVersio
func (s *Ethereum) EthVersion() int { return s.ethVersionId } func (s *Ethereum) EthVersion() int { return s.ethVersionId }
func (s *Ethereum) NetVersion() int { return s.netVersionId } func (s *Ethereum) NetVersion() int { return s.netVersionId }
func (s *Ethereum) ShhVersion() int { return s.shhVersionId } func (s *Ethereum) ShhVersion() int { return s.shhVersionId }
func (s *Ethereum) Downloader() *downloader.Downloader { return s.downloader }
// Start the ethereum // Start the ethereum
func (s *Ethereum) Start() error { func (s *Ethereum) Start() error {

View File

@ -7,6 +7,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/errs" "github.com/ethereum/go-ethereum/errs"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
@ -18,8 +19,8 @@ const (
NetworkId = 0 NetworkId = 0
ProtocolLength = uint64(8) ProtocolLength = uint64(8)
ProtocolMaxMsgSize = 10 * 1024 * 1024 ProtocolMaxMsgSize = 10 * 1024 * 1024
maxHashes = 256 maxHashes = 512
maxBlocks = 64 maxBlocks = 128
) )
// eth protocol message codes // eth protocol message codes
@ -64,6 +65,7 @@ type ethProtocol struct {
txPool txPool txPool txPool
chainManager chainManager chainManager chainManager
blockPool blockPool blockPool blockPool
downloader *downloader.Downloader
peer *p2p.Peer peer *p2p.Peer
id string id string
rw p2p.MsgReadWriter rw p2p.MsgReadWriter
@ -114,25 +116,26 @@ type statusMsgData struct {
// main entrypoint, wrappers starting a server running the eth protocol // main entrypoint, wrappers starting a server running the eth protocol
// use this constructor to attach the protocol ("class") to server caps // use this constructor to attach the protocol ("class") to server caps
// the Dev p2p layer then runs the protocol instance on each peer // the Dev p2p layer then runs the protocol instance on each peer
func EthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool) p2p.Protocol { func EthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool, downloader *downloader.Downloader) p2p.Protocol {
return p2p.Protocol{ return p2p.Protocol{
Name: "eth", Name: "eth",
Version: uint(protocolVersion), Version: uint(protocolVersion),
Length: ProtocolLength, Length: ProtocolLength,
Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error { Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
return runEthProtocol(protocolVersion, networkId, txPool, chainManager, blockPool, peer, rw) return runEthProtocol(protocolVersion, networkId, txPool, chainManager, blockPool, downloader, peer, rw)
}, },
} }
} }
// the main loop that handles incoming messages // the main loop that handles incoming messages
// note RemovePeer in the post-disconnect hook // note RemovePeer in the post-disconnect hook
func runEthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) { func runEthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool, downloader *downloader.Downloader, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) {
id := peer.ID() id := peer.ID()
self := &ethProtocol{ self := &ethProtocol{
txPool: txPool, txPool: txPool,
chainManager: chainManager, chainManager: chainManager,
blockPool: blockPool, blockPool: blockPool,
downloader: downloader,
rw: rw, rw: rw,
peer: peer, peer: peer,
protocolVersion: protocolVersion, protocolVersion: protocolVersion,
@ -211,24 +214,33 @@ func (self *ethProtocol) handle() error {
case BlockHashesMsg: case BlockHashesMsg:
msgStream := rlp.NewStream(msg.Payload) msgStream := rlp.NewStream(msg.Payload)
if _, err := msgStream.List(); err != nil {
return err
}
var i int var hashes []common.Hash
iter := func() (hash common.Hash, ok bool) { if err := msgStream.Decode(&hashes); err != nil {
err := msgStream.Decode(&hash) break
if err == rlp.EOL { }
return common.Hash{}, false self.downloader.HashCh <- hashes
} else if err != nil {
self.protoError(ErrDecode, "msg %v: after %v hashes : %v", msg, i, err) /*
return common.Hash{}, false if _, err := msgStream.List(); err != nil {
return err
} }
i++ var i int
return hash, true iter := func() (hash common.Hash, err error) {
} err = msgStream.Decode(&hash)
self.blockPool.AddBlockHashes(iter, self.id) if err == rlp.EOL {
return common.Hash{}, err
} else if err != nil {
return common.Hash{}, fmt.Errorf("Fetching hashes err (%d): %v", i, err)
}
i++
return hash, nil
}
self.downloader.HashCh <- iter
//self.blockPool.AddBlockHashes(iter, self.id)
*/
case GetBlocksMsg: case GetBlocksMsg:
msgStream := rlp.NewStream(msg.Payload) msgStream := rlp.NewStream(msg.Payload)
@ -260,23 +272,34 @@ func (self *ethProtocol) handle() error {
case BlocksMsg: case BlocksMsg:
msgStream := rlp.NewStream(msg.Payload) msgStream := rlp.NewStream(msg.Payload)
if _, err := msgStream.List(); err != nil {
return err var blocks []*types.Block
if err := msgStream.Decode(&blocks); err != nil {
glog.V(logger.Detail).Infoln("Decode error", err)
fmt.Println("decode error", err)
blocks = nil
} }
for { self.downloader.DeliverChunk(self.id, blocks)
var block types.Block /*
if err := msgStream.Decode(&block); err != nil { msgStream := rlp.NewStream(msg.Payload)
if err == rlp.EOL { if _, err := msgStream.List(); err != nil {
break return err
} else { }
return self.protoError(ErrDecode, "msg %v: %v", msg, err) for {
var block types.Block
if err := msgStream.Decode(&block); err != nil {
if err == rlp.EOL {
break
} else {
return self.protoError(ErrDecode, "msg %v: %v", msg, err)
}
} }
if err := block.ValidateFields(); err != nil {
return self.protoError(ErrDecode, "block validation %v: %v", msg, err)
}
self.blockPool.AddBlock(&block, self.id)
} }
if err := block.ValidateFields(); err != nil { */
return self.protoError(ErrDecode, "block validation %v: %v", msg, err)
}
self.blockPool.AddBlock(&block, self.id)
}
case NewBlockMsg: case NewBlockMsg:
var request newBlockMsgData var request newBlockMsgData
@ -296,6 +319,8 @@ func (self *ethProtocol) handle() error {
BlockPrevHash: request.Block.ParentHash().Hex(), BlockPrevHash: request.Block.ParentHash().Hex(),
RemoteId: self.peer.ID().String(), RemoteId: self.peer.ID().String(),
}) })
self.downloader.AddBlock(self.id, request.Block, request.TD)
// to simplify backend interface adding a new block // to simplify backend interface adding a new block
// uses AddPeer followed by AddBlock only if peer is the best peer // uses AddPeer followed by AddBlock only if peer is the best peer
// (or selected as new best peer) // (or selected as new best peer)
@ -345,10 +370,16 @@ func (self *ethProtocol) handleStatus() error {
return self.protoError(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, self.protocolVersion) return self.protoError(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, self.protocolVersion)
} }
_, suspended := self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) err = self.downloader.RegisterPeer(self.id, status.TD, status.CurrentBlock, self.requestBlockHashes, self.requestBlocks)
if suspended { if err != nil {
return self.protoError(ErrSuspendedPeer, "") return self.protoError(ErrSuspendedPeer, "something")
} }
/*
_, suspended := self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect)
if suspended {
return self.protoError(ErrSuspendedPeer, "")
}
*/
self.peer.Debugf("Peer is [eth] capable (%d/%d). TD=%v H=%x\n", status.ProtocolVersion, status.NetworkId, status.TD, status.CurrentBlock[:4]) self.peer.Debugf("Peer is [eth] capable (%d/%d). TD=%v H=%x\n", status.ProtocolVersion, status.NetworkId, status.TD, status.CurrentBlock[:4])