diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index e30572e0a..de25a5cb8 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -517,6 +517,10 @@ func (fb *filterBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscr return fb.bc.SubscribeLogsEvent(ch) } +func (fb *filterBackend) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription { + return fb.bc.SubscribeStateDiffEvent(ch) +} + func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 } func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) { panic("not supported") diff --git a/core/blockchain.go b/core/blockchain.go index 833de3bc7..2594ba955 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -138,15 +138,16 @@ type BlockChain struct { triegc *prque.Prque // Priority queue mapping block numbers to tries to gc gcproc time.Duration // Accumulates canonical block processing for trie dumping - hc *HeaderChain - rmLogsFeed event.Feed - chainFeed event.Feed - chainSideFeed event.Feed - chainHeadFeed event.Feed - logsFeed event.Feed - blockProcFeed event.Feed - scope event.SubscriptionScope - genesisBlock *types.Block + hc *HeaderChain + rmLogsFeed event.Feed + chainFeed event.Feed + chainSideFeed event.Feed + chainHeadFeed event.Feed + logsFeed event.Feed + blockProcFeed event.Feed + stateDiffsFeed event.Feed + scope event.SubscriptionScope + genesisBlock *types.Block chainmu sync.RWMutex // blockchain insertion lock @@ -1436,11 +1437,11 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { // Pre-checks passed, start the full block imports bc.wg.Add(1) bc.chainmu.Lock() - n, events, logs, err := bc.insertChain(chain, true) + n, events, logs, stateDiffs, err := bc.insertChain(chain, true) bc.chainmu.Unlock() bc.wg.Done() - bc.PostChainEvents(events, logs) + bc.PostChainEvents(events, logs, stateDiffs) return n, err } @@ -1452,10 +1453,10 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { // racey behaviour. If a sidechain import is in progress, and the historic state // is imported, but then new canon-head is added before the actual sidechain // completes, then the historic state could be pruned again -func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) { +func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, map[common.Address]state.Account, error) { // If the chain is terminating, don't even bother starting up if atomic.LoadInt32(&bc.procInterrupt) == 1 { - return 0, nil, nil, nil + return 0, nil, nil, nil, nil } // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) @@ -1468,6 +1469,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] events = make([]interface{}, 0, len(chain)) lastCanon *types.Block coalescedLogs []*types.Log + stateDiffs map[common.Address]state.Account ) // Start the parallel header verifier headers := make([]*types.Header, len(chain)) @@ -1518,7 +1520,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] for block != nil && err == ErrKnownBlock { log.Debug("Writing previously known block", "number", block.Number(), "hash", block.Hash()) if err := bc.writeKnownBlock(block); err != nil { - return it.index, nil, nil, err + return it.index, nil, nil, nil, err } lastCanon = block @@ -1537,7 +1539,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] for block != nil && (it.index == 0 || err == consensus.ErrUnknownAncestor) { log.Debug("Future block, postponing import", "number", block.Number(), "hash", block.Hash()) if err := bc.addFutureBlock(block); err != nil { - return it.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, stateDiffs, err } block, err = it.next() } @@ -1545,13 +1547,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] stats.ignored += it.remaining() // If there are any still remaining, mark as ignored - return it.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, stateDiffs, err // Some other error occurred, abort case err != nil: stats.ignored += len(it.chain) bc.reportBlock(block, nil, err) - return it.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, stateDiffs, err } // No validation errors for the first block (or chain prefix skipped) for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() { @@ -1563,7 +1565,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] // If the header is a banned one, straight out abort if BadHashes[block.Hash()] { bc.reportBlock(block, nil, ErrBlacklistedHash) - return it.index, events, coalescedLogs, ErrBlacklistedHash + return it.index, events, coalescedLogs, stateDiffs, ErrBlacklistedHash } // If the block is known (in the middle of the chain), it's a special case for // Clique blocks where they can share state among each other, so importing an @@ -1580,7 +1582,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] "root", block.Root()) if err := bc.writeKnownBlock(block); err != nil { - return it.index, nil, nil, err + return it.index, nil, nil, stateDiffs, err } stats.processed++ @@ -1600,7 +1602,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] } statedb, err := state.New(parent.Root, bc.stateCache) if err != nil { - return it.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, stateDiffs, err } // If we have a followup block, run that against the current state to pre-cache // transactions and probabilistically some of the account/storage trie nodes. @@ -1621,11 +1623,11 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] } // Process block using the parent state as reference point substart := time.Now() - receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) + receipts, logs, processedStateDiffs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) if err != nil { bc.reportBlock(block, receipts, err) atomic.StoreUint32(&followupInterrupt, 1) - return it.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, stateDiffs, err } // Update the metrics touched during block processing accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them @@ -1644,7 +1646,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { bc.reportBlock(block, receipts, err) atomic.StoreUint32(&followupInterrupt, 1) - return it.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, stateDiffs, err } proctime := time.Since(start) @@ -1659,7 +1661,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] status, err := bc.writeBlockWithState(block, receipts, statedb) if err != nil { atomic.StoreUint32(&followupInterrupt, 1) - return it.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, stateDiffs, err } atomic.StoreUint32(&followupInterrupt, 1) @@ -1678,6 +1680,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] "root", block.Root()) coalescedLogs = append(coalescedLogs, logs...) + stateDiffs = processedStateDiffs events = append(events, ChainEvent{block, block.Hash(), logs}) lastCanon = block @@ -1708,13 +1711,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] // Any blocks remaining here? The only ones we care about are the future ones if block != nil && err == consensus.ErrFutureBlock { if err := bc.addFutureBlock(block); err != nil { - return it.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, stateDiffs, err } block, err = it.next() for ; block != nil && err == consensus.ErrUnknownAncestor; block, err = it.next() { if err := bc.addFutureBlock(block); err != nil { - return it.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, stateDiffs, err } stats.queued++ } @@ -1725,7 +1728,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { events = append(events, ChainHeadEvent{lastCanon}) } - return it.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, stateDiffs, err } // insertSideChain is called when an import batch hits upon a pruned ancestor @@ -1734,7 +1737,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] // // The method writes all (header-and-body-valid) blocks to disk, then tries to // switch over to the new chain if the TD exceeded the current chain. -func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, []interface{}, []*types.Log, error) { +func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, []interface{}, []*types.Log, map[common.Address]state.Account, error) { var ( externTd *big.Int current = bc.CurrentBlock() @@ -1770,7 +1773,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i // If someone legitimately side-mines blocks, they would still be imported as usual. However, // we cannot risk writing unverified blocks to disk when they obviously target the pruning // mechanism. - return it.index, nil, nil, errors.New("sidechain ghost-state attack") + return it.index, nil, nil, nil, errors.New("sidechain ghost-state attack") } } if externTd == nil { @@ -1781,7 +1784,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i if !bc.HasBlock(block.Hash(), block.NumberU64()) { start := time.Now() if err := bc.writeBlockWithoutState(block, externTd); err != nil { - return it.index, nil, nil, err + return it.index, nil, nil, nil, err } log.Debug("Injected sidechain block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)), @@ -1798,7 +1801,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i localTd := bc.GetTd(current.Hash(), current.NumberU64()) if localTd.Cmp(externTd) > 0 { log.Info("Sidechain written to disk", "start", it.first().NumberU64(), "end", it.previous().Number, "sidetd", externTd, "localtd", localTd) - return it.index, nil, nil, err + return it.index, nil, nil, nil, err } // Gather all the sidechain hashes (full blocks may be memory heavy) var ( @@ -1813,7 +1816,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i parent = bc.GetHeader(parent.ParentHash, parent.Number.Uint64()-1) } if parent == nil { - return it.index, nil, nil, errors.New("missing parent") + return it.index, nil, nil, nil, errors.New("missing parent") } // Import all the pruned blocks to make the state available var ( @@ -1832,15 +1835,15 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i // memory here. if len(blocks) >= 2048 || memory > 64*1024*1024 { log.Info("Importing heavy sidechain segment", "blocks", len(blocks), "start", blocks[0].NumberU64(), "end", block.NumberU64()) - if _, _, _, err := bc.insertChain(blocks, false); err != nil { - return 0, nil, nil, err + if _, _, _, _, err := bc.insertChain(blocks, false); err != nil { + return 0, nil, nil, nil, err } blocks, memory = blocks[:0], 0 // If the chain is terminating, stop processing blocks if atomic.LoadInt32(&bc.procInterrupt) == 1 { log.Debug("Premature abort during blocks processing") - return 0, nil, nil, nil + return 0, nil, nil, nil, nil } } } @@ -1848,7 +1851,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i log.Info("Importing sidechain segment", "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64()) return bc.insertChain(blocks, false) } - return 0, nil, nil, nil + return 0, nil, nil, nil, nil } // reorg takes two blocks, an old chain and a new chain and will reconstruct the @@ -2003,11 +2006,15 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // PostChainEvents iterates over the events generated by a chain insertion and // posts them into the event feed. // TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock. -func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) { +func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log, stateDiffs map[common.Address]state.Account) { // post event logs for further processing if logs != nil { bc.logsFeed.Send(logs) } + // post state diffs for further processing + if stateDiffs != nil { + bc.stateDiffsFeed.Send(stateDiffs) + } for _, event := range events { switch ev := event.(type) { case ChainEvent: @@ -2215,3 +2222,7 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription { return bc.scope.Track(bc.blockProcFeed.Subscribe(ch)) } + +func (bc *BlockChain) SubscribeStateDiffEvent(ch chan<- map[common.Address]state.Account) event.Subscription { + return bc.scope.Track(bc.stateDiffsFeed.Subscribe(ch)) +} diff --git a/core/blockchain_test.go b/core/blockchain_test.go index db624c4dc..965e356cf 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -147,7 +147,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { if err != nil { return err } - receipts, _, usedGas, err := blockchain.processor.Process(block, statedb, vm.Config{}) + receipts, _, _, usedGas, err := blockchain.processor.Process(block, statedb, vm.Config{}) if err != nil { blockchain.reportBlock(block, receipts, err) return err diff --git a/core/state/statedb.go b/core/state/statedb.go index b07f08fd2..7efe1c162 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -685,6 +685,23 @@ func (s *StateDB) clearJournalAndRefund() { s.refund = 0 } +func (s *StateDB) GetDirtyAccounts() map[common.Address]Account { + for addr := range s.journal.dirties { + s.stateObjectsDirty[addr] = struct{}{} + } + + results := make(map[common.Address]Account) + + for addr, stateObject := range s.stateObjects { + _, isDirty := s.stateObjectsDirty[addr] + if isDirty { + results[addr] = stateObject.data + } + } + + return results +} + // Commit writes the state to the underlying in-memory trie database. func (s *StateDB) Commit(deleteEmptyObjects bool) (root common.Hash, err error) { defer s.clearJournalAndRefund() diff --git a/core/state_processor.go b/core/state_processor.go index bed6a0730..18860eb2b 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -53,7 +53,7 @@ func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consen // Process returns the receipts and logs accumulated during the process and // returns the amount of gas that was used in the process. If any of the // transactions failed to execute due to insufficient gas it will return an error. -func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) { +func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, map[common.Address]state.Account, uint64, error) { var ( receipts types.Receipts usedGas = new(uint64) @@ -70,7 +70,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg statedb.Prepare(tx.Hash(), block.Hash(), i) receipt, _, err := ApplyTransaction(p.config, p.bc, nil, gp, statedb, header, tx, usedGas, cfg) if err != nil { - return nil, nil, 0, err + return nil, nil, nil, 0, err } receipts = append(receipts, receipt) allLogs = append(allLogs, receipt.Logs...) @@ -78,7 +78,9 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles()) - return receipts, allLogs, *usedGas, nil + stateDiffs := statedb.GetDirtyAccounts() + + return receipts, allLogs, stateDiffs, *usedGas, nil } // ApplyTransaction attempts to apply a transaction to the given state database diff --git a/core/types.go b/core/types.go index 4c5b74a49..50cd1bed6 100644 --- a/core/types.go +++ b/core/types.go @@ -17,6 +17,7 @@ package core import ( + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" @@ -47,5 +48,5 @@ type Processor interface { // Process processes the state changes according to the Ethereum rules by running // the transaction messages using the statedb and applying any rewards to both // the processor (coinbase) and any included uncles. - Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) + Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, map[common.Address]state.Account, uint64, error) } diff --git a/eth/api_backend.go b/eth/api_backend.go index 69904a70f..7184cb91b 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -159,6 +159,10 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri return b.eth.BlockChain().SubscribeLogsEvent(ch) } +func (b *EthAPIBackend) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription { + return b.eth.BlockChain().SubscribeStateDiffEvent(ch) +} + func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { return b.eth.txPool.AddLocal(signedTx) } diff --git a/eth/api_tracer.go b/eth/api_tracer.go index ce211cbd9..a3bf2db06 100644 --- a/eth/api_tracer.go +++ b/eth/api_tracer.go @@ -289,7 +289,7 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl traced += uint64(len(txs)) } // Generate the next state snapshot fast without tracing - _, _, _, err := api.eth.blockchain.Processor().Process(block, statedb, vm.Config{}) + _, _, _, _, err := api.eth.blockchain.Processor().Process(block, statedb, vm.Config{}) if err != nil { failed = err break @@ -676,7 +676,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (* if block = api.eth.blockchain.GetBlockByNumber(block.NumberU64() + 1); block == nil { return nil, fmt.Errorf("block #%d not found", block.NumberU64()+1) } - _, _, _, err := api.eth.blockchain.Processor().Process(block, statedb, vm.Config{}) + _, _, _, _, err := api.eth.blockchain.Processor().Process(block, statedb, vm.Config{}) if err != nil { return nil, fmt.Errorf("processing block %d failed: %v", block.NumberU64(), err) } diff --git a/eth/filters/api.go b/eth/filters/api.go index 5ed80a887..90e29ea5d 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -28,6 +28,7 @@ import ( ethereum "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -233,6 +234,36 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er return rpcSub, nil } +// NewStateDiffs sent a notification each time an account changes in a block. +func (api *PublicFilterAPI) NewStateDiffs(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + stateDiffs := make(chan map[common.Address]state.Account) + stateDiffsSub := api.events.SubscribeStateDiffs(stateDiffs) + + for { + select { + case d := <-stateDiffs: + notifier.Notify(rpcSub.ID, d) + case <-rpcSub.Err(): + stateDiffsSub.Unsubscribe() + return + case <-notifier.Closed(): + stateDiffsSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + // Logs creates a subscription that fires for all new log that match the given filter criteria. func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) diff --git a/eth/filters/bench_test.go b/eth/filters/bench_test.go index fc7e6f527..e55e62ce5 100644 --- a/eth/filters/bench_test.go +++ b/eth/filters/bench_test.go @@ -129,7 +129,7 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) { if i%20 == 0 { db.Close() db, _ = rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "") - backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)} + backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)} } var addr common.Address addr[0] = byte(i) @@ -174,7 +174,7 @@ func BenchmarkNoBloomBits(b *testing.B) { b.Log("Running filter benchmarks...") start := time.Now() mux := new(event.TypeMux) - backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)} + backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)} filter := NewRangeFilter(backend, 0, int64(*headNum), []common.Address{{}}, nil) filter.Logs(context.Background()) d := time.Since(start) diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 071613ad7..b566baf5b 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -24,6 +24,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -42,6 +43,7 @@ type Backend interface { SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription + SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 70139c1a9..2c9b6a12c 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" @@ -55,6 +56,8 @@ const ( BlocksSubscription // LastSubscription keeps track of the last index LastIndexSubscription + // StateDiffsSubscription queries for new account changes + StateDiffsSubscription ) const ( @@ -68,6 +71,8 @@ const ( logsChanSize = 10 // chainEvChanSize is the size of channel listening to ChainEvent. chainEvChanSize = 10 + // stateDiffChanSize is the size of channel listening to StateDiffEvent. + stateDiffChanSize = 10 ) var ( @@ -75,15 +80,16 @@ var ( ) type subscription struct { - id rpc.ID - typ Type - created time.Time - logsCrit ethereum.FilterQuery - logs chan []*types.Log - hashes chan []common.Hash - headers chan *types.Header - installed chan struct{} // closed when the filter is installed - err chan error // closed when the filter is uninstalled + id rpc.ID + typ Type + created time.Time + logsCrit ethereum.FilterQuery + logs chan []*types.Log + hashes chan []common.Hash + headers chan *types.Header + stateDiffs chan map[common.Address]state.Account + installed chan struct{} // closed when the filter is installed + err chan error // closed when the filter is uninstalled } // EventSystem creates subscriptions, processes events and broadcasts them to the @@ -100,14 +106,16 @@ type EventSystem struct { rmLogsSub event.Subscription // Subscription for removed log event chainSub event.Subscription // Subscription for new chain event pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event + stateDiffsSub event.Subscription // Subscription for new state diff events // Channels - install chan *subscription // install filter for event notification - uninstall chan *subscription // remove filter for event notification - txsCh chan core.NewTxsEvent // Channel to receive new transactions event - logsCh chan []*types.Log // Channel to receive new log event - rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event - chainCh chan core.ChainEvent // Channel to receive new chain event + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + txsCh chan core.NewTxsEvent // Channel to receive new transactions event + logsCh chan []*types.Log // Channel to receive new log event + rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event + chainCh chan core.ChainEvent // Channel to receive new chain event + stateDiffCh chan map[common.Address]state.Account // Channel to receive new state diff events } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -118,15 +126,16 @@ type EventSystem struct { // or by stopping the given mux. func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem { m := &EventSystem{ - mux: mux, - backend: backend, - lightMode: lightMode, - install: make(chan *subscription), - uninstall: make(chan *subscription), - txsCh: make(chan core.NewTxsEvent, txChanSize), - logsCh: make(chan []*types.Log, logsChanSize), - rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), - chainCh: make(chan core.ChainEvent, chainEvChanSize), + mux: mux, + backend: backend, + lightMode: lightMode, + install: make(chan *subscription), + uninstall: make(chan *subscription), + txsCh: make(chan core.NewTxsEvent, txChanSize), + logsCh: make(chan []*types.Log, logsChanSize), + rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), + chainCh: make(chan core.ChainEvent, chainEvChanSize), + stateDiffCh: make(chan map[common.Address]state.Account, stateDiffChanSize), } // Subscribe events @@ -134,6 +143,7 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) + m.stateDiffsSub = m.backend.SubscribeStateDiffs(m.stateDiffCh) // TODO(rjl493456442): use feed to subscribe pending log event m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) @@ -175,6 +185,7 @@ func (sub *Subscription) Unsubscribe() { case <-sub.f.logs: case <-sub.f.hashes: case <-sub.f.headers: + case <-sub.f.stateDiffs: } } @@ -252,15 +263,16 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs // given criteria to the given logs channel. func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: LogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - hashes: make(chan []common.Hash), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: LogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + stateDiffs: make(chan map[common.Address]state.Account), + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -269,15 +281,32 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ // transactions that enter the transaction pool. func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: PendingLogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - hashes: make(chan []common.Hash), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: PendingLogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + stateDiffs: make(chan map[common.Address]state.Account), + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +// SubscribeStateDiffs +func (es *EventSystem) SubscribeStateDiffs(diffs chan map[common.Address]state.Account) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: StateDiffsSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + stateDiffs: diffs, + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -286,14 +315,15 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan // imported in the chain. func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: BlocksSubscription, - created: time.Now(), - logs: make(chan []*types.Log), - hashes: make(chan []common.Hash), - headers: headers, - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: BlocksSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: make(chan []common.Hash), + headers: headers, + stateDiffs: make(chan map[common.Address]state.Account), + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -302,14 +332,15 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti // transactions that enter the transaction pool. func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: PendingTransactionsSubscription, - created: time.Now(), - logs: make(chan []*types.Log), - hashes: hashes, - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: PendingTransactionsSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: hashes, + headers: make(chan *types.Header), + stateDiffs: make(chan map[common.Address]state.Account), + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -368,6 +399,10 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { } }) } + case map[common.Address]state.Account: + for _, f := range filters[StateDiffsSubscription] { + f.stateDiffs <- e + } } } @@ -453,6 +488,7 @@ func (es *EventSystem) eventLoop() { es.logsSub.Unsubscribe() es.rmLogsSub.Unsubscribe() es.chainSub.Unsubscribe() + es.stateDiffsSub.Unsubscribe() }() index := make(filterIndex) @@ -471,6 +507,8 @@ func (es *EventSystem) eventLoop() { es.broadcast(index, ev) case ev := <-es.chainCh: es.broadcast(index, ev) + case ev := <-es.stateDiffCh: + es.broadcast(index, ev) case ev, active := <-es.pendingLogSub.Chan(): if !active { // system stopped return @@ -506,6 +544,8 @@ func (es *EventSystem) eventLoop() { return case <-es.chainSub.Err(): return + case <-es.stateDiffsSub.Err(): + return } } } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 93cb43123..0715c4549 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -39,13 +40,14 @@ import ( ) type testBackend struct { - mux *event.TypeMux - db ethdb.Database - sections uint64 - txFeed *event.Feed - rmLogsFeed *event.Feed - logsFeed *event.Feed - chainFeed *event.Feed + mux *event.TypeMux + db ethdb.Database + sections uint64 + txFeed *event.Feed + rmLogsFeed *event.Feed + logsFeed *event.Feed + chainFeed *event.Feed + stateDiffsFeed *event.Feed } func (b *testBackend) ChainDb() ethdb.Database { @@ -120,6 +122,10 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc return b.chainFeed.Subscribe(ch) } +func (b *testBackend) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription { + return b.stateDiffsFeed.Subscribe(ch) +} + func (b *testBackend) BloomStatus() (uint64, uint64) { return params.BloomBitsBlocks, b.sections } @@ -160,17 +166,18 @@ func TestBlockSubscription(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - api = NewPublicFilterAPI(backend, false) - genesis = new(core.Genesis).MustCommit(db) - chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {}) - chainEvents = []core.ChainEvent{} + mux = new(event.TypeMux) + db = rawdb.NewMemoryDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + stateDiffsFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed} + api = NewPublicFilterAPI(backend, false) + genesis = new(core.Genesis).MustCommit(db) + chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {}) + chainEvents = []core.ChainEvent{} ) for _, blk := range chain { @@ -217,14 +224,15 @@ func TestPendingTxFilter(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db = rawdb.NewMemoryDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + stateDiffsFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed} + api = NewPublicFilterAPI(backend, false) transactions = []*types.Transaction{ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), @@ -277,14 +285,15 @@ func TestPendingTxFilter(t *testing.T) { // If not it must return an error. func TestLogFilterCreation(t *testing.T) { var ( - mux = new(event.TypeMux) - db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db = rawdb.NewMemoryDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + stateDiffsFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed} + api = NewPublicFilterAPI(backend, false) testCases = []struct { crit FilterCriteria @@ -326,14 +335,15 @@ func TestInvalidLogFilterCreation(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db = rawdb.NewMemoryDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + stateDiffsFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed} + api = NewPublicFilterAPI(backend, false) ) // different situations where log filter creation should fail. @@ -353,15 +363,16 @@ func TestInvalidLogFilterCreation(t *testing.T) { func TestInvalidGetLogsRequest(t *testing.T) { var ( - mux = new(event.TypeMux) - db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - api = NewPublicFilterAPI(backend, false) - blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") + mux = new(event.TypeMux) + db = rawdb.NewMemoryDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + stateDiffsFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed} + api = NewPublicFilterAPI(backend, false) + blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") ) // Reason: Cannot specify both BlockHash and FromBlock/ToBlock) @@ -383,14 +394,15 @@ func TestLogFilter(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db = rawdb.NewMemoryDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + stateDiffsFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed} + api = NewPublicFilterAPI(backend, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -502,14 +514,15 @@ func TestPendingLogsSubscription(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db = rawdb.NewMemoryDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + stateDiffsFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed} + api = NewPublicFilterAPI(backend, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index eafa19cdf..36d787532 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -50,18 +50,19 @@ func BenchmarkFilters(b *testing.B) { defer os.RemoveAll(dir) var ( - db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "") - mux = new(event.TypeMux) - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - addr1 = crypto.PubkeyToAddress(key1.PublicKey) - addr2 = common.BytesToAddress([]byte("jeff")) - addr3 = common.BytesToAddress([]byte("ethereum")) - addr4 = common.BytesToAddress([]byte("random addresses please")) + db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "") + mux = new(event.TypeMux) + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + stateDiffsFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed} + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + addr2 = common.BytesToAddress([]byte("jeff")) + addr3 = common.BytesToAddress([]byte("ethereum")) + addr4 = common.BytesToAddress([]byte("random addresses please")) ) defer db.Close() @@ -109,15 +110,16 @@ func TestFilters(t *testing.T) { defer os.RemoveAll(dir) var ( - db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "") - mux = new(event.TypeMux) - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - addr = crypto.PubkeyToAddress(key1.PublicKey) + db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "") + mux = new(event.TypeMux) + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + stateDiffsFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed} + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr = crypto.PubkeyToAddress(key1.PublicKey) hash1 = common.BytesToHash([]byte("topic1")) hash2 = common.BytesToHash([]byte("topic2")) diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 06c6db33b..8a64bd2c4 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -78,6 +78,7 @@ type Backend interface { ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription + SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription ChainConfig() *params.ChainConfig CurrentBlock() *types.Block diff --git a/les/api_backend.go b/les/api_backend.go index 5cd432dcf..ba7ccd964 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -172,6 +172,10 @@ func (b *LesApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEven return b.eth.blockchain.SubscribeRemovedLogsEvent(ch) } +func (b *LesApiBackend) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription { + return b.eth.blockchain.SubscribeStateDiffs(ch) +} + func (b *LesApiBackend) Downloader() *downloader.Downloader { return b.eth.Downloader() } diff --git a/light/lightchain.go b/light/lightchain.go index 7f64d1c28..82eed9576 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -531,6 +531,12 @@ func (lc *LightChain) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) return lc.scope.Track(new(event.Feed).Subscribe(ch)) } +// SubscribeStateDiffs implements the interface of filters.Backend +// LightChain does not sen state diffs, so return an empty subscription. +func (lc *LightChain) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription { + return lc.scope.Track(new(event.Feed).Subscribe(ch)) +} + // DisableCheckFreq disables header validation. This is used for ultralight mode. func (lc *LightChain) DisableCheckFreq() { atomic.StoreInt32(&lc.disableCheckFreq, 1) diff --git a/miner/worker.go b/miner/worker.go index 4a9528c39..d44551916 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -608,7 +608,7 @@ func (w *worker) resultLoop() { case core.SideStatTy: events = append(events, core.ChainSideEvent{Block: block}) } - w.chain.PostChainEvents(events, logs) + w.chain.PostChainEvents(events, logs, task.state.GetDirtyAccounts()) // Insert the block into the set of pending ones to resultLoop for confirmations w.unconfirmed.Insert(block.NumberU64(), block.Hash()) diff --git a/miner/worker_test.go b/miner/worker_test.go index 1604e988d..4a474b233 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -134,7 +134,7 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain } func (b *testWorkerBackend) TxPool() *core.TxPool { return b.txPool } func (b *testWorkerBackend) PostChainEvents(events []interface{}) { - b.chain.PostChainEvents(events, nil) + b.chain.PostChainEvents(events, nil, nil) } func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, blocks int) (*worker, *testWorkerBackend) {