WIP: add rpc pubsub interface for account diffs

This commit is contained in:
Rob Mulholand 2019-09-11 16:03:30 -05:00
parent 39b0b1a1a6
commit 9b0042f925
19 changed files with 332 additions and 194 deletions

View File

@ -517,6 +517,10 @@ func (fb *filterBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscr
return fb.bc.SubscribeLogsEvent(ch)
}
func (fb *filterBackend) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription {
return fb.bc.SubscribeStateDiffEvent(ch)
}
func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 }
func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) {
panic("not supported")

View File

@ -138,15 +138,16 @@ type BlockChain struct {
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block
hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
stateDiffsFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block
chainmu sync.RWMutex // blockchain insertion lock
@ -1436,11 +1437,11 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// Pre-checks passed, start the full block imports
bc.wg.Add(1)
bc.chainmu.Lock()
n, events, logs, err := bc.insertChain(chain, true)
n, events, logs, stateDiffs, err := bc.insertChain(chain, true)
bc.chainmu.Unlock()
bc.wg.Done()
bc.PostChainEvents(events, logs)
bc.PostChainEvents(events, logs, stateDiffs)
return n, err
}
@ -1452,10 +1453,10 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// racey behaviour. If a sidechain import is in progress, and the historic state
// is imported, but then new canon-head is added before the actual sidechain
// completes, then the historic state could be pruned again
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, map[common.Address]state.Account, error) {
// If the chain is terminating, don't even bother starting up
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
return 0, nil, nil, nil
return 0, nil, nil, nil, nil
}
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)
@ -1468,6 +1469,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
events = make([]interface{}, 0, len(chain))
lastCanon *types.Block
coalescedLogs []*types.Log
stateDiffs map[common.Address]state.Account
)
// Start the parallel header verifier
headers := make([]*types.Header, len(chain))
@ -1518,7 +1520,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
for block != nil && err == ErrKnownBlock {
log.Debug("Writing previously known block", "number", block.Number(), "hash", block.Hash())
if err := bc.writeKnownBlock(block); err != nil {
return it.index, nil, nil, err
return it.index, nil, nil, nil, err
}
lastCanon = block
@ -1537,7 +1539,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
for block != nil && (it.index == 0 || err == consensus.ErrUnknownAncestor) {
log.Debug("Future block, postponing import", "number", block.Number(), "hash", block.Hash())
if err := bc.addFutureBlock(block); err != nil {
return it.index, events, coalescedLogs, err
return it.index, events, coalescedLogs, stateDiffs, err
}
block, err = it.next()
}
@ -1545,13 +1547,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
stats.ignored += it.remaining()
// If there are any still remaining, mark as ignored
return it.index, events, coalescedLogs, err
return it.index, events, coalescedLogs, stateDiffs, err
// Some other error occurred, abort
case err != nil:
stats.ignored += len(it.chain)
bc.reportBlock(block, nil, err)
return it.index, events, coalescedLogs, err
return it.index, events, coalescedLogs, stateDiffs, err
}
// No validation errors for the first block (or chain prefix skipped)
for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() {
@ -1563,7 +1565,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
// If the header is a banned one, straight out abort
if BadHashes[block.Hash()] {
bc.reportBlock(block, nil, ErrBlacklistedHash)
return it.index, events, coalescedLogs, ErrBlacklistedHash
return it.index, events, coalescedLogs, stateDiffs, ErrBlacklistedHash
}
// If the block is known (in the middle of the chain), it's a special case for
// Clique blocks where they can share state among each other, so importing an
@ -1580,7 +1582,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
"root", block.Root())
if err := bc.writeKnownBlock(block); err != nil {
return it.index, nil, nil, err
return it.index, nil, nil, stateDiffs, err
}
stats.processed++
@ -1600,7 +1602,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
}
statedb, err := state.New(parent.Root, bc.stateCache)
if err != nil {
return it.index, events, coalescedLogs, err
return it.index, events, coalescedLogs, stateDiffs, err
}
// If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
@ -1621,11 +1623,11 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
}
// Process block using the parent state as reference point
substart := time.Now()
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
receipts, logs, processedStateDiffs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
if err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
return it.index, events, coalescedLogs, err
return it.index, events, coalescedLogs, stateDiffs, err
}
// Update the metrics touched during block processing
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
@ -1644,7 +1646,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
return it.index, events, coalescedLogs, err
return it.index, events, coalescedLogs, stateDiffs, err
}
proctime := time.Since(start)
@ -1659,7 +1661,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
status, err := bc.writeBlockWithState(block, receipts, statedb)
if err != nil {
atomic.StoreUint32(&followupInterrupt, 1)
return it.index, events, coalescedLogs, err
return it.index, events, coalescedLogs, stateDiffs, err
}
atomic.StoreUint32(&followupInterrupt, 1)
@ -1678,6 +1680,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
"root", block.Root())
coalescedLogs = append(coalescedLogs, logs...)
stateDiffs = processedStateDiffs
events = append(events, ChainEvent{block, block.Hash(), logs})
lastCanon = block
@ -1708,13 +1711,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
// Any blocks remaining here? The only ones we care about are the future ones
if block != nil && err == consensus.ErrFutureBlock {
if err := bc.addFutureBlock(block); err != nil {
return it.index, events, coalescedLogs, err
return it.index, events, coalescedLogs, stateDiffs, err
}
block, err = it.next()
for ; block != nil && err == consensus.ErrUnknownAncestor; block, err = it.next() {
if err := bc.addFutureBlock(block); err != nil {
return it.index, events, coalescedLogs, err
return it.index, events, coalescedLogs, stateDiffs, err
}
stats.queued++
}
@ -1725,7 +1728,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
events = append(events, ChainHeadEvent{lastCanon})
}
return it.index, events, coalescedLogs, err
return it.index, events, coalescedLogs, stateDiffs, err
}
// insertSideChain is called when an import batch hits upon a pruned ancestor
@ -1734,7 +1737,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
//
// The method writes all (header-and-body-valid) blocks to disk, then tries to
// switch over to the new chain if the TD exceeded the current chain.
func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, []interface{}, []*types.Log, error) {
func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, []interface{}, []*types.Log, map[common.Address]state.Account, error) {
var (
externTd *big.Int
current = bc.CurrentBlock()
@ -1770,7 +1773,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
// If someone legitimately side-mines blocks, they would still be imported as usual. However,
// we cannot risk writing unverified blocks to disk when they obviously target the pruning
// mechanism.
return it.index, nil, nil, errors.New("sidechain ghost-state attack")
return it.index, nil, nil, nil, errors.New("sidechain ghost-state attack")
}
}
if externTd == nil {
@ -1781,7 +1784,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
if !bc.HasBlock(block.Hash(), block.NumberU64()) {
start := time.Now()
if err := bc.writeBlockWithoutState(block, externTd); err != nil {
return it.index, nil, nil, err
return it.index, nil, nil, nil, err
}
log.Debug("Injected sidechain block", "number", block.Number(), "hash", block.Hash(),
"diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
@ -1798,7 +1801,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
localTd := bc.GetTd(current.Hash(), current.NumberU64())
if localTd.Cmp(externTd) > 0 {
log.Info("Sidechain written to disk", "start", it.first().NumberU64(), "end", it.previous().Number, "sidetd", externTd, "localtd", localTd)
return it.index, nil, nil, err
return it.index, nil, nil, nil, err
}
// Gather all the sidechain hashes (full blocks may be memory heavy)
var (
@ -1813,7 +1816,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
parent = bc.GetHeader(parent.ParentHash, parent.Number.Uint64()-1)
}
if parent == nil {
return it.index, nil, nil, errors.New("missing parent")
return it.index, nil, nil, nil, errors.New("missing parent")
}
// Import all the pruned blocks to make the state available
var (
@ -1832,15 +1835,15 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
// memory here.
if len(blocks) >= 2048 || memory > 64*1024*1024 {
log.Info("Importing heavy sidechain segment", "blocks", len(blocks), "start", blocks[0].NumberU64(), "end", block.NumberU64())
if _, _, _, err := bc.insertChain(blocks, false); err != nil {
return 0, nil, nil, err
if _, _, _, _, err := bc.insertChain(blocks, false); err != nil {
return 0, nil, nil, nil, err
}
blocks, memory = blocks[:0], 0
// If the chain is terminating, stop processing blocks
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
log.Debug("Premature abort during blocks processing")
return 0, nil, nil, nil
return 0, nil, nil, nil, nil
}
}
}
@ -1848,7 +1851,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
log.Info("Importing sidechain segment", "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64())
return bc.insertChain(blocks, false)
}
return 0, nil, nil, nil
return 0, nil, nil, nil, nil
}
// reorg takes two blocks, an old chain and a new chain and will reconstruct the
@ -2003,11 +2006,15 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// PostChainEvents iterates over the events generated by a chain insertion and
// posts them into the event feed.
// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log, stateDiffs map[common.Address]state.Account) {
// post event logs for further processing
if logs != nil {
bc.logsFeed.Send(logs)
}
// post state diffs for further processing
if stateDiffs != nil {
bc.stateDiffsFeed.Send(stateDiffs)
}
for _, event := range events {
switch ev := event.(type) {
case ChainEvent:
@ -2215,3 +2222,7 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript
func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription {
return bc.scope.Track(bc.blockProcFeed.Subscribe(ch))
}
func (bc *BlockChain) SubscribeStateDiffEvent(ch chan<- map[common.Address]state.Account) event.Subscription {
return bc.scope.Track(bc.stateDiffsFeed.Subscribe(ch))
}

View File

@ -147,7 +147,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
if err != nil {
return err
}
receipts, _, usedGas, err := blockchain.processor.Process(block, statedb, vm.Config{})
receipts, _, _, usedGas, err := blockchain.processor.Process(block, statedb, vm.Config{})
if err != nil {
blockchain.reportBlock(block, receipts, err)
return err

View File

@ -685,6 +685,23 @@ func (s *StateDB) clearJournalAndRefund() {
s.refund = 0
}
func (s *StateDB) GetDirtyAccounts() map[common.Address]Account {
for addr := range s.journal.dirties {
s.stateObjectsDirty[addr] = struct{}{}
}
results := make(map[common.Address]Account)
for addr, stateObject := range s.stateObjects {
_, isDirty := s.stateObjectsDirty[addr]
if isDirty {
results[addr] = stateObject.data
}
}
return results
}
// Commit writes the state to the underlying in-memory trie database.
func (s *StateDB) Commit(deleteEmptyObjects bool) (root common.Hash, err error) {
defer s.clearJournalAndRefund()

View File

@ -53,7 +53,7 @@ func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consen
// Process returns the receipts and logs accumulated during the process and
// returns the amount of gas that was used in the process. If any of the
// transactions failed to execute due to insufficient gas it will return an error.
func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) {
func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, map[common.Address]state.Account, uint64, error) {
var (
receipts types.Receipts
usedGas = new(uint64)
@ -70,7 +70,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
statedb.Prepare(tx.Hash(), block.Hash(), i)
receipt, _, err := ApplyTransaction(p.config, p.bc, nil, gp, statedb, header, tx, usedGas, cfg)
if err != nil {
return nil, nil, 0, err
return nil, nil, nil, 0, err
}
receipts = append(receipts, receipt)
allLogs = append(allLogs, receipt.Logs...)
@ -78,7 +78,9 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles())
return receipts, allLogs, *usedGas, nil
stateDiffs := statedb.GetDirtyAccounts()
return receipts, allLogs, stateDiffs, *usedGas, nil
}
// ApplyTransaction attempts to apply a transaction to the given state database

View File

@ -17,6 +17,7 @@
package core
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
@ -47,5 +48,5 @@ type Processor interface {
// Process processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb and applying any rewards to both
// the processor (coinbase) and any included uncles.
Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error)
Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, map[common.Address]state.Account, uint64, error)
}

View File

@ -159,6 +159,10 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri
return b.eth.BlockChain().SubscribeLogsEvent(ch)
}
func (b *EthAPIBackend) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription {
return b.eth.BlockChain().SubscribeStateDiffEvent(ch)
}
func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
return b.eth.txPool.AddLocal(signedTx)
}

View File

@ -289,7 +289,7 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl
traced += uint64(len(txs))
}
// Generate the next state snapshot fast without tracing
_, _, _, err := api.eth.blockchain.Processor().Process(block, statedb, vm.Config{})
_, _, _, _, err := api.eth.blockchain.Processor().Process(block, statedb, vm.Config{})
if err != nil {
failed = err
break
@ -676,7 +676,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (*
if block = api.eth.blockchain.GetBlockByNumber(block.NumberU64() + 1); block == nil {
return nil, fmt.Errorf("block #%d not found", block.NumberU64()+1)
}
_, _, _, err := api.eth.blockchain.Processor().Process(block, statedb, vm.Config{})
_, _, _, _, err := api.eth.blockchain.Processor().Process(block, statedb, vm.Config{})
if err != nil {
return nil, fmt.Errorf("processing block %d failed: %v", block.NumberU64(), err)
}

View File

@ -28,6 +28,7 @@ import (
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@ -233,6 +234,36 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er
return rpcSub, nil
}
// NewStateDiffs sent a notification each time an account changes in a block.
func (api *PublicFilterAPI) NewStateDiffs(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()
go func() {
stateDiffs := make(chan map[common.Address]state.Account)
stateDiffsSub := api.events.SubscribeStateDiffs(stateDiffs)
for {
select {
case d := <-stateDiffs:
notifier.Notify(rpcSub.ID, d)
case <-rpcSub.Err():
stateDiffsSub.Unsubscribe()
return
case <-notifier.Closed():
stateDiffsSub.Unsubscribe()
return
}
}
}()
return rpcSub, nil
}
// Logs creates a subscription that fires for all new log that match the given filter criteria.
func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)

View File

@ -129,7 +129,7 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) {
if i%20 == 0 {
db.Close()
db, _ = rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "")
backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
}
var addr common.Address
addr[0] = byte(i)
@ -174,7 +174,7 @@ func BenchmarkNoBloomBits(b *testing.B) {
b.Log("Running filter benchmarks...")
start := time.Now()
mux := new(event.TypeMux)
backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
filter := NewRangeFilter(backend, 0, int64(*headNum), []common.Address{{}}, nil)
filter.Logs(context.Background())
d := time.Since(start)

View File

@ -24,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@ -42,6 +43,7 @@ type Backend interface {
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription
BloomStatus() (uint64, uint64)
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)

View File

@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
@ -55,6 +56,8 @@ const (
BlocksSubscription
// LastSubscription keeps track of the last index
LastIndexSubscription
// StateDiffsSubscription queries for new account changes
StateDiffsSubscription
)
const (
@ -68,6 +71,8 @@ const (
logsChanSize = 10
// chainEvChanSize is the size of channel listening to ChainEvent.
chainEvChanSize = 10
// stateDiffChanSize is the size of channel listening to StateDiffEvent.
stateDiffChanSize = 10
)
var (
@ -75,15 +80,16 @@ var (
)
type subscription struct {
id rpc.ID
typ Type
created time.Time
logsCrit ethereum.FilterQuery
logs chan []*types.Log
hashes chan []common.Hash
headers chan *types.Header
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
id rpc.ID
typ Type
created time.Time
logsCrit ethereum.FilterQuery
logs chan []*types.Log
hashes chan []common.Hash
headers chan *types.Header
stateDiffs chan map[common.Address]state.Account
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
}
// EventSystem creates subscriptions, processes events and broadcasts them to the
@ -100,14 +106,16 @@ type EventSystem struct {
rmLogsSub event.Subscription // Subscription for removed log event
chainSub event.Subscription // Subscription for new chain event
pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event
stateDiffsSub event.Subscription // Subscription for new state diff events
// Channels
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
logsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
logsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
stateDiffCh chan map[common.Address]state.Account // Channel to receive new state diff events
}
// NewEventSystem creates a new manager that listens for event on the given mux,
@ -118,15 +126,16 @@ type EventSystem struct {
// or by stopping the given mux.
func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem {
m := &EventSystem{
mux: mux,
backend: backend,
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
txsCh: make(chan core.NewTxsEvent, txChanSize),
logsCh: make(chan []*types.Log, logsChanSize),
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
mux: mux,
backend: backend,
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
txsCh: make(chan core.NewTxsEvent, txChanSize),
logsCh: make(chan []*types.Log, logsChanSize),
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
stateDiffCh: make(chan map[common.Address]state.Account, stateDiffChanSize),
}
// Subscribe events
@ -134,6 +143,7 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
m.stateDiffsSub = m.backend.SubscribeStateDiffs(m.stateDiffCh)
// TODO(rjl493456442): use feed to subscribe pending log event
m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})
@ -175,6 +185,7 @@ func (sub *Subscription) Unsubscribe() {
case <-sub.f.logs:
case <-sub.f.hashes:
case <-sub.f.headers:
case <-sub.f.stateDiffs:
}
}
@ -252,15 +263,16 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs
// given criteria to the given logs channel.
func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: LogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: LogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
stateDiffs: make(chan map[common.Address]state.Account),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
@ -269,15 +281,32 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
// transactions that enter the transaction pool.
func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: PendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
stateDiffs: make(chan map[common.Address]state.Account),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
// SubscribeStateDiffs
func (es *EventSystem) SubscribeStateDiffs(diffs chan map[common.Address]state.Account) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: StateDiffsSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
stateDiffs: diffs,
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
@ -286,14 +315,15 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan
// imported in the chain.
func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: BlocksSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: headers,
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: BlocksSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: headers,
stateDiffs: make(chan map[common.Address]state.Account),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
@ -302,14 +332,15 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
// transactions that enter the transaction pool.
func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: hashes,
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: hashes,
headers: make(chan *types.Header),
stateDiffs: make(chan map[common.Address]state.Account),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
@ -368,6 +399,10 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
}
})
}
case map[common.Address]state.Account:
for _, f := range filters[StateDiffsSubscription] {
f.stateDiffs <- e
}
}
}
@ -453,6 +488,7 @@ func (es *EventSystem) eventLoop() {
es.logsSub.Unsubscribe()
es.rmLogsSub.Unsubscribe()
es.chainSub.Unsubscribe()
es.stateDiffsSub.Unsubscribe()
}()
index := make(filterIndex)
@ -471,6 +507,8 @@ func (es *EventSystem) eventLoop() {
es.broadcast(index, ev)
case ev := <-es.chainCh:
es.broadcast(index, ev)
case ev := <-es.stateDiffCh:
es.broadcast(index, ev)
case ev, active := <-es.pendingLogSub.Chan():
if !active { // system stopped
return
@ -506,6 +544,8 @@ func (es *EventSystem) eventLoop() {
return
case <-es.chainSub.Err():
return
case <-es.stateDiffsSub.Err():
return
}
}
}

View File

@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@ -39,13 +40,14 @@ import (
)
type testBackend struct {
mux *event.TypeMux
db ethdb.Database
sections uint64
txFeed *event.Feed
rmLogsFeed *event.Feed
logsFeed *event.Feed
chainFeed *event.Feed
mux *event.TypeMux
db ethdb.Database
sections uint64
txFeed *event.Feed
rmLogsFeed *event.Feed
logsFeed *event.Feed
chainFeed *event.Feed
stateDiffsFeed *event.Feed
}
func (b *testBackend) ChainDb() ethdb.Database {
@ -120,6 +122,10 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc
return b.chainFeed.Subscribe(ch)
}
func (b *testBackend) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription {
return b.stateDiffsFeed.Subscribe(ch)
}
func (b *testBackend) BloomStatus() (uint64, uint64) {
return params.BloomBitsBlocks, b.sections
}
@ -160,17 +166,18 @@ func TestBlockSubscription(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
genesis = new(core.Genesis).MustCommit(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {})
chainEvents = []core.ChainEvent{}
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
stateDiffsFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
api = NewPublicFilterAPI(backend, false)
genesis = new(core.Genesis).MustCommit(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {})
chainEvents = []core.ChainEvent{}
)
for _, blk := range chain {
@ -217,14 +224,15 @@ func TestPendingTxFilter(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
stateDiffsFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
api = NewPublicFilterAPI(backend, false)
transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
@ -277,14 +285,15 @@ func TestPendingTxFilter(t *testing.T) {
// If not it must return an error.
func TestLogFilterCreation(t *testing.T) {
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
stateDiffsFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
api = NewPublicFilterAPI(backend, false)
testCases = []struct {
crit FilterCriteria
@ -326,14 +335,15 @@ func TestInvalidLogFilterCreation(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
stateDiffsFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
api = NewPublicFilterAPI(backend, false)
)
// different situations where log filter creation should fail.
@ -353,15 +363,16 @@ func TestInvalidLogFilterCreation(t *testing.T) {
func TestInvalidGetLogsRequest(t *testing.T) {
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
stateDiffsFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
api = NewPublicFilterAPI(backend, false)
blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
)
// Reason: Cannot specify both BlockHash and FromBlock/ToBlock)
@ -383,14 +394,15 @@ func TestLogFilter(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
stateDiffsFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@ -502,14 +514,15 @@ func TestPendingLogsSubscription(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
stateDiffsFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")

View File

@ -50,18 +50,19 @@ func BenchmarkFilters(b *testing.B) {
defer os.RemoveAll(dir)
var (
db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "")
mux = new(event.TypeMux)
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = common.BytesToAddress([]byte("jeff"))
addr3 = common.BytesToAddress([]byte("ethereum"))
addr4 = common.BytesToAddress([]byte("random addresses please"))
db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "")
mux = new(event.TypeMux)
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
stateDiffsFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = common.BytesToAddress([]byte("jeff"))
addr3 = common.BytesToAddress([]byte("ethereum"))
addr4 = common.BytesToAddress([]byte("random addresses please"))
)
defer db.Close()
@ -109,15 +110,16 @@ func TestFilters(t *testing.T) {
defer os.RemoveAll(dir)
var (
db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "")
mux = new(event.TypeMux)
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr = crypto.PubkeyToAddress(key1.PublicKey)
db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "")
mux = new(event.TypeMux)
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
stateDiffsFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr = crypto.PubkeyToAddress(key1.PublicKey)
hash1 = common.BytesToHash([]byte("topic1"))
hash2 = common.BytesToHash([]byte("topic2"))

View File

@ -78,6 +78,7 @@ type Backend interface {
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription
ChainConfig() *params.ChainConfig
CurrentBlock() *types.Block

View File

@ -172,6 +172,10 @@ func (b *LesApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEven
return b.eth.blockchain.SubscribeRemovedLogsEvent(ch)
}
func (b *LesApiBackend) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription {
return b.eth.blockchain.SubscribeStateDiffs(ch)
}
func (b *LesApiBackend) Downloader() *downloader.Downloader {
return b.eth.Downloader()
}

View File

@ -531,6 +531,12 @@ func (lc *LightChain) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent)
return lc.scope.Track(new(event.Feed).Subscribe(ch))
}
// SubscribeStateDiffs implements the interface of filters.Backend
// LightChain does not sen state diffs, so return an empty subscription.
func (lc *LightChain) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription {
return lc.scope.Track(new(event.Feed).Subscribe(ch))
}
// DisableCheckFreq disables header validation. This is used for ultralight mode.
func (lc *LightChain) DisableCheckFreq() {
atomic.StoreInt32(&lc.disableCheckFreq, 1)

View File

@ -608,7 +608,7 @@ func (w *worker) resultLoop() {
case core.SideStatTy:
events = append(events, core.ChainSideEvent{Block: block})
}
w.chain.PostChainEvents(events, logs)
w.chain.PostChainEvents(events, logs, task.state.GetDirtyAccounts())
// Insert the block into the set of pending ones to resultLoop for confirmations
w.unconfirmed.Insert(block.NumberU64(), block.Hash())

View File

@ -134,7 +134,7 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine
func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain }
func (b *testWorkerBackend) TxPool() *core.TxPool { return b.txPool }
func (b *testWorkerBackend) PostChainEvents(events []interface{}) {
b.chain.PostChainEvents(events, nil)
b.chain.PostChainEvents(events, nil, nil)
}
func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, blocks int) (*worker, *testWorkerBackend) {