WIP: add rpc pubsub interface for account diffs #11

Closed
rmulhol wants to merge 2 commits from pubsub-rpc-account-diffing into master
22 changed files with 333 additions and 212 deletions

View File

@ -517,6 +517,10 @@ func (fb *filterBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscr
return fb.bc.SubscribeLogsEvent(ch)
}
func (fb *filterBackend) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription {
elizabethengelman commented 2019-09-12 14:26:02 +00:00 (Migrated from github.com)
Review

Is the reason that we're adding SubscribeStateDiffs to a bunch of places is because they all implement the Backend interface?

It's confusing to know which backend we'll actually want to use for VDB. 🤔

Is the reason that we're adding `SubscribeStateDiffs` to a bunch of places is because they all implement the Backend interface? It's confusing to know which backend we'll actually want to use for VDB. 🤔
rmulhol commented 2019-09-12 16:21:28 +00:00 (Migrated from github.com)
Review

Yeah, you got it. I think we want to (and do) use the EthApiBackend with a subscription to a full (non-fast-syncing) node, where all transactions are applied/verified.

Yeah, you got it. I think we want to (and do) use the `EthApiBackend` with a subscription to a full (non-fast-syncing) node, where all transactions are applied/verified.
return fb.bc.SubscribeStateDiffEvent(ch)
}
func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 }
func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) {
panic("not supported")

View File

@ -665,7 +665,7 @@ func (api *RetestethAPI) AccountRange(ctx context.Context,
root = statedb.IntermediateRoot(vmenv.ChainConfig().IsEIP158(block.Number()))
if idx == int(txIndex) {
// This is to make sure root can be opened by OpenTrie
root, err = statedb.Commit(api.chainConfig.IsEIP158(block.Number()))
root, _, err = statedb.Commit(api.chainConfig.IsEIP158(block.Number()))
if err != nil {
return AccountRangeResult{}, err
}
@ -778,7 +778,7 @@ func (api *RetestethAPI) StorageRangeAt(ctx context.Context,
_ = statedb.IntermediateRoot(vmenv.ChainConfig().IsEIP158(block.Number()))
if idx == int(txIndex) {
// This is to make sure root can be opened by OpenTrie
_, err = statedb.Commit(vmenv.ChainConfig().IsEIP158(block.Number()))
_, _, err = statedb.Commit(vmenv.ChainConfig().IsEIP158(block.Number()))
if err != nil {
return StorageRangeResult{}, err
}

View File

@ -138,15 +138,16 @@ type BlockChain struct {
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block
hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
stateDiffsFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block
chainmu sync.RWMutex // blockchain insertion lock
@ -1252,7 +1253,7 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
}
// WriteBlockWithState writes the block and all associated state to the database.
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, stateDiffs map[common.Address]state.Account, err error) {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
@ -1261,14 +1262,14 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
// writeBlockWithState writes the block and all associated state to the database,
// but is expects the chain mutex to be held.
func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, stateDiffs map[common.Address]state.Account, err error) {
bc.wg.Add(1)
defer bc.wg.Done()
// Calculate the total difficulty of the block
ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
if ptd == nil {
return NonStatTy, consensus.ErrUnknownAncestor
return NonStatTy, stateDiffs, consensus.ErrUnknownAncestor
}
// Make sure no inconsistent state is leaked during insertion
currentBlock := bc.CurrentBlock()
@ -1277,20 +1278,20 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
// Irrelevant of the canonical status, write the block itself to the database
if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil {
return NonStatTy, err
return NonStatTy, stateDiffs, err
}
rawdb.WriteBlock(bc.db, block)
root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
root, modifiedAccounts, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
if err != nil {
return NonStatTy, err
return NonStatTy, stateDiffs, err
}
triedb := bc.stateCache.TrieDB()
// If we're running an archive node, always flush
if bc.cacheConfig.TrieDirtyDisabled {
if err := triedb.Commit(root, false); err != nil {
return NonStatTy, err
return NonStatTy, modifiedAccounts, err
}
} else {
// Full but not archive node, do proper garbage collection
@ -1366,7 +1367,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
// Reorganise the chain if the parent is not the head block
if block.ParentHash() != currentBlock.Hash() {
if err := bc.reorg(currentBlock, block); err != nil {
return NonStatTy, err
return NonStatTy, modifiedAccounts, err
}
}
// Write the positional metadata for transaction/receipt lookups and preimages
@ -1378,7 +1379,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
status = SideStatTy
}
if err := batch.Write(); err != nil {
return NonStatTy, err
return NonStatTy, modifiedAccounts, err
}
// Set new head.
@ -1386,7 +1387,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
bc.insert(block)
}
bc.futureBlocks.Remove(block.Hash())
return status, nil
return status, modifiedAccounts, nil
}
// addFutureBlock checks if the block is within the max allowed window to get
@ -1436,11 +1437,11 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// Pre-checks passed, start the full block imports
bc.wg.Add(1)
bc.chainmu.Lock()
n, events, logs, err := bc.insertChain(chain, true)
n, events, logs, stateDiffs, err := bc.insertChain(chain, true)
bc.chainmu.Unlock()
bc.wg.Done()
bc.PostChainEvents(events, logs)
bc.PostChainEvents(events, logs, stateDiffs)
return n, err
}
@ -1452,10 +1453,10 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// racey behaviour. If a sidechain import is in progress, and the historic state
// is imported, but then new canon-head is added before the actual sidechain
// completes, then the historic state could be pruned again
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, map[common.Address]state.Account, error) {
// If the chain is terminating, don't even bother starting up
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
return 0, nil, nil, nil
return 0, nil, nil, nil, nil
}
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)
@ -1468,6 +1469,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
events = make([]interface{}, 0, len(chain))
lastCanon *types.Block
coalescedLogs []*types.Log
stateDiffs map[common.Address]state.Account
)
// Start the parallel header verifier
headers := make([]*types.Header, len(chain))
@ -1518,7 +1520,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
for block != nil && err == ErrKnownBlock {
log.Debug("Writing previously known block", "number", block.Number(), "hash", block.Hash())
if err := bc.writeKnownBlock(block); err != nil {
return it.index, nil, nil, err
return it.index, nil, nil, nil, err
}
lastCanon = block
@ -1537,7 +1539,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
for block != nil && (it.index == 0 || err == consensus.ErrUnknownAncestor) {
log.Debug("Future block, postponing import", "number", block.Number(), "hash", block.Hash())
if err := bc.addFutureBlock(block); err != nil {
return it.index, events, coalescedLogs, err
return it.index, events, coalescedLogs, stateDiffs, err
}
block, err = it.next()
}
@ -1545,13 +1547,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
stats.ignored += it.remaining()
// If there are any still remaining, mark as ignored
return it.index, events, coalescedLogs, err
return it.index, events, coalescedLogs, stateDiffs, err
// Some other error occurred, abort
case err != nil:
stats.ignored += len(it.chain)
bc.reportBlock(block, nil, err)
return it.index, events, coalescedLogs, err
return it.index, events, coalescedLogs, stateDiffs, err
}
// No validation errors for the first block (or chain prefix skipped)
for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() {
@ -1563,7 +1565,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
// If the header is a banned one, straight out abort
if BadHashes[block.Hash()] {
bc.reportBlock(block, nil, ErrBlacklistedHash)
return it.index, events, coalescedLogs, ErrBlacklistedHash
return it.index, events, coalescedLogs, stateDiffs, ErrBlacklistedHash
}
// If the block is known (in the middle of the chain), it's a special case for
// Clique blocks where they can share state among each other, so importing an
@ -1580,7 +1582,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
"root", block.Root())
if err := bc.writeKnownBlock(block); err != nil {
return it.index, nil, nil, err
return it.index, nil, nil, stateDiffs, err
}
stats.processed++
@ -1600,7 +1602,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
}
statedb, err := state.New(parent.Root, bc.stateCache)
if err != nil {
return it.index, events, coalescedLogs, err
return it.index, events, coalescedLogs, stateDiffs, err
}
// If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
@ -1625,7 +1627,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
if err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
return it.index, events, coalescedLogs, err
return it.index, events, coalescedLogs, stateDiffs, err
}
// Update the metrics touched during block processing
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
@ -1644,7 +1646,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
return it.index, events, coalescedLogs, err
return it.index, events, coalescedLogs, stateDiffs, err
}
proctime := time.Since(start)
@ -1656,10 +1658,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
// Write the block to the chain and get the status.
substart = time.Now()
status, err := bc.writeBlockWithState(block, receipts, statedb)
status, committedStateDiffs, err := bc.writeBlockWithState(block, receipts, statedb)
if err != nil {
atomic.StoreUint32(&followupInterrupt, 1)
return it.index, events, coalescedLogs, err
return it.index, events, coalescedLogs, stateDiffs, err
}
atomic.StoreUint32(&followupInterrupt, 1)
@ -1678,6 +1680,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
"root", block.Root())
coalescedLogs = append(coalescedLogs, logs...)
stateDiffs = committedStateDiffs
events = append(events, ChainEvent{block, block.Hash(), logs})
lastCanon = block
@ -1708,13 +1711,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
// Any blocks remaining here? The only ones we care about are the future ones
if block != nil && err == consensus.ErrFutureBlock {
if err := bc.addFutureBlock(block); err != nil {
return it.index, events, coalescedLogs, err
return it.index, events, coalescedLogs, stateDiffs, err
}
block, err = it.next()
for ; block != nil && err == consensus.ErrUnknownAncestor; block, err = it.next() {
if err := bc.addFutureBlock(block); err != nil {
return it.index, events, coalescedLogs, err
return it.index, events, coalescedLogs, stateDiffs, err
}
stats.queued++
}
@ -1725,7 +1728,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
events = append(events, ChainHeadEvent{lastCanon})
}
return it.index, events, coalescedLogs, err
return it.index, events, coalescedLogs, stateDiffs, err
}
// insertSideChain is called when an import batch hits upon a pruned ancestor
@ -1734,7 +1737,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
//
// The method writes all (header-and-body-valid) blocks to disk, then tries to
// switch over to the new chain if the TD exceeded the current chain.
func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, []interface{}, []*types.Log, error) {
func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, []interface{}, []*types.Log, map[common.Address]state.Account, error) {
var (
externTd *big.Int
current = bc.CurrentBlock()
@ -1770,7 +1773,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
// If someone legitimately side-mines blocks, they would still be imported as usual. However,
// we cannot risk writing unverified blocks to disk when they obviously target the pruning
// mechanism.
return it.index, nil, nil, errors.New("sidechain ghost-state attack")
return it.index, nil, nil, nil, errors.New("sidechain ghost-state attack")
}
}
if externTd == nil {
@ -1781,7 +1784,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
if !bc.HasBlock(block.Hash(), block.NumberU64()) {
start := time.Now()
if err := bc.writeBlockWithoutState(block, externTd); err != nil {
return it.index, nil, nil, err
return it.index, nil, nil, nil, err
}
log.Debug("Injected sidechain block", "number", block.Number(), "hash", block.Hash(),
"diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
@ -1798,7 +1801,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
localTd := bc.GetTd(current.Hash(), current.NumberU64())
if localTd.Cmp(externTd) > 0 {
log.Info("Sidechain written to disk", "start", it.first().NumberU64(), "end", it.previous().Number, "sidetd", externTd, "localtd", localTd)
return it.index, nil, nil, err
return it.index, nil, nil, nil, err
}
// Gather all the sidechain hashes (full blocks may be memory heavy)
var (
@ -1813,7 +1816,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
parent = bc.GetHeader(parent.ParentHash, parent.Number.Uint64()-1)
}
if parent == nil {
return it.index, nil, nil, errors.New("missing parent")
return it.index, nil, nil, nil, errors.New("missing parent")
}
// Import all the pruned blocks to make the state available
var (
@ -1832,15 +1835,15 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
// memory here.
if len(blocks) >= 2048 || memory > 64*1024*1024 {
log.Info("Importing heavy sidechain segment", "blocks", len(blocks), "start", blocks[0].NumberU64(), "end", block.NumberU64())
if _, _, _, err := bc.insertChain(blocks, false); err != nil {
return 0, nil, nil, err
if _, _, _, _, err := bc.insertChain(blocks, false); err != nil {
return 0, nil, nil, nil, err
}
blocks, memory = blocks[:0], 0
// If the chain is terminating, stop processing blocks
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
log.Debug("Premature abort during blocks processing")
return 0, nil, nil, nil
return 0, nil, nil, nil, nil
}
}
}
@ -1848,7 +1851,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
log.Info("Importing sidechain segment", "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64())
return bc.insertChain(blocks, false)
}
return 0, nil, nil, nil
return 0, nil, nil, nil, nil
}
// reorg takes two blocks, an old chain and a new chain and will reconstruct the
@ -2003,11 +2006,15 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// PostChainEvents iterates over the events generated by a chain insertion and
// posts them into the event feed.
// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log, stateDiffs map[common.Address]state.Account) {
// post event logs for further processing
if logs != nil {
bc.logsFeed.Send(logs)
}
// post state diffs for further processing
if stateDiffs != nil {
bc.stateDiffsFeed.Send(stateDiffs)
}
for _, event := range events {
switch ev := event.(type) {
case ChainEvent:
@ -2215,3 +2222,7 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript
func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription {
return bc.scope.Track(bc.blockProcFeed.Subscribe(ch))
}
func (bc *BlockChain) SubscribeStateDiffEvent(ch chan<- map[common.Address]state.Account) event.Subscription {
return bc.scope.Track(bc.stateDiffsFeed.Subscribe(ch))
}

View File

@ -216,7 +216,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse
block, _ := b.engine.FinalizeAndAssemble(chainreader, b.header, statedb, b.txs, b.uncles, b.receipts)
// Write state changes to db
root, err := statedb.Commit(config.IsEIP158(b.header.Number))
root, _, err := statedb.Commit(config.IsEIP158(b.header.Number))
if err != nil {
panic(fmt.Sprintf("state write error: %v", err))
}

View File

@ -158,7 +158,7 @@ func TestSnapshot2(t *testing.T) {
so0.deleted = false
state.setStateObject(so0)
root, _ := state.Commit(false)
root, _, _ := state.Commit(false)
state.Reset(root)
// and one with deleted == true

View File

@ -686,9 +686,10 @@ func (s *StateDB) clearJournalAndRefund() {
}
// Commit writes the state to the underlying in-memory trie database.
func (s *StateDB) Commit(deleteEmptyObjects bool) (root common.Hash, err error) {
func (s *StateDB) Commit(deleteEmptyObjects bool) (root common.Hash, modifiedAccounts map[common.Address]Account, err error) {
defer s.clearJournalAndRefund()
modifiedAccounts = make(map[common.Address]Account)
for addr := range s.journal.dirties {
s.stateObjectsDirty[addr] = struct{}{}
}
@ -699,6 +700,7 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (root common.Hash, err error)
case stateObject.suicided || (isDirty && deleteEmptyObjects && stateObject.empty()):
// If the object has been removed, don't bother syncing it
// and just mark it for deletion in the trie.
modifiedAccounts[addr] = stateObject.data
s.deleteStateObject(stateObject)
case isDirty:
// Write any contract code associated with the state object
@ -708,8 +710,9 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (root common.Hash, err error)
}
// Write any storage changes in the state object to its storage trie.
if err := stateObject.CommitTrie(s.db); err != nil {
return common.Hash{}, err
return common.Hash{}, nil, err
}
modifiedAccounts[addr] = stateObject.data
// Update the object in the main account trie.
s.updateStateObject(stateObject)
}
@ -733,5 +736,5 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (root common.Hash, err error)
}
return nil
})
return root, err
return root, modifiedAccounts, err
}

View File

@ -98,10 +98,10 @@ func TestIntermediateLeaks(t *testing.T) {
}
// Commit and cross check the databases.
if _, err := transState.Commit(false); err != nil {
if _, _, err := transState.Commit(false); err != nil {
t.Fatalf("failed to commit transition state: %v", err)
}
if _, err := finalState.Commit(false); err != nil {
if _, _, err := finalState.Commit(false); err != nil {
t.Fatalf("failed to commit final state: %v", err)
}
it := finalDb.NewIterator()
@ -420,7 +420,7 @@ func (test *snapshotTest) checkEqual(state, checkstate *StateDB) error {
func (s *StateSuite) TestTouchDelete(c *check.C) {
s.state.GetOrNewStateObject(common.Address{})
root, _ := s.state.Commit(false)
root, _, _ := s.state.Commit(false)
s.state.Reset(root)
snapshot := s.state.Snapshot()

View File

@ -62,7 +62,7 @@ func makeTestState() (Database, common.Hash, []*testAccount) {
state.updateStateObject(obj)
accounts = append(accounts, acc)
}
root, _ := state.Commit(false)
root, _, _ := state.Commit(false)
// Return the generated state
return db, root, accounts

View File

@ -159,6 +159,10 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri
return b.eth.BlockChain().SubscribeLogsEvent(ch)
}
func (b *EthAPIBackend) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription {
return b.eth.BlockChain().SubscribeStateDiffEvent(ch)
}
func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
return b.eth.txPool.AddLocal(signedTx)
}

View File

@ -295,7 +295,7 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl
break
}
// Finalize the state so any modifications are written to the trie
root, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number()))
root, _, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number()))
if err != nil {
failed = err
break
@ -681,7 +681,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (*
return nil, fmt.Errorf("processing block %d failed: %v", block.NumberU64(), err)
}
// Finalize the state so any modifications are written to the trie
root, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number()))
root, _, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number()))
if err != nil {
return nil, err
}

View File

@ -28,6 +28,7 @@ import (
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@ -233,6 +234,36 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er
return rpcSub, nil
}
// NewStateDiffs sent a notification each time an account changes in a block.
func (api *PublicFilterAPI) NewStateDiffs(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()
go func() {
stateDiffs := make(chan map[common.Address]state.Account)
stateDiffsSub := api.events.SubscribeStateDiffs(stateDiffs)
for {
select {
case d := <-stateDiffs:
notifier.Notify(rpcSub.ID, d)
case <-rpcSub.Err():
stateDiffsSub.Unsubscribe()
return
case <-notifier.Closed():
stateDiffsSub.Unsubscribe()
return
}
}
}()
return rpcSub, nil
}
// Logs creates a subscription that fires for all new log that match the given filter criteria.
func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)

View File

@ -129,7 +129,7 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) {
if i%20 == 0 {
db.Close()
db, _ = rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "")
backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
}
var addr common.Address
addr[0] = byte(i)
@ -174,7 +174,7 @@ func BenchmarkNoBloomBits(b *testing.B) {
b.Log("Running filter benchmarks...")
start := time.Now()
mux := new(event.TypeMux)
backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
filter := NewRangeFilter(backend, 0, int64(*headNum), []common.Address{{}}, nil)
filter.Logs(context.Background())
d := time.Since(start)

View File

@ -24,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@ -42,6 +43,7 @@ type Backend interface {
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription
BloomStatus() (uint64, uint64)
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)

View File

@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/common"
elizabethengelman commented 2019-09-12 15:17:15 +00:00 (Migrated from github.com)
Review

I'm not sure I understand what the filters package is for - is it just another way to subscribe to new events in the chain? I think I've previously only interacted with new event subscriptions directly from the blockchain package.

I'm not sure I understand what the `filters` package is for - is it just another way to subscribe to new events in the chain? I think I've previously only interacted with new event subscriptions directly from the `blockchain` package.
rmulhol commented 2019-09-12 16:23:29 +00:00 (Migrated from github.com)
Review

I think it's designed to provide a public API for the blockchain's internal subscriptions, but one big question I've had is how functions in this namespace (like SubscribeNewHeads) are actually called - will definitely need to sort that out before we can upstream

I think it's designed to provide a public API for the blockchain's internal subscriptions, but one big question I've had is how functions in this namespace (like `SubscribeNewHeads`) are actually called - will definitely need to sort that out before we can upstream
Review

It looks like the filters api is being loaded under the "eth" namespace here. So I think you would call it like:

cli, _ := rpc.Dial("ipcPathOrWsURL")
stateDiffChan := make(chan map[common.Address]state.Account, 20000)
rpcSub, err := cli.Subscribe(context.Background(), "eth", stateDiffChan, "NewStateDiffs"})

or use the EthSubscribe method since it is under that namespace.

It looks like the filters api is being loaded under the "eth" namespace [here](https://github.com/vulcanize/go-ethereum/blob/pubsub-rpc-account-diffing/eth/backend.go#L316). So I think you would call it like: ```go cli, _ := rpc.Dial("ipcPathOrWsURL") stateDiffChan := make(chan map[common.Address]state.Account, 20000) rpcSub, err := cli.Subscribe(context.Background(), "eth", stateDiffChan, "NewStateDiffs"}) ``` or use the [EthSubscribe](https://github.com/vulcanize/go-ethereum/blob/pubsub-rpc-account-diffing/rpc/client.go#L397) method since it is under that namespace.
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
@ -55,6 +56,8 @@ const (
BlocksSubscription
// LastSubscription keeps track of the last index
LastIndexSubscription
// StateDiffsSubscription queries for new account changes
StateDiffsSubscription
)
const (
@ -68,6 +71,8 @@ const (
logsChanSize = 10
// chainEvChanSize is the size of channel listening to ChainEvent.
chainEvChanSize = 10
// stateDiffChanSize is the size of channel listening to StateDiffEvent.
stateDiffChanSize = 10
)
var (
@ -75,15 +80,16 @@ var (
)
type subscription struct {
id rpc.ID
typ Type
created time.Time
logsCrit ethereum.FilterQuery
logs chan []*types.Log
hashes chan []common.Hash
headers chan *types.Header
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
id rpc.ID
typ Type
created time.Time
logsCrit ethereum.FilterQuery
logs chan []*types.Log
hashes chan []common.Hash
headers chan *types.Header
stateDiffs chan map[common.Address]state.Account
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
}
// EventSystem creates subscriptions, processes events and broadcasts them to the
@ -100,14 +106,16 @@ type EventSystem struct {
rmLogsSub event.Subscription // Subscription for removed log event
chainSub event.Subscription // Subscription for new chain event
pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event
stateDiffsSub event.Subscription // Subscription for new state diff events
// Channels
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
logsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
logsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
stateDiffCh chan map[common.Address]state.Account // Channel to receive new state diff events
}
// NewEventSystem creates a new manager that listens for event on the given mux,
@ -118,15 +126,16 @@ type EventSystem struct {
// or by stopping the given mux.
func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem {
m := &EventSystem{
mux: mux,
backend: backend,
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
txsCh: make(chan core.NewTxsEvent, txChanSize),
logsCh: make(chan []*types.Log, logsChanSize),
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
mux: mux,
backend: backend,
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
txsCh: make(chan core.NewTxsEvent, txChanSize),
logsCh: make(chan []*types.Log, logsChanSize),
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
stateDiffCh: make(chan map[common.Address]state.Account, stateDiffChanSize),
}
// Subscribe events
@ -134,6 +143,7 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
m.stateDiffsSub = m.backend.SubscribeStateDiffs(m.stateDiffCh)
// TODO(rjl493456442): use feed to subscribe pending log event
m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})
@ -175,6 +185,7 @@ func (sub *Subscription) Unsubscribe() {
case <-sub.f.logs:
case <-sub.f.hashes:
case <-sub.f.headers:
case <-sub.f.stateDiffs:
}
}
@ -252,15 +263,16 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs
// given criteria to the given logs channel.
func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: LogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: LogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
stateDiffs: make(chan map[common.Address]state.Account),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
@ -269,15 +281,32 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
// transactions that enter the transaction pool.
func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: PendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
stateDiffs: make(chan map[common.Address]state.Account),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
// SubscribeStateDiffs
func (es *EventSystem) SubscribeStateDiffs(diffs chan map[common.Address]state.Account) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: StateDiffsSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
stateDiffs: diffs,
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
@ -286,14 +315,15 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan
// imported in the chain.
func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: BlocksSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: headers,
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: BlocksSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: headers,
stateDiffs: make(chan map[common.Address]state.Account),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
@ -302,14 +332,15 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
// transactions that enter the transaction pool.
func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: hashes,
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: hashes,
headers: make(chan *types.Header),
stateDiffs: make(chan map[common.Address]state.Account),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
@ -368,6 +399,10 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
}
elizabethengelman commented 2019-09-12 14:43:08 +00:00 (Migrated from github.com)
Review

How come we are adding stateDiffs to the subscribeLogs? Is it because it's in the subscription struct, but it can't be nil so we're adding an empty map?

How come we are adding stateDiffs to the subscribeLogs? Is it because it's in the subscription struct, but it can't be nil so we're adding an empty map?
rmulhol commented 2019-09-12 16:22:06 +00:00 (Migrated from github.com)
Review

Yeah just following the pattern here, seems like every subscription creates empty channels for all the things it's not subscribing to.

Yeah just following the pattern here, seems like every subscription creates empty channels for all the things it's not subscribing to.
})
}
case map[common.Address]state.Account:
for _, f := range filters[StateDiffsSubscription] {
f.stateDiffs <- e
}
}
}
@ -453,6 +488,7 @@ func (es *EventSystem) eventLoop() {
es.logsSub.Unsubscribe()
es.rmLogsSub.Unsubscribe()
es.chainSub.Unsubscribe()
es.stateDiffsSub.Unsubscribe()
}()
index := make(filterIndex)
@ -471,6 +507,8 @@ func (es *EventSystem) eventLoop() {
es.broadcast(index, ev)
case ev := <-es.chainCh:
es.broadcast(index, ev)
case ev := <-es.stateDiffCh:
es.broadcast(index, ev)
case ev, active := <-es.pendingLogSub.Chan():
if !active { // system stopped
return
@ -506,6 +544,8 @@ func (es *EventSystem) eventLoop() {
return
case <-es.chainSub.Err():
return
case <-es.stateDiffsSub.Err():
return
}
}
}

View File

@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@ -39,13 +40,14 @@ import (
)
type testBackend struct {
mux *event.TypeMux
db ethdb.Database
sections uint64
txFeed *event.Feed
rmLogsFeed *event.Feed
logsFeed *event.Feed
chainFeed *event.Feed
mux *event.TypeMux
db ethdb.Database
sections uint64
txFeed *event.Feed
rmLogsFeed *event.Feed
logsFeed *event.Feed
chainFeed *event.Feed
stateDiffsFeed *event.Feed
}
func (b *testBackend) ChainDb() ethdb.Database {
@ -120,6 +122,10 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc
return b.chainFeed.Subscribe(ch)
}
func (b *testBackend) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription {
return b.stateDiffsFeed.Subscribe(ch)
}
func (b *testBackend) BloomStatus() (uint64, uint64) {
return params.BloomBitsBlocks, b.sections
}
@ -160,17 +166,18 @@ func TestBlockSubscription(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
genesis = new(core.Genesis).MustCommit(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {})
chainEvents = []core.ChainEvent{}
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
stateDiffsFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
api = NewPublicFilterAPI(backend, false)
genesis = new(core.Genesis).MustCommit(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {})
chainEvents = []core.ChainEvent{}
)
for _, blk := range chain {
@ -217,14 +224,15 @@ func TestPendingTxFilter(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
stateDiffsFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
api = NewPublicFilterAPI(backend, false)
transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
@ -277,14 +285,15 @@ func TestPendingTxFilter(t *testing.T) {
// If not it must return an error.
func TestLogFilterCreation(t *testing.T) {
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
stateDiffsFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
api = NewPublicFilterAPI(backend, false)
testCases = []struct {
crit FilterCriteria
@ -326,14 +335,15 @@ func TestInvalidLogFilterCreation(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
stateDiffsFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
api = NewPublicFilterAPI(backend, false)
)
// different situations where log filter creation should fail.
@ -353,15 +363,16 @@ func TestInvalidLogFilterCreation(t *testing.T) {
func TestInvalidGetLogsRequest(t *testing.T) {
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
stateDiffsFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
api = NewPublicFilterAPI(backend, false)
blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
)
// Reason: Cannot specify both BlockHash and FromBlock/ToBlock)
@ -383,14 +394,15 @@ func TestLogFilter(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
stateDiffsFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@ -502,14 +514,15 @@ func TestPendingLogsSubscription(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
mux = new(event.TypeMux)
db = rawdb.NewMemoryDatabase()
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
stateDiffsFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")

View File

@ -50,18 +50,19 @@ func BenchmarkFilters(b *testing.B) {
defer os.RemoveAll(dir)
var (
db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "")
mux = new(event.TypeMux)
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = common.BytesToAddress([]byte("jeff"))
addr3 = common.BytesToAddress([]byte("ethereum"))
addr4 = common.BytesToAddress([]byte("random addresses please"))
db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "")
mux = new(event.TypeMux)
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
stateDiffsFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = common.BytesToAddress([]byte("jeff"))
addr3 = common.BytesToAddress([]byte("ethereum"))
addr4 = common.BytesToAddress([]byte("random addresses please"))
)
defer db.Close()
@ -109,15 +110,16 @@ func TestFilters(t *testing.T) {
defer os.RemoveAll(dir)
var (
db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "")
mux = new(event.TypeMux)
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr = crypto.PubkeyToAddress(key1.PublicKey)
db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "")
mux = new(event.TypeMux)
txFeed = new(event.Feed)
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
stateDiffsFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr = crypto.PubkeyToAddress(key1.PublicKey)
hash1 = common.BytesToHash([]byte("topic1"))
hash2 = common.BytesToHash([]byte("topic2"))

View File

@ -78,6 +78,7 @@ type Backend interface {
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription
ChainConfig() *params.ChainConfig
CurrentBlock() *types.Block

View File

@ -172,6 +172,10 @@ func (b *LesApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEven
return b.eth.blockchain.SubscribeRemovedLogsEvent(ch)
}
func (b *LesApiBackend) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription {
return b.eth.blockchain.SubscribeStateDiffs(ch)
}
func (b *LesApiBackend) Downloader() *downloader.Downloader {
return b.eth.Downloader()
}

View File

@ -531,6 +531,12 @@ func (lc *LightChain) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent)
return lc.scope.Track(new(event.Feed).Subscribe(ch))
}
// SubscribeStateDiffs implements the interface of filters.Backend
// LightChain does not sen state diffs, so return an empty subscription.
func (lc *LightChain) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription {
return lc.scope.Track(new(event.Feed).Subscribe(ch))
}
// DisableCheckFreq disables header validation. This is used for ultralight mode.
func (lc *LightChain) DisableCheckFreq() {
atomic.StoreInt32(&lc.disableCheckFreq, 1)

View File

@ -589,7 +589,7 @@ func (w *worker) resultLoop() {
logs = append(logs, receipt.Logs...)
}
// Commit block and state to database.
stat, err := w.chain.WriteBlockWithState(block, receipts, task.state)
stat, stateDiffs, err := w.chain.WriteBlockWithState(block, receipts, task.state)
if err != nil {
log.Error("Failed writing block to chain", "err", err)
continue
@ -608,7 +608,7 @@ func (w *worker) resultLoop() {
case core.SideStatTy:
events = append(events, core.ChainSideEvent{Block: block})
}
w.chain.PostChainEvents(events, logs)
w.chain.PostChainEvents(events, logs, stateDiffs)
// Insert the block into the set of pending ones to resultLoop for confirmations
w.unconfirmed.Insert(block.NumberU64(), block.Hash())

View File

@ -134,7 +134,7 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine
func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain }
func (b *testWorkerBackend) TxPool() *core.TxPool { return b.txPool }
func (b *testWorkerBackend) PostChainEvents(events []interface{}) {
b.chain.PostChainEvents(events, nil)
b.chain.PostChainEvents(events, nil, nil)
}
func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, blocks int) (*worker, *testWorkerBackend) {

View File

@ -216,7 +216,7 @@ func MakePreState(db ethdb.Database, accounts core.GenesisAlloc) *state.StateDB
}
}
// Commit and re-open to start with a clean state.
root, _ := statedb.Commit(false)
root, _, _ := statedb.Commit(false)
statedb, _ = state.New(root, sdb)
return statedb
}