From 70c65835f4747d991fe8d79e7138828cd97c6ac7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 11 May 2015 14:26:20 +0300 Subject: [PATCH 1/5] eth/downloader: fix #910, thread safe peers & polishes --- eth/downloader/downloader.go | 140 ++++++++---------- eth/downloader/downloader_test.go | 3 +- eth/downloader/peer.go | 227 +++++++++++++++++++----------- eth/downloader/queue.go | 9 ++ 4 files changed, 214 insertions(+), 165 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 14ca2cd3d..b1e23f58f 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -49,12 +49,6 @@ type blockPack struct { blocks []*types.Block } -type syncPack struct { - peer *peer - hash common.Hash - ignoreInitial bool -} - type hashPack struct { peerId string hashes []common.Hash @@ -63,7 +57,7 @@ type hashPack struct { type Downloader struct { mu sync.RWMutex queue *queue - peers peers + peers *peerSet activePeer string // Callbacks @@ -83,7 +77,7 @@ type Downloader struct { func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { downloader := &Downloader{ queue: newQueue(), - peers: make(peers), + peers: newPeerSet(), hasBlock: hasBlock, getBlock: getBlock, newPeerCh: make(chan *peer, 1), @@ -98,29 +92,26 @@ func (d *Downloader) Stats() (current int, max int) { return d.queue.Size() } -func (d *Downloader) RegisterPeer(id string, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error { - d.mu.Lock() - defer d.mu.Unlock() - - glog.V(logger.Detail).Infoln("Register peer", id) - - // Create a new peer and add it to the list of known peers - peer := newPeer(id, hash, getHashes, getBlocks) - // add peer to our peer set - d.peers[id] = peer - // broadcast new peer - +// RegisterPeer injects a new download peer into the set of block source to be +// used for fetching hashes and blocks from. +func (d *Downloader) RegisterPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error { + glog.V(logger.Detail).Infoln("Registering peer", id) + if err := d.peers.Register(newPeer(id, head, getHashes, getBlocks)); err != nil { + glog.V(logger.Error).Infoln("Register failed:", err) + return err + } return nil } -// UnregisterPeer unregisters a peer. This will prevent any action from the specified peer. -func (d *Downloader) UnregisterPeer(id string) { - d.mu.Lock() - defer d.mu.Unlock() - - glog.V(logger.Detail).Infoln("Unregister peer", id) - - delete(d.peers, id) +// UnregisterPeer remove a peer from the known list, preventing any action from +// the specified peer. +func (d *Downloader) UnregisterPeer(id string) error { + glog.V(logger.Detail).Infoln("Unregistering peer", id) + if err := d.peers.Unregister(id); err != nil { + glog.V(logger.Error).Infoln("Unregister failed:", err) + return err + } + return nil } // Synchronise will select the peer and use it for synchronising. If an empty string is given @@ -140,15 +131,16 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { return errPendingQueue } - // Reset the queue to clean any internal leftover state + // Reset the queue and peer set to clean any internal leftover state d.queue.Reset() + d.peers.Reset() // Retrieve the origin peer and initiate the downloading process - p := d.peers[id] + p := d.peers.Peer(id) if p == nil { return errUnknownPeer } - return d.getFromPeer(p, hash, false) + return d.syncWithPeer(p, hash) } // TakeBlocks takes blocks from the queue and yields them to the blockTaker handler @@ -167,7 +159,9 @@ func (d *Downloader) Has(hash common.Hash) bool { return d.queue.Has(hash) } -func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) (err error) { +// syncWithPeer starts a block synchronization based on the hash chain from the +// specified peer and head hash. +func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { d.activePeer = p.id defer func() { // reset on error @@ -177,21 +171,12 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) }() glog.V(logger.Debug).Infoln("Synchronizing with the network using:", p.id) - // Start the fetcher. This will block the update entirely - // interupts need to be send to the appropriate channels - // respectively. - if err = d.startFetchingHashes(p, hash, ignoreInitial); err != nil { + if err = d.fetchHashes(p, hash); err != nil { return err } - - // Start fetching blocks in paralel. The strategy is simple - // take any available peers, seserve a chunk for each peer available, - // let the peer deliver the chunkn and periodically check if a peer - // has timedout. - if err = d.startFetchingBlocks(p); err != nil { + if err = d.fetchBlocks(); err != nil { return err } - glog.V(logger.Debug).Infoln("Synchronization completed") return nil @@ -234,17 +219,14 @@ blockDone: } // XXX Make synchronous -func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial bool) error { +func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id) start := time.Now() - // We ignore the initial hash in some cases (e.g. we received a block without it's parent) - // In such circumstances we don't need to download the block so don't add it to the queue. - if !ignoreInitial { - // Add the hash to the queue first - d.queue.Insert([]common.Hash{h}) - } + // Add the hash to the queue first + d.queue.Insert([]common.Hash{h}) + // Get the first batch of hashes p.getHashes(h) @@ -308,20 +290,18 @@ out: // Attempt to find a new peer by checking inclusion of peers best hash in our // already fetched hash list. This can't guarantee 100% correctness but does // a fair job. This is always either correct or false incorrect. - for id, peer := range d.peers { - if d.queue.Has(peer.recentHash) && !attemptedPeers[id] { + for _, peer := range d.peers.AllPeers() { + if d.queue.Has(peer.head) && !attemptedPeers[p.id] { p = peer break } } - // if all peers have been tried, abort the process entirely or if the hash is // the zero hash. if p == nil || (hash == common.Hash{}) { d.queue.Reset() return ErrTimeout } - // set p to the active peer. this will invalidate any hashes that may be returned // by our previous (delayed) peer. activePeer = p @@ -334,14 +314,11 @@ out: return nil } -func (d *Downloader) startFetchingBlocks(p *peer) error { +// fetchBlocks iteratively downloads the entire schedules block-chain, taking +// any available peers, reserving a chunk of blocks for each, wait for delivery +// and periodically checking for timeouts. +func (d *Downloader) fetchBlocks() error { glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)") - - // Defer the peer reset. This will empty the peer requested set - // and makes sure there are no lingering peers with an incorrect - // state - defer d.peers.reset() - start := time.Now() // default ticker for re-fetching blocks every now and then @@ -354,19 +331,19 @@ out: case blockPack := <-d.blockCh: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. - if d.peers[blockPack.peerId] != nil { - err := d.queue.Deliver(blockPack.peerId, blockPack.blocks) - if err != nil { - glog.V(logger.Debug).Infof("deliver failed for peer %s: %v\n", blockPack.peerId, err) - // FIXME d.UnregisterPeer(blockPack.peerId) + if peer := d.peers.Peer(blockPack.peerId); peer != nil { + // Deliver the received chunk of blocks, but drop the peer if invalid + if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil { + glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err) + d.peers.Unregister(blockPack.peerId) break } - if glog.V(logger.Debug) { - glog.Infof("adding %d blocks from: %s\n", len(blockPack.blocks), blockPack.peerId) + glog.Infof("Added %d blocks from: %s\n", len(blockPack.blocks), blockPack.peerId) } - d.peers[blockPack.peerId].promote() - d.peers.setState(blockPack.peerId, idleState) + // Promote the peer and update it's idle state + peer.Promote() + peer.SetIdle() } case <-ticker.C: // Check for bad peers. Bad peers may indicate a peer not responding @@ -381,13 +358,10 @@ out: // 1) Time for them to respond; // 2) Measure their speed; // 3) Amount and availability. - if peer := d.peers[pid]; peer != nil { - peer.demote() - peer.reset() - } + d.peers.Unregister(pid) } // After removing bad peers make sure we actually have sufficient peer left to keep downloading - if len(d.peers) == 0 { + if d.peers.Peers() == 0 { d.queue.Reset() return errNoPeers } @@ -398,31 +372,29 @@ out: if d.queue.Throttle() { continue } - - availablePeers := d.peers.get(idleState) - for _, peer := range availablePeers { + // Send a download request to all idle peers + idlePeers := d.peers.IdlePeers() + for _, peer := range idlePeers { // Get a possible chunk. If nil is returned no chunk // could be returned due to no hashes available. request := d.queue.Reserve(peer, maxBlockFetch) if request == nil { continue } - // XXX make fetch blocking. // Fetch the chunk and check for error. If the peer was somehow // already fetching a chunk due to a bug, it will be returned to // the queue - if err := peer.fetch(request); err != nil { - // log for tracing - glog.V(logger.Debug).Infof("peer %s received double work (state = %v)\n", peer.id, peer.state) + if err := peer.Fetch(request); err != nil { + glog.V(logger.Error).Infof("Peer %s received double work\n", peer.id) d.queue.Cancel(request) } } - // make sure that we have peers available for fetching. If all peers have been tried + // Make sure that we have peers available for fetching. If all peers have been tried // and all failed throw an error if d.queue.InFlight() == 0 { d.queue.Reset() - return fmt.Errorf("%v peers avaialable = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(availablePeers), len(d.peers), d.queue.Pending()) + return fmt.Errorf("%v peers available = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(idlePeers), d.peers.Peers(), d.queue.Pending()) } } else if d.queue.InFlight() == 0 { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index d0f8d4c8f..385ad2909 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -229,7 +229,7 @@ func TestThrottling(t *testing.T) { minDesiredPeerCount = 4 blockTtl = 1 * time.Second - targetBlocks := 4 * blockCacheLimit + targetBlocks := 16 * blockCacheLimit hashes := createHashes(0, targetBlocks) blocks := createBlocksFromHashes(hashes) tester := newTester(t, hashes, blocks) @@ -256,6 +256,7 @@ func TestThrottling(t *testing.T) { return default: took = append(took, tester.downloader.TakeBlocks()...) + time.Sleep(time.Millisecond) } } }() diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 45ec1cbfd..e2dec5571 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -1,63 +1,35 @@ +// Contains the active peer-set of the downloader, maintaining both failures +// as well as reputation metrics to prioritize the block retrievals. + package downloader import ( "errors" "sync" + "sync/atomic" "github.com/ethereum/go-ethereum/common" "gopkg.in/fatih/set.v0" ) -const ( - workingState = 2 - idleState = 4 -) - type hashFetcherFn func(common.Hash) error type blockFetcherFn func([]common.Hash) error -// XXX make threadsafe!!!! -type peers map[string]*peer +var ( + errAlreadyFetching = errors.New("already fetching blocks from peer") + errAlreadyRegistered = errors.New("peer is already registered") + errNotRegistered = errors.New("peer is not registered") +) -func (p peers) reset() { - for _, peer := range p { - peer.reset() - } -} - -func (p peers) get(state int) []*peer { - var peers []*peer - for _, peer := range p { - peer.mu.RLock() - if peer.state == state { - peers = append(peers, peer) - } - peer.mu.RUnlock() - } - - return peers -} - -func (p peers) setState(id string, state int) { - if peer, exist := p[id]; exist { - peer.mu.Lock() - defer peer.mu.Unlock() - peer.state = state - } -} - -func (p peers) getPeer(id string) *peer { - return p[id] -} - -// peer represents an active peer +// peer represents an active peer from which hashes and blocks are retrieved. type peer struct { - state int // Peer state (working, idle) - rep int // TODO peer reputation + id string // Unique identifier of the peer + head common.Hash // Hash of the peers latest known block - mu sync.RWMutex - id string - recentHash common.Hash + idle int32 // Current activity state of the peer (idle = 0, active = 1) + rep int32 // Simple peer reputation (not used currently) + + mu sync.RWMutex ignored *set.Set @@ -65,31 +37,31 @@ type peer struct { getBlocks blockFetcherFn } -// create a new peer -func newPeer(id string, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer { +// newPeer create a new downloader peer, with specific hash and block retrieval +// mechanisms. +func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer { return &peer{ - id: id, - recentHash: hash, - getHashes: getHashes, - getBlocks: getBlocks, - state: idleState, - ignored: set.New(), + id: id, + head: head, + getHashes: getHashes, + getBlocks: getBlocks, + ignored: set.New(), } } -// fetch a chunk using the peer -func (p *peer) fetch(request *fetchRequest) error { - p.mu.Lock() - defer p.mu.Unlock() +// Reset clears the internal state of a peer entity. +func (p *peer) Reset() { + atomic.StoreInt32(&p.idle, 0) + p.ignored.Clear() +} - if p.state == workingState { - return errors.New("peer already fetching chunk") +// Fetch sends a block retrieval request to the remote peer. +func (p *peer) Fetch(request *fetchRequest) error { + // Short circuit if the peer is already fetching + if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) { + return errAlreadyFetching } - - // set working state - p.state = workingState - - // Convert the hash set to a fetchable slice + // Convert the hash set to a retrievable slice hashes := make([]common.Hash, 0, len(request.Hashes)) for hash, _ := range request.Hashes { hashes = append(hashes, hash) @@ -99,27 +71,122 @@ func (p *peer) fetch(request *fetchRequest) error { return nil } -// promote increases the peer's reputation -func (p *peer) promote() { - p.mu.Lock() - defer p.mu.Unlock() - - p.rep++ +// SetIdle sets the peer to idle, allowing it to execute new retrieval requests. +func (p *peer) SetIdle() { + atomic.StoreInt32(&p.idle, 0) } -// demote decreases the peer's reputation or leaves it at 0 -func (p *peer) demote() { - p.mu.Lock() - defer p.mu.Unlock() +// Promote increases the peer's reputation. +func (p *peer) Promote() { + atomic.AddInt32(&p.rep, 1) +} - if p.rep > 1 { - p.rep -= 2 - } else { - p.rep = 0 +// Demote decreases the peer's reputation or leaves it at 0. +func (p *peer) Demote() { + for { + // Calculate the new reputation value + prev := atomic.LoadInt32(&p.rep) + next := prev - 2 + if next < 0 { + next = 0 + } + // Try to update the old value + if atomic.CompareAndSwapInt32(&p.rep, prev, next) { + return + } } } -func (p *peer) reset() { - p.state = idleState - p.ignored.Clear() +// peerSet represents the collection of active peer participating in the block +// download procedure. +type peerSet struct { + peers map[string]*peer + lock sync.RWMutex +} + +// newPeerSet creates a new peer set top track the active download sources. +func newPeerSet() *peerSet { + return &peerSet{ + peers: make(map[string]*peer), + } +} + +// Reset iterates over the current peer set, and resets each of the known peers +// to prepare for a next batch of block retrieval. +func (ps *peerSet) Reset() { + ps.lock.RLock() + defer ps.lock.RUnlock() + + for _, peer := range ps.peers { + peer.Reset() + } +} + +// Register injects a new peer into the working set, or returns an error if the +// peer is already known. +func (ps *peerSet) Register(p *peer) error { + ps.lock.Lock() + defer ps.lock.Unlock() + + if _, ok := ps.peers[p.id]; ok { + return errAlreadyRegistered + } + ps.peers[p.id] = p + return nil +} + +// Unregister removes a remote peer from the active set, disabling any further +// actions to/from that particular entity. +func (ps *peerSet) Unregister(id string) error { + ps.lock.Lock() + defer ps.lock.Unlock() + + if _, ok := ps.peers[id]; !ok { + return errNotRegistered + } + delete(ps.peers, id) + return nil +} + +// Peer retrieves the registered peer with the given id. +func (ps *peerSet) Peer(id string) *peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + return ps.peers[id] +} + +// Peers returns if the current number of peers in the set. +func (ps *peerSet) Peers() int { + ps.lock.RLock() + defer ps.lock.RUnlock() + + return len(ps.peers) +} + +// AllPeers retrieves a flat list of all the peers within the set. +func (ps *peerSet) AllPeers() []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + list = append(list, p) + } + return list +} + +// IdlePeers retrieves a flat list of all the currently idle peers within the +// active peer set. +func (ps *peerSet) IdlePeers() []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if atomic.LoadInt32(&p.idle) == 0 { + list = append(list, p) + } + } + return list } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 515440bca..40749698c 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -1,3 +1,6 @@ +// Contains the block download scheduler to collect download tasks and schedule +// them in an ordered, and throttled way. + package downloader import ( @@ -8,6 +11,8 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) @@ -126,6 +131,10 @@ func (q *queue) Insert(hashes []common.Hash) { for i, hash := range hashes { index := q.hashCounter + i + if old, ok := q.hashPool[hash]; ok { + glog.V(logger.Warn).Infof("Hash %x already scheduled at index %v", hash, old) + continue + } q.hashPool[hash] = index q.hashQueue.Push(hash, float32(index)) // Highest gets schedules first } From d37a2559b928a8118a5eec77e2181cb6cf566be1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 11 May 2015 16:47:58 +0300 Subject: [PATCH 2/5] eth/downloader: revert to demotion, use harsher penalty --- eth/downloader/downloader.go | 12 +++++++++--- eth/downloader/peer.go | 15 ++++++++++----- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index b1e23f58f..5e9931f59 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -335,7 +335,7 @@ out: // Deliver the received chunk of blocks, but drop the peer if invalid if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil { glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err) - d.peers.Unregister(blockPack.peerId) + peer.Demote() break } if glog.V(logger.Debug) { @@ -358,7 +358,9 @@ out: // 1) Time for them to respond; // 2) Measure their speed; // 3) Amount and availability. - d.peers.Unregister(pid) + if peer := d.peers.Peer(pid); peer != nil { + peer.Demote() + } } // After removing bad peers make sure we actually have sufficient peer left to keep downloading if d.peers.Peers() == 0 { @@ -372,9 +374,13 @@ out: if d.queue.Throttle() { continue } - // Send a download request to all idle peers + // Send a download request to all idle peers, until throttled idlePeers := d.peers.IdlePeers() for _, peer := range idlePeers { + // Short circuit if throttling activated since above + if d.queue.Throttle() { + break + } // Get a possible chunk. If nil is returned no chunk // could be returned due to no hashes available. request := d.queue.Reserve(peer, maxBlockFetch) diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index e2dec5571..1ff2d5149 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -86,10 +86,8 @@ func (p *peer) Demote() { for { // Calculate the new reputation value prev := atomic.LoadInt32(&p.rep) - next := prev - 2 - if next < 0 { - next = 0 - } + next := prev / 2 + // Try to update the old value if atomic.CompareAndSwapInt32(&p.rep, prev, next) { return @@ -177,7 +175,7 @@ func (ps *peerSet) AllPeers() []*peer { } // IdlePeers retrieves a flat list of all the currently idle peers within the -// active peer set. +// active peer set, ordered by their reputation. func (ps *peerSet) IdlePeers() []*peer { ps.lock.RLock() defer ps.lock.RUnlock() @@ -188,5 +186,12 @@ func (ps *peerSet) IdlePeers() []*peer { list = append(list, p) } } + for i := 0; i < len(list); i++ { + for j := i + 1; j < len(list); j++ { + if atomic.LoadInt32(&list[i].rep) < atomic.LoadInt32(&list[j].rep) { + list[i], list[j] = list[j], list[i] + } + } + } return list } From 064cf1609987bb0f6c59c1e790b7811d9a783fef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 11 May 2015 17:06:42 +0300 Subject: [PATCH 3/5] eth/downloader: use count instead of peers, clearer --- eth/downloader/downloader.go | 4 ++-- eth/downloader/peer.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 5e9931f59..4c7ebe646 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -363,7 +363,7 @@ out: } } // After removing bad peers make sure we actually have sufficient peer left to keep downloading - if d.peers.Peers() == 0 { + if d.peers.Len() == 0 { d.queue.Reset() return errNoPeers } @@ -400,7 +400,7 @@ out: if d.queue.InFlight() == 0 { d.queue.Reset() - return fmt.Errorf("%v peers available = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(idlePeers), d.peers.Peers(), d.queue.Pending()) + return fmt.Errorf("%v peers available = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(idlePeers), d.peers.Len(), d.queue.Pending()) } } else if d.queue.InFlight() == 0 { diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 1ff2d5149..4abae8d5e 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -154,8 +154,8 @@ func (ps *peerSet) Peer(id string) *peer { return ps.peers[id] } -// Peers returns if the current number of peers in the set. -func (ps *peerSet) Peers() int { +// Len returns if the current number of peers in the set. +func (ps *peerSet) Len() int { ps.lock.RLock() defer ps.lock.RUnlock() From 21e52efdfed19c4376b830f8ad0e52a9e599f633 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 11 May 2015 15:43:14 +0200 Subject: [PATCH 4/5] cmd/geth, miner, backend, xeth: Fixed miner threads to be settable Miner threads are now settable through the admin interface (closes #897) and specify 0 CPU worker threads when eth_getWork is called (closes #916) --- cmd/geth/admin.go | 5 ++--- cmd/geth/main.go | 2 +- cmd/mist/ui_lib.go | 2 +- eth/backend.go | 12 ++++++------ miner/agent.go | 22 +++++++++++----------- miner/miner.go | 33 ++++++++++++++++----------------- miner/worker.go | 7 ++++++- rpc/api.go | 2 +- xeth/xeth.go | 4 ++-- 9 files changed, 46 insertions(+), 43 deletions(-) diff --git a/cmd/geth/admin.go b/cmd/geth/admin.go index 2b9956638..17d711297 100644 --- a/cmd/geth/admin.go +++ b/cmd/geth/admin.go @@ -275,14 +275,13 @@ func (js *jsre) verbosity(call otto.FunctionCall) otto.Value { } func (js *jsre) startMining(call otto.FunctionCall) otto.Value { - _, err := call.Argument(0).ToInteger() + threads, err := call.Argument(0).ToInteger() if err != nil { fmt.Println(err) return otto.FalseValue() } - // threads now ignored - err = js.ethereum.StartMining() + err = js.ethereum.StartMining(int(threads)) if err != nil { fmt.Println(err) return otto.FalseValue() diff --git a/cmd/geth/main.go b/cmd/geth/main.go index fd7aae4c2..5da59ff3b 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -401,7 +401,7 @@ func startEth(ctx *cli.Context, eth *eth.Ethereum) { } } if ctx.GlobalBool(utils.MiningEnabledFlag.Name) { - if err := eth.StartMining(); err != nil { + if err := eth.StartMining(ctx.GlobalInt(utils.MinerThreadsFlag.Name)); err != nil { utils.Fatalf("%v", err) } } diff --git a/cmd/mist/ui_lib.go b/cmd/mist/ui_lib.go index a604e87ba..4653e0980 100644 --- a/cmd/mist/ui_lib.go +++ b/cmd/mist/ui_lib.go @@ -159,7 +159,7 @@ func (self *UiLib) RemoveLocalTransaction(id int) { func (self *UiLib) ToggleMining() bool { if !self.eth.IsMining() { - err := self.eth.StartMining() + err := self.eth.StartMining(4) return err == nil } else { self.eth.StopMining() diff --git a/eth/backend.go b/eth/backend.go index cdbe35b26..6be871138 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -267,7 +267,7 @@ func New(config *Config) (*Ethereum, error) { eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit) eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux()) eth.chainManager.SetProcessor(eth.blockProcessor) - eth.miner = miner.New(eth, eth.pow, config.MinerThreads) + eth.miner = miner.New(eth, eth.pow) eth.miner.SetGasPrice(config.GasPrice) eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager, eth.downloader) @@ -368,7 +368,7 @@ func (s *Ethereum) ResetWithGenesisBlock(gb *types.Block) { s.chainManager.ResetWithGenesisBlock(gb) } -func (s *Ethereum) StartMining() error { +func (s *Ethereum) StartMining(threads int) error { eb, err := s.Etherbase() if err != nil { err = fmt.Errorf("Cannot start mining without etherbase address: %v", err) @@ -376,7 +376,7 @@ func (s *Ethereum) StartMining() error { return err } - go s.miner.Start(eb) + go s.miner.Start(eb, threads) return nil } @@ -461,13 +461,13 @@ done: case <-ticker.C: // don't change the order of database flushes if err := s.extraDb.Flush(); err != nil { - glog.Fatalf("fatal error: flush extraDb: %v\n", err) + glog.Fatalf("fatal error: flush extraDb: %v (Restart your node. We are aware of this issue)\n", err) } if err := s.stateDb.Flush(); err != nil { - glog.Fatalf("fatal error: flush stateDb: %v\n", err) + glog.Fatalf("fatal error: flush stateDb: %v (Restart your node. We are aware of this issue)\n", err) } if err := s.blockDb.Flush(); err != nil { - glog.Fatalf("fatal error: flush blockDb: %v\n", err) + glog.Fatalf("fatal error: flush blockDb: %v (Restart your node. We are aware of this issue)\n", err) } case <-s.shutdownChan: break done diff --git a/miner/agent.go b/miner/agent.go index b2f89aaab..939f63fef 100644 --- a/miner/agent.go +++ b/miner/agent.go @@ -10,7 +10,7 @@ import ( "github.com/ethereum/go-ethereum/pow" ) -type CpuMiner struct { +type CpuAgent struct { chMu sync.Mutex c chan *types.Block quit chan struct{} @@ -21,8 +21,8 @@ type CpuMiner struct { pow pow.PoW } -func NewCpuMiner(index int, pow pow.PoW) *CpuMiner { - miner := &CpuMiner{ +func NewCpuAgent(index int, pow pow.PoW) *CpuAgent { + miner := &CpuAgent{ pow: pow, index: index, } @@ -30,16 +30,16 @@ func NewCpuMiner(index int, pow pow.PoW) *CpuMiner { return miner } -func (self *CpuMiner) Work() chan<- *types.Block { return self.c } -func (self *CpuMiner) Pow() pow.PoW { return self.pow } -func (self *CpuMiner) SetReturnCh(ch chan<- *types.Block) { self.returnCh = ch } +func (self *CpuAgent) Work() chan<- *types.Block { return self.c } +func (self *CpuAgent) Pow() pow.PoW { return self.pow } +func (self *CpuAgent) SetReturnCh(ch chan<- *types.Block) { self.returnCh = ch } -func (self *CpuMiner) Stop() { +func (self *CpuAgent) Stop() { close(self.quit) close(self.quitCurrentOp) } -func (self *CpuMiner) Start() { +func (self *CpuAgent) Start() { self.quit = make(chan struct{}) self.quitCurrentOp = make(chan struct{}, 1) self.c = make(chan *types.Block, 1) @@ -47,7 +47,7 @@ func (self *CpuMiner) Start() { go self.update() } -func (self *CpuMiner) update() { +func (self *CpuAgent) update() { out: for { select { @@ -76,7 +76,7 @@ done: } } -func (self *CpuMiner) mine(block *types.Block) { +func (self *CpuAgent) mine(block *types.Block) { glog.V(logger.Debug).Infof("(re)started agent[%d]. mining...\n", self.index) // Reset the channel @@ -95,6 +95,6 @@ func (self *CpuMiner) mine(block *types.Block) { } } -func (self *CpuMiner) GetHashRate() int64 { +func (self *CpuAgent) GetHashRate() int64 { return self.pow.GetHashrate() } diff --git a/miner/miner.go b/miner/miner.go index efe6d3051..09342e250 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -23,16 +23,8 @@ type Miner struct { pow pow.PoW } -func New(eth core.Backend, pow pow.PoW, minerThreads int) *Miner { - // note: minerThreads is currently ignored because - // ethash is not thread safe. - miner := &Miner{eth: eth, pow: pow, worker: newWorker(common.Address{}, eth)} - for i := 0; i < minerThreads; i++ { - miner.worker.register(NewCpuMiner(i, pow)) - } - miner.threads = minerThreads - - return miner +func New(eth core.Backend, pow pow.PoW) *Miner { + return &Miner{eth: eth, pow: pow, worker: newWorker(common.Address{}, eth)} } func (self *Miner) Mining() bool { @@ -48,15 +40,27 @@ func (m *Miner) SetGasPrice(price *big.Int) { m.worker.gasPrice = price } -func (self *Miner) Start(coinbase common.Address) { - glog.V(logger.Info).Infoln("Starting mining operation") +func (self *Miner) Start(coinbase common.Address, threads int) { self.mining = true + + for i := 0; i < threads; i++ { + self.worker.register(NewCpuAgent(i, self.pow)) + } + self.threads = threads + + glog.V(logger.Info).Infof("Starting mining operation (CPU=%d TOT=%d)\n", threads, len(self.worker.agents)) + self.worker.coinbase = coinbase self.worker.start() self.worker.commitNewWork() } +func (self *Miner) Stop() { + self.worker.stop() + self.mining = false +} + func (self *Miner) Register(agent Agent) { if self.mining { agent.Start() @@ -65,11 +69,6 @@ func (self *Miner) Register(agent Agent) { self.worker.register(agent) } -func (self *Miner) Stop() { - self.mining = false - self.worker.stop() -} - func (self *Miner) HashRate() int64 { return self.worker.HashRate() } diff --git a/miner/worker.go b/miner/worker.go index e3dbae717..d801a9839 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -141,7 +141,6 @@ func (self *worker) start() { for _, agent := range self.agents { agent.Start() } - } func (self *worker) stop() { @@ -149,10 +148,16 @@ func (self *worker) stop() { defer self.mu.Unlock() if atomic.LoadInt32(&self.mining) == 1 { + var keep []Agent // stop all agents for _, agent := range self.agents { agent.Stop() + // keep all that's not a cpu agent + if _, ok := agent.(*CpuAgent); !ok { + keep = append(keep, agent) + } } + self.agents = keep } atomic.StoreInt32(&self.mining, 0) diff --git a/rpc/api.go b/rpc/api.go index 309c161ad..d53a9917d 100644 --- a/rpc/api.go +++ b/rpc/api.go @@ -391,7 +391,7 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err } *reply = NewLogsRes(api.xeth().AllLogs(args.Earliest, args.Latest, args.Skip, args.Max, args.Address, args.Topics)) case "eth_getWork": - api.xeth().SetMining(true) + api.xeth().SetMining(true, 0) *reply = api.xeth().RemoteMining().GetWork() case "eth_submitWork": args := new(SubmitWorkArgs) diff --git a/xeth/xeth.go b/xeth/xeth.go index 47b833a34..bf5844770 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -425,10 +425,10 @@ func (self *XEth) ClientVersion() string { return self.backend.ClientVersion() } -func (self *XEth) SetMining(shouldmine bool) bool { +func (self *XEth) SetMining(shouldmine bool, threads int) bool { ismining := self.backend.IsMining() if shouldmine && !ismining { - err := self.backend.StartMining() + err := self.backend.StartMining(threads) return err == nil } if ismining && !shouldmine { From 48bd48876c02d1a08690b9604df09ef4bcf77838 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 11 May 2015 17:27:34 +0200 Subject: [PATCH 5/5] eth, eth/downloader: moved pending queue error message to debug --- eth/downloader/downloader.go | 4 ++-- eth/sync.go | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 4c7ebe646..577152a21 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -28,7 +28,7 @@ var ( errUnknownPeer = errors.New("peer's unknown or unhealthy") errBadPeer = errors.New("action from bad peer ignored") errNoPeers = errors.New("no peers to keep download active") - errPendingQueue = errors.New("pending items in queue") + ErrPendingQueue = errors.New("pending items in queue") ErrTimeout = errors.New("timeout") errEmptyHashSet = errors.New("empty hash set by peer") errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") @@ -129,7 +129,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { // Abort if the queue still contains some leftover data if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { - return errPendingQueue + return ErrPendingQueue } // Reset the queue and peer set to clean any internal leftover state d.queue.Reset() diff --git a/eth/sync.go b/eth/sync.go index d955eaa50..00b571782 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -98,7 +98,8 @@ func (pm *ProtocolManager) synchronise(peer *peer) { case downloader.ErrTimeout: glog.V(logger.Debug).Infof("Removing peer %v due to sync timeout", peer.id) pm.removePeer(peer) - + case downloader.ErrPendingQueue: + glog.V(logger.Debug).Infoln("Synchronisation aborted:", err) default: glog.V(logger.Warn).Infof("Synchronisation failed: %v", err) }