From d8590d97902ff9c9ddc4ce56856f64b417b64489 Mon Sep 17 00:00:00 2001 From: Jeffrey Wilcke Date: Tue, 4 Aug 2015 20:38:57 +0200 Subject: [PATCH] miner: fixed worker race condition --- miner/worker.go | 87 +++++++++++++++++++++++++------------------------ 1 file changed, 44 insertions(+), 43 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index 7fb40af78..a703baada 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -263,8 +263,9 @@ func (self *worker) wait() { continue } block := result.Block + work := result.Work - self.current.state.Sync() + work.state.Sync() if self.fullValidation { if _, err := self.chain.InsertChain(types.Blocks{block}); err != nil { glog.V(logger.Error).Infoln("mining err", err) @@ -292,7 +293,7 @@ func (self *worker) wait() { // This puts transactions in a extra db for rpc core.PutTransactions(self.extraDb, block, block.Transactions()) // store the receipts - core.PutReceipts(self.extraDb, self.current.receipts) + core.PutReceipts(self.extraDb, work.receipts) } // broadcast before waiting for validation @@ -303,7 +304,7 @@ func (self *worker) wait() { self.mux.Post(core.ChainHeadEvent{block}) self.mux.Post(logs) } - }(block, self.current.state.Logs()) + }(block, work.state.Logs()) } // check staleness and display confirmation @@ -313,7 +314,7 @@ func (self *worker) wait() { stale = "stale " } else { confirm = "Wait 5 blocks for confirmation" - self.current.localMinedBlocks = newLocalMinedBlock(block.Number().Uint64(), self.current.localMinedBlocks) + work.localMinedBlocks = newLocalMinedBlock(block.Number().Uint64(), work.localMinedBlocks) } glog.V(logger.Info).Infof("🔨 Mined %sblock (#%v / %x). %s", stale, block.Number(), block.Hash().Bytes()[:4], confirm) @@ -322,9 +323,9 @@ func (self *worker) wait() { } } -func (self *worker) push() { +func (self *worker) push(work *Work) { if atomic.LoadInt32(&self.mining) == 1 { - if core.Canary(self.current.state) { + if core.Canary(work.state) { glog.Infoln("Toxicity levels rising to deadly levels. Your canary has died. You can go back or continue down the mineshaft --more--") glog.Infoln("You turn back and abort mining") return @@ -335,7 +336,7 @@ func (self *worker) push() { atomic.AddInt32(&self.atWork, 1) if agent.Work() != nil { - agent.Work() <- self.current + agent.Work() <- work } } } @@ -344,7 +345,7 @@ func (self *worker) push() { // makeCurrent creates a new environment for the current cycle. func (self *worker) makeCurrent(parent *types.Block, header *types.Header) { state := state.New(parent.Root(), self.eth.StateDb()) - current := &Work{ + work := &Work{ state: state, ancestors: set.New(), family: set.New(), @@ -357,23 +358,23 @@ func (self *worker) makeCurrent(parent *types.Block, header *types.Header) { // when 08 is processed ancestors contain 07 (quick block) for _, ancestor := range self.chain.GetBlocksFromHash(parent.Hash(), 7) { for _, uncle := range ancestor.Uncles() { - current.family.Add(uncle.Hash()) + work.family.Add(uncle.Hash()) } - current.family.Add(ancestor.Hash()) - current.ancestors.Add(ancestor.Hash()) + work.family.Add(ancestor.Hash()) + work.ancestors.Add(ancestor.Hash()) } accounts, _ := self.eth.AccountManager().Accounts() // Keep track of transactions which return errors so they can be removed - current.remove = set.New() - current.tcount = 0 - current.ignoredTransactors = set.New() - current.lowGasTransactors = set.New() - current.ownedAccounts = accountAddressesSet(accounts) + work.remove = set.New() + work.tcount = 0 + work.ignoredTransactors = set.New() + work.lowGasTransactors = set.New() + work.ownedAccounts = accountAddressesSet(accounts) if self.current != nil { - current.localMinedBlocks = self.current.localMinedBlocks + work.localMinedBlocks = self.current.localMinedBlocks } - self.current = current + self.current = work } func (w *worker) setGasPrice(p *big.Int) { @@ -387,13 +388,13 @@ func (w *worker) setGasPrice(p *big.Int) { w.mux.Post(core.GasPriceChanged{w.gasPrice}) } -func (self *worker) isBlockLocallyMined(deepBlockNum uint64) bool { +func (self *worker) isBlockLocallyMined(current *Work, deepBlockNum uint64) bool { //Did this instance mine a block at {deepBlockNum} ? var isLocal = false - for idx, blockNum := range self.current.localMinedBlocks.ints { + for idx, blockNum := range current.localMinedBlocks.ints { if deepBlockNum == blockNum { isLocal = true - self.current.localMinedBlocks.ints[idx] = 0 //prevent showing duplicate logs + current.localMinedBlocks.ints[idx] = 0 //prevent showing duplicate logs break } } @@ -407,12 +408,12 @@ func (self *worker) isBlockLocallyMined(deepBlockNum uint64) bool { return block != nil && block.Coinbase() == self.coinbase } -func (self *worker) logLocalMinedBlocks(previous *Work) { - if previous != nil && self.current.localMinedBlocks != nil { - nextBlockNum := self.current.Block.NumberU64() +func (self *worker) logLocalMinedBlocks(current, previous *Work) { + if previous != nil && current.localMinedBlocks != nil { + nextBlockNum := current.Block.NumberU64() for checkBlockNum := previous.Block.NumberU64(); checkBlockNum < nextBlockNum; checkBlockNum++ { inspectBlockNum := checkBlockNum - miningLogAtDepth - if self.isBlockLocallyMined(inspectBlockNum) { + if self.isBlockLocallyMined(current, inspectBlockNum) { glog.V(logger.Info).Infof("🔨 🔗 Mined %d blocks back: block #%v", miningLogAtDepth, inspectBlockNum) } } @@ -454,14 +455,14 @@ func (self *worker) commitNewWork() { previous := self.current self.makeCurrent(parent, header) - current := self.current + work := self.current // commit transactions for this run. transactions := self.eth.TxPool().GetTransactions() sort.Sort(types.TxByNonce{transactions}) - current.coinbase.SetGasLimit(header.GasLimit) - current.commitTransactions(transactions, self.gasPrice, self.proc) - self.eth.TxPool().RemoveTransactions(current.lowGasTxs) + work.coinbase.SetGasLimit(header.GasLimit) + work.commitTransactions(transactions, self.gasPrice, self.proc) + self.eth.TxPool().RemoveTransactions(work.lowGasTxs) // compute uncles for the new block. var ( @@ -472,7 +473,7 @@ func (self *worker) commitNewWork() { if len(uncles) == 2 { break } - if err := self.commitUncle(uncle.Header()); err != nil { + if err := self.commitUncle(work, uncle.Header()); err != nil { if glog.V(logger.Ridiculousness) { glog.V(logger.Detail).Infof("Bad uncle found and will be removed (%x)\n", hash[:4]) glog.V(logger.Detail).Infoln(uncle) @@ -489,36 +490,36 @@ func (self *worker) commitNewWork() { if atomic.LoadInt32(&self.mining) == 1 { // commit state root after all state transitions. - core.AccumulateRewards(self.current.state, header, uncles) - current.state.SyncObjects() - header.Root = current.state.Root() + core.AccumulateRewards(work.state, header, uncles) + work.state.SyncObjects() + header.Root = work.state.Root() } // create the new block whose nonce will be mined. - current.Block = types.NewBlock(header, current.txs, uncles, current.receipts) - self.current.Block.Td = new(big.Int).Set(core.CalcTD(self.current.Block, self.chain.GetBlock(self.current.Block.ParentHash()))) + work.Block = types.NewBlock(header, work.txs, uncles, work.receipts) + work.Block.Td = new(big.Int).Set(core.CalcTD(work.Block, self.chain.GetBlock(work.Block.ParentHash()))) // We only care about logging if we're actually mining. if atomic.LoadInt32(&self.mining) == 1 { - glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles. Took %v\n", current.Block.Number(), current.tcount, len(uncles), time.Since(tstart)) - self.logLocalMinedBlocks(previous) + glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles. Took %v\n", work.Block.Number(), work.tcount, len(uncles), time.Since(tstart)) + self.logLocalMinedBlocks(work, previous) } - self.push() + self.push(work) } -func (self *worker) commitUncle(uncle *types.Header) error { +func (self *worker) commitUncle(work *Work, uncle *types.Header) error { hash := uncle.Hash() - if self.current.uncles.Has(hash) { + if work.uncles.Has(hash) { return core.UncleError("Uncle not unique") } - if !self.current.ancestors.Has(uncle.ParentHash) { + if !work.ancestors.Has(uncle.ParentHash) { return core.UncleError(fmt.Sprintf("Uncle's parent unknown (%x)", uncle.ParentHash[0:4])) } - if self.current.family.Has(hash) { + if work.family.Has(hash) { return core.UncleError(fmt.Sprintf("Uncle already in family (%x)", hash)) } - self.current.uncles.Add(uncle.Hash()) + work.uncles.Add(uncle.Hash()) return nil }