fixed chain event. Closes #529
This commit is contained in:
		
							parent
							
								
									b52807f0c0
								
							
						
					
					
						commit
						14a2f42f37
					
				| @ -7,8 +7,8 @@ import ( | |||||||
| 	"sync" | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
| 	"github.com/ethereum/go-ethereum/core/types" |  | ||||||
| 	"github.com/ethereum/go-ethereum/common" | 	"github.com/ethereum/go-ethereum/common" | ||||||
|  | 	"github.com/ethereum/go-ethereum/core/types" | ||||||
| 	"github.com/ethereum/go-ethereum/event" | 	"github.com/ethereum/go-ethereum/event" | ||||||
| 	"github.com/ethereum/go-ethereum/logger" | 	"github.com/ethereum/go-ethereum/logger" | ||||||
| 	"github.com/ethereum/go-ethereum/pow" | 	"github.com/ethereum/go-ethereum/pow" | ||||||
| @ -17,10 +17,6 @@ import ( | |||||||
| 	"gopkg.in/fatih/set.v0" | 	"gopkg.in/fatih/set.v0" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| type PendingBlockEvent struct { |  | ||||||
| 	Block *types.Block |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| var statelogger = logger.NewLogger("BLOCK") | var statelogger = logger.NewLogger("BLOCK") | ||||||
| 
 | 
 | ||||||
| type BlockProcessor struct { | type BlockProcessor struct { | ||||||
| @ -137,7 +133,7 @@ func (self *BlockProcessor) ApplyTransactions(coinbase *state.StateObject, state | |||||||
| 	block.Header().GasUsed = totalUsedGas | 	block.Header().GasUsed = totalUsedGas | ||||||
| 
 | 
 | ||||||
| 	if transientProcess { | 	if transientProcess { | ||||||
| 		go self.eventMux.Post(PendingBlockEvent{block}) | 		go self.eventMux.Post(PendingBlockEvent{block, statedb.Logs()}) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return receipts, handled, unhandled, erroneous, err | 	return receipts, handled, unhandled, erroneous, err | ||||||
| @ -146,25 +142,25 @@ func (self *BlockProcessor) ApplyTransactions(coinbase *state.StateObject, state | |||||||
| // Process block will attempt to process the given block's transactions and applies them
 | // Process block will attempt to process the given block's transactions and applies them
 | ||||||
| // on top of the block's parent state (given it exists) and will return wether it was
 | // on top of the block's parent state (given it exists) and will return wether it was
 | ||||||
| // successful or not.
 | // successful or not.
 | ||||||
| func (sm *BlockProcessor) Process(block *types.Block) (td *big.Int, err error) { | func (sm *BlockProcessor) Process(block *types.Block) (td *big.Int, logs state.Logs, err error) { | ||||||
| 	// Processing a blocks may never happen simultaneously
 | 	// Processing a blocks may never happen simultaneously
 | ||||||
| 	sm.mutex.Lock() | 	sm.mutex.Lock() | ||||||
| 	defer sm.mutex.Unlock() | 	defer sm.mutex.Unlock() | ||||||
| 
 | 
 | ||||||
| 	header := block.Header() | 	header := block.Header() | ||||||
| 	if sm.bc.HasBlock(header.Hash()) { | 	if sm.bc.HasBlock(header.Hash()) { | ||||||
| 		return nil, &KnownBlockError{header.Number, header.Hash()} | 		return nil, nil, &KnownBlockError{header.Number, header.Hash()} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if !sm.bc.HasBlock(header.ParentHash) { | 	if !sm.bc.HasBlock(header.ParentHash) { | ||||||
| 		return nil, ParentError(header.ParentHash) | 		return nil, nil, ParentError(header.ParentHash) | ||||||
| 	} | 	} | ||||||
| 	parent := sm.bc.GetBlock(header.ParentHash) | 	parent := sm.bc.GetBlock(header.ParentHash) | ||||||
| 
 | 
 | ||||||
| 	return sm.processWithParent(block, parent) | 	return sm.processWithParent(block, parent) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (td *big.Int, err error) { | func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (td *big.Int, logs state.Logs, err error) { | ||||||
| 	sm.lastAttemptedBlock = block | 	sm.lastAttemptedBlock = block | ||||||
| 
 | 
 | ||||||
| 	// Create a new state based on the parent's root (e.g., create copy)
 | 	// Create a new state based on the parent's root (e.g., create copy)
 | ||||||
| @ -177,7 +173,7 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (td *big | |||||||
| 
 | 
 | ||||||
| 	// There can be at most two uncles
 | 	// There can be at most two uncles
 | ||||||
| 	if len(block.Uncles()) > 2 { | 	if len(block.Uncles()) > 2 { | ||||||
| 		return nil, ValidationError("Block can only contain one uncle (contained %v)", len(block.Uncles())) | 		return nil, nil, ValidationError("Block can only contain one uncle (contained %v)", len(block.Uncles())) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	receipts, err := sm.TransitionState(state, parent, block, false) | 	receipts, err := sm.TransitionState(state, parent, block, false) | ||||||
| @ -236,7 +232,7 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (td *big | |||||||
| 
 | 
 | ||||||
| 	chainlogger.Infof("processed block #%d (%x...)\n", header.Number, block.Hash()[0:4]) | 	chainlogger.Infof("processed block #%d (%x...)\n", header.Number, block.Hash()[0:4]) | ||||||
| 
 | 
 | ||||||
| 	return td, nil | 	return td, state.Logs(), nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Validates the current block. Returns an error if the block was invalid,
 | // Validates the current block. Returns an error if the block was invalid,
 | ||||||
|  | |||||||
| @ -4,8 +4,8 @@ import ( | |||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"math/big" | 	"math/big" | ||||||
| 
 | 
 | ||||||
| 	"github.com/ethereum/go-ethereum/core/types" |  | ||||||
| 	"github.com/ethereum/go-ethereum/common" | 	"github.com/ethereum/go-ethereum/common" | ||||||
|  | 	"github.com/ethereum/go-ethereum/core/types" | ||||||
| 	"github.com/ethereum/go-ethereum/event" | 	"github.com/ethereum/go-ethereum/event" | ||||||
| 	"github.com/ethereum/go-ethereum/pow" | 	"github.com/ethereum/go-ethereum/pow" | ||||||
| 	"github.com/ethereum/go-ethereum/state" | 	"github.com/ethereum/go-ethereum/state" | ||||||
| @ -93,7 +93,7 @@ func makeChain(bman *BlockProcessor, parent *types.Block, max int, db common.Dat | |||||||
| 	blocks := make(types.Blocks, max) | 	blocks := make(types.Blocks, max) | ||||||
| 	for i := 0; i < max; i++ { | 	for i := 0; i < max; i++ { | ||||||
| 		block := makeBlock(bman, parent, i, db, seed) | 		block := makeBlock(bman, parent, i, db, seed) | ||||||
| 		td, err := bman.processWithParent(block, parent) | 		td, _, err := bman.processWithParent(block, parent) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			fmt.Println("process with parent failed", err) | 			fmt.Println("process with parent failed", err) | ||||||
| 			panic(err) | 			panic(err) | ||||||
|  | |||||||
| @ -6,8 +6,8 @@ import ( | |||||||
| 	"math/big" | 	"math/big" | ||||||
| 	"sync" | 	"sync" | ||||||
| 
 | 
 | ||||||
| 	"github.com/ethereum/go-ethereum/core/types" |  | ||||||
| 	"github.com/ethereum/go-ethereum/common" | 	"github.com/ethereum/go-ethereum/common" | ||||||
|  | 	"github.com/ethereum/go-ethereum/core/types" | ||||||
| 	"github.com/ethereum/go-ethereum/event" | 	"github.com/ethereum/go-ethereum/event" | ||||||
| 	"github.com/ethereum/go-ethereum/logger" | 	"github.com/ethereum/go-ethereum/logger" | ||||||
| 	"github.com/ethereum/go-ethereum/rlp" | 	"github.com/ethereum/go-ethereum/rlp" | ||||||
| @ -411,7 +411,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { | |||||||
| 	for i, block := range chain { | 	for i, block := range chain { | ||||||
| 		// Call in to the block processor and check for errors. It's likely that if one block fails
 | 		// Call in to the block processor and check for errors. It's likely that if one block fails
 | ||||||
| 		// all others will fail too (unless a known block is returned).
 | 		// all others will fail too (unless a known block is returned).
 | ||||||
| 		td, err := self.processor.Process(block) | 		td, logs, err := self.processor.Process(block) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			if IsKnownBlockErr(err) { | 			if IsKnownBlockErr(err) { | ||||||
| 				continue | 				continue | ||||||
| @ -437,7 +437,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { | |||||||
| 				if block.Header().Number.Cmp(new(big.Int).Add(cblock.Header().Number, common.Big1)) < 0 { | 				if block.Header().Number.Cmp(new(big.Int).Add(cblock.Header().Number, common.Big1)) < 0 { | ||||||
| 					chainlogger.Infof("Split detected. New head #%v (%x) TD=%v, was #%v (%x) TD=%v\n", block.Header().Number, block.Hash()[:4], td, cblock.Header().Number, cblock.Hash()[:4], self.td) | 					chainlogger.Infof("Split detected. New head #%v (%x) TD=%v, was #%v (%x) TD=%v\n", block.Header().Number, block.Hash()[:4], td, cblock.Header().Number, cblock.Hash()[:4], self.td) | ||||||
| 
 | 
 | ||||||
| 					queue[i] = ChainSplitEvent{block} | 					queue[i] = ChainSplitEvent{block, logs} | ||||||
| 					queueEvent.splitCount++ | 					queueEvent.splitCount++ | ||||||
| 				} | 				} | ||||||
| 
 | 
 | ||||||
| @ -456,10 +456,10 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { | |||||||
| 				self.setTransState(state.New(block.Root(), self.stateDb)) | 				self.setTransState(state.New(block.Root(), self.stateDb)) | ||||||
| 				self.setTxState(state.New(block.Root(), self.stateDb)) | 				self.setTxState(state.New(block.Root(), self.stateDb)) | ||||||
| 
 | 
 | ||||||
| 				queue[i] = ChainEvent{block} | 				queue[i] = ChainEvent{block, logs} | ||||||
| 				queueEvent.canonicalCount++ | 				queueEvent.canonicalCount++ | ||||||
| 			} else { | 			} else { | ||||||
| 				queue[i] = ChainSideEvent{block} | 				queue[i] = ChainSideEvent{block, logs} | ||||||
| 				queueEvent.sideCount++ | 				queueEvent.sideCount++ | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  | |||||||
| @ -1,6 +1,9 @@ | |||||||
| package core | package core | ||||||
| 
 | 
 | ||||||
| import "github.com/ethereum/go-ethereum/core/types" | import ( | ||||||
|  | 	"github.com/ethereum/go-ethereum/core/types" | ||||||
|  | 	"github.com/ethereum/go-ethereum/state" | ||||||
|  | ) | ||||||
| 
 | 
 | ||||||
| // TxPreEvent is posted when a transaction enters the transaction pool.
 | // TxPreEvent is posted when a transaction enters the transaction pool.
 | ||||||
| type TxPreEvent struct{ Tx *types.Transaction } | type TxPreEvent struct{ Tx *types.Transaction } | ||||||
| @ -15,11 +18,25 @@ type NewBlockEvent struct{ Block *types.Block } | |||||||
| type NewMinedBlockEvent struct{ Block *types.Block } | type NewMinedBlockEvent struct{ Block *types.Block } | ||||||
| 
 | 
 | ||||||
| // ChainSplit is posted when a new head is detected
 | // ChainSplit is posted when a new head is detected
 | ||||||
| type ChainSplitEvent struct{ Block *types.Block } | type ChainSplitEvent struct { | ||||||
|  | 	Block *types.Block | ||||||
|  | 	Logs  state.Logs | ||||||
|  | } | ||||||
| 
 | 
 | ||||||
| type ChainEvent struct{ Block *types.Block } | type ChainEvent struct { | ||||||
|  | 	Block *types.Block | ||||||
|  | 	Logs  state.Logs | ||||||
|  | } | ||||||
| 
 | 
 | ||||||
| type ChainSideEvent struct{ Block *types.Block } | type ChainSideEvent struct { | ||||||
|  | 	Block *types.Block | ||||||
|  | 	Logs  state.Logs | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type PendingBlockEvent struct { | ||||||
|  | 	Block *types.Block | ||||||
|  | 	Logs  state.Logs | ||||||
|  | } | ||||||
| 
 | 
 | ||||||
| type ChainHeadEvent struct{ Block *types.Block } | type ChainHeadEvent struct{ Block *types.Block } | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -33,8 +33,8 @@ type Filter struct { | |||||||
| 	max      int | 	max      int | ||||||
| 	topics   [][][]byte | 	topics   [][][]byte | ||||||
| 
 | 
 | ||||||
| 	BlockCallback   func(*types.Block) | 	BlockCallback   func(*types.Block, state.Logs) | ||||||
| 	PendingCallback func(*types.Block) | 	PendingCallback func(*types.Block, state.Logs) | ||||||
| 	LogsCallback    func(state.Logs) | 	LogsCallback    func(state.Logs) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -1,7 +1,11 @@ | |||||||
| package types | package types | ||||||
| 
 | 
 | ||||||
| import "math/big" | import ( | ||||||
|  | 	"math/big" | ||||||
|  | 
 | ||||||
|  | 	"github.com/ethereum/go-ethereum/state" | ||||||
|  | ) | ||||||
| 
 | 
 | ||||||
| type BlockProcessor interface { | type BlockProcessor interface { | ||||||
| 	Process(*Block) (*big.Int, error) | 	Process(*Block) (*big.Int, state.Logs, error) | ||||||
| } | } | ||||||
|  | |||||||
| @ -63,7 +63,7 @@ func (self *FilterManager) filterLoop() { | |||||||
| 	// Subscribe to events
 | 	// Subscribe to events
 | ||||||
| 	events := self.eventMux.Subscribe( | 	events := self.eventMux.Subscribe( | ||||||
| 		core.PendingBlockEvent{}, | 		core.PendingBlockEvent{}, | ||||||
| 		//core.ChainEvent{},
 | 		core.ChainEvent{}, | ||||||
| 		state.Logs(nil)) | 		state.Logs(nil)) | ||||||
| 
 | 
 | ||||||
| out: | out: | ||||||
| @ -77,7 +77,7 @@ out: | |||||||
| 				self.filterMu.RLock() | 				self.filterMu.RLock() | ||||||
| 				for _, filter := range self.filters { | 				for _, filter := range self.filters { | ||||||
| 					if filter.BlockCallback != nil { | 					if filter.BlockCallback != nil { | ||||||
| 						filter.BlockCallback(event.Block) | 						filter.BlockCallback(event.Block, event.Logs) | ||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
| 				self.filterMu.RUnlock() | 				self.filterMu.RUnlock() | ||||||
| @ -86,7 +86,7 @@ out: | |||||||
| 				self.filterMu.RLock() | 				self.filterMu.RLock() | ||||||
| 				for _, filter := range self.filters { | 				for _, filter := range self.filters { | ||||||
| 					if filter.PendingCallback != nil { | 					if filter.PendingCallback != nil { | ||||||
| 						filter.PendingCallback(event.Block) | 						filter.PendingCallback(event.Block, event.Logs) | ||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
| 				self.filterMu.RUnlock() | 				self.filterMu.RUnlock() | ||||||
|  | |||||||
							
								
								
									
										15
									
								
								rpc/api.go
									
									
									
									
									
								
							
							
						
						
									
										15
									
								
								rpc/api.go
									
									
									
									
									
								
							| @ -86,7 +86,7 @@ func (self *EthereumApi) getStateWithNum(num int64) *xeth.State { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *EthereumApi) start() { | func (self *EthereumApi) start() { | ||||||
| 	timer := time.NewTicker(filterTickerTime) | 	timer := time.NewTicker(2 * time.Second) | ||||||
| done: | done: | ||||||
| 	for { | 	for { | ||||||
| 		select { | 		select { | ||||||
| @ -94,20 +94,20 @@ done: | |||||||
| 			self.logMut.Lock() | 			self.logMut.Lock() | ||||||
| 			self.messagesMut.Lock() | 			self.messagesMut.Lock() | ||||||
| 			for id, filter := range self.logs { | 			for id, filter := range self.logs { | ||||||
| 				if time.Since(filter.timeout) > 20*time.Second { | 				if time.Since(filter.timeout) > filterTickerTime { | ||||||
| 					self.filterManager.UninstallFilter(id) | 					self.filterManager.UninstallFilter(id) | ||||||
| 					delete(self.logs, id) | 					delete(self.logs, id) | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 
 | 
 | ||||||
| 			for id, filter := range self.messages { | 			for id, filter := range self.messages { | ||||||
| 				if time.Since(filter.timeout) > 20*time.Second { | 				if time.Since(filter.timeout) > filterTickerTime { | ||||||
| 					self.xeth().Whisper().Unwatch(id) | 					self.xeth().Whisper().Unwatch(id) | ||||||
| 					delete(self.messages, id) | 					delete(self.messages, id) | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 			self.logMut.Unlock() |  | ||||||
| 			self.messagesMut.Unlock() | 			self.messagesMut.Unlock() | ||||||
|  | 			self.logMut.Unlock() | ||||||
| 		case <-self.quit: | 		case <-self.quit: | ||||||
| 			break done | 			break done | ||||||
| 		} | 		} | ||||||
| @ -180,10 +180,13 @@ func (self *EthereumApi) NewFilterString(args *FilterStringArgs, reply *interfac | |||||||
| 	var id int | 	var id int | ||||||
| 	filter := core.NewFilter(self.xeth().Backend()) | 	filter := core.NewFilter(self.xeth().Backend()) | ||||||
| 
 | 
 | ||||||
| 	callback := func(block *types.Block) { | 	callback := func(block *types.Block, logs state.Logs) { | ||||||
| 		self.logMut.Lock() | 		self.logMut.Lock() | ||||||
| 		defer self.logMut.Unlock() | 		defer self.logMut.Unlock() | ||||||
| 
 | 
 | ||||||
|  | 		for _, log := range logs { | ||||||
|  | 			self.logs[id].add(log) | ||||||
|  | 		} | ||||||
| 		self.logs[id].add(&state.StateLog{}) | 		self.logs[id].add(&state.StateLog{}) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| @ -483,7 +486,7 @@ func (p *EthereumApi) GetBlockUncleCountByNumber(blocknum int64) (int64, error) | |||||||
| 
 | 
 | ||||||
| func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error { | func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error { | ||||||
| 	// Spec at https://github.com/ethereum/wiki/wiki/Generic-JSON-RPC
 | 	// Spec at https://github.com/ethereum/wiki/wiki/Generic-JSON-RPC
 | ||||||
| 	rpclogger.Debugf("%s %s", req.Method, req.Params) | 	rpclogger.Infof("%s %s", req.Method, req.Params) | ||||||
| 	switch req.Method { | 	switch req.Method { | ||||||
| 	case "web3_sha3": | 	case "web3_sha3": | ||||||
| 		args := new(Sha3Args) | 		args := new(Sha3Args) | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user