diff --git a/block_pool.go b/block_pool.go
deleted file mode 100644
index 803927f21..000000000
--- a/block_pool.go
+++ /dev/null
@@ -1,351 +0,0 @@
-package eth
-
-import (
- "bytes"
- "container/list"
- "fmt"
- "math"
- "math/big"
- "sync"
- "time"
-
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/ethutil"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/wire"
-)
-
-var poollogger = logger.NewLogger("BPOOL")
-
-type block struct {
- from *Peer
- peer *Peer
- block *types.Block
- reqAt time.Time
- requested int
-}
-
-type BlockPool struct {
- mut sync.Mutex
-
- eth *Ethereum
-
- hashes [][]byte
- pool map[string]*block
-
- td *big.Int
- quit chan bool
-
- fetchingHashes bool
- downloadStartedAt time.Time
-
- ChainLength, BlocksProcessed int
-
- peer *Peer
-}
-
-func NewBlockPool(eth *Ethereum) *BlockPool {
- return &BlockPool{
- eth: eth,
- pool: make(map[string]*block),
- td: ethutil.Big0,
- quit: make(chan bool),
- }
-}
-
-func (self *BlockPool) Len() int {
- return len(self.hashes)
-}
-
-func (self *BlockPool) Reset() {
- self.pool = make(map[string]*block)
- self.hashes = nil
-}
-
-func (self *BlockPool) HasLatestHash() bool {
- self.mut.Lock()
- defer self.mut.Unlock()
-
- return self.pool[string(self.eth.ChainManager().CurrentBlock.Hash())] != nil
-}
-
-func (self *BlockPool) HasCommonHash(hash []byte) bool {
- return self.eth.ChainManager().GetBlock(hash) != nil
-}
-
-func (self *BlockPool) Blocks() (blocks types.Blocks) {
- for _, item := range self.pool {
- if item.block != nil {
- blocks = append(blocks, item.block)
- }
- }
-
- return
-}
-
-func (self *BlockPool) FetchHashes(peer *Peer) bool {
- highestTd := self.eth.HighestTDPeer()
-
- if (self.peer == nil && peer.td.Cmp(highestTd) >= 0) || (self.peer != nil && peer.td.Cmp(self.peer.td) > 0) || self.peer == peer {
- if self.peer != peer {
- poollogger.Infof("Found better suitable peer (%v vs %v)\n", self.td, peer.td)
-
- if self.peer != nil {
- self.peer.doneFetchingHashes = true
- }
- }
-
- self.peer = peer
- self.td = peer.td
-
- if !self.HasLatestHash() {
- self.fetchHashes()
- }
-
- return true
- }
-
- return false
-}
-
-func (self *BlockPool) fetchHashes() {
- peer := self.peer
-
- peer.doneFetchingHashes = false
-
- const amount = 256
- peerlogger.Debugf("Fetching hashes (%d) %x...\n", amount, peer.lastReceivedHash[0:4])
- peer.QueueMessage(wire.NewMessage(wire.MsgGetBlockHashesTy, []interface{}{peer.lastReceivedHash, uint32(amount)}))
-}
-
-func (self *BlockPool) AddHash(hash []byte, peer *Peer) {
- self.mut.Lock()
- defer self.mut.Unlock()
-
- if self.pool[string(hash)] == nil {
- self.pool[string(hash)] = &block{peer, nil, nil, time.Now(), 0}
-
- self.hashes = append([][]byte{hash}, self.hashes...)
- }
-}
-
-func (self *BlockPool) Add(b *types.Block, peer *Peer) {
- self.addBlock(b, peer, false)
-}
-
-func (self *BlockPool) AddNew(b *types.Block, peer *Peer) {
- self.addBlock(b, peer, true)
-}
-
-func (self *BlockPool) addBlock(b *types.Block, peer *Peer, newBlock bool) {
- self.mut.Lock()
- defer self.mut.Unlock()
-
- hash := string(b.Hash())
-
- if self.pool[hash] == nil && !self.eth.ChainManager().HasBlock(b.Hash()) {
- poollogger.Infof("Got unrequested block (%x...)\n", hash[0:4])
-
- self.hashes = append(self.hashes, b.Hash())
- self.pool[hash] = &block{peer, peer, b, time.Now(), 0}
-
- // The following is only performed on an unrequested new block
- if newBlock {
- fmt.Println("1.", !self.eth.ChainManager().HasBlock(b.PrevHash), ethutil.Bytes2Hex(b.Hash()[0:4]), ethutil.Bytes2Hex(b.PrevHash[0:4]))
- fmt.Println("2.", self.pool[string(b.PrevHash)] == nil)
- fmt.Println("3.", !self.fetchingHashes)
- if !self.eth.ChainManager().HasBlock(b.PrevHash) /*&& self.pool[string(b.PrevHash)] == nil*/ && !self.fetchingHashes {
- poollogger.Infof("Unknown chain, requesting (%x...)\n", b.PrevHash[0:4])
- peer.QueueMessage(wire.NewMessage(wire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)}))
- }
- }
- } else if self.pool[hash] != nil {
- self.pool[hash].block = b
- }
-
- self.BlocksProcessed++
-}
-
-func (self *BlockPool) Remove(hash []byte) {
- self.mut.Lock()
- defer self.mut.Unlock()
-
- self.hashes = ethutil.DeleteFromByteSlice(self.hashes, hash)
- delete(self.pool, string(hash))
-}
-
-func (self *BlockPool) DistributeHashes() {
- self.mut.Lock()
- defer self.mut.Unlock()
-
- var (
- peerLen = self.eth.peers.Len()
- amount = 256 * peerLen
- dist = make(map[*Peer][][]byte)
- )
-
- num := int(math.Min(float64(amount), float64(len(self.pool))))
- for i, j := 0, 0; i < len(self.hashes) && j < num; i++ {
- hash := self.hashes[i]
- item := self.pool[string(hash)]
-
- if item != nil && item.block == nil {
- var peer *Peer
- lastFetchFailed := time.Since(item.reqAt) > 5*time.Second
-
- // Handle failed requests
- if lastFetchFailed && item.requested > 5 && item.peer != nil {
- if item.requested < 100 {
- // Select peer the hash was retrieved off
- peer = item.from
- } else {
- // Remove it
- self.hashes = ethutil.DeleteFromByteSlice(self.hashes, hash)
- delete(self.pool, string(hash))
- }
- } else if lastFetchFailed || item.peer == nil {
- // Find a suitable, available peer
- eachPeer(self.eth.peers, func(p *Peer, v *list.Element) {
- if peer == nil && len(dist[p]) < amount/peerLen && p.statusKnown {
- peer = p
- }
- })
- }
-
- if peer != nil {
- item.reqAt = time.Now()
- item.peer = peer
- item.requested++
-
- dist[peer] = append(dist[peer], hash)
- }
- }
- }
-
- for peer, hashes := range dist {
- peer.FetchBlocks(hashes)
- }
-
- if len(dist) > 0 {
- self.downloadStartedAt = time.Now()
- }
-}
-
-func (self *BlockPool) Start() {
- go self.downloadThread()
- go self.chainThread()
-}
-
-func (self *BlockPool) Stop() {
- close(self.quit)
-}
-
-func (self *BlockPool) downloadThread() {
- serviceTimer := time.NewTicker(100 * time.Millisecond)
-out:
- for {
- select {
- case <-self.quit:
- break out
- case <-serviceTimer.C:
- // Check if we're catching up. If not distribute the hashes to
- // the peers and download the blockchain
- self.fetchingHashes = false
- eachPeer(self.eth.peers, func(p *Peer, v *list.Element) {
- if p.statusKnown && p.FetchingHashes() {
- self.fetchingHashes = true
- }
- })
-
- if len(self.hashes) > 0 {
- self.DistributeHashes()
- }
-
- if self.ChainLength < len(self.hashes) {
- self.ChainLength = len(self.hashes)
- }
-
- if self.peer != nil &&
- !self.peer.doneFetchingHashes &&
- time.Since(self.peer.lastHashAt) > 10*time.Second &&
- time.Since(self.peer.lastHashRequestedAt) > 5*time.Second {
- self.fetchHashes()
- }
-
- /*
- if !self.fetchingHashes {
- blocks := self.Blocks()
- chain.BlockBy(chain.Number).Sort(blocks)
-
- if len(blocks) > 0 {
- if !self.eth.ChainManager().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes {
- }
- }
- }
- */
- }
- }
-}
-
-func (self *BlockPool) chainThread() {
- procTimer := time.NewTicker(500 * time.Millisecond)
-out:
- for {
- select {
- case <-self.quit:
- break out
- case <-procTimer.C:
- blocks := self.Blocks()
- types.BlockBy(types.Number).Sort(blocks)
-
- // Find common block
- for i, block := range blocks {
- if self.eth.ChainManager().HasBlock(block.PrevHash) {
- blocks = blocks[i:]
- break
- }
- }
-
- if len(blocks) > 0 {
- if self.eth.ChainManager().HasBlock(blocks[0].PrevHash) {
- for i, block := range blocks[1:] {
- // NOTE: The Ith element in this loop refers to the previous block in
- // outer "blocks"
- if bytes.Compare(block.PrevHash, blocks[i].Hash()) != 0 {
- blocks = blocks[:i]
-
- break
- }
- }
- } else {
- blocks = nil
- }
- }
-
- if len(blocks) > 0 {
- chainman := self.eth.ChainManager()
-
- err := chainman.InsertChain(blocks)
- if err != nil {
- poollogger.Debugln(err)
-
- self.Reset()
-
- if self.peer != nil && self.peer.conn != nil {
- poollogger.Debugf("Punishing peer for supplying bad chain (%v)\n", self.peer.conn.RemoteAddr())
- }
-
- // This peer gave us bad hashes and made us fetch a bad chain, therefor he shall be punished.
- self.eth.BlacklistPeer(self.peer)
- self.peer.StopWithReason(DiscBadPeer)
- self.td = ethutil.Big0
- self.peer = nil
- }
-
- for _, block := range blocks {
- self.Remove(block.Hash())
- }
- }
- }
- }
-}
diff --git a/ethereum b/ethereum
new file mode 100755
index 000000000..7e17d95a4
Binary files /dev/null and b/ethereum differ
diff --git a/ethereum.go b/ethereum.go
deleted file mode 100644
index e8b1a9500..000000000
--- a/ethereum.go
+++ /dev/null
@@ -1,659 +0,0 @@
-package eth
-
-import (
- "container/list"
- "encoding/json"
- "fmt"
- "math/big"
- "math/rand"
- "net"
- "path"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/ethutil"
- "github.com/ethereum/go-ethereum/event"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/rpc"
- "github.com/ethereum/go-ethereum/state"
- "github.com/ethereum/go-ethereum/wire"
-)
-
-const (
- seedTextFileUri string = "http://www.ethereum.org/servers.poc3.txt"
- seedNodeAddress = "poc-7.ethdev.com:30303"
-)
-
-var loggerger = logger.NewLogger("SERV")
-
-func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) {
- // Loop thru the peers and close them (if we had them)
- for e := peers.Front(); e != nil; e = e.Next() {
- callback(e.Value.(*Peer), e)
- }
-}
-
-const (
- processReapingTimeout = 60 // TODO increase
-)
-
-type Ethereum struct {
- // Channel for shutting down the ethereum
- shutdownChan chan bool
- quit chan bool
-
- // DB interface
- db ethutil.Database
- // State manager for processing new blocks and managing the over all states
- blockManager *core.BlockManager
- // The transaction pool. Transaction can be pushed on this pool
- // for later including in the blocks
- txPool *core.TxPool
- // The canonical chain
- blockChain *core.ChainManager
- // The block pool
- blockPool *BlockPool
- // Eventer
- eventMux event.TypeMux
- // Peers
- peers *list.List
- // Nonce
- Nonce uint64
-
- Addr net.Addr
- Port string
-
- blacklist [][]byte
-
- peerMut sync.Mutex
-
- // Capabilities for outgoing peers
- serverCaps Caps
-
- nat NAT
-
- // Specifies the desired amount of maximum peers
- MaxPeers int
-
- Mining bool
-
- listening bool
-
- RpcServer *rpc.JsonRpcServer
-
- keyManager *crypto.KeyManager
-
- clientIdentity wire.ClientIdentity
-
- isUpToDate bool
-
- filterMu sync.RWMutex
- filterId int
- filters map[int]*core.Filter
-}
-
-func New(db ethutil.Database, clientIdentity wire.ClientIdentity, keyManager *crypto.KeyManager, caps Caps, usePnp bool) (*Ethereum, error) {
- var err error
- var nat NAT
-
- if usePnp {
- nat, err = Discover()
- if err != nil {
- loggerger.Debugln("UPnP failed", err)
- }
- }
-
- bootstrapDb(db)
-
- ethutil.Config.Db = db
-
- nonce, _ := ethutil.RandomUint64()
- ethereum := &Ethereum{
- shutdownChan: make(chan bool),
- quit: make(chan bool),
- db: db,
- peers: list.New(),
- Nonce: nonce,
- serverCaps: caps,
- nat: nat,
- keyManager: keyManager,
- clientIdentity: clientIdentity,
- isUpToDate: true,
- filters: make(map[int]*core.Filter),
- }
-
- ethereum.blockPool = NewBlockPool(ethereum)
- ethereum.txPool = core.NewTxPool(ethereum)
- ethereum.blockChain = core.NewChainManager(ethereum.EventMux())
- ethereum.blockManager = core.NewBlockManager(ethereum)
- ethereum.blockChain.SetProcessor(ethereum.blockManager)
-
- // Start the tx pool
- ethereum.txPool.Start()
-
- return ethereum, nil
-}
-
-func (s *Ethereum) KeyManager() *crypto.KeyManager {
- return s.keyManager
-}
-
-func (s *Ethereum) ClientIdentity() wire.ClientIdentity {
- return s.clientIdentity
-}
-
-func (s *Ethereum) ChainManager() *core.ChainManager {
- return s.blockChain
-}
-
-func (s *Ethereum) BlockManager() *core.BlockManager {
- return s.blockManager
-}
-
-func (s *Ethereum) TxPool() *core.TxPool {
- return s.txPool
-}
-func (s *Ethereum) BlockPool() *BlockPool {
- return s.blockPool
-}
-func (s *Ethereum) EventMux() *event.TypeMux {
- return &s.eventMux
-}
-func (self *Ethereum) Db() ethutil.Database {
- return self.db
-}
-
-func (s *Ethereum) ServerCaps() Caps {
- return s.serverCaps
-}
-func (s *Ethereum) IsMining() bool {
- return s.Mining
-}
-func (s *Ethereum) PeerCount() int {
- return s.peers.Len()
-}
-func (s *Ethereum) IsUpToDate() bool {
- upToDate := true
- eachPeer(s.peers, func(peer *Peer, e *list.Element) {
- if atomic.LoadInt32(&peer.connected) == 1 {
- if peer.catchingUp == true && peer.versionKnown {
- upToDate = false
- }
- }
- })
- return upToDate
-}
-func (s *Ethereum) PushPeer(peer *Peer) {
- s.peers.PushBack(peer)
-}
-func (s *Ethereum) IsListening() bool {
- return s.listening
-}
-
-func (s *Ethereum) HighestTDPeer() (td *big.Int) {
- td = big.NewInt(0)
-
- eachPeer(s.peers, func(p *Peer, v *list.Element) {
- if p.td.Cmp(td) > 0 {
- td = p.td
- }
- })
-
- return
-}
-
-func (self *Ethereum) BlacklistPeer(peer *Peer) {
- self.blacklist = append(self.blacklist, peer.pubkey)
-}
-
-func (s *Ethereum) AddPeer(conn net.Conn) {
- peer := NewPeer(conn, s, true)
-
- if peer != nil {
- if s.peers.Len() < s.MaxPeers {
- peer.Start()
- } else {
- loggerger.Debugf("Max connected peers reached. Not adding incoming peer.")
- }
- }
-}
-
-func (s *Ethereum) ProcessPeerList(addrs []string) {
- for _, addr := range addrs {
- // TODO Probably requires some sanity checks
- s.ConnectToPeer(addr)
- }
-}
-
-func (s *Ethereum) ConnectToPeer(addr string) error {
- if s.peers.Len() < s.MaxPeers {
- var alreadyConnected bool
-
- ahost, aport, _ := net.SplitHostPort(addr)
- var chost string
-
- ips, err := net.LookupIP(ahost)
-
- if err != nil {
- return err
- } else {
- // If more then one ip is available try stripping away the ipv6 ones
- if len(ips) > 1 {
- var ipsv4 []net.IP
- // For now remove the ipv6 addresses
- for _, ip := range ips {
- if strings.Contains(ip.String(), "::") {
- continue
- } else {
- ipsv4 = append(ipsv4, ip)
- }
- }
- if len(ipsv4) == 0 {
- return fmt.Errorf("[SERV] No IPV4 addresses available for hostname")
- }
-
- // Pick a random ipv4 address, simulating round-robin DNS.
- rand.Seed(time.Now().UTC().UnixNano())
- i := rand.Intn(len(ipsv4))
- chost = ipsv4[i].String()
- } else {
- if len(ips) == 0 {
- return fmt.Errorf("[SERV] No IPs resolved for the given hostname")
- return nil
- }
- chost = ips[0].String()
- }
- }
-
- eachPeer(s.peers, func(p *Peer, v *list.Element) {
- if p.conn == nil {
- return
- }
- phost, pport, _ := net.SplitHostPort(p.conn.RemoteAddr().String())
-
- if phost == chost && pport == aport {
- alreadyConnected = true
- //loggerger.Debugf("Peer %s already added.\n", chost)
- return
- }
- })
-
- if alreadyConnected {
- return nil
- }
-
- NewOutboundPeer(addr, s, s.serverCaps)
- }
-
- return nil
-}
-
-func (s *Ethereum) OutboundPeers() []*Peer {
- // Create a new peer slice with at least the length of the total peers
- outboundPeers := make([]*Peer, s.peers.Len())
- length := 0
- eachPeer(s.peers, func(p *Peer, e *list.Element) {
- if !p.inbound && p.conn != nil {
- outboundPeers[length] = p
- length++
- }
- })
-
- return outboundPeers[:length]
-}
-
-func (s *Ethereum) InboundPeers() []*Peer {
- // Create a new peer slice with at least the length of the total peers
- inboundPeers := make([]*Peer, s.peers.Len())
- length := 0
- eachPeer(s.peers, func(p *Peer, e *list.Element) {
- if p.inbound {
- inboundPeers[length] = p
- length++
- }
- })
-
- return inboundPeers[:length]
-}
-
-func (s *Ethereum) InOutPeers() []*Peer {
- // Reap the dead peers first
- s.reapPeers()
-
- // Create a new peer slice with at least the length of the total peers
- inboundPeers := make([]*Peer, s.peers.Len())
- length := 0
- eachPeer(s.peers, func(p *Peer, e *list.Element) {
- // Only return peers with an actual ip
- if len(p.host) > 0 {
- inboundPeers[length] = p
- length++
- }
- })
-
- return inboundPeers[:length]
-}
-
-func (s *Ethereum) Broadcast(msgType wire.MsgType, data []interface{}) {
- msg := wire.NewMessage(msgType, data)
- s.BroadcastMsg(msg)
-}
-
-func (s *Ethereum) BroadcastMsg(msg *wire.Msg) {
- eachPeer(s.peers, func(p *Peer, e *list.Element) {
- p.QueueMessage(msg)
- })
-}
-
-func (s *Ethereum) Peers() *list.List {
- return s.peers
-}
-
-func (s *Ethereum) reapPeers() {
- eachPeer(s.peers, func(p *Peer, e *list.Element) {
- if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) {
- s.removePeerElement(e)
- }
- })
-}
-
-func (s *Ethereum) removePeerElement(e *list.Element) {
- s.peerMut.Lock()
- defer s.peerMut.Unlock()
-
- s.peers.Remove(e)
-
- s.eventMux.Post(PeerListEvent{s.peers})
-}
-
-func (s *Ethereum) RemovePeer(p *Peer) {
- eachPeer(s.peers, func(peer *Peer, e *list.Element) {
- if peer == p {
- s.removePeerElement(e)
- }
- })
-}
-
-func (s *Ethereum) reapDeadPeerHandler() {
- reapTimer := time.NewTicker(processReapingTimeout * time.Second)
-
- for {
- select {
- case <-reapTimer.C:
- s.reapPeers()
- }
- }
-}
-
-// Start the ethereum
-func (s *Ethereum) Start(seed bool) {
- s.blockPool.Start()
-
- // Bind to addr and port
- ln, err := net.Listen("tcp", ":"+s.Port)
- if err != nil {
- loggerger.Warnf("Port %s in use. Connection listening disabled. Acting as client", s.Port)
- s.listening = false
- } else {
- s.listening = true
- // Starting accepting connections
- loggerger.Infoln("Ready and accepting connections")
- // Start the peer handler
- go s.peerHandler(ln)
- }
-
- if s.nat != nil {
- go s.upnpUpdateThread()
- }
-
- // Start the reaping processes
- go s.reapDeadPeerHandler()
- go s.update()
- go s.filterLoop()
-
- if seed {
- s.Seed()
- }
- s.ConnectToPeer("localhost:40404")
- loggerger.Infoln("Server started")
-}
-
-func (s *Ethereum) Seed() {
- // Sorry Py person. I must blacklist. you perform badly
- s.blacklist = append(s.blacklist, ethutil.Hex2Bytes("64656330303561383532336435376331616537643864663236623336313863373537353163636634333530626263396330346237336262623931383064393031"))
- ips := PastPeers()
- if len(ips) > 0 {
- for _, ip := range ips {
- loggerger.Infoln("Connecting to previous peer ", ip)
- s.ConnectToPeer(ip)
- }
- } else {
- loggerger.Debugln("Retrieving seed nodes")
-
- // Eth-Go Bootstrapping
- ips, er := net.LookupIP("seed.bysh.me")
- if er == nil {
- peers := []string{}
- for _, ip := range ips {
- node := fmt.Sprintf("%s:%d", ip.String(), 30303)
- loggerger.Debugln("Found DNS Go Peer:", node)
- peers = append(peers, node)
- }
- s.ProcessPeerList(peers)
- }
-
- // Official DNS Bootstrapping
- _, nodes, err := net.LookupSRV("eth", "tcp", "ethereum.org")
- if err == nil {
- peers := []string{}
- // Iterate SRV nodes
- for _, n := range nodes {
- target := n.Target
- port := strconv.Itoa(int(n.Port))
- // Resolve target to ip (Go returns list, so may resolve to multiple ips?)
- addr, err := net.LookupHost(target)
- if err == nil {
- for _, a := range addr {
- // Build string out of SRV port and Resolved IP
- peer := net.JoinHostPort(a, port)
- loggerger.Debugln("Found DNS Bootstrap Peer:", peer)
- peers = append(peers, peer)
- }
- } else {
- loggerger.Debugln("Couldn't resolve :", target)
- }
- }
- // Connect to Peer list
- s.ProcessPeerList(peers)
- }
-
- s.ConnectToPeer(seedNodeAddress)
- }
-}
-
-func (s *Ethereum) peerHandler(listener net.Listener) {
- for {
- conn, err := listener.Accept()
- if err != nil {
- loggerger.Debugln(err)
-
- continue
- }
-
- go s.AddPeer(conn)
- }
-}
-
-func (s *Ethereum) Stop() {
- // Stop eventMux first, it will close all subscriptions.
- s.eventMux.Stop()
-
- // Close the database
- defer s.db.Close()
-
- var ips []string
- eachPeer(s.peers, func(p *Peer, e *list.Element) {
- ips = append(ips, p.conn.RemoteAddr().String())
- })
-
- if len(ips) > 0 {
- d, _ := json.MarshalIndent(ips, "", " ")
- ethutil.WriteFile(path.Join(ethutil.Config.ExecPath, "known_peers.json"), d)
- }
-
- eachPeer(s.peers, func(p *Peer, e *list.Element) {
- p.Stop()
- })
-
- close(s.quit)
-
- if s.RpcServer != nil {
- s.RpcServer.Stop()
- }
- s.txPool.Stop()
- s.blockPool.Stop()
-
- loggerger.Infoln("Server stopped")
- close(s.shutdownChan)
-}
-
-// This function will wait for a shutdown and resumes main thread execution
-func (s *Ethereum) WaitForShutdown() {
- <-s.shutdownChan
-}
-
-func (s *Ethereum) upnpUpdateThread() {
- // Go off immediately to prevent code duplication, thereafter we renew
- // lease every 15 minutes.
- timer := time.NewTimer(5 * time.Minute)
- lport, _ := strconv.ParseInt(s.Port, 10, 16)
- first := true
-out:
- for {
- select {
- case <-timer.C:
- var err error
- _, err = s.nat.AddPortMapping("TCP", int(lport), int(lport), "eth listen port", 20*60)
- if err != nil {
- loggerger.Debugln("can't add UPnP port mapping:", err)
- break out
- }
- if first && err == nil {
- _, err = s.nat.GetExternalAddress()
- if err != nil {
- loggerger.Debugln("UPnP can't get external address:", err)
- continue out
- }
- first = false
- }
- timer.Reset(time.Minute * 15)
- case <-s.quit:
- break out
- }
- }
-
- timer.Stop()
-
- if err := s.nat.DeletePortMapping("TCP", int(lport), int(lport)); err != nil {
- loggerger.Debugln("unable to remove UPnP port mapping:", err)
- } else {
- loggerger.Debugln("succesfully disestablished UPnP port mapping")
- }
-}
-
-func (self *Ethereum) update() {
- upToDateTimer := time.NewTicker(1 * time.Second)
-
-out:
- for {
- select {
- case <-upToDateTimer.C:
- if self.IsUpToDate() && !self.isUpToDate {
- self.eventMux.Post(ChainSyncEvent{false})
- self.isUpToDate = true
- } else if !self.IsUpToDate() && self.isUpToDate {
- self.eventMux.Post(ChainSyncEvent{true})
- self.isUpToDate = false
- }
- case <-self.quit:
- break out
- }
- }
-}
-
-// InstallFilter adds filter for blockchain events.
-// The filter's callbacks will run for matching blocks and messages.
-// The filter should not be modified after it has been installed.
-func (self *Ethereum) InstallFilter(filter *core.Filter) (id int) {
- self.filterMu.Lock()
- id = self.filterId
- self.filters[id] = filter
- self.filterId++
- self.filterMu.Unlock()
- return id
-}
-
-func (self *Ethereum) UninstallFilter(id int) {
- self.filterMu.Lock()
- delete(self.filters, id)
- self.filterMu.Unlock()
-}
-
-// GetFilter retrieves a filter installed using InstallFilter.
-// The filter may not be modified.
-func (self *Ethereum) GetFilter(id int) *core.Filter {
- self.filterMu.RLock()
- defer self.filterMu.RUnlock()
- return self.filters[id]
-}
-
-func (self *Ethereum) filterLoop() {
- // Subscribe to events
- events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Messages(nil))
- for event := range events.Chan() {
- switch event := event.(type) {
- case core.NewBlockEvent:
- self.filterMu.RLock()
- for _, filter := range self.filters {
- if filter.BlockCallback != nil {
- filter.BlockCallback(event.Block)
- }
- }
- self.filterMu.RUnlock()
-
- case state.Messages:
- self.filterMu.RLock()
- for _, filter := range self.filters {
- if filter.MessageCallback != nil {
- msgs := filter.FilterMessages(event)
- if len(msgs) > 0 {
- filter.MessageCallback(msgs)
- }
- }
- }
- self.filterMu.RUnlock()
- }
- }
-}
-
-func bootstrapDb(db ethutil.Database) {
- d, _ := db.Get([]byte("ProtocolVersion"))
- protov := ethutil.NewValue(d).Uint()
-
- if protov == 0 {
- db.Put([]byte("ProtocolVersion"), ethutil.NewValue(ProtocolVersion).Bytes())
- }
-}
-
-func PastPeers() []string {
- var ips []string
- data, _ := ethutil.ReadAllFile(path.Join(ethutil.Config.ExecPath, "known_peers.json"))
- json.Unmarshal([]byte(data), &ips)
-
- return ips
-}
diff --git a/events.go b/events.go
deleted file mode 100644
index 5fff1d831..000000000
--- a/events.go
+++ /dev/null
@@ -1,11 +0,0 @@
-package eth
-
-import "container/list"
-
-type PeerListEvent struct {
- Peers *list.List
-}
-
-type ChainSyncEvent struct {
- InSync bool
-}
diff --git a/nat.go b/nat.go
deleted file mode 100644
index 999308eb2..000000000
--- a/nat.go
+++ /dev/null
@@ -1,12 +0,0 @@
-package eth
-
-import (
- "net"
-)
-
-// protocol is either "udp" or "tcp"
-type NAT interface {
- GetExternalAddress() (addr net.IP, err error)
- AddPortMapping(protocol string, externalPort, internalPort int, description string, timeout int) (mappedExternalPort int, err error)
- DeletePortMapping(protocol string, externalPort, internalPort int) (err error)
-}
diff --git a/natpmp.go b/natpmp.go
deleted file mode 100644
index 489342a4b..000000000
--- a/natpmp.go
+++ /dev/null
@@ -1,55 +0,0 @@
-package eth
-
-import (
- "fmt"
- "net"
-
- natpmp "github.com/jackpal/go-nat-pmp"
-)
-
-// Adapt the NAT-PMP protocol to the NAT interface
-
-// TODO:
-// + Register for changes to the external address.
-// + Re-register port mapping when router reboots.
-// + A mechanism for keeping a port mapping registered.
-
-type natPMPClient struct {
- client *natpmp.Client
-}
-
-func NewNatPMP(gateway net.IP) (nat NAT) {
- return &natPMPClient{natpmp.NewClient(gateway)}
-}
-
-func (n *natPMPClient) GetExternalAddress() (addr net.IP, err error) {
- response, err := n.client.GetExternalAddress()
- if err != nil {
- return
- }
- ip := response.ExternalIPAddress
- addr = net.IPv4(ip[0], ip[1], ip[2], ip[3])
- return
-}
-
-func (n *natPMPClient) AddPortMapping(protocol string, externalPort, internalPort int,
- description string, timeout int) (mappedExternalPort int, err error) {
- if timeout <= 0 {
- err = fmt.Errorf("timeout must not be <= 0")
- return
- }
- // Note order of port arguments is switched between our AddPortMapping and the client's AddPortMapping.
- response, err := n.client.AddPortMapping(protocol, internalPort, externalPort, timeout)
- if err != nil {
- return
- }
- mappedExternalPort = int(response.MappedExternalPort)
- return
-}
-
-func (n *natPMPClient) DeletePortMapping(protocol string, externalPort, internalPort int) (err error) {
- // To destroy a mapping, send an add-port with
- // an internalPort of the internal port to destroy, an external port of zero and a time of zero.
- _, err = n.client.AddPortMapping(protocol, internalPort, 0, 0)
- return
-}
diff --git a/natupnp.go b/natupnp.go
deleted file mode 100644
index c7f9eeb62..000000000
--- a/natupnp.go
+++ /dev/null
@@ -1,338 +0,0 @@
-package eth
-
-// Just enough UPnP to be able to forward ports
-//
-
-import (
- "bytes"
- "encoding/xml"
- "errors"
- "net"
- "net/http"
- "os"
- "strconv"
- "strings"
- "time"
-)
-
-type upnpNAT struct {
- serviceURL string
- ourIP string
-}
-
-func Discover() (nat NAT, err error) {
- ssdp, err := net.ResolveUDPAddr("udp4", "239.255.255.250:1900")
- if err != nil {
- return
- }
- conn, err := net.ListenPacket("udp4", ":0")
- if err != nil {
- return
- }
- socket := conn.(*net.UDPConn)
- defer socket.Close()
-
- err = socket.SetDeadline(time.Now().Add(10 * time.Second))
- if err != nil {
- return
- }
-
- st := "ST: urn:schemas-upnp-org:device:InternetGatewayDevice:1\r\n"
- buf := bytes.NewBufferString(
- "M-SEARCH * HTTP/1.1\r\n" +
- "HOST: 239.255.255.250:1900\r\n" +
- st +
- "MAN: \"ssdp:discover\"\r\n" +
- "MX: 2\r\n\r\n")
- message := buf.Bytes()
- answerBytes := make([]byte, 1024)
- for i := 0; i < 3; i++ {
- _, err = socket.WriteToUDP(message, ssdp)
- if err != nil {
- return
- }
- var n int
- n, _, err = socket.ReadFromUDP(answerBytes)
- if err != nil {
- continue
- // socket.Close()
- // return
- }
- answer := string(answerBytes[0:n])
- if strings.Index(answer, "\r\n"+st) < 0 {
- continue
- }
- // HTTP header field names are case-insensitive.
- // http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2
- locString := "\r\nlocation: "
- answer = strings.ToLower(answer)
- locIndex := strings.Index(answer, locString)
- if locIndex < 0 {
- continue
- }
- loc := answer[locIndex+len(locString):]
- endIndex := strings.Index(loc, "\r\n")
- if endIndex < 0 {
- continue
- }
- locURL := loc[0:endIndex]
- var serviceURL string
- serviceURL, err = getServiceURL(locURL)
- if err != nil {
- return
- }
- var ourIP string
- ourIP, err = getOurIP()
- if err != nil {
- return
- }
- nat = &upnpNAT{serviceURL: serviceURL, ourIP: ourIP}
- return
- }
- err = errors.New("UPnP port discovery failed.")
- return
-}
-
-// service represents the Service type in an UPnP xml description.
-// Only the parts we care about are present and thus the xml may have more
-// fields than present in the structure.
-type service struct {
- ServiceType string `xml:"serviceType"`
- ControlURL string `xml:"controlURL"`
-}
-
-// deviceList represents the deviceList type in an UPnP xml description.
-// Only the parts we care about are present and thus the xml may have more
-// fields than present in the structure.
-type deviceList struct {
- XMLName xml.Name `xml:"deviceList"`
- Device []device `xml:"device"`
-}
-
-// serviceList represents the serviceList type in an UPnP xml description.
-// Only the parts we care about are present and thus the xml may have more
-// fields than present in the structure.
-type serviceList struct {
- XMLName xml.Name `xml:"serviceList"`
- Service []service `xml:"service"`
-}
-
-// device represents the device type in an UPnP xml description.
-// Only the parts we care about are present and thus the xml may have more
-// fields than present in the structure.
-type device struct {
- XMLName xml.Name `xml:"device"`
- DeviceType string `xml:"deviceType"`
- DeviceList deviceList `xml:"deviceList"`
- ServiceList serviceList `xml:"serviceList"`
-}
-
-// specVersion represents the specVersion in a UPnP xml description.
-// Only the parts we care about are present and thus the xml may have more
-// fields than present in the structure.
-type specVersion struct {
- XMLName xml.Name `xml:"specVersion"`
- Major int `xml:"major"`
- Minor int `xml:"minor"`
-}
-
-// root represents the Root document for a UPnP xml description.
-// Only the parts we care about are present and thus the xml may have more
-// fields than present in the structure.
-type root struct {
- XMLName xml.Name `xml:"root"`
- SpecVersion specVersion
- Device device
-}
-
-func getChildDevice(d *device, deviceType string) *device {
- dl := d.DeviceList.Device
- for i := 0; i < len(dl); i++ {
- if dl[i].DeviceType == deviceType {
- return &dl[i]
- }
- }
- return nil
-}
-
-func getChildService(d *device, serviceType string) *service {
- sl := d.ServiceList.Service
- for i := 0; i < len(sl); i++ {
- if sl[i].ServiceType == serviceType {
- return &sl[i]
- }
- }
- return nil
-}
-
-func getOurIP() (ip string, err error) {
- hostname, err := os.Hostname()
- if err != nil {
- return
- }
- p, err := net.LookupIP(hostname)
- if err != nil && len(p) > 0 {
- return
- }
- return p[0].String(), nil
-}
-
-func getServiceURL(rootURL string) (url string, err error) {
- r, err := http.Get(rootURL)
- if err != nil {
- return
- }
- defer r.Body.Close()
- if r.StatusCode >= 400 {
- err = errors.New(string(r.StatusCode))
- return
- }
- var root root
- err = xml.NewDecoder(r.Body).Decode(&root)
-
- if err != nil {
- return
- }
- a := &root.Device
- if a.DeviceType != "urn:schemas-upnp-org:device:InternetGatewayDevice:1" {
- err = errors.New("No InternetGatewayDevice")
- return
- }
- b := getChildDevice(a, "urn:schemas-upnp-org:device:WANDevice:1")
- if b == nil {
- err = errors.New("No WANDevice")
- return
- }
- c := getChildDevice(b, "urn:schemas-upnp-org:device:WANConnectionDevice:1")
- if c == nil {
- err = errors.New("No WANConnectionDevice")
- return
- }
- d := getChildService(c, "urn:schemas-upnp-org:service:WANIPConnection:1")
- if d == nil {
- err = errors.New("No WANIPConnection")
- return
- }
- url = combineURL(rootURL, d.ControlURL)
- return
-}
-
-func combineURL(rootURL, subURL string) string {
- protocolEnd := "://"
- protoEndIndex := strings.Index(rootURL, protocolEnd)
- a := rootURL[protoEndIndex+len(protocolEnd):]
- rootIndex := strings.Index(a, "/")
- return rootURL[0:protoEndIndex+len(protocolEnd)+rootIndex] + subURL
-}
-
-func soapRequest(url, function, message string) (r *http.Response, err error) {
- fullMessage := "" +
- "\r\n" +
- "" + message + ""
-
- req, err := http.NewRequest("POST", url, strings.NewReader(fullMessage))
- if err != nil {
- return nil, err
- }
- req.Header.Set("Content-Type", "text/xml ; charset=\"utf-8\"")
- req.Header.Set("User-Agent", "Darwin/10.0.0, UPnP/1.0, MiniUPnPc/1.3")
- //req.Header.Set("Transfer-Encoding", "chunked")
- req.Header.Set("SOAPAction", "\"urn:schemas-upnp-org:service:WANIPConnection:1#"+function+"\"")
- req.Header.Set("Connection", "Close")
- req.Header.Set("Cache-Control", "no-cache")
- req.Header.Set("Pragma", "no-cache")
-
- // log.Stderr("soapRequest ", req)
- //fmt.Println(fullMessage)
-
- r, err = http.DefaultClient.Do(req)
- if err != nil {
- return
- }
-
- if r.Body != nil {
- defer r.Body.Close()
- }
-
- if r.StatusCode >= 400 {
- // log.Stderr(function, r.StatusCode)
- err = errors.New("Error " + strconv.Itoa(r.StatusCode) + " for " + function)
- r = nil
- return
- }
- return
-}
-
-type statusInfo struct {
- externalIpAddress string
-}
-
-func (n *upnpNAT) getStatusInfo() (info statusInfo, err error) {
-
- message := "\r\n" +
- ""
-
- var response *http.Response
- response, err = soapRequest(n.serviceURL, "GetStatusInfo", message)
- if err != nil {
- return
- }
-
- // TODO: Write a soap reply parser. It has to eat the Body and envelope tags...
-
- response.Body.Close()
- return
-}
-
-func (n *upnpNAT) GetExternalAddress() (addr net.IP, err error) {
- info, err := n.getStatusInfo()
- if err != nil {
- return
- }
- addr = net.ParseIP(info.externalIpAddress)
- return
-}
-
-func (n *upnpNAT) AddPortMapping(protocol string, externalPort, internalPort int, description string, timeout int) (mappedExternalPort int, err error) {
- // A single concatenation would break ARM compilation.
- message := "\r\n" +
- "" + strconv.Itoa(externalPort)
- message += "" + protocol + ""
- message += "" + strconv.Itoa(internalPort) + "" +
- "" + n.ourIP + "" +
- "1"
- message += description +
- "" + strconv.Itoa(timeout) +
- ""
-
- var response *http.Response
- response, err = soapRequest(n.serviceURL, "AddPortMapping", message)
- if err != nil {
- return
- }
-
- // TODO: check response to see if the port was forwarded
- // log.Println(message, response)
- mappedExternalPort = externalPort
- _ = response
- return
-}
-
-func (n *upnpNAT) DeletePortMapping(protocol string, externalPort, internalPort int) (err error) {
-
- message := "\r\n" +
- "" + strconv.Itoa(externalPort) +
- "" + protocol + "" +
- ""
-
- var response *http.Response
- response, err = soapRequest(n.serviceURL, "DeletePortMapping", message)
- if err != nil {
- return
- }
-
- // TODO: check response to see if the port was deleted
- // log.Println(message, response)
- _ = response
- return
-}
diff --git a/peer.go b/peer.go
deleted file mode 100644
index 331e9de37..000000000
--- a/peer.go
+++ /dev/null
@@ -1,881 +0,0 @@
-package eth
-
-import (
- "bytes"
- "container/list"
- "fmt"
- "math"
- "math/big"
- "net"
- "strconv"
- "strings"
- "sync/atomic"
- "time"
-
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/ethutil"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/wire"
-)
-
-var peerlogger = logger.NewLogger("PEER")
-
-const (
- // The size of the output buffer for writing messages
- outputBufferSize = 50
- // Current protocol version
- ProtocolVersion = 49
- // Current P2P version
- P2PVersion = 2
- // Ethereum network version
- NetVersion = 0
- // Interval for ping/pong message
- pingPongTimer = 2 * time.Second
-)
-
-type DiscReason byte
-
-const (
- // Values are given explicitly instead of by iota because these values are
- // defined by the wire protocol spec; it is easier for humans to ensure
- // correctness when values are explicit.
- DiscRequested DiscReason = iota
- DiscReTcpSysErr
- DiscBadProto
- DiscBadPeer
- DiscTooManyPeers
- DiscConnDup
- DiscGenesisErr
- DiscProtoErr
- DiscQuitting
-)
-
-var discReasonToString = []string{
- "requested",
- "TCP sys error",
- "bad protocol",
- "useless peer",
- "too many peers",
- "already connected",
- "wrong genesis block",
- "incompatible network",
- "quitting",
-}
-
-func (d DiscReason) String() string {
- if len(discReasonToString) < int(d) {
- return "Unknown"
- }
-
- return discReasonToString[d]
-}
-
-// Peer capabilities
-type Caps byte
-
-const (
- CapPeerDiscTy Caps = 1 << iota
- CapTxTy
- CapChainTy
-
- CapDefault = CapChainTy | CapTxTy | CapPeerDiscTy
-)
-
-var capsToString = map[Caps]string{
- CapPeerDiscTy: "Peer discovery",
- CapTxTy: "Transaction relaying",
- CapChainTy: "Block chain relaying",
-}
-
-func (c Caps) IsCap(cap Caps) bool {
- return c&cap > 0
-}
-
-func (c Caps) String() string {
- var caps []string
- if c.IsCap(CapPeerDiscTy) {
- caps = append(caps, capsToString[CapPeerDiscTy])
- }
- if c.IsCap(CapChainTy) {
- caps = append(caps, capsToString[CapChainTy])
- }
- if c.IsCap(CapTxTy) {
- caps = append(caps, capsToString[CapTxTy])
- }
-
- return strings.Join(caps, " | ")
-}
-
-type Peer struct {
- // Ethereum interface
- ethereum *Ethereum
- // Net connection
- conn net.Conn
- // Output queue which is used to communicate and handle messages
- outputQueue chan *wire.Msg
- // Quit channel
- quit chan bool
- // Determines whether it's an inbound or outbound peer
- inbound bool
- // Flag for checking the peer's connectivity state
- connected int32
- disconnect int32
- // Last known message send
- lastSend time.Time
- // Indicated whether a verack has been send or not
- // This flag is used by writeMessage to check if messages are allowed
- // to be send or not. If no version is known all messages are ignored.
- versionKnown bool
- statusKnown bool
-
- // Last received pong message
- lastPong int64
- lastBlockReceived time.Time
- doneFetchingHashes bool
- lastHashAt time.Time
- lastHashRequestedAt time.Time
-
- host []byte
- port uint16
- caps Caps
- td *big.Int
- bestHash []byte
- lastReceivedHash []byte
- requestedHashes [][]byte
-
- // This peer's public key
- pubkey []byte
-
- // Indicated whether the node is catching up or not
- catchingUp bool
- diverted bool
- blocksRequested int
-
- version string
-
- // We use this to give some kind of pingtime to a node, not very accurate, could be improved.
- pingTime time.Duration
- pingStartTime time.Time
-
- lastRequestedBlock *types.Block
-
- protocolCaps *ethutil.Value
-}
-
-func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer {
- pubkey := ethereum.KeyManager().PublicKey()[1:]
-
- return &Peer{
- outputQueue: make(chan *wire.Msg, outputBufferSize),
- quit: make(chan bool),
- ethereum: ethereum,
- conn: conn,
- inbound: inbound,
- disconnect: 0,
- connected: 1,
- port: 30303,
- pubkey: pubkey,
- blocksRequested: 10,
- caps: ethereum.ServerCaps(),
- version: ethereum.ClientIdentity().String(),
- protocolCaps: ethutil.NewValue(nil),
- td: big.NewInt(0),
- doneFetchingHashes: true,
- }
-}
-
-func NewOutboundPeer(addr string, ethereum *Ethereum, caps Caps) *Peer {
- p := &Peer{
- outputQueue: make(chan *wire.Msg, outputBufferSize),
- quit: make(chan bool),
- ethereum: ethereum,
- inbound: false,
- connected: 0,
- disconnect: 0,
- port: 30303,
- caps: caps,
- version: ethereum.ClientIdentity().String(),
- protocolCaps: ethutil.NewValue(nil),
- td: big.NewInt(0),
- doneFetchingHashes: true,
- }
-
- // Set up the connection in another goroutine so we don't block the main thread
- go func() {
- conn, err := p.Connect(addr)
- if err != nil {
- //peerlogger.Debugln("Connection to peer failed. Giving up.", err)
- p.Stop()
- return
- }
- p.conn = conn
-
- // Atomically set the connection state
- atomic.StoreInt32(&p.connected, 1)
- atomic.StoreInt32(&p.disconnect, 0)
-
- p.Start()
- }()
-
- return p
-}
-
-func (self *Peer) Connect(addr string) (conn net.Conn, err error) {
- const maxTries = 3
- for attempts := 0; attempts < maxTries; attempts++ {
- conn, err = net.DialTimeout("tcp", addr, 10*time.Second)
- if err != nil {
- time.Sleep(time.Duration(attempts*20) * time.Second)
- continue
- }
-
- // Success
- return
- }
-
- return
-}
-
-// Getters
-func (p *Peer) PingTime() string {
- return p.pingTime.String()
-}
-func (p *Peer) Inbound() bool {
- return p.inbound
-}
-func (p *Peer) LastSend() time.Time {
- return p.lastSend
-}
-func (p *Peer) LastPong() int64 {
- return p.lastPong
-}
-func (p *Peer) Host() []byte {
- return p.host
-}
-func (p *Peer) Port() uint16 {
- return p.port
-}
-func (p *Peer) Version() string {
- return p.version
-}
-func (p *Peer) Connected() *int32 {
- return &p.connected
-}
-
-// Setters
-func (p *Peer) SetVersion(version string) {
- p.version = version
-}
-
-// Outputs any RLP encoded data to the peer
-func (p *Peer) QueueMessage(msg *wire.Msg) {
- if atomic.LoadInt32(&p.connected) != 1 {
- return
- }
- p.outputQueue <- msg
-}
-
-func (p *Peer) writeMessage(msg *wire.Msg) {
- // Ignore the write if we're not connected
- if atomic.LoadInt32(&p.connected) != 1 {
- return
- }
-
- if !p.versionKnown {
- switch msg.Type {
- case wire.MsgHandshakeTy: // Ok
- default: // Anything but ack is allowed
- return
- }
- } else {
- /*
- if !p.statusKnown {
- switch msg.Type {
- case wire.MsgStatusTy: // Ok
- default: // Anything but ack is allowed
- return
- }
- }
- */
- }
-
- peerlogger.DebugDetailf("(%v) <= %v\n", p.conn.RemoteAddr(), formatMessage(msg))
-
- err := wire.WriteMessage(p.conn, msg)
- if err != nil {
- peerlogger.Debugln(" Can't send message:", err)
- // Stop the client if there was an error writing to it
- p.Stop()
- return
- }
-}
-
-// Outbound message handler. Outbound messages are handled here
-func (p *Peer) HandleOutbound() {
- // The ping timer. Makes sure that every 2 minutes a ping is send to the peer
- pingTimer := time.NewTicker(pingPongTimer)
- serviceTimer := time.NewTicker(10 * time.Second)
-
-out:
- for {
- skip:
- select {
- // Main message queue. All outbound messages are processed through here
- case msg := <-p.outputQueue:
- if !p.statusKnown {
- switch msg.Type {
- case wire.MsgTxTy, wire.MsgGetBlockHashesTy, wire.MsgBlockHashesTy, wire.MsgGetBlocksTy, wire.MsgBlockTy:
- break skip
- }
- }
-
- switch msg.Type {
- case wire.MsgGetBlockHashesTy:
- p.lastHashRequestedAt = time.Now()
- }
-
- p.writeMessage(msg)
- p.lastSend = time.Now()
-
- // Ping timer
- case <-pingTimer.C:
- p.writeMessage(wire.NewMessage(wire.MsgPingTy, ""))
- p.pingStartTime = time.Now()
-
- // Service timer takes care of peer broadcasting, transaction
- // posting or block posting
- case <-serviceTimer.C:
- p.QueueMessage(wire.NewMessage(wire.MsgGetPeersTy, ""))
-
- case <-p.quit:
- // Break out of the for loop if a quit message is posted
- break out
- }
- }
-
-clean:
- // This loop is for draining the output queue and anybody waiting for us
- for {
- select {
- case <-p.outputQueue:
- // TODO
- default:
- break clean
- }
- }
-}
-
-func formatMessage(msg *wire.Msg) (ret string) {
- ret = fmt.Sprintf("%v %v", msg.Type, msg.Data)
-
- /*
- XXX Commented out because I need the log level here to determine
- if i should or shouldn't generate this message
- */
- /*
- switch msg.Type {
- case wire.MsgPeersTy:
- ret += fmt.Sprintf("(%d entries)", msg.Data.Len())
- case wire.MsgBlockTy:
- b1, b2 := chain.NewBlockFromRlpValue(msg.Data.Get(0)), ethchain.NewBlockFromRlpValue(msg.Data.Get(msg.Data.Len()-1))
- ret += fmt.Sprintf("(%d entries) %x - %x", msg.Data.Len(), b1.Hash()[0:4], b2.Hash()[0:4])
- case wire.MsgBlockHashesTy:
- h1, h2 := msg.Data.Get(0).Bytes(), msg.Data.Get(msg.Data.Len()-1).Bytes()
- ret += fmt.Sprintf("(%d entries) %x - %x", msg.Data.Len(), h1, h2)
- }
- */
-
- return
-}
-
-// Inbound handler. Inbound messages are received here and passed to the appropriate methods
-func (p *Peer) HandleInbound() {
- for atomic.LoadInt32(&p.disconnect) == 0 {
-
- // HMM?
- time.Sleep(50 * time.Millisecond)
- // Wait for a message from the peer
- msgs, err := wire.ReadMessages(p.conn)
- if err != nil {
- peerlogger.Debugln(err)
- }
- for _, msg := range msgs {
- peerlogger.DebugDetailf("(%v) => %v\n", p.conn.RemoteAddr(), formatMessage(msg))
-
- switch msg.Type {
- case wire.MsgHandshakeTy:
- // Version message
- p.handleHandshake(msg)
-
- //if p.caps.IsCap(CapPeerDiscTy) {
- p.QueueMessage(wire.NewMessage(wire.MsgGetPeersTy, ""))
- //}
-
- case wire.MsgDiscTy:
- p.Stop()
- peerlogger.Infoln("Disconnect peer: ", DiscReason(msg.Data.Get(0).Uint()))
- case wire.MsgPingTy:
- // Respond back with pong
- p.QueueMessage(wire.NewMessage(wire.MsgPongTy, ""))
- case wire.MsgPongTy:
- // If we received a pong back from a peer we set the
- // last pong so the peer handler knows this peer is still
- // active.
- p.lastPong = time.Now().Unix()
- p.pingTime = time.Since(p.pingStartTime)
- case wire.MsgTxTy:
- // If the message was a transaction queue the transaction
- // in the TxPool where it will undergo validation and
- // processing when a new block is found
- for i := 0; i < msg.Data.Len(); i++ {
- tx := types.NewTransactionFromValue(msg.Data.Get(i))
- err := p.ethereum.TxPool().Add(tx)
- if err != nil {
- peerlogger.Infoln(err)
- } else {
- peerlogger.Infof("tx OK (%x)\n", tx.Hash()[0:4])
- }
- }
- case wire.MsgGetPeersTy:
- // Peer asked for list of connected peers
- //p.pushPeers()
- case wire.MsgPeersTy:
- // Received a list of peers (probably because MsgGetPeersTy was send)
- data := msg.Data
- // Create new list of possible peers for the ethereum to process
- peers := make([]string, data.Len())
- // Parse each possible peer
- for i := 0; i < data.Len(); i++ {
- value := data.Get(i)
- peers[i] = unpackAddr(value.Get(0), value.Get(1).Uint())
- }
-
- // Connect to the list of peers
- p.ethereum.ProcessPeerList(peers)
-
- case wire.MsgStatusTy:
- // Handle peer's status msg
- p.handleStatus(msg)
- }
-
- // TMP
- if p.statusKnown {
- switch msg.Type {
-
- case wire.MsgGetBlockHashesTy:
- if msg.Data.Len() < 2 {
- peerlogger.Debugln("err: argument length invalid ", msg.Data.Len())
- }
-
- hash := msg.Data.Get(0).Bytes()
- amount := msg.Data.Get(1).Uint()
-
- hashes := p.ethereum.ChainManager().GetChainHashesFromHash(hash, amount)
-
- p.QueueMessage(wire.NewMessage(wire.MsgBlockHashesTy, ethutil.ByteSliceToInterface(hashes)))
-
- case wire.MsgGetBlocksTy:
- // Limit to max 300 blocks
- max := int(math.Min(float64(msg.Data.Len()), 300.0))
- var blocks []interface{}
-
- for i := 0; i < max; i++ {
- hash := msg.Data.Get(i).Bytes()
- block := p.ethereum.ChainManager().GetBlock(hash)
- if block != nil {
- blocks = append(blocks, block.Value().Raw())
- }
- }
-
- p.QueueMessage(wire.NewMessage(wire.MsgBlockTy, blocks))
-
- case wire.MsgBlockHashesTy:
- p.catchingUp = true
-
- blockPool := p.ethereum.blockPool
-
- foundCommonHash := false
- p.lastHashAt = time.Now()
-
- it := msg.Data.NewIterator()
- for it.Next() {
- hash := it.Value().Bytes()
- p.lastReceivedHash = hash
-
- if blockPool.HasCommonHash(hash) {
- foundCommonHash = true
-
- break
- }
-
- blockPool.AddHash(hash, p)
- }
-
- if !foundCommonHash {
- p.FetchHashes()
- } else {
- peerlogger.Infof("Found common hash (%x...)\n", p.lastReceivedHash[0:4])
- p.doneFetchingHashes = true
- }
-
- case wire.MsgBlockTy:
- p.catchingUp = true
-
- blockPool := p.ethereum.blockPool
-
- it := msg.Data.NewIterator()
- for it.Next() {
- block := types.NewBlockFromRlpValue(it.Value())
- blockPool.Add(block, p)
-
- p.lastBlockReceived = time.Now()
- }
- case wire.MsgNewBlockTy:
- var (
- blockPool = p.ethereum.blockPool
- block = types.NewBlockFromRlpValue(msg.Data.Get(0))
- td = msg.Data.Get(1).BigInt()
- )
-
- if td.Cmp(blockPool.td) > 0 {
- p.ethereum.blockPool.AddNew(block, p)
- }
- }
-
- }
- }
- }
-
- p.Stop()
-}
-
-func (self *Peer) FetchBlocks(hashes [][]byte) {
- if len(hashes) > 0 {
- peerlogger.Debugf("Fetching blocks (%d)\n", len(hashes))
-
- self.QueueMessage(wire.NewMessage(wire.MsgGetBlocksTy, ethutil.ByteSliceToInterface(hashes)))
- }
-}
-
-func (self *Peer) FetchHashes() bool {
- blockPool := self.ethereum.blockPool
-
- return blockPool.FetchHashes(self)
-}
-
-func (self *Peer) FetchingHashes() bool {
- return !self.doneFetchingHashes
-}
-
-// General update method
-func (self *Peer) update() {
- serviceTimer := time.NewTicker(100 * time.Millisecond)
-
-out:
- for {
- select {
- case <-serviceTimer.C:
- if self.IsCap("eth") {
- var (
- sinceBlock = time.Since(self.lastBlockReceived)
- )
-
- if sinceBlock > 5*time.Second {
- self.catchingUp = false
- }
- }
- case <-self.quit:
- break out
- }
- }
-
- serviceTimer.Stop()
-}
-
-func (p *Peer) Start() {
- peerHost, peerPort, _ := net.SplitHostPort(p.conn.LocalAddr().String())
- servHost, servPort, _ := net.SplitHostPort(p.conn.RemoteAddr().String())
-
- if p.inbound {
- p.host, p.port = packAddr(peerHost, peerPort)
- } else {
- p.host, p.port = packAddr(servHost, servPort)
- }
-
- err := p.pushHandshake()
- if err != nil {
- peerlogger.Debugln("Peer can't send outbound version ack", err)
-
- p.Stop()
-
- return
- }
-
- go p.HandleOutbound()
- // Run the inbound handler in a new goroutine
- go p.HandleInbound()
- // Run the general update handler
- go p.update()
-
- // Wait a few seconds for startup and then ask for an initial ping
- time.Sleep(2 * time.Second)
- p.writeMessage(wire.NewMessage(wire.MsgPingTy, ""))
- p.pingStartTime = time.Now()
-
-}
-
-func (p *Peer) Stop() {
- p.StopWithReason(DiscRequested)
-}
-
-func (p *Peer) StopWithReason(reason DiscReason) {
- if atomic.AddInt32(&p.disconnect, 1) != 1 {
- return
- }
-
- // Pre-emptively remove the peer; don't wait for reaping. We already know it's dead if we are here
- p.ethereum.RemovePeer(p)
-
- close(p.quit)
- if atomic.LoadInt32(&p.connected) != 0 {
- p.writeMessage(wire.NewMessage(wire.MsgDiscTy, reason))
- p.conn.Close()
- }
-}
-
-func (p *Peer) peersMessage() *wire.Msg {
- outPeers := make([]interface{}, len(p.ethereum.InOutPeers()))
- // Serialise each peer
- for i, peer := range p.ethereum.InOutPeers() {
- // Don't return localhost as valid peer
- if !net.ParseIP(peer.conn.RemoteAddr().String()).IsLoopback() {
- outPeers[i] = peer.RlpData()
- }
- }
-
- // Return the message to the peer with the known list of connected clients
- return wire.NewMessage(wire.MsgPeersTy, outPeers)
-}
-
-// Pushes the list of outbound peers to the client when requested
-func (p *Peer) pushPeers() {
- p.QueueMessage(p.peersMessage())
-}
-
-func (self *Peer) pushStatus() {
- msg := wire.NewMessage(wire.MsgStatusTy, []interface{}{
- uint32(ProtocolVersion),
- uint32(NetVersion),
- self.ethereum.ChainManager().TD,
- self.ethereum.ChainManager().CurrentBlock.Hash(),
- self.ethereum.ChainManager().Genesis().Hash(),
- })
-
- self.QueueMessage(msg)
-}
-
-func (self *Peer) handleStatus(msg *wire.Msg) {
- c := msg.Data
-
- var (
- //protoVersion = c.Get(0).Uint()
- netVersion = c.Get(1).Uint()
- td = c.Get(2).BigInt()
- bestHash = c.Get(3).Bytes()
- genesis = c.Get(4).Bytes()
- )
-
- if bytes.Compare(self.ethereum.ChainManager().Genesis().Hash(), genesis) != 0 {
- loggerger.Warnf("Invalid genisis hash %x. Disabling [eth]\n", genesis)
- return
- }
-
- if netVersion != NetVersion {
- loggerger.Warnf("Invalid network version %d. Disabling [eth]\n", netVersion)
- return
- }
-
- /*
- if protoVersion != ProtocolVersion {
- loggerger.Warnf("Invalid protocol version %d. Disabling [eth]\n", protoVersion)
- return
- }
- */
-
- // Get the td and last hash
- self.td = td
- self.bestHash = bestHash
- self.lastReceivedHash = bestHash
-
- self.statusKnown = true
-
- // Compare the total TD with the blockchain TD. If remote is higher
- // fetch hashes from highest TD node.
- self.FetchHashes()
-
- loggerger.Infof("Peer is [eth] capable. (TD = %v ~ %x)", self.td, self.bestHash)
-
-}
-
-func (p *Peer) pushHandshake() error {
- pubkey := p.ethereum.KeyManager().PublicKey()
- msg := wire.NewMessage(wire.MsgHandshakeTy, []interface{}{
- P2PVersion, []byte(p.version), []interface{}{[]interface{}{"eth", ProtocolVersion}}, p.port, pubkey[1:],
- })
-
- p.QueueMessage(msg)
-
- return nil
-}
-
-func (p *Peer) handleHandshake(msg *wire.Msg) {
- c := msg.Data
-
- var (
- p2pVersion = c.Get(0).Uint()
- clientId = c.Get(1).Str()
- caps = c.Get(2)
- port = c.Get(3).Uint()
- pub = c.Get(4).Bytes()
- )
-
- // Check correctness of p2p protocol version
- if p2pVersion != P2PVersion {
- peerlogger.Debugf("Invalid P2P version. Require protocol %d, received %d\n", P2PVersion, p2pVersion)
- p.Stop()
- return
- }
-
- // Handle the pub key (validation, uniqueness)
- if len(pub) == 0 {
- peerlogger.Warnln("Pubkey required, not supplied in handshake.")
- p.Stop()
- return
- }
-
- // Self connect detection
- pubkey := p.ethereum.KeyManager().PublicKey()
- if bytes.Compare(pubkey[1:], pub) == 0 {
- p.Stop()
-
- return
- }
-
- // Check for blacklisting
- for _, pk := range p.ethereum.blacklist {
- if bytes.Compare(pk, pub) == 0 {
- peerlogger.Debugf("Blacklisted peer tried to connect (%x...)\n", pubkey[0:4])
- p.StopWithReason(DiscBadPeer)
-
- return
- }
- }
-
- usedPub := 0
- // This peer is already added to the peerlist so we expect to find a double pubkey at least once
- eachPeer(p.ethereum.Peers(), func(peer *Peer, e *list.Element) {
- if bytes.Compare(pub, peer.pubkey) == 0 {
- usedPub++
- }
- })
-
- if usedPub > 0 {
- peerlogger.Debugf("Pubkey %x found more then once. Already connected to client.", p.pubkey)
- p.Stop()
- return
- }
- p.pubkey = pub
-
- // If this is an inbound connection send an ack back
- if p.inbound {
- p.port = uint16(port)
- }
-
- p.SetVersion(clientId)
-
- p.versionKnown = true
-
- p.ethereum.PushPeer(p)
- p.ethereum.eventMux.Post(PeerListEvent{p.ethereum.Peers()})
-
- p.protocolCaps = caps
-
- it := caps.NewIterator()
- var capsStrs []string
- for it.Next() {
- cap := it.Value().Get(0).Str()
- ver := it.Value().Get(1).Uint()
- switch cap {
- case "eth":
- if ver != ProtocolVersion {
- loggerger.Warnf("Invalid protocol version %d. Disabling [eth]\n", ver)
- continue
- }
- p.pushStatus()
- }
-
- capsStrs = append(capsStrs, fmt.Sprintf("%s/%d", cap, ver))
- }
-
- peerlogger.Infof("Added peer (%s) %d / %d (%v)\n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers, capsStrs)
-
- peerlogger.Debugln(p)
-}
-
-func (self *Peer) IsCap(cap string) bool {
- capsIt := self.protocolCaps.NewIterator()
- for capsIt.Next() {
- if capsIt.Value().Str() == cap {
- return true
- }
- }
-
- return false
-}
-
-func (self *Peer) Caps() *ethutil.Value {
- return self.protocolCaps
-}
-
-func (p *Peer) String() string {
- var strBoundType string
- if p.inbound {
- strBoundType = "inbound"
- } else {
- strBoundType = "outbound"
- }
- var strConnectType string
- if atomic.LoadInt32(&p.disconnect) == 0 {
- strConnectType = "connected"
- } else {
- strConnectType = "disconnected"
- }
-
- return fmt.Sprintf("[%s] (%s) %v %s", strConnectType, strBoundType, p.conn.RemoteAddr(), p.version)
-
-}
-
-func (p *Peer) RlpData() []interface{} {
- return []interface{}{p.host, p.port, p.pubkey}
-}
-
-func packAddr(address, _port string) (host []byte, port uint16) {
- p, _ := strconv.Atoi(_port)
- port = uint16(p)
-
- h := net.ParseIP(address)
- if ip := h.To4(); ip != nil {
- host = []byte(ip)
- } else {
- host = []byte(h)
- }
-
- return
-}
-
-func unpackAddr(value *ethutil.Value, p uint64) string {
- host, _ := net.IP(value.Bytes()).MarshalText()
- prt := strconv.Itoa(int(p))
-
- return net.JoinHostPort(string(host), prt)
-}