From 2f362509b813573f533a5be437c140355ddec7fc Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 10 Sep 2014 11:22:19 +0200 Subject: [PATCH] New p2p protocol. NOTE: Needs major refactoring. See #50 --- ethwire/messaging.go | 32 +++++------ peer.go | 127 ++++++++++++++++++++++++++++++------------- 2 files changed, 103 insertions(+), 56 deletions(-) diff --git a/ethwire/messaging.go b/ethwire/messaging.go index 7ac0188a1..c93c717a2 100644 --- a/ethwire/messaging.go +++ b/ethwire/messaging.go @@ -27,24 +27,20 @@ const ( // Values are given explicitly instead of by iota because these values are // defined by the wire protocol spec; it is easier for humans to ensure // correctness when values are explicit. - MsgHandshakeTy = 0x00 - MsgDiscTy = 0x01 - MsgPingTy = 0x02 - MsgPongTy = 0x03 - MsgGetPeersTy = 0x10 - MsgPeersTy = 0x11 + MsgHandshakeTy = 0x00 + MsgDiscTy = 0x01 + MsgPingTy = 0x02 + MsgPongTy = 0x03 + MsgGetPeersTy = 0x04 + MsgPeersTy = 0x05 + + MsgStatusTy = 0x10 + MsgGetTxsTy = 0x11 MsgTxTy = 0x12 - MsgGetChainTy = 0x14 - MsgNotInChainTy = 0x15 - MsgGetTxsTy = 0x16 - MsgGetBlockHashesTy = 0x17 - MsgBlockHashesTy = 0x18 - MsgGetBlocksTy = 0x19 - MsgBlockTy = 0x13 - - MsgOldBlockTy = 0xbb - - MsgTalkTy = 0xff + MsgGetBlockHashesTy = 0x13 + MsgBlockHashesTy = 0x14 + MsgGetBlocksTy = 0x15 + MsgBlockTy = 0x16 ) var msgTypeToString = map[MsgType]string{ @@ -56,9 +52,7 @@ var msgTypeToString = map[MsgType]string{ MsgPeersTy: "Peers", MsgTxTy: "Transactions", MsgBlockTy: "Blocks", - MsgGetChainTy: "Get chain", MsgGetTxsTy: "Get Txs", - MsgNotInChainTy: "Not in chain", MsgGetBlockHashesTy: "Get block hashes", MsgBlockHashesTy: "Block hashes", MsgGetBlocksTy: "Get blocks", diff --git a/peer.go b/peer.go index 79a58d28e..a7259d712 100644 --- a/peer.go +++ b/peer.go @@ -25,6 +25,8 @@ const ( outputBufferSize = 50 // Current protocol version ProtocolVersion = 28 + // Current P2P version + P2PVersion = 0 // Interval for ping/pong message pingPongTimer = 2 * time.Second ) @@ -122,6 +124,7 @@ type Peer struct { // This flag is used by writeMessage to check if messages are allowed // to be send or not. If no version is known all messages are ignored. versionKnown bool + statusKnown bool // Last received pong message lastPong int64 @@ -271,6 +274,14 @@ func (p *Peer) writeMessage(msg *ethwire.Msg) { default: // Anything but ack is allowed return } + } else { + if !p.statusKnown { + switch msg.Type { + case ethwire.MsgStatusTy: // Ok + default: // Anything but ack is allowed + return + } + } } peerlogger.DebugDetailf("(%v) <= %v %v\n", p.conn.RemoteAddr(), msg.Type, msg.Data) @@ -356,9 +367,9 @@ func (p *Peer) HandleInbound() { // Version message p.handleHandshake(msg) - if p.caps.IsCap(CapPeerDiscTy) { - p.QueueMessage(ethwire.NewMessage(ethwire.MsgGetPeersTy, "")) - } + //if p.caps.IsCap(CapPeerDiscTy) { + p.QueueMessage(ethwire.NewMessage(ethwire.MsgGetPeersTy, "")) + //} case ethwire.MsgDiscTy: p.Stop() @@ -396,6 +407,10 @@ func (p *Peer) HandleInbound() { // Connect to the list of peers p.ethereum.ProcessPeerList(peers) + + case ethwire.MsgStatusTy: + // Handle peer's status msg + p.handleStatus(msg) case ethwire.MsgGetTxsTy: // Get the current transactions of the pool txs := p.ethereum.TxPool().CurrentTransactions() @@ -581,6 +596,7 @@ func (p *Peer) Stop() { p.ethereum.RemovePeer(p) } +/* func (p *Peer) pushHandshake() error { pubkey := p.ethereum.KeyManager().PublicKey() msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{ @@ -592,6 +608,7 @@ func (p *Peer) pushHandshake() error { return nil } +*/ func (p *Peer) peersMessage() *ethwire.Msg { outPeers := make([]interface{}, len(p.ethereum.InOutPeers())) @@ -612,13 +629,72 @@ func (p *Peer) pushPeers() { p.QueueMessage(p.peersMessage()) } +func (p *Peer) pushHandshake() error { + pubkey := p.ethereum.KeyManager().PublicKey() + msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{ + uint32(0), []byte(p.version), []string{"eth"}, p.port, pubkey[1:], + }) + + p.QueueMessage(msg) + + return nil +} + +func (self *Peer) pushStatus() { + const netVersion = 0 + msg := ethwire.NewMessage(ethwire.MsgStatusTy, []interface{}{ + uint32(ProtocolVersion), + netVersion, + self.ethereum.BlockChain().TD.Uint64(), + self.ethereum.BlockChain().CurrentBlock.Hash(), + self.ethereum.BlockChain().Genesis().Hash(), + }) + + self.QueueMessage(msg) +} + +func (self *Peer) handleStatus(msg *ethwire.Msg) { + c := msg.Data + // Set the peer's caps + //p.caps = Caps(c.Get(3).Byte()) + + // Get the td and last hash + self.td = c.Get(6).BigInt() + self.bestHash = c.Get(7).Bytes() + self.lastReceivedHash = self.bestHash + + // Compare the total TD with the blockchain TD. If remote is higher + // fetch hashes from highest TD node. + if self.td.Cmp(self.ethereum.BlockChain().TD) > 0 { + self.ethereum.blockPool.AddHash(self.lastReceivedHash) + self.FetchHashes() + } + + ethlogger.Infof("Peer is [ETH] capable. (TD = %v ~ %x", self.td, self.bestHash) +} + func (p *Peer) handleHandshake(msg *ethwire.Msg) { c := msg.Data - // Set pubkey - p.pubkey = c.Get(5).Bytes() + var ( + p2pVersion = c.Get(0).Uint() + clientId = c.Get(1).Str() + caps = c.Get(2).Raw() + port = c.Get(3).Uint() + pub = c.Get(4).Bytes() + ) - if p.pubkey == nil { + fmt.Println("PEER CAPS", caps) + + // Check correctness of p2p protocol version + if p2pVersion != P2PVersion { + peerlogger.Debugf("Invalid P2P version. Require protocol %d, received %d\n", P2PVersion, p2pVersion) + p.Stop() + return + } + + // Handle the pub key (validation, uniqueness) + if pub == nil || len(pub) == 0 { peerlogger.Warnln("Pubkey required, not supplied in handshake.") p.Stop() return @@ -627,7 +703,7 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { usedPub := 0 // This peer is already added to the peerlist so we expect to find a double pubkey at least once eachPeer(p.ethereum.Peers(), func(peer *Peer, e *list.Element) { - if bytes.Compare(p.pubkey, peer.pubkey) == 0 { + if bytes.Compare(pub, peer.pubkey) == 0 { usedPub++ } }) @@ -637,18 +713,11 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { p.Stop() return } - - if c.Get(0).Uint() != ProtocolVersion { - peerlogger.Debugf("Invalid peer version. Require protocol: %d. Received: %d\n", ProtocolVersion, c.Get(0).Uint()) - p.Stop() - return - } - - p.versionKnown = true + p.pubkey = pub // If this is an inbound connection send an ack back if p.inbound { - p.port = uint16(c.Get(4).Uint()) + p.port = uint16(port) // Self connect detection pubkey := p.ethereum.KeyManager().PublicKey() @@ -659,34 +728,18 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { } } + p.SetVersion(clientId) - // Set the peer's caps - p.caps = Caps(c.Get(3).Byte()) - - // Get a reference to the peers version - versionString := c.Get(2).Str() - if len(versionString) > 0 { - p.SetVersion(c.Get(2).Str()) - } - - // Get the td and last hash - p.td = c.Get(6).BigInt() - p.bestHash = c.Get(7).Bytes() - p.lastReceivedHash = p.bestHash + p.versionKnown = true p.ethereum.PushPeer(p) p.ethereum.reactor.Post("peerList", p.ethereum.Peers()) - ethlogger.Infof("Added peer (%s) %d / %d (TD = %v ~ %x)\n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers, p.td, p.bestHash) - - // Compare the total TD with the blockchain TD. If remote is higher - // fetch hashes from highest TD node. - if p.td.Cmp(p.ethereum.BlockChain().TD) > 0 { - p.ethereum.blockPool.AddHash(p.lastReceivedHash) - p.FetchHashes() - } + ethlogger.Infof("Added peer (%s) %d / %d \n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers) peerlogger.Debugln(p) + + p.pushStatus() } func (p *Peer) String() string {