diff --git a/cmd/geth/admin.go b/cmd/geth/admin.go index 2b9956638..17d711297 100644 --- a/cmd/geth/admin.go +++ b/cmd/geth/admin.go @@ -275,14 +275,13 @@ func (js *jsre) verbosity(call otto.FunctionCall) otto.Value { } func (js *jsre) startMining(call otto.FunctionCall) otto.Value { - _, err := call.Argument(0).ToInteger() + threads, err := call.Argument(0).ToInteger() if err != nil { fmt.Println(err) return otto.FalseValue() } - // threads now ignored - err = js.ethereum.StartMining() + err = js.ethereum.StartMining(int(threads)) if err != nil { fmt.Println(err) return otto.FalseValue() diff --git a/cmd/geth/main.go b/cmd/geth/main.go index fd7aae4c2..5da59ff3b 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -401,7 +401,7 @@ func startEth(ctx *cli.Context, eth *eth.Ethereum) { } } if ctx.GlobalBool(utils.MiningEnabledFlag.Name) { - if err := eth.StartMining(); err != nil { + if err := eth.StartMining(ctx.GlobalInt(utils.MinerThreadsFlag.Name)); err != nil { utils.Fatalf("%v", err) } } diff --git a/cmd/mist/ui_lib.go b/cmd/mist/ui_lib.go index a604e87ba..4653e0980 100644 --- a/cmd/mist/ui_lib.go +++ b/cmd/mist/ui_lib.go @@ -159,7 +159,7 @@ func (self *UiLib) RemoveLocalTransaction(id int) { func (self *UiLib) ToggleMining() bool { if !self.eth.IsMining() { - err := self.eth.StartMining() + err := self.eth.StartMining(4) return err == nil } else { self.eth.StopMining() diff --git a/eth/backend.go b/eth/backend.go index cdbe35b26..6be871138 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -267,7 +267,7 @@ func New(config *Config) (*Ethereum, error) { eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit) eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux()) eth.chainManager.SetProcessor(eth.blockProcessor) - eth.miner = miner.New(eth, eth.pow, config.MinerThreads) + eth.miner = miner.New(eth, eth.pow) eth.miner.SetGasPrice(config.GasPrice) eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager, eth.downloader) @@ -368,7 +368,7 @@ func (s *Ethereum) ResetWithGenesisBlock(gb *types.Block) { s.chainManager.ResetWithGenesisBlock(gb) } -func (s *Ethereum) StartMining() error { +func (s *Ethereum) StartMining(threads int) error { eb, err := s.Etherbase() if err != nil { err = fmt.Errorf("Cannot start mining without etherbase address: %v", err) @@ -376,7 +376,7 @@ func (s *Ethereum) StartMining() error { return err } - go s.miner.Start(eb) + go s.miner.Start(eb, threads) return nil } @@ -461,13 +461,13 @@ done: case <-ticker.C: // don't change the order of database flushes if err := s.extraDb.Flush(); err != nil { - glog.Fatalf("fatal error: flush extraDb: %v\n", err) + glog.Fatalf("fatal error: flush extraDb: %v (Restart your node. We are aware of this issue)\n", err) } if err := s.stateDb.Flush(); err != nil { - glog.Fatalf("fatal error: flush stateDb: %v\n", err) + glog.Fatalf("fatal error: flush stateDb: %v (Restart your node. We are aware of this issue)\n", err) } if err := s.blockDb.Flush(); err != nil { - glog.Fatalf("fatal error: flush blockDb: %v\n", err) + glog.Fatalf("fatal error: flush blockDb: %v (Restart your node. We are aware of this issue)\n", err) } case <-s.shutdownChan: break done diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 4c7ebe646..577152a21 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -28,7 +28,7 @@ var ( errUnknownPeer = errors.New("peer's unknown or unhealthy") errBadPeer = errors.New("action from bad peer ignored") errNoPeers = errors.New("no peers to keep download active") - errPendingQueue = errors.New("pending items in queue") + ErrPendingQueue = errors.New("pending items in queue") ErrTimeout = errors.New("timeout") errEmptyHashSet = errors.New("empty hash set by peer") errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") @@ -129,7 +129,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { // Abort if the queue still contains some leftover data if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { - return errPendingQueue + return ErrPendingQueue } // Reset the queue and peer set to clean any internal leftover state d.queue.Reset() diff --git a/eth/sync.go b/eth/sync.go index d955eaa50..00b571782 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -98,7 +98,8 @@ func (pm *ProtocolManager) synchronise(peer *peer) { case downloader.ErrTimeout: glog.V(logger.Debug).Infof("Removing peer %v due to sync timeout", peer.id) pm.removePeer(peer) - + case downloader.ErrPendingQueue: + glog.V(logger.Debug).Infoln("Synchronisation aborted:", err) default: glog.V(logger.Warn).Infof("Synchronisation failed: %v", err) } diff --git a/miner/agent.go b/miner/agent.go index b2f89aaab..939f63fef 100644 --- a/miner/agent.go +++ b/miner/agent.go @@ -10,7 +10,7 @@ import ( "github.com/ethereum/go-ethereum/pow" ) -type CpuMiner struct { +type CpuAgent struct { chMu sync.Mutex c chan *types.Block quit chan struct{} @@ -21,8 +21,8 @@ type CpuMiner struct { pow pow.PoW } -func NewCpuMiner(index int, pow pow.PoW) *CpuMiner { - miner := &CpuMiner{ +func NewCpuAgent(index int, pow pow.PoW) *CpuAgent { + miner := &CpuAgent{ pow: pow, index: index, } @@ -30,16 +30,16 @@ func NewCpuMiner(index int, pow pow.PoW) *CpuMiner { return miner } -func (self *CpuMiner) Work() chan<- *types.Block { return self.c } -func (self *CpuMiner) Pow() pow.PoW { return self.pow } -func (self *CpuMiner) SetReturnCh(ch chan<- *types.Block) { self.returnCh = ch } +func (self *CpuAgent) Work() chan<- *types.Block { return self.c } +func (self *CpuAgent) Pow() pow.PoW { return self.pow } +func (self *CpuAgent) SetReturnCh(ch chan<- *types.Block) { self.returnCh = ch } -func (self *CpuMiner) Stop() { +func (self *CpuAgent) Stop() { close(self.quit) close(self.quitCurrentOp) } -func (self *CpuMiner) Start() { +func (self *CpuAgent) Start() { self.quit = make(chan struct{}) self.quitCurrentOp = make(chan struct{}, 1) self.c = make(chan *types.Block, 1) @@ -47,7 +47,7 @@ func (self *CpuMiner) Start() { go self.update() } -func (self *CpuMiner) update() { +func (self *CpuAgent) update() { out: for { select { @@ -76,7 +76,7 @@ done: } } -func (self *CpuMiner) mine(block *types.Block) { +func (self *CpuAgent) mine(block *types.Block) { glog.V(logger.Debug).Infof("(re)started agent[%d]. mining...\n", self.index) // Reset the channel @@ -95,6 +95,6 @@ func (self *CpuMiner) mine(block *types.Block) { } } -func (self *CpuMiner) GetHashRate() int64 { +func (self *CpuAgent) GetHashRate() int64 { return self.pow.GetHashrate() } diff --git a/miner/miner.go b/miner/miner.go index efe6d3051..09342e250 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -23,16 +23,8 @@ type Miner struct { pow pow.PoW } -func New(eth core.Backend, pow pow.PoW, minerThreads int) *Miner { - // note: minerThreads is currently ignored because - // ethash is not thread safe. - miner := &Miner{eth: eth, pow: pow, worker: newWorker(common.Address{}, eth)} - for i := 0; i < minerThreads; i++ { - miner.worker.register(NewCpuMiner(i, pow)) - } - miner.threads = minerThreads - - return miner +func New(eth core.Backend, pow pow.PoW) *Miner { + return &Miner{eth: eth, pow: pow, worker: newWorker(common.Address{}, eth)} } func (self *Miner) Mining() bool { @@ -48,15 +40,27 @@ func (m *Miner) SetGasPrice(price *big.Int) { m.worker.gasPrice = price } -func (self *Miner) Start(coinbase common.Address) { - glog.V(logger.Info).Infoln("Starting mining operation") +func (self *Miner) Start(coinbase common.Address, threads int) { self.mining = true + + for i := 0; i < threads; i++ { + self.worker.register(NewCpuAgent(i, self.pow)) + } + self.threads = threads + + glog.V(logger.Info).Infof("Starting mining operation (CPU=%d TOT=%d)\n", threads, len(self.worker.agents)) + self.worker.coinbase = coinbase self.worker.start() self.worker.commitNewWork() } +func (self *Miner) Stop() { + self.worker.stop() + self.mining = false +} + func (self *Miner) Register(agent Agent) { if self.mining { agent.Start() @@ -65,11 +69,6 @@ func (self *Miner) Register(agent Agent) { self.worker.register(agent) } -func (self *Miner) Stop() { - self.mining = false - self.worker.stop() -} - func (self *Miner) HashRate() int64 { return self.worker.HashRate() } diff --git a/miner/worker.go b/miner/worker.go index e3dbae717..d801a9839 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -141,7 +141,6 @@ func (self *worker) start() { for _, agent := range self.agents { agent.Start() } - } func (self *worker) stop() { @@ -149,10 +148,16 @@ func (self *worker) stop() { defer self.mu.Unlock() if atomic.LoadInt32(&self.mining) == 1 { + var keep []Agent // stop all agents for _, agent := range self.agents { agent.Stop() + // keep all that's not a cpu agent + if _, ok := agent.(*CpuAgent); !ok { + keep = append(keep, agent) + } } + self.agents = keep } atomic.StoreInt32(&self.mining, 0) diff --git a/rpc/api.go b/rpc/api.go index 309c161ad..d53a9917d 100644 --- a/rpc/api.go +++ b/rpc/api.go @@ -391,7 +391,7 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err } *reply = NewLogsRes(api.xeth().AllLogs(args.Earliest, args.Latest, args.Skip, args.Max, args.Address, args.Topics)) case "eth_getWork": - api.xeth().SetMining(true) + api.xeth().SetMining(true, 0) *reply = api.xeth().RemoteMining().GetWork() case "eth_submitWork": args := new(SubmitWorkArgs) diff --git a/xeth/xeth.go b/xeth/xeth.go index 47b833a34..bf5844770 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -425,10 +425,10 @@ func (self *XEth) ClientVersion() string { return self.backend.ClientVersion() } -func (self *XEth) SetMining(shouldmine bool) bool { +func (self *XEth) SetMining(shouldmine bool, threads int) bool { ismining := self.backend.IsMining() if shouldmine && !ismining { - err := self.backend.StartMining() + err := self.backend.StartMining(threads) return err == nil } if ismining && !shouldmine {