Queued approach to delivering chain events

This commit is contained in:
obscuren 2015-03-06 15:50:44 +01:00
parent b72ca57775
commit 8d9be18b29
4 changed files with 86 additions and 36 deletions

View File

@ -19,11 +19,6 @@ var (
jsonlogger = logger.NewJsonLogger() jsonlogger = logger.NewJsonLogger()
) )
type ChainEvent struct {
Block *types.Block
Td *big.Int
}
type StateQuery interface { type StateQuery interface {
GetAccount(addr []byte) *state.StateObject GetAccount(addr []byte) *state.StateObject
} }
@ -93,13 +88,16 @@ type ChainManager struct {
transState *state.StateDB transState *state.StateDB
txState *state.StateDB txState *state.StateDB
quit chan struct{}
} }
func NewChainManager(db ethutil.Database, mux *event.TypeMux) *ChainManager { func NewChainManager(db ethutil.Database, mux *event.TypeMux) *ChainManager {
bc := &ChainManager{db: db, genesisBlock: GenesisBlock(db), eventMux: mux} bc := &ChainManager{db: db, genesisBlock: GenesisBlock(db), eventMux: mux, quit: make(chan struct{})}
bc.setLastBlock() bc.setLastBlock()
bc.transState = bc.State().Copy() bc.transState = bc.State().Copy()
bc.txState = bc.State().Copy() bc.txState = bc.State().Copy()
go bc.update()
return bc return bc
} }
@ -388,16 +386,24 @@ func (self *ChainManager) CalcTotalDiff(block *types.Block) (*big.Int, error) {
} }
func (bc *ChainManager) Stop() { func (bc *ChainManager) Stop() {
if bc.CurrentBlock != nil { close(bc.quit)
chainlogger.Infoln("Stopped") }
}
type queueEvent struct {
queue []interface{}
canonicalCount int
sideCount int
splitCount int
} }
func (self *ChainManager) InsertChain(chain types.Blocks) error { func (self *ChainManager) InsertChain(chain types.Blocks) error {
self.tsmu.Lock() //self.tsmu.Lock()
defer self.tsmu.Unlock() //defer self.tsmu.Unlock()
for _, block := range chain { // A queued approach to delivering events. This is generally faster than direct delivery and requires much less mutex acquiring.
var queue = make([]interface{}, len(chain))
var queueEvent = queueEvent{queue: queue}
for i, block := range chain {
// Call in to the block processor and check for errors. It's likely that if one block fails // Call in to the block processor and check for errors. It's likely that if one block fails
// all others will fail too (unless a known block is returned). // all others will fail too (unless a known block is returned).
td, err := self.processor.Process(block) td, err := self.processor.Process(block)
@ -414,7 +420,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
} }
block.Td = td block.Td = td
var canonical, split bool
self.mu.Lock() self.mu.Lock()
cblock := self.currentBlock cblock := self.currentBlock
{ {
@ -426,41 +431,75 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
if td.Cmp(self.td) > 0 { if td.Cmp(self.td) > 0 {
if block.Header().Number.Cmp(new(big.Int).Add(cblock.Header().Number, ethutil.Big1)) < 0 { if block.Header().Number.Cmp(new(big.Int).Add(cblock.Header().Number, ethutil.Big1)) < 0 {
chainlogger.Infof("Split detected. New head #%v (%x) TD=%v, was #%v (%x) TD=%v\n", block.Header().Number, block.Hash()[:4], td, cblock.Header().Number, cblock.Hash()[:4], self.td) chainlogger.Infof("Split detected. New head #%v (%x) TD=%v, was #%v (%x) TD=%v\n", block.Header().Number, block.Hash()[:4], td, cblock.Header().Number, cblock.Hash()[:4], self.td)
split = true
queue[i] = ChainSplitEvent{block}
queueEvent.splitCount++
} }
self.setTotalDifficulty(td) self.setTotalDifficulty(td)
self.insert(block) self.insert(block)
canonical = true /*
jsonlogger.LogJson(&logger.EthChainNewHead{
BlockHash: ethutil.Bytes2Hex(block.Hash()),
BlockNumber: block.Number(),
ChainHeadHash: ethutil.Bytes2Hex(cblock.Hash()),
BlockPrevHash: ethutil.Bytes2Hex(block.ParentHash()),
})
*/
self.setTransState(state.New(block.Root(), self.db))
queue[i] = ChainEvent{block}
queueEvent.canonicalCount++
} else {
queue[i] = ChainSideEvent{block}
queueEvent.sideCount++
} }
} }
self.mu.Unlock() self.mu.Unlock()
if canonical {
/*
jsonlogger.LogJson(&logger.EthChainNewHead{
BlockHash: ethutil.Bytes2Hex(block.Hash()),
BlockNumber: block.Number(),
ChainHeadHash: ethutil.Bytes2Hex(cblock.Hash()),
BlockPrevHash: ethutil.Bytes2Hex(block.ParentHash()),
})
*/
self.setTransState(state.New(block.Root(), self.db))
self.eventMux.Post(ChainEvent{block, td})
} else {
//self.eventMux.
}
if split {
self.setTxState(state.New(block.Root(), self.db))
self.eventMux.Post(ChainSplitEvent{block})
}
} }
// XXX put this in a goroutine?
go self.eventMux.Post(queueEvent)
return nil return nil
} }
func (self *ChainManager) update() {
events := self.eventMux.Subscribe(queueEvent{})
out:
for {
select {
case ev := <-events.Chan():
switch ev := ev.(type) {
case queueEvent:
for i, event := range ev.queue {
switch event := event.(type) {
case ChainEvent:
// We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long
// and in most cases isn't even necessary.
if i == ev.canonicalCount {
self.eventMux.Post(ChainHeadEvent{event.Block})
}
case ChainSplitEvent:
// On chain splits we need to reset the transaction state. We can't be sure whether the actual
// state of the accounts are still valid.
if i == ev.splitCount {
self.setTxState(state.New(event.Block.Root(), self.db))
}
}
self.eventMux.Post(event)
}
}
case <-self.quit:
break out
}
}
}
// Satisfy state query interface // Satisfy state query interface
func (self *ChainManager) GetAccount(addr []byte) *state.StateObject { func (self *ChainManager) GetAccount(addr []byte) *state.StateObject {
return self.State().GetAccount(addr) return self.State().GetAccount(addr)

View File

@ -16,3 +16,13 @@ type NewMinedBlockEvent struct{ Block *types.Block }
// ChainSplit is posted when a new head is detected // ChainSplit is posted when a new head is detected
type ChainSplitEvent struct{ Block *types.Block } type ChainSplitEvent struct{ Block *types.Block }
type ChainEvent struct{ Block *types.Block }
type ChainSideEvent struct{ Block *types.Block }
type ChainHeadEvent struct{ Block *types.Block }
// Mining operation events
type StartMining struct{}
type TopMining struct{}

View File

@ -30,6 +30,7 @@ func New(coinbase []byte, eth core.Backend, pow pow.PoW, minerThreads int) *Mine
pow: pow, pow: pow,
} }
minerThreads = 1
for i := 0; i < minerThreads; i++ { for i := 0; i < minerThreads; i++ {
miner.worker.register(NewCpuMiner(i, miner.pow)) miner.worker.register(NewCpuMiner(i, miner.pow))
} }

View File

@ -116,7 +116,7 @@ func (self *worker) register(agent Agent) {
} }
func (self *worker) update() { func (self *worker) update() {
events := self.mux.Subscribe(core.ChainEvent{}, core.NewMinedBlockEvent{}) events := self.mux.Subscribe(core.ChainHeadEvent{}, core.NewMinedBlockEvent{})
timer := time.NewTicker(2 * time.Second) timer := time.NewTicker(2 * time.Second)
@ -125,7 +125,7 @@ out:
select { select {
case event := <-events.Chan(): case event := <-events.Chan():
switch ev := event.(type) { switch ev := event.(type) {
case core.ChainEvent: case core.ChainHeadEvent:
if self.current.block != ev.Block { if self.current.block != ev.Block {
self.commitNewWork() self.commitNewWork()
} }