From 01dc1c13942867d0579f5010a560da4073ece05e Mon Sep 17 00:00:00 2001 From: zelig Date: Sun, 14 Dec 2014 18:08:18 +0000 Subject: [PATCH] blockpool rewritten , tests broken FIXME --- eth/block_pool.go | 8 ++ eth/protocol.go | 292 ---------------------------------------------- 2 files changed, 8 insertions(+), 292 deletions(-) diff --git a/eth/block_pool.go b/eth/block_pool.go index 7cfbc63f8..a5cda7b58 100644 --- a/eth/block_pool.go +++ b/eth/block_pool.go @@ -55,7 +55,11 @@ type BlockPool struct { // the minimal interface with blockchain hasBlock func(hash []byte) bool insertChain func(types.Blocks) error +<<<<<<< HEAD verifyPoW func(pow.Block) bool +======= + verifyPoW func(*types.Block) bool +>>>>>>> blockpool rewritten , tests broken FIXME } type peerInfo struct { @@ -74,7 +78,11 @@ type peerInfo struct { quitC chan bool } +<<<<<<< HEAD func NewBlockPool(hasBlock func(hash []byte) bool, insertChain func(types.Blocks) error, verifyPoW func(pow.Block) bool, +======= +func NewBlockPool(hasBlock func(hash []byte) bool, insertChain func(types.Blocks) error, verifyPoW func(*types.Block) bool, +>>>>>>> blockpool rewritten , tests broken FIXME ) *BlockPool { return &BlockPool{ hasBlock: hasBlock, diff --git a/eth/protocol.go b/eth/protocol.go index 37e642fd0..3b5b49696 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -68,8 +68,6 @@ type newBlockMsgData struct { type getBlockHashesMsgData struct { Hash []byte -<<<<<<< HEAD -<<<<<<< HEAD Amount uint64 } @@ -78,47 +76,18 @@ type getBlockHashesMsgData struct { // the Dev p2p layer then runs the protocol instance on each peer func EthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool) p2p.Protocol { return p2p.Protocol{ -======= - Amount uint32 -======= - Amount uint64 ->>>>>>> protocol -} - -// main entrypoint, wrappers starting a server running the eth protocol -// use this constructor to attach the protocol ("class") to server caps -// the Dev p2p layer then runs the protocol instance on each peer -<<<<<<< HEAD -func EthProtocol(eth backend) *p2p.Protocol { - return &p2p.Protocol{ ->>>>>>> initial commit for eth-p2p integration -======= -func EthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool) p2p.Protocol { - return p2p.Protocol{ ->>>>>>> protocol Name: "eth", Version: ProtocolVersion, Length: ProtocolLength, Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error { -<<<<<<< HEAD -<<<<<<< HEAD return runEthProtocol(txPool, chainManager, blockPool, peer, rw) -======= - return runEthProtocol(eth, peer, rw) ->>>>>>> initial commit for eth-p2p integration -======= - return runEthProtocol(txPool, chainManager, blockPool, peer, rw) ->>>>>>> protocol }, } } -<<<<<<< HEAD -<<<<<<< HEAD // the main loop that handles incoming messages // note RemovePeer in the post-disconnect hook func runEthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) { -<<<<<<< HEAD self := ðProtocol{ txPool: txPool, chainManager: chainManager, @@ -126,30 +95,6 @@ func runEthProtocol(txPool txPool, chainManager chainManager, blockPool blockPoo rw: rw, peer: peer, id: (string)(peer.Identity().Pubkey()), -======= -======= -// the main loop that handles incoming messages -// note RemovePeer in the post-disconnect hook ->>>>>>> eth protocol changes -func runEthProtocol(eth backend, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) { - self := ðProtocol{ - eth: eth, - rw: rw, - peer: peer, -<<<<<<< HEAD ->>>>>>> initial commit for eth-p2p integration -======= - id: (string)(peer.Identity().Pubkey()), ->>>>>>> eth protocol changes -======= - self := ðProtocol{ - txPool: txPool, - chainManager: chainManager, - blockPool: blockPool, - rw: rw, - peer: peer, - id: (string)(peer.Identity().Pubkey()), ->>>>>>> protocol } err = self.handleStatus() if err == nil { @@ -157,18 +102,7 @@ func runEthProtocol(eth backend, peer *p2p.Peer, rw p2p.MsgReadWriter) (err erro for { err = self.handle() if err != nil { -<<<<<<< HEAD -<<<<<<< HEAD -<<<<<<< HEAD self.blockPool.RemovePeer(self.id) -======= ->>>>>>> initial commit for eth-p2p integration -======= - self.eth.RemovePeer(self.id) ->>>>>>> eth protocol changes -======= - self.blockPool.RemovePeer(self.id) ->>>>>>> protocol break } } @@ -193,71 +127,30 @@ func (self *ethProtocol) handle() error { case StatusMsg: return ProtocolError(ErrExtraStatusMsg, "") -<<<<<<< HEAD -<<<<<<< HEAD case TxMsg: // TODO: rework using lazy RLP stream -======= - case GetTxMsg: - txs := self.eth.GetTransactions() - // TODO: rewrite using rlp flat - txsInterface := make([]interface{}, len(txs)) - for i, tx := range txs { - txsInterface[i] = tx.RlpData() - } - return self.rw.EncodeMsg(TxMsg, txsInterface...) - -======= ->>>>>>> protocol - case TxMsg: -<<<<<<< HEAD ->>>>>>> initial commit for eth-p2p integration -======= - // TODO: rework using lazy RLP stream ->>>>>>> eth protocol changes var txs []*types.Transaction if err := msg.Decode(&txs); err != nil { return ProtocolError(ErrDecode, "%v", err) } -<<<<<<< HEAD -<<<<<<< HEAD self.txPool.AddTransactions(txs) -======= - self.eth.AddTransactions(txs) ->>>>>>> initial commit for eth-p2p integration -======= - self.txPool.AddTransactions(txs) ->>>>>>> protocol case GetBlockHashesMsg: var request getBlockHashesMsgData if err := msg.Decode(&request); err != nil { return ProtocolError(ErrDecode, "%v", err) } -<<<<<<< HEAD -<<<<<<< HEAD hashes := self.chainManager.GetBlockHashesFromHash(request.Hash, request.Amount) -======= - hashes := self.eth.GetBlockHashes(request.Hash, request.Amount) ->>>>>>> initial commit for eth-p2p integration -======= - hashes := self.chainManager.GetBlockHashesFromHash(request.Hash, request.Amount) ->>>>>>> protocol return self.rw.EncodeMsg(BlockHashesMsg, ethutil.ByteSliceToInterface(hashes)...) case BlockHashesMsg: // TODO: redo using lazy decode , this way very inefficient on known chains -<<<<<<< HEAD -<<<<<<< HEAD -======= ->>>>>>> eth protocol changes msgStream := rlp.NewListStream(msg.Payload, uint64(msg.Size)) var err error iter := func() (hash []byte, ok bool) { hash, err = msgStream.Bytes() if err == nil { ok = true -<<<<<<< HEAD } return } @@ -267,60 +160,17 @@ func (self *ethProtocol) handle() error { } case GetBlocksMsg: -======= - // s := rlp.NewListStream(msg.Payload, uint64(msg.Size)) var blockHashes [][]byte if err := msg.Decode(&blockHashes); err != nil { return ProtocolError(ErrDecode, "%v", err) } - fetchMore := true - for _, hash := range blockHashes { - fetchMore = self.eth.AddHash(hash, self.peer) - if !fetchMore { - break -======= ->>>>>>> eth protocol changes - } - return - } - self.blockPool.AddBlockHashes(iter, self.id) - if err != nil && err != rlp.EOL { - return ProtocolError(ErrDecode, "%v", err) - } - - case GetBlocksMsg: -<<<<<<< HEAD - // Limit to max 300 blocks ->>>>>>> initial commit for eth-p2p integration -======= ->>>>>>> eth protocol changes - var blockHashes [][]byte - if err := msg.Decode(&blockHashes); err != nil { - return ProtocolError(ErrDecode, "%v", err) - } -<<<<<<< HEAD -<<<<<<< HEAD max := int(math.Min(float64(len(blockHashes)), blockHashesBatchSize)) -======= - max := int(math.Min(float64(len(blockHashes)), 300.0)) ->>>>>>> initial commit for eth-p2p integration -======= - max := int(math.Min(float64(len(blockHashes)), blockHashesBatchSize)) ->>>>>>> eth protocol changes var blocks []interface{} for i, hash := range blockHashes { if i >= max { break } -<<<<<<< HEAD -<<<<<<< HEAD block := self.chainManager.GetBlock(hash) -======= - block := self.eth.GetBlock(hash) ->>>>>>> initial commit for eth-p2p integration -======= - block := self.chainManager.GetBlock(hash) ->>>>>>> protocol if block != nil { blocks = append(blocks, block.Value().Raw()) } @@ -328,10 +178,6 @@ func (self *ethProtocol) handle() error { return self.rw.EncodeMsg(BlocksMsg, blocks...) case BlocksMsg: -<<<<<<< HEAD -<<<<<<< HEAD -======= ->>>>>>> eth protocol changes msgStream := rlp.NewListStream(msg.Payload, uint64(msg.Size)) for { var block *types.Block @@ -340,37 +186,9 @@ func (self *ethProtocol) handle() error { break } else { return ProtocolError(ErrDecode, "%v", err) -<<<<<<< HEAD } } self.blockPool.AddBlock(block, self.id) -======= - var blocks []*types.Block - if err := msg.Decode(&blocks); err != nil { - return ProtocolError(ErrDecode, "%v", err) - } - for _, block := range blocks { - fetchHashes, err := self.eth.AddBlock(nil, block, self.peer) - if err != nil { - return ProtocolError(ErrInvalidBlock, "%v", err) - } - if fetchHashes { - if err := self.FetchHashes(block.Hash()); err != nil { - return err - } - } ->>>>>>> initial commit for eth-p2p integration -======= - } - } -<<<<<<< HEAD - if err := self.eth.AddBlock(block, self.id); err != nil { - return ProtocolError(ErrInvalidBlock, "%v", err) - } ->>>>>>> eth protocol changes -======= - self.blockPool.AddBlock(block, self.id) ->>>>>>> protocol } case NewBlockMsg: @@ -378,23 +196,11 @@ func (self *ethProtocol) handle() error { if err := msg.Decode(&request); err != nil { return ProtocolError(ErrDecode, "%v", err) } -<<<<<<< HEAD -<<<<<<< HEAD -======= ->>>>>>> eth protocol changes hash := request.Block.Hash() // to simplify backend interface adding a new block // uses AddPeer followed by AddHashes, AddBlock only if peer is the best peer // (or selected as new best peer) -<<<<<<< HEAD -<<<<<<< HEAD if self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) { -======= - if self.eth.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.invalidBlock) { ->>>>>>> eth protocol changes -======= - if self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) { ->>>>>>> protocol called := true iter := func() (hash []byte, ok bool) { if called { @@ -404,29 +210,8 @@ func (self *ethProtocol) handle() error { return } } -<<<<<<< HEAD -<<<<<<< HEAD self.blockPool.AddBlockHashes(iter, self.id) self.blockPool.AddBlock(request.Block, self.id) -======= - var fetchHashes bool - // this should reset td and offer blockpool as candidate new peer? - if fetchHashes, err = self.eth.AddBlock(request.TD, request.Block, self.peer); err != nil { - return ProtocolError(ErrInvalidBlock, "%v", err) - } - if fetchHashes { - return self.FetchHashes(request.Block.Hash()) ->>>>>>> initial commit for eth-p2p integration -======= - self.eth.AddBlockHashes(iter, self.id) - if err := self.eth.AddBlock(request.Block, self.id); err != nil { - return ProtocolError(ErrInvalidBlock, "%v", err) - } ->>>>>>> eth protocol changes -======= - self.blockPool.AddBlockHashes(iter, self.id) - self.blockPool.AddBlock(request.Block, self.id) ->>>>>>> protocol } default: @@ -444,15 +229,7 @@ type statusMsgData struct { } func (self *ethProtocol) statusMsg() p2p.Msg { -<<<<<<< HEAD -<<<<<<< HEAD td, currentBlock, genesisBlock := self.chainManager.Status() -======= - td, currentBlock, genesisBlock := self.eth.Status() ->>>>>>> initial commit for eth-p2p integration -======= - td, currentBlock, genesisBlock := self.chainManager.Status() ->>>>>>> protocol return p2p.NewMsg(StatusMsg, uint32(ProtocolVersion), @@ -488,15 +265,7 @@ func (self *ethProtocol) handleStatus() error { return ProtocolError(ErrDecode, "%v", err) } -<<<<<<< HEAD -<<<<<<< HEAD _, _, genesisBlock := self.chainManager.Status() -======= - _, _, genesisBlock := self.eth.Status() ->>>>>>> initial commit for eth-p2p integration -======= - _, _, genesisBlock := self.chainManager.Status() ->>>>>>> protocol if bytes.Compare(status.GenesisBlock, genesisBlock) != 0 { return ProtocolError(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock, genesisBlock) @@ -510,33 +279,13 @@ func (self *ethProtocol) handleStatus() error { return ProtocolError(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, ProtocolVersion) } -<<<<<<< HEAD -<<<<<<< HEAD self.peer.Infof("Peer is [eth] capable (%d/%d). TD = %v ~ %x", status.ProtocolVersion, status.NetworkId, status.CurrentBlock) self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) -======= - logger.Infof("Peer is [eth] capable (%d/%d). TD = %v ~ %x", status.ProtocolVersion, status.NetworkId, status.CurrentBlock) - - if self.eth.AddPeer(status.TD, status.CurrentBlock, self.peer) { - return self.FetchHashes(status.CurrentBlock) - } ->>>>>>> initial commit for eth-p2p integration -======= - self.peer.Infof("Peer is [eth] capable (%d/%d). TD = %v ~ %x", status.ProtocolVersion, status.NetworkId, status.CurrentBlock) - -<<<<<<< HEAD - self.eth.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.invalidBlock) ->>>>>>> eth protocol changes -======= - self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) ->>>>>>> protocol return nil } -<<<<<<< HEAD -<<<<<<< HEAD func (self *ethProtocol) requestBlockHashes(from []byte) error { self.peer.Debugf("fetching hashes (%d) %x...\n", blockHashesBatchSize, from[0:4]) return self.rw.EncodeMsg(GetBlockHashesMsg, from, blockHashesBatchSize) @@ -567,44 +316,3 @@ func (self *ethProtocol) protoErrorDisconnect(code int, format string, params .. } } -======= -func (self *ethProtocol) FetchHashes(from []byte) error { - logger.Debugf("Fetching hashes (%d) %x...\n", blockHashesBatchSize, from[0:4]) - return self.rw.EncodeMsg(GetBlockHashesMsg, from, blockHashesBatchSize) -} ->>>>>>> initial commit for eth-p2p integration -======= -func (self *ethProtocol) requestBlockHashes(from []byte) error { - self.peer.Debugf("fetching hashes (%d) %x...\n", blockHashesBatchSize, from[0:4]) - return self.rw.EncodeMsg(GetBlockHashesMsg, from, blockHashesBatchSize) -} - -func (self *ethProtocol) requestBlocks(hashes [][]byte) error { - self.peer.Debugf("fetching %v blocks", len(hashes)) - return self.rw.EncodeMsg(GetBlocksMsg, ethutil.ByteSliceToInterface(hashes)) -} - -func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *protocolError) { - err = ProtocolError(code, format, params...) - if err.Fatal() { - self.peer.Errorln(err) - } else { - self.peer.Debugln(err) - } - return -} -<<<<<<< HEAD ->>>>>>> eth protocol changes -======= - -func (self *ethProtocol) protoErrorDisconnect(code int, format string, params ...interface{}) { - err := ProtocolError(code, format, params...) - if err.Fatal() { - self.peer.Errorln(err) - // disconnect - } else { - self.peer.Debugln(err) - } - -} ->>>>>>> protocol