miner: fix state commit, track old work packages too (#17490)

* miner: commit state which is relative with sealing result

* consensus, core, miner, mobile: introduce sealHash interface

* miner: evict pending task with threshold

* miner: go fmt
This commit is contained in:
gary rong 2018-08-23 21:02:57 +08:00 committed by Péter Szilágyi
parent c3f7e3be3b
commit 40a71f28cf
9 changed files with 112 additions and 73 deletions

View File

@ -673,6 +673,11 @@ func CalcDifficulty(snap *Snapshot, signer common.Address) *big.Int {
return new(big.Int).Set(diffNoTurn) return new(big.Int).Set(diffNoTurn)
} }
// SealHash returns the hash of a block prior to it being sealed.
func (c *Clique) SealHash(header *types.Header) common.Hash {
return sigHash(header)
}
// Close implements consensus.Engine. It's a noop for clique as there is are no background threads. // Close implements consensus.Engine. It's a noop for clique as there is are no background threads.
func (c *Clique) Close() error { func (c *Clique) Close() error {
return nil return nil

View File

@ -90,6 +90,9 @@ type Engine interface {
// seal place on top. // seal place on top.
Seal(chain ChainReader, block *types.Block, stop <-chan struct{}) (*types.Block, error) Seal(chain ChainReader, block *types.Block, stop <-chan struct{}) (*types.Block, error)
// SealHash returns the hash of a block prior to it being sealed.
SealHash(header *types.Header) common.Hash
// CalcDifficulty is the difficulty adjustment algorithm. It returns the difficulty // CalcDifficulty is the difficulty adjustment algorithm. It returns the difficulty
// that a new block should have. // that a new block should have.
CalcDifficulty(chain ChainReader, time uint64, parent *types.Header) *big.Int CalcDifficulty(chain ChainReader, time uint64, parent *types.Header) *big.Int

View File

@ -31,7 +31,9 @@ import (
"github.com/ethereum/go-ethereum/consensus/misc" "github.com/ethereum/go-ethereum/consensus/misc"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/sha3"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
) )
// Ethash proof-of-work protocol constants. // Ethash proof-of-work protocol constants.
@ -495,7 +497,7 @@ func (ethash *Ethash) verifySeal(chain consensus.ChainReader, header *types.Head
if fulldag { if fulldag {
dataset := ethash.dataset(number, true) dataset := ethash.dataset(number, true)
if dataset.generated() { if dataset.generated() {
digest, result = hashimotoFull(dataset.dataset, header.HashNoNonce().Bytes(), header.Nonce.Uint64()) digest, result = hashimotoFull(dataset.dataset, ethash.SealHash(header).Bytes(), header.Nonce.Uint64())
// Datasets are unmapped in a finalizer. Ensure that the dataset stays alive // Datasets are unmapped in a finalizer. Ensure that the dataset stays alive
// until after the call to hashimotoFull so it's not unmapped while being used. // until after the call to hashimotoFull so it's not unmapped while being used.
@ -513,7 +515,7 @@ func (ethash *Ethash) verifySeal(chain consensus.ChainReader, header *types.Head
if ethash.config.PowMode == ModeTest { if ethash.config.PowMode == ModeTest {
size = 32 * 1024 size = 32 * 1024
} }
digest, result = hashimotoLight(size, cache.cache, header.HashNoNonce().Bytes(), header.Nonce.Uint64()) digest, result = hashimotoLight(size, cache.cache, ethash.SealHash(header).Bytes(), header.Nonce.Uint64())
// Caches are unmapped in a finalizer. Ensure that the cache stays alive // Caches are unmapped in a finalizer. Ensure that the cache stays alive
// until after the call to hashimotoLight so it's not unmapped while being used. // until after the call to hashimotoLight so it's not unmapped while being used.
@ -552,6 +554,29 @@ func (ethash *Ethash) Finalize(chain consensus.ChainReader, header *types.Header
return types.NewBlock(header, txs, uncles, receipts), nil return types.NewBlock(header, txs, uncles, receipts), nil
} }
// SealHash returns the hash of a block prior to it being sealed.
func (ethash *Ethash) SealHash(header *types.Header) (hash common.Hash) {
hasher := sha3.NewKeccak256()
rlp.Encode(hasher, []interface{}{
header.ParentHash,
header.UncleHash,
header.Coinbase,
header.Root,
header.TxHash,
header.ReceiptHash,
header.Bloom,
header.Difficulty,
header.Number,
header.GasLimit,
header.GasUsed,
header.Time,
header.Extra,
})
hasher.Sum(hash[:0])
return hash
}
// Some weird constants to avoid constant memory allocs for them. // Some weird constants to avoid constant memory allocs for them.
var ( var (
big8 = big.NewInt(8) big8 = big.NewInt(8)

View File

@ -94,6 +94,7 @@ func TestRemoteSealer(t *testing.T) {
} }
header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)}
block := types.NewBlockWithHeader(header) block := types.NewBlockWithHeader(header)
sealhash := ethash.SealHash(header)
// Push new work. // Push new work.
ethash.Seal(nil, block, nil) ethash.Seal(nil, block, nil)
@ -102,27 +103,29 @@ func TestRemoteSealer(t *testing.T) {
work [3]string work [3]string
err error err error
) )
if work, err = api.GetWork(); err != nil || work[0] != block.HashNoNonce().Hex() { if work, err = api.GetWork(); err != nil || work[0] != sealhash.Hex() {
t.Error("expect to return a mining work has same hash") t.Error("expect to return a mining work has same hash")
} }
if res := api.SubmitWork(types.BlockNonce{}, block.HashNoNonce(), common.Hash{}); res { if res := api.SubmitWork(types.BlockNonce{}, sealhash, common.Hash{}); res {
t.Error("expect to return false when submit a fake solution") t.Error("expect to return false when submit a fake solution")
} }
// Push new block with same block number to replace the original one. // Push new block with same block number to replace the original one.
header = &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(1000)} header = &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(1000)}
block = types.NewBlockWithHeader(header) block = types.NewBlockWithHeader(header)
sealhash = ethash.SealHash(header)
ethash.Seal(nil, block, nil) ethash.Seal(nil, block, nil)
if work, err = api.GetWork(); err != nil || work[0] != block.HashNoNonce().Hex() { if work, err = api.GetWork(); err != nil || work[0] != sealhash.Hex() {
t.Error("expect to return the latest pushed work") t.Error("expect to return the latest pushed work")
} }
// Push block with higher block number. // Push block with higher block number.
newHead := &types.Header{Number: big.NewInt(2), Difficulty: big.NewInt(100)} newHead := &types.Header{Number: big.NewInt(2), Difficulty: big.NewInt(100)}
newBlock := types.NewBlockWithHeader(newHead) newBlock := types.NewBlockWithHeader(newHead)
newSealhash := ethash.SealHash(newHead)
ethash.Seal(nil, newBlock, nil) ethash.Seal(nil, newBlock, nil)
if res := api.SubmitWork(types.BlockNonce{}, block.HashNoNonce(), common.Hash{}); res { if res := api.SubmitWork(types.BlockNonce{}, newSealhash, common.Hash{}); res {
t.Error("expect to return false when submit a stale solution") t.Error("expect to return false when submit a stale solution")
} }
} }

View File

@ -111,7 +111,7 @@ func (ethash *Ethash) mine(block *types.Block, id int, seed uint64, abort chan s
// Extract some data from the header // Extract some data from the header
var ( var (
header = block.Header() header = block.Header()
hash = header.HashNoNonce().Bytes() hash = ethash.SealHash(header).Bytes()
target = new(big.Int).Div(two256, header.Difficulty) target = new(big.Int).Div(two256, header.Difficulty)
number = header.Number.Uint64() number = header.Number.Uint64()
dataset = ethash.dataset(number, false) dataset = ethash.dataset(number, false)
@ -213,7 +213,7 @@ func (ethash *Ethash) remote(notify []string) {
// result[1], 32 bytes hex encoded seed hash used for DAG // result[1], 32 bytes hex encoded seed hash used for DAG
// result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty // result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty
makeWork := func(block *types.Block) { makeWork := func(block *types.Block) {
hash := block.HashNoNonce() hash := ethash.SealHash(block.Header())
currentWork[0] = hash.Hex() currentWork[0] = hash.Hex()
currentWork[1] = common.BytesToHash(SeedHash(block.NumberU64())).Hex() currentWork[1] = common.BytesToHash(SeedHash(block.NumberU64())).Hex()

View File

@ -51,7 +51,7 @@ func TestRemoteNotify(t *testing.T) {
ethash.Seal(nil, block, nil) ethash.Seal(nil, block, nil)
select { select {
case work := <-sink: case work := <-sink:
if want := header.HashNoNonce().Hex(); work[0] != want { if want := ethash.SealHash(header).Hex(); work[0] != want {
t.Errorf("work packet hash mismatch: have %s, want %s", work[0], want) t.Errorf("work packet hash mismatch: have %s, want %s", work[0], want)
} }
if want := common.BytesToHash(SeedHash(header.Number.Uint64())).Hex(); work[1] != want { if want := common.BytesToHash(SeedHash(header.Number.Uint64())).Hex(); work[1] != want {

View File

@ -102,25 +102,6 @@ func (h *Header) Hash() common.Hash {
return rlpHash(h) return rlpHash(h)
} }
// HashNoNonce returns the hash which is used as input for the proof-of-work search.
func (h *Header) HashNoNonce() common.Hash {
return rlpHash([]interface{}{
h.ParentHash,
h.UncleHash,
h.Coinbase,
h.Root,
h.TxHash,
h.ReceiptHash,
h.Bloom,
h.Difficulty,
h.Number,
h.GasLimit,
h.GasUsed,
h.Time,
h.Extra,
})
}
// Size returns the approximate memory used by all internal contents. It is used // Size returns the approximate memory used by all internal contents. It is used
// to approximate and limit the memory consumption of various caches. // to approximate and limit the memory consumption of various caches.
func (h *Header) Size() common.StorageSize { func (h *Header) Size() common.StorageSize {
@ -324,10 +305,6 @@ func (b *Block) Header() *Header { return CopyHeader(b.header) }
// Body returns the non-header content of the block. // Body returns the non-header content of the block.
func (b *Block) Body() *Body { return &Body{b.transactions, b.uncles} } func (b *Block) Body() *Body { return &Body{b.transactions, b.uncles} }
func (b *Block) HashNoNonce() common.Hash {
return b.header.HashNoNonce()
}
// Size returns the true RLP encoded storage size of the block, either by encoding // Size returns the true RLP encoded storage size of the block, either by encoding
// and returning it, or returning a previsouly cached value. // and returning it, or returning a previsouly cached value.
func (b *Block) Size() common.StorageSize { func (b *Block) Size() common.StorageSize {

View File

@ -72,6 +72,9 @@ const (
// intervalAdjustBias is applied during the new resubmit interval calculation in favor of // intervalAdjustBias is applied during the new resubmit interval calculation in favor of
// increasing upper limit or decreasing lower limit so that the limit can be reachable. // increasing upper limit or decreasing lower limit so that the limit can be reachable.
intervalAdjustBias = 200 * 1000.0 * 1000.0 intervalAdjustBias = 200 * 1000.0 * 1000.0
// staleThreshold is the maximum distance of the acceptable stale block.
staleThreshold = 7
) )
// environment is the worker's current environment and holds all of the current state information. // environment is the worker's current environment and holds all of the current state information.
@ -150,6 +153,9 @@ type worker struct {
coinbase common.Address coinbase common.Address
extra []byte extra []byte
pendingMu sync.RWMutex
pendingTasks map[common.Hash]*task
snapshotMu sync.RWMutex // The lock used to protect the block snapshot and state snapshot snapshotMu sync.RWMutex // The lock used to protect the block snapshot and state snapshot
snapshotBlock *types.Block snapshotBlock *types.Block
snapshotState *state.StateDB snapshotState *state.StateDB
@ -174,6 +180,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend,
chain: eth.BlockChain(), chain: eth.BlockChain(),
possibleUncles: make(map[common.Hash]*types.Block), possibleUncles: make(map[common.Hash]*types.Block),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
pendingTasks: make(map[common.Hash]*task),
txsCh: make(chan core.NewTxsEvent, txChanSize), txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
@ -317,13 +324,25 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
} }
recommit = time.Duration(int64(next)) recommit = time.Duration(int64(next))
} }
// clearPending cleans the stale pending tasks.
clearPending := func(number uint64) {
w.pendingMu.Lock()
for h, t := range w.pendingTasks {
if t.block.NumberU64()+staleThreshold <= number {
delete(w.pendingTasks, h)
}
}
w.pendingMu.Unlock()
}
for { for {
select { select {
case <-w.startCh: case <-w.startCh:
clearPending(w.chain.CurrentBlock().NumberU64())
commit(false, commitInterruptNewHead) commit(false, commitInterruptNewHead)
case <-w.chainHeadCh: case head := <-w.chainHeadCh:
clearPending(head.Block.NumberU64())
commit(false, commitInterruptNewHead) commit(false, commitInterruptNewHead)
case <-timer.C: case <-timer.C:
@ -454,29 +473,38 @@ func (w *worker) mainLoop() {
// seal pushes a sealing task to consensus engine and submits the result. // seal pushes a sealing task to consensus engine and submits the result.
func (w *worker) seal(t *task, stop <-chan struct{}) { func (w *worker) seal(t *task, stop <-chan struct{}) {
var (
err error
res *task
)
if w.skipSealHook != nil && w.skipSealHook(t) { if w.skipSealHook != nil && w.skipSealHook(t) {
return return
} }
// The reason for caching task first is:
// A previous sealing action will be canceled by subsequent actions,
// however, remote miner may submit a result based on the cancelled task.
// So we should only submit the pending state corresponding to the seal result.
// TODO(rjl493456442) Replace the seal-wait logic structure
w.pendingMu.Lock()
w.pendingTasks[w.engine.SealHash(t.block.Header())] = t
w.pendingMu.Unlock()
if t.block, err = w.engine.Seal(w.chain, t.block, stop); t.block != nil { if block, err := w.engine.Seal(w.chain, t.block, stop); block != nil {
log.Info("Successfully sealed new block", "number", t.block.Number(), "hash", t.block.Hash(), sealhash := w.engine.SealHash(block.Header())
"elapsed", common.PrettyDuration(time.Since(t.createdAt))) w.pendingMu.RLock()
res = t task, exist := w.pendingTasks[sealhash]
} else { w.pendingMu.RUnlock()
if err != nil { if !exist {
log.Warn("Block sealing failed", "err", err) log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", block.Hash())
} return
res = nil
} }
// Assemble sealing result
task.block = block
log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", block.Hash(),
"elapsed", common.PrettyDuration(time.Since(task.createdAt)))
select { select {
case w.resultCh <- res: case w.resultCh <- task:
case <-w.exitCh: case <-w.exitCh:
} }
} else if err != nil {
log.Warn("Block sealing failed", "err", err)
}
} }
// taskLoop is a standalone goroutine to fetch sealing task from the generator and // taskLoop is a standalone goroutine to fetch sealing task from the generator and
@ -501,12 +529,13 @@ func (w *worker) taskLoop() {
w.newTaskHook(task) w.newTaskHook(task)
} }
// Reject duplicate sealing work due to resubmitting. // Reject duplicate sealing work due to resubmitting.
if task.block.HashNoNonce() == prev { sealHash := w.engine.SealHash(task.block.Header())
if sealHash == prev {
continue continue
} }
interrupt() interrupt()
stopCh = make(chan struct{}) stopCh = make(chan struct{})
prev = task.block.HashNoNonce() prev = sealHash
go w.seal(task, stopCh) go w.seal(task, stopCh)
case <-w.exitCh: case <-w.exitCh:
interrupt() interrupt()
@ -928,8 +957,8 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st
} }
feesEth := new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether))) feesEth := new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether)))
log.Info("Commit new mining work", "number", block.Number(), "uncles", len(uncles), "txs", w.current.tcount, log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
"gas", block.GasUsed(), "fees", feesEth, "elapsed", common.PrettyDuration(time.Since(start))) "uncles", len(uncles), "txs", w.current.tcount, "gas", block.GasUsed(), "fees", feesEth, "elapsed", common.PrettyDuration(time.Since(start)))
case <-w.exitCh: case <-w.exitCh:
log.Info("Worker has exited") log.Info("Worker has exited")

View File

@ -183,10 +183,7 @@ func (b *Block) GetTime() int64 { return b.block.Time().Int64() }
func (b *Block) GetExtra() []byte { return b.block.Extra() } func (b *Block) GetExtra() []byte { return b.block.Extra() }
func (b *Block) GetMixDigest() *Hash { return &Hash{b.block.MixDigest()} } func (b *Block) GetMixDigest() *Hash { return &Hash{b.block.MixDigest()} }
func (b *Block) GetNonce() int64 { return int64(b.block.Nonce()) } func (b *Block) GetNonce() int64 { return int64(b.block.Nonce()) }
func (b *Block) GetHash() *Hash { return &Hash{b.block.Hash()} } func (b *Block) GetHash() *Hash { return &Hash{b.block.Hash()} }
func (b *Block) GetHashNoNonce() *Hash { return &Hash{b.block.HashNoNonce()} }
func (b *Block) GetHeader() *Header { return &Header{b.block.Header()} } func (b *Block) GetHeader() *Header { return &Header{b.block.Header()} }
func (b *Block) GetUncles() *Headers { return &Headers{b.block.Uncles()} } func (b *Block) GetUncles() *Headers { return &Headers{b.block.Uncles()} }
func (b *Block) GetTransactions() *Transactions { return &Transactions{b.block.Transactions()} } func (b *Block) GetTransactions() *Transactions { return &Transactions{b.block.Transactions()} }