diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index e30572e0a..de25a5cb8 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -517,6 +517,10 @@ func (fb *filterBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscr return fb.bc.SubscribeLogsEvent(ch) } +func (fb *filterBackend) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription { + return fb.bc.SubscribeStateDiffEvent(ch) +} + func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 } func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) { panic("not supported") diff --git a/cmd/geth/retesteth.go b/cmd/geth/retesteth.go index 9469c9f5f..bb2f40090 100644 --- a/cmd/geth/retesteth.go +++ b/cmd/geth/retesteth.go @@ -665,7 +665,7 @@ func (api *RetestethAPI) AccountRange(ctx context.Context, root = statedb.IntermediateRoot(vmenv.ChainConfig().IsEIP158(block.Number())) if idx == int(txIndex) { // This is to make sure root can be opened by OpenTrie - root, err = statedb.Commit(api.chainConfig.IsEIP158(block.Number())) + root, _, err = statedb.Commit(api.chainConfig.IsEIP158(block.Number())) if err != nil { return AccountRangeResult{}, err } @@ -778,7 +778,7 @@ func (api *RetestethAPI) StorageRangeAt(ctx context.Context, _ = statedb.IntermediateRoot(vmenv.ChainConfig().IsEIP158(block.Number())) if idx == int(txIndex) { // This is to make sure root can be opened by OpenTrie - _, err = statedb.Commit(vmenv.ChainConfig().IsEIP158(block.Number())) + _, _, err = statedb.Commit(vmenv.ChainConfig().IsEIP158(block.Number())) if err != nil { return StorageRangeResult{}, err } diff --git a/core/blockchain.go b/core/blockchain.go index 833de3bc7..cd8fec010 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -138,15 +138,16 @@ type BlockChain struct { triegc *prque.Prque // Priority queue mapping block numbers to tries to gc gcproc time.Duration // Accumulates canonical block processing for trie dumping - hc *HeaderChain - rmLogsFeed event.Feed - chainFeed event.Feed - chainSideFeed event.Feed - chainHeadFeed event.Feed - logsFeed event.Feed - blockProcFeed event.Feed - scope event.SubscriptionScope - genesisBlock *types.Block + hc *HeaderChain + rmLogsFeed event.Feed + chainFeed event.Feed + chainSideFeed event.Feed + chainHeadFeed event.Feed + logsFeed event.Feed + blockProcFeed event.Feed + stateDiffsFeed event.Feed + scope event.SubscriptionScope + genesisBlock *types.Block chainmu sync.RWMutex // blockchain insertion lock @@ -1252,7 +1253,7 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error { } // WriteBlockWithState writes the block and all associated state to the database. -func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) { +func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, stateDiffs map[common.Address]state.Account, err error) { bc.chainmu.Lock() defer bc.chainmu.Unlock() @@ -1261,14 +1262,14 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. // writeBlockWithState writes the block and all associated state to the database, // but is expects the chain mutex to be held. -func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) { +func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, stateDiffs map[common.Address]state.Account, err error) { bc.wg.Add(1) defer bc.wg.Done() // Calculate the total difficulty of the block ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1) if ptd == nil { - return NonStatTy, consensus.ErrUnknownAncestor + return NonStatTy, stateDiffs, consensus.ErrUnknownAncestor } // Make sure no inconsistent state is leaked during insertion currentBlock := bc.CurrentBlock() @@ -1277,20 +1278,20 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // Irrelevant of the canonical status, write the block itself to the database if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil { - return NonStatTy, err + return NonStatTy, stateDiffs, err } rawdb.WriteBlock(bc.db, block) - root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number())) + root, modifiedAccounts, err := state.Commit(bc.chainConfig.IsEIP158(block.Number())) if err != nil { - return NonStatTy, err + return NonStatTy, stateDiffs, err } triedb := bc.stateCache.TrieDB() // If we're running an archive node, always flush if bc.cacheConfig.TrieDirtyDisabled { if err := triedb.Commit(root, false); err != nil { - return NonStatTy, err + return NonStatTy, modifiedAccounts, err } } else { // Full but not archive node, do proper garbage collection @@ -1366,7 +1367,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // Reorganise the chain if the parent is not the head block if block.ParentHash() != currentBlock.Hash() { if err := bc.reorg(currentBlock, block); err != nil { - return NonStatTy, err + return NonStatTy, modifiedAccounts, err } } // Write the positional metadata for transaction/receipt lookups and preimages @@ -1378,7 +1379,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. status = SideStatTy } if err := batch.Write(); err != nil { - return NonStatTy, err + return NonStatTy, modifiedAccounts, err } // Set new head. @@ -1386,7 +1387,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. bc.insert(block) } bc.futureBlocks.Remove(block.Hash()) - return status, nil + return status, modifiedAccounts, nil } // addFutureBlock checks if the block is within the max allowed window to get @@ -1436,11 +1437,11 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { // Pre-checks passed, start the full block imports bc.wg.Add(1) bc.chainmu.Lock() - n, events, logs, err := bc.insertChain(chain, true) + n, events, logs, stateDiffs, err := bc.insertChain(chain, true) bc.chainmu.Unlock() bc.wg.Done() - bc.PostChainEvents(events, logs) + bc.PostChainEvents(events, logs, stateDiffs) return n, err } @@ -1452,10 +1453,10 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { // racey behaviour. If a sidechain import is in progress, and the historic state // is imported, but then new canon-head is added before the actual sidechain // completes, then the historic state could be pruned again -func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) { +func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, map[common.Address]state.Account, error) { // If the chain is terminating, don't even bother starting up if atomic.LoadInt32(&bc.procInterrupt) == 1 { - return 0, nil, nil, nil + return 0, nil, nil, nil, nil } // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) @@ -1468,6 +1469,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] events = make([]interface{}, 0, len(chain)) lastCanon *types.Block coalescedLogs []*types.Log + stateDiffs map[common.Address]state.Account ) // Start the parallel header verifier headers := make([]*types.Header, len(chain)) @@ -1518,7 +1520,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] for block != nil && err == ErrKnownBlock { log.Debug("Writing previously known block", "number", block.Number(), "hash", block.Hash()) if err := bc.writeKnownBlock(block); err != nil { - return it.index, nil, nil, err + return it.index, nil, nil, nil, err } lastCanon = block @@ -1537,7 +1539,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] for block != nil && (it.index == 0 || err == consensus.ErrUnknownAncestor) { log.Debug("Future block, postponing import", "number", block.Number(), "hash", block.Hash()) if err := bc.addFutureBlock(block); err != nil { - return it.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, stateDiffs, err } block, err = it.next() } @@ -1545,13 +1547,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] stats.ignored += it.remaining() // If there are any still remaining, mark as ignored - return it.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, stateDiffs, err // Some other error occurred, abort case err != nil: stats.ignored += len(it.chain) bc.reportBlock(block, nil, err) - return it.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, stateDiffs, err } // No validation errors for the first block (or chain prefix skipped) for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() { @@ -1563,7 +1565,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] // If the header is a banned one, straight out abort if BadHashes[block.Hash()] { bc.reportBlock(block, nil, ErrBlacklistedHash) - return it.index, events, coalescedLogs, ErrBlacklistedHash + return it.index, events, coalescedLogs, stateDiffs, ErrBlacklistedHash } // If the block is known (in the middle of the chain), it's a special case for // Clique blocks where they can share state among each other, so importing an @@ -1580,7 +1582,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] "root", block.Root()) if err := bc.writeKnownBlock(block); err != nil { - return it.index, nil, nil, err + return it.index, nil, nil, stateDiffs, err } stats.processed++ @@ -1600,7 +1602,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] } statedb, err := state.New(parent.Root, bc.stateCache) if err != nil { - return it.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, stateDiffs, err } // If we have a followup block, run that against the current state to pre-cache // transactions and probabilistically some of the account/storage trie nodes. @@ -1625,7 +1627,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] if err != nil { bc.reportBlock(block, receipts, err) atomic.StoreUint32(&followupInterrupt, 1) - return it.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, stateDiffs, err } // Update the metrics touched during block processing accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them @@ -1644,7 +1646,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { bc.reportBlock(block, receipts, err) atomic.StoreUint32(&followupInterrupt, 1) - return it.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, stateDiffs, err } proctime := time.Since(start) @@ -1656,10 +1658,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] // Write the block to the chain and get the status. substart = time.Now() - status, err := bc.writeBlockWithState(block, receipts, statedb) + status, committedStateDiffs, err := bc.writeBlockWithState(block, receipts, statedb) if err != nil { atomic.StoreUint32(&followupInterrupt, 1) - return it.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, stateDiffs, err } atomic.StoreUint32(&followupInterrupt, 1) @@ -1678,6 +1680,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] "root", block.Root()) coalescedLogs = append(coalescedLogs, logs...) + stateDiffs = committedStateDiffs events = append(events, ChainEvent{block, block.Hash(), logs}) lastCanon = block @@ -1708,13 +1711,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] // Any blocks remaining here? The only ones we care about are the future ones if block != nil && err == consensus.ErrFutureBlock { if err := bc.addFutureBlock(block); err != nil { - return it.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, stateDiffs, err } block, err = it.next() for ; block != nil && err == consensus.ErrUnknownAncestor; block, err = it.next() { if err := bc.addFutureBlock(block); err != nil { - return it.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, stateDiffs, err } stats.queued++ } @@ -1725,7 +1728,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { events = append(events, ChainHeadEvent{lastCanon}) } - return it.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, stateDiffs, err } // insertSideChain is called when an import batch hits upon a pruned ancestor @@ -1734,7 +1737,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] // // The method writes all (header-and-body-valid) blocks to disk, then tries to // switch over to the new chain if the TD exceeded the current chain. -func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, []interface{}, []*types.Log, error) { +func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, []interface{}, []*types.Log, map[common.Address]state.Account, error) { var ( externTd *big.Int current = bc.CurrentBlock() @@ -1770,7 +1773,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i // If someone legitimately side-mines blocks, they would still be imported as usual. However, // we cannot risk writing unverified blocks to disk when they obviously target the pruning // mechanism. - return it.index, nil, nil, errors.New("sidechain ghost-state attack") + return it.index, nil, nil, nil, errors.New("sidechain ghost-state attack") } } if externTd == nil { @@ -1781,7 +1784,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i if !bc.HasBlock(block.Hash(), block.NumberU64()) { start := time.Now() if err := bc.writeBlockWithoutState(block, externTd); err != nil { - return it.index, nil, nil, err + return it.index, nil, nil, nil, err } log.Debug("Injected sidechain block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)), @@ -1798,7 +1801,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i localTd := bc.GetTd(current.Hash(), current.NumberU64()) if localTd.Cmp(externTd) > 0 { log.Info("Sidechain written to disk", "start", it.first().NumberU64(), "end", it.previous().Number, "sidetd", externTd, "localtd", localTd) - return it.index, nil, nil, err + return it.index, nil, nil, nil, err } // Gather all the sidechain hashes (full blocks may be memory heavy) var ( @@ -1813,7 +1816,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i parent = bc.GetHeader(parent.ParentHash, parent.Number.Uint64()-1) } if parent == nil { - return it.index, nil, nil, errors.New("missing parent") + return it.index, nil, nil, nil, errors.New("missing parent") } // Import all the pruned blocks to make the state available var ( @@ -1832,15 +1835,15 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i // memory here. if len(blocks) >= 2048 || memory > 64*1024*1024 { log.Info("Importing heavy sidechain segment", "blocks", len(blocks), "start", blocks[0].NumberU64(), "end", block.NumberU64()) - if _, _, _, err := bc.insertChain(blocks, false); err != nil { - return 0, nil, nil, err + if _, _, _, _, err := bc.insertChain(blocks, false); err != nil { + return 0, nil, nil, nil, err } blocks, memory = blocks[:0], 0 // If the chain is terminating, stop processing blocks if atomic.LoadInt32(&bc.procInterrupt) == 1 { log.Debug("Premature abort during blocks processing") - return 0, nil, nil, nil + return 0, nil, nil, nil, nil } } } @@ -1848,7 +1851,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i log.Info("Importing sidechain segment", "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64()) return bc.insertChain(blocks, false) } - return 0, nil, nil, nil + return 0, nil, nil, nil, nil } // reorg takes two blocks, an old chain and a new chain and will reconstruct the @@ -2003,11 +2006,15 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // PostChainEvents iterates over the events generated by a chain insertion and // posts them into the event feed. // TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock. -func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) { +func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log, stateDiffs map[common.Address]state.Account) { // post event logs for further processing if logs != nil { bc.logsFeed.Send(logs) } + // post state diffs for further processing + if stateDiffs != nil { + bc.stateDiffsFeed.Send(stateDiffs) + } for _, event := range events { switch ev := event.(type) { case ChainEvent: @@ -2215,3 +2222,7 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription { return bc.scope.Track(bc.blockProcFeed.Subscribe(ch)) } + +func (bc *BlockChain) SubscribeStateDiffEvent(ch chan<- map[common.Address]state.Account) event.Subscription { + return bc.scope.Track(bc.stateDiffsFeed.Subscribe(ch)) +} diff --git a/core/chain_makers.go b/core/chain_makers.go index 17f404211..d1e9cf92d 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -216,7 +216,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse block, _ := b.engine.FinalizeAndAssemble(chainreader, b.header, statedb, b.txs, b.uncles, b.receipts) // Write state changes to db - root, err := statedb.Commit(config.IsEIP158(b.header.Number)) + root, _, err := statedb.Commit(config.IsEIP158(b.header.Number)) if err != nil { panic(fmt.Sprintf("state write error: %v", err)) } diff --git a/core/state/state_test.go b/core/state/state_test.go index d6ff714ee..849cef1f8 100644 --- a/core/state/state_test.go +++ b/core/state/state_test.go @@ -158,7 +158,7 @@ func TestSnapshot2(t *testing.T) { so0.deleted = false state.setStateObject(so0) - root, _ := state.Commit(false) + root, _, _ := state.Commit(false) state.Reset(root) // and one with deleted == true diff --git a/core/state/statedb.go b/core/state/statedb.go index b07f08fd2..d3231eac8 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -686,9 +686,10 @@ func (s *StateDB) clearJournalAndRefund() { } // Commit writes the state to the underlying in-memory trie database. -func (s *StateDB) Commit(deleteEmptyObjects bool) (root common.Hash, err error) { +func (s *StateDB) Commit(deleteEmptyObjects bool) (root common.Hash, modifiedAccounts map[common.Address]Account, err error) { defer s.clearJournalAndRefund() + modifiedAccounts = make(map[common.Address]Account) for addr := range s.journal.dirties { s.stateObjectsDirty[addr] = struct{}{} } @@ -699,6 +700,7 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (root common.Hash, err error) case stateObject.suicided || (isDirty && deleteEmptyObjects && stateObject.empty()): // If the object has been removed, don't bother syncing it // and just mark it for deletion in the trie. + modifiedAccounts[addr] = stateObject.data s.deleteStateObject(stateObject) case isDirty: // Write any contract code associated with the state object @@ -708,8 +710,9 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (root common.Hash, err error) } // Write any storage changes in the state object to its storage trie. if err := stateObject.CommitTrie(s.db); err != nil { - return common.Hash{}, err + return common.Hash{}, nil, err } + modifiedAccounts[addr] = stateObject.data // Update the object in the main account trie. s.updateStateObject(stateObject) } @@ -733,5 +736,5 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (root common.Hash, err error) } return nil }) - return root, err + return root, modifiedAccounts, err } diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index bf073bc94..32608ded4 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -98,10 +98,10 @@ func TestIntermediateLeaks(t *testing.T) { } // Commit and cross check the databases. - if _, err := transState.Commit(false); err != nil { + if _, _, err := transState.Commit(false); err != nil { t.Fatalf("failed to commit transition state: %v", err) } - if _, err := finalState.Commit(false); err != nil { + if _, _, err := finalState.Commit(false); err != nil { t.Fatalf("failed to commit final state: %v", err) } it := finalDb.NewIterator() @@ -420,7 +420,7 @@ func (test *snapshotTest) checkEqual(state, checkstate *StateDB) error { func (s *StateSuite) TestTouchDelete(c *check.C) { s.state.GetOrNewStateObject(common.Address{}) - root, _ := s.state.Commit(false) + root, _, _ := s.state.Commit(false) s.state.Reset(root) snapshot := s.state.Snapshot() diff --git a/core/state/sync_test.go b/core/state/sync_test.go index de098dce0..ea2e1322d 100644 --- a/core/state/sync_test.go +++ b/core/state/sync_test.go @@ -62,7 +62,7 @@ func makeTestState() (Database, common.Hash, []*testAccount) { state.updateStateObject(obj) accounts = append(accounts, acc) } - root, _ := state.Commit(false) + root, _, _ := state.Commit(false) // Return the generated state return db, root, accounts diff --git a/eth/api_backend.go b/eth/api_backend.go index 69904a70f..7184cb91b 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -159,6 +159,10 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri return b.eth.BlockChain().SubscribeLogsEvent(ch) } +func (b *EthAPIBackend) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription { + return b.eth.BlockChain().SubscribeStateDiffEvent(ch) +} + func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { return b.eth.txPool.AddLocal(signedTx) } diff --git a/eth/api_tracer.go b/eth/api_tracer.go index ce211cbd9..05dc74dec 100644 --- a/eth/api_tracer.go +++ b/eth/api_tracer.go @@ -295,7 +295,7 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl break } // Finalize the state so any modifications are written to the trie - root, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number())) + root, _, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number())) if err != nil { failed = err break @@ -681,7 +681,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (* return nil, fmt.Errorf("processing block %d failed: %v", block.NumberU64(), err) } // Finalize the state so any modifications are written to the trie - root, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number())) + root, _, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number())) if err != nil { return nil, err } diff --git a/eth/filters/api.go b/eth/filters/api.go index 5ed80a887..90e29ea5d 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -28,6 +28,7 @@ import ( ethereum "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -233,6 +234,36 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er return rpcSub, nil } +// NewStateDiffs sent a notification each time an account changes in a block. +func (api *PublicFilterAPI) NewStateDiffs(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + stateDiffs := make(chan map[common.Address]state.Account) + stateDiffsSub := api.events.SubscribeStateDiffs(stateDiffs) + + for { + select { + case d := <-stateDiffs: + notifier.Notify(rpcSub.ID, d) + case <-rpcSub.Err(): + stateDiffsSub.Unsubscribe() + return + case <-notifier.Closed(): + stateDiffsSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + // Logs creates a subscription that fires for all new log that match the given filter criteria. func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) diff --git a/eth/filters/bench_test.go b/eth/filters/bench_test.go index fc7e6f527..e55e62ce5 100644 --- a/eth/filters/bench_test.go +++ b/eth/filters/bench_test.go @@ -129,7 +129,7 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) { if i%20 == 0 { db.Close() db, _ = rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "") - backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)} + backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)} } var addr common.Address addr[0] = byte(i) @@ -174,7 +174,7 @@ func BenchmarkNoBloomBits(b *testing.B) { b.Log("Running filter benchmarks...") start := time.Now() mux := new(event.TypeMux) - backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)} + backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)} filter := NewRangeFilter(backend, 0, int64(*headNum), []common.Address{{}}, nil) filter.Logs(context.Background()) d := time.Since(start) diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 071613ad7..b566baf5b 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -24,6 +24,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -42,6 +43,7 @@ type Backend interface { SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription + SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 70139c1a9..2c9b6a12c 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" @@ -55,6 +56,8 @@ const ( BlocksSubscription // LastSubscription keeps track of the last index LastIndexSubscription + // StateDiffsSubscription queries for new account changes + StateDiffsSubscription ) const ( @@ -68,6 +71,8 @@ const ( logsChanSize = 10 // chainEvChanSize is the size of channel listening to ChainEvent. chainEvChanSize = 10 + // stateDiffChanSize is the size of channel listening to StateDiffEvent. + stateDiffChanSize = 10 ) var ( @@ -75,15 +80,16 @@ var ( ) type subscription struct { - id rpc.ID - typ Type - created time.Time - logsCrit ethereum.FilterQuery - logs chan []*types.Log - hashes chan []common.Hash - headers chan *types.Header - installed chan struct{} // closed when the filter is installed - err chan error // closed when the filter is uninstalled + id rpc.ID + typ Type + created time.Time + logsCrit ethereum.FilterQuery + logs chan []*types.Log + hashes chan []common.Hash + headers chan *types.Header + stateDiffs chan map[common.Address]state.Account + installed chan struct{} // closed when the filter is installed + err chan error // closed when the filter is uninstalled } // EventSystem creates subscriptions, processes events and broadcasts them to the @@ -100,14 +106,16 @@ type EventSystem struct { rmLogsSub event.Subscription // Subscription for removed log event chainSub event.Subscription // Subscription for new chain event pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event + stateDiffsSub event.Subscription // Subscription for new state diff events // Channels - install chan *subscription // install filter for event notification - uninstall chan *subscription // remove filter for event notification - txsCh chan core.NewTxsEvent // Channel to receive new transactions event - logsCh chan []*types.Log // Channel to receive new log event - rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event - chainCh chan core.ChainEvent // Channel to receive new chain event + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + txsCh chan core.NewTxsEvent // Channel to receive new transactions event + logsCh chan []*types.Log // Channel to receive new log event + rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event + chainCh chan core.ChainEvent // Channel to receive new chain event + stateDiffCh chan map[common.Address]state.Account // Channel to receive new state diff events } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -118,15 +126,16 @@ type EventSystem struct { // or by stopping the given mux. func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem { m := &EventSystem{ - mux: mux, - backend: backend, - lightMode: lightMode, - install: make(chan *subscription), - uninstall: make(chan *subscription), - txsCh: make(chan core.NewTxsEvent, txChanSize), - logsCh: make(chan []*types.Log, logsChanSize), - rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), - chainCh: make(chan core.ChainEvent, chainEvChanSize), + mux: mux, + backend: backend, + lightMode: lightMode, + install: make(chan *subscription), + uninstall: make(chan *subscription), + txsCh: make(chan core.NewTxsEvent, txChanSize), + logsCh: make(chan []*types.Log, logsChanSize), + rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), + chainCh: make(chan core.ChainEvent, chainEvChanSize), + stateDiffCh: make(chan map[common.Address]state.Account, stateDiffChanSize), } // Subscribe events @@ -134,6 +143,7 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) + m.stateDiffsSub = m.backend.SubscribeStateDiffs(m.stateDiffCh) // TODO(rjl493456442): use feed to subscribe pending log event m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) @@ -175,6 +185,7 @@ func (sub *Subscription) Unsubscribe() { case <-sub.f.logs: case <-sub.f.hashes: case <-sub.f.headers: + case <-sub.f.stateDiffs: } } @@ -252,15 +263,16 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs // given criteria to the given logs channel. func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: LogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - hashes: make(chan []common.Hash), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: LogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + stateDiffs: make(chan map[common.Address]state.Account), + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -269,15 +281,32 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ // transactions that enter the transaction pool. func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: PendingLogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - hashes: make(chan []common.Hash), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: PendingLogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + stateDiffs: make(chan map[common.Address]state.Account), + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +// SubscribeStateDiffs +func (es *EventSystem) SubscribeStateDiffs(diffs chan map[common.Address]state.Account) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: StateDiffsSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + stateDiffs: diffs, + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -286,14 +315,15 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan // imported in the chain. func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: BlocksSubscription, - created: time.Now(), - logs: make(chan []*types.Log), - hashes: make(chan []common.Hash), - headers: headers, - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: BlocksSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: make(chan []common.Hash), + headers: headers, + stateDiffs: make(chan map[common.Address]state.Account), + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -302,14 +332,15 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti // transactions that enter the transaction pool. func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: PendingTransactionsSubscription, - created: time.Now(), - logs: make(chan []*types.Log), - hashes: hashes, - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: PendingTransactionsSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: hashes, + headers: make(chan *types.Header), + stateDiffs: make(chan map[common.Address]state.Account), + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -368,6 +399,10 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { } }) } + case map[common.Address]state.Account: + for _, f := range filters[StateDiffsSubscription] { + f.stateDiffs <- e + } } } @@ -453,6 +488,7 @@ func (es *EventSystem) eventLoop() { es.logsSub.Unsubscribe() es.rmLogsSub.Unsubscribe() es.chainSub.Unsubscribe() + es.stateDiffsSub.Unsubscribe() }() index := make(filterIndex) @@ -471,6 +507,8 @@ func (es *EventSystem) eventLoop() { es.broadcast(index, ev) case ev := <-es.chainCh: es.broadcast(index, ev) + case ev := <-es.stateDiffCh: + es.broadcast(index, ev) case ev, active := <-es.pendingLogSub.Chan(): if !active { // system stopped return @@ -506,6 +544,8 @@ func (es *EventSystem) eventLoop() { return case <-es.chainSub.Err(): return + case <-es.stateDiffsSub.Err(): + return } } } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 93cb43123..0715c4549 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -39,13 +40,14 @@ import ( ) type testBackend struct { - mux *event.TypeMux - db ethdb.Database - sections uint64 - txFeed *event.Feed - rmLogsFeed *event.Feed - logsFeed *event.Feed - chainFeed *event.Feed + mux *event.TypeMux + db ethdb.Database + sections uint64 + txFeed *event.Feed + rmLogsFeed *event.Feed + logsFeed *event.Feed + chainFeed *event.Feed + stateDiffsFeed *event.Feed } func (b *testBackend) ChainDb() ethdb.Database { @@ -120,6 +122,10 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc return b.chainFeed.Subscribe(ch) } +func (b *testBackend) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription { + return b.stateDiffsFeed.Subscribe(ch) +} + func (b *testBackend) BloomStatus() (uint64, uint64) { return params.BloomBitsBlocks, b.sections } @@ -160,17 +166,18 @@ func TestBlockSubscription(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - api = NewPublicFilterAPI(backend, false) - genesis = new(core.Genesis).MustCommit(db) - chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {}) - chainEvents = []core.ChainEvent{} + mux = new(event.TypeMux) + db = rawdb.NewMemoryDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + stateDiffsFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed} + api = NewPublicFilterAPI(backend, false) + genesis = new(core.Genesis).MustCommit(db) + chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {}) + chainEvents = []core.ChainEvent{} ) for _, blk := range chain { @@ -217,14 +224,15 @@ func TestPendingTxFilter(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db = rawdb.NewMemoryDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + stateDiffsFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed} + api = NewPublicFilterAPI(backend, false) transactions = []*types.Transaction{ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), @@ -277,14 +285,15 @@ func TestPendingTxFilter(t *testing.T) { // If not it must return an error. func TestLogFilterCreation(t *testing.T) { var ( - mux = new(event.TypeMux) - db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db = rawdb.NewMemoryDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + stateDiffsFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed} + api = NewPublicFilterAPI(backend, false) testCases = []struct { crit FilterCriteria @@ -326,14 +335,15 @@ func TestInvalidLogFilterCreation(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db = rawdb.NewMemoryDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + stateDiffsFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed} + api = NewPublicFilterAPI(backend, false) ) // different situations where log filter creation should fail. @@ -353,15 +363,16 @@ func TestInvalidLogFilterCreation(t *testing.T) { func TestInvalidGetLogsRequest(t *testing.T) { var ( - mux = new(event.TypeMux) - db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - api = NewPublicFilterAPI(backend, false) - blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") + mux = new(event.TypeMux) + db = rawdb.NewMemoryDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + stateDiffsFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed} + api = NewPublicFilterAPI(backend, false) + blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") ) // Reason: Cannot specify both BlockHash and FromBlock/ToBlock) @@ -383,14 +394,15 @@ func TestLogFilter(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db = rawdb.NewMemoryDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + stateDiffsFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed} + api = NewPublicFilterAPI(backend, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -502,14 +514,15 @@ func TestPendingLogsSubscription(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db = rawdb.NewMemoryDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + stateDiffsFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed} + api = NewPublicFilterAPI(backend, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index eafa19cdf..36d787532 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -50,18 +50,19 @@ func BenchmarkFilters(b *testing.B) { defer os.RemoveAll(dir) var ( - db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "") - mux = new(event.TypeMux) - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - addr1 = crypto.PubkeyToAddress(key1.PublicKey) - addr2 = common.BytesToAddress([]byte("jeff")) - addr3 = common.BytesToAddress([]byte("ethereum")) - addr4 = common.BytesToAddress([]byte("random addresses please")) + db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "") + mux = new(event.TypeMux) + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + stateDiffsFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed} + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + addr2 = common.BytesToAddress([]byte("jeff")) + addr3 = common.BytesToAddress([]byte("ethereum")) + addr4 = common.BytesToAddress([]byte("random addresses please")) ) defer db.Close() @@ -109,15 +110,16 @@ func TestFilters(t *testing.T) { defer os.RemoveAll(dir) var ( - db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "") - mux = new(event.TypeMux) - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - addr = crypto.PubkeyToAddress(key1.PublicKey) + db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "") + mux = new(event.TypeMux) + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + stateDiffsFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed} + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr = crypto.PubkeyToAddress(key1.PublicKey) hash1 = common.BytesToHash([]byte("topic1")) hash2 = common.BytesToHash([]byte("topic2")) diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 06c6db33b..8a64bd2c4 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -78,6 +78,7 @@ type Backend interface { ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription + SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription ChainConfig() *params.ChainConfig CurrentBlock() *types.Block diff --git a/les/api_backend.go b/les/api_backend.go index 5cd432dcf..ba7ccd964 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -172,6 +172,10 @@ func (b *LesApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEven return b.eth.blockchain.SubscribeRemovedLogsEvent(ch) } +func (b *LesApiBackend) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription { + return b.eth.blockchain.SubscribeStateDiffs(ch) +} + func (b *LesApiBackend) Downloader() *downloader.Downloader { return b.eth.Downloader() } diff --git a/light/lightchain.go b/light/lightchain.go index 7f64d1c28..82eed9576 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -531,6 +531,12 @@ func (lc *LightChain) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) return lc.scope.Track(new(event.Feed).Subscribe(ch)) } +// SubscribeStateDiffs implements the interface of filters.Backend +// LightChain does not sen state diffs, so return an empty subscription. +func (lc *LightChain) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription { + return lc.scope.Track(new(event.Feed).Subscribe(ch)) +} + // DisableCheckFreq disables header validation. This is used for ultralight mode. func (lc *LightChain) DisableCheckFreq() { atomic.StoreInt32(&lc.disableCheckFreq, 1) diff --git a/miner/worker.go b/miner/worker.go index 4a9528c39..5ba40aa9a 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -589,7 +589,7 @@ func (w *worker) resultLoop() { logs = append(logs, receipt.Logs...) } // Commit block and state to database. - stat, err := w.chain.WriteBlockWithState(block, receipts, task.state) + stat, stateDiffs, err := w.chain.WriteBlockWithState(block, receipts, task.state) if err != nil { log.Error("Failed writing block to chain", "err", err) continue @@ -608,7 +608,7 @@ func (w *worker) resultLoop() { case core.SideStatTy: events = append(events, core.ChainSideEvent{Block: block}) } - w.chain.PostChainEvents(events, logs) + w.chain.PostChainEvents(events, logs, stateDiffs) // Insert the block into the set of pending ones to resultLoop for confirmations w.unconfirmed.Insert(block.NumberU64(), block.Hash()) diff --git a/miner/worker_test.go b/miner/worker_test.go index 1604e988d..4a474b233 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -134,7 +134,7 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain } func (b *testWorkerBackend) TxPool() *core.TxPool { return b.txPool } func (b *testWorkerBackend) PostChainEvents(events []interface{}) { - b.chain.PostChainEvents(events, nil) + b.chain.PostChainEvents(events, nil, nil) } func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, blocks int) (*worker, *testWorkerBackend) { diff --git a/tests/state_test_util.go b/tests/state_test_util.go index 59ebcb6e1..8d8cf0be5 100644 --- a/tests/state_test_util.go +++ b/tests/state_test_util.go @@ -216,7 +216,7 @@ func MakePreState(db ethdb.Database, accounts core.GenesisAlloc) *state.StateDB } } // Commit and re-open to start with a clean state. - root, _ := statedb.Commit(false) + root, _, _ := statedb.Commit(false) statedb, _ = state.New(root, sdb) return statedb }