WIP: add rpc pubsub interface for account diffs #11
@ -517,6 +517,10 @@ func (fb *filterBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscr
|
|||||||
return fb.bc.SubscribeLogsEvent(ch)
|
return fb.bc.SubscribeLogsEvent(ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fb *filterBackend) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription {
|
||||||
|
|||||||
|
return fb.bc.SubscribeStateDiffEvent(ch)
|
||||||
|
}
|
||||||
|
|
||||||
func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 }
|
func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 }
|
||||||
func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) {
|
func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) {
|
||||||
panic("not supported")
|
panic("not supported")
|
||||||
|
@ -665,7 +665,7 @@ func (api *RetestethAPI) AccountRange(ctx context.Context,
|
|||||||
root = statedb.IntermediateRoot(vmenv.ChainConfig().IsEIP158(block.Number()))
|
root = statedb.IntermediateRoot(vmenv.ChainConfig().IsEIP158(block.Number()))
|
||||||
if idx == int(txIndex) {
|
if idx == int(txIndex) {
|
||||||
// This is to make sure root can be opened by OpenTrie
|
// This is to make sure root can be opened by OpenTrie
|
||||||
root, err = statedb.Commit(api.chainConfig.IsEIP158(block.Number()))
|
root, _, err = statedb.Commit(api.chainConfig.IsEIP158(block.Number()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return AccountRangeResult{}, err
|
return AccountRangeResult{}, err
|
||||||
}
|
}
|
||||||
@ -778,7 +778,7 @@ func (api *RetestethAPI) StorageRangeAt(ctx context.Context,
|
|||||||
_ = statedb.IntermediateRoot(vmenv.ChainConfig().IsEIP158(block.Number()))
|
_ = statedb.IntermediateRoot(vmenv.ChainConfig().IsEIP158(block.Number()))
|
||||||
if idx == int(txIndex) {
|
if idx == int(txIndex) {
|
||||||
// This is to make sure root can be opened by OpenTrie
|
// This is to make sure root can be opened by OpenTrie
|
||||||
_, err = statedb.Commit(vmenv.ChainConfig().IsEIP158(block.Number()))
|
_, _, err = statedb.Commit(vmenv.ChainConfig().IsEIP158(block.Number()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return StorageRangeResult{}, err
|
return StorageRangeResult{}, err
|
||||||
}
|
}
|
||||||
|
@ -145,6 +145,7 @@ type BlockChain struct {
|
|||||||
chainHeadFeed event.Feed
|
chainHeadFeed event.Feed
|
||||||
logsFeed event.Feed
|
logsFeed event.Feed
|
||||||
blockProcFeed event.Feed
|
blockProcFeed event.Feed
|
||||||
|
stateDiffsFeed event.Feed
|
||||||
scope event.SubscriptionScope
|
scope event.SubscriptionScope
|
||||||
genesisBlock *types.Block
|
genesisBlock *types.Block
|
||||||
|
|
||||||
@ -1252,7 +1253,7 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// WriteBlockWithState writes the block and all associated state to the database.
|
// WriteBlockWithState writes the block and all associated state to the database.
|
||||||
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
|
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, stateDiffs map[common.Address]state.Account, err error) {
|
||||||
bc.chainmu.Lock()
|
bc.chainmu.Lock()
|
||||||
defer bc.chainmu.Unlock()
|
defer bc.chainmu.Unlock()
|
||||||
|
|
||||||
@ -1261,14 +1262,14 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
|
|||||||
|
|
||||||
// writeBlockWithState writes the block and all associated state to the database,
|
// writeBlockWithState writes the block and all associated state to the database,
|
||||||
// but is expects the chain mutex to be held.
|
// but is expects the chain mutex to be held.
|
||||||
func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
|
func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, stateDiffs map[common.Address]state.Account, err error) {
|
||||||
bc.wg.Add(1)
|
bc.wg.Add(1)
|
||||||
defer bc.wg.Done()
|
defer bc.wg.Done()
|
||||||
|
|
||||||
// Calculate the total difficulty of the block
|
// Calculate the total difficulty of the block
|
||||||
ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
|
ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
|
||||||
if ptd == nil {
|
if ptd == nil {
|
||||||
return NonStatTy, consensus.ErrUnknownAncestor
|
return NonStatTy, stateDiffs, consensus.ErrUnknownAncestor
|
||||||
}
|
}
|
||||||
// Make sure no inconsistent state is leaked during insertion
|
// Make sure no inconsistent state is leaked during insertion
|
||||||
currentBlock := bc.CurrentBlock()
|
currentBlock := bc.CurrentBlock()
|
||||||
@ -1277,20 +1278,20 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
|
|||||||
|
|
||||||
// Irrelevant of the canonical status, write the block itself to the database
|
// Irrelevant of the canonical status, write the block itself to the database
|
||||||
if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil {
|
if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil {
|
||||||
return NonStatTy, err
|
return NonStatTy, stateDiffs, err
|
||||||
}
|
}
|
||||||
rawdb.WriteBlock(bc.db, block)
|
rawdb.WriteBlock(bc.db, block)
|
||||||
|
|
||||||
root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
|
root, modifiedAccounts, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return NonStatTy, err
|
return NonStatTy, stateDiffs, err
|
||||||
}
|
}
|
||||||
triedb := bc.stateCache.TrieDB()
|
triedb := bc.stateCache.TrieDB()
|
||||||
|
|
||||||
// If we're running an archive node, always flush
|
// If we're running an archive node, always flush
|
||||||
if bc.cacheConfig.TrieDirtyDisabled {
|
if bc.cacheConfig.TrieDirtyDisabled {
|
||||||
if err := triedb.Commit(root, false); err != nil {
|
if err := triedb.Commit(root, false); err != nil {
|
||||||
return NonStatTy, err
|
return NonStatTy, modifiedAccounts, err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Full but not archive node, do proper garbage collection
|
// Full but not archive node, do proper garbage collection
|
||||||
@ -1366,7 +1367,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
|
|||||||
// Reorganise the chain if the parent is not the head block
|
// Reorganise the chain if the parent is not the head block
|
||||||
if block.ParentHash() != currentBlock.Hash() {
|
if block.ParentHash() != currentBlock.Hash() {
|
||||||
if err := bc.reorg(currentBlock, block); err != nil {
|
if err := bc.reorg(currentBlock, block); err != nil {
|
||||||
return NonStatTy, err
|
return NonStatTy, modifiedAccounts, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Write the positional metadata for transaction/receipt lookups and preimages
|
// Write the positional metadata for transaction/receipt lookups and preimages
|
||||||
@ -1378,7 +1379,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
|
|||||||
status = SideStatTy
|
status = SideStatTy
|
||||||
}
|
}
|
||||||
if err := batch.Write(); err != nil {
|
if err := batch.Write(); err != nil {
|
||||||
return NonStatTy, err
|
return NonStatTy, modifiedAccounts, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set new head.
|
// Set new head.
|
||||||
@ -1386,7 +1387,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
|
|||||||
bc.insert(block)
|
bc.insert(block)
|
||||||
}
|
}
|
||||||
bc.futureBlocks.Remove(block.Hash())
|
bc.futureBlocks.Remove(block.Hash())
|
||||||
return status, nil
|
return status, modifiedAccounts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// addFutureBlock checks if the block is within the max allowed window to get
|
// addFutureBlock checks if the block is within the max allowed window to get
|
||||||
@ -1436,11 +1437,11 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
|
|||||||
// Pre-checks passed, start the full block imports
|
// Pre-checks passed, start the full block imports
|
||||||
bc.wg.Add(1)
|
bc.wg.Add(1)
|
||||||
bc.chainmu.Lock()
|
bc.chainmu.Lock()
|
||||||
n, events, logs, err := bc.insertChain(chain, true)
|
n, events, logs, stateDiffs, err := bc.insertChain(chain, true)
|
||||||
bc.chainmu.Unlock()
|
bc.chainmu.Unlock()
|
||||||
bc.wg.Done()
|
bc.wg.Done()
|
||||||
|
|
||||||
bc.PostChainEvents(events, logs)
|
bc.PostChainEvents(events, logs, stateDiffs)
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1452,10 +1453,10 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
|
|||||||
// racey behaviour. If a sidechain import is in progress, and the historic state
|
// racey behaviour. If a sidechain import is in progress, and the historic state
|
||||||
// is imported, but then new canon-head is added before the actual sidechain
|
// is imported, but then new canon-head is added before the actual sidechain
|
||||||
// completes, then the historic state could be pruned again
|
// completes, then the historic state could be pruned again
|
||||||
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
|
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, map[common.Address]state.Account, error) {
|
||||||
// If the chain is terminating, don't even bother starting up
|
// If the chain is terminating, don't even bother starting up
|
||||||
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
|
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
|
||||||
return 0, nil, nil, nil
|
return 0, nil, nil, nil, nil
|
||||||
}
|
}
|
||||||
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
|
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
|
||||||
senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)
|
senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)
|
||||||
@ -1468,6 +1469,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
|
|||||||
events = make([]interface{}, 0, len(chain))
|
events = make([]interface{}, 0, len(chain))
|
||||||
lastCanon *types.Block
|
lastCanon *types.Block
|
||||||
coalescedLogs []*types.Log
|
coalescedLogs []*types.Log
|
||||||
|
stateDiffs map[common.Address]state.Account
|
||||||
)
|
)
|
||||||
// Start the parallel header verifier
|
// Start the parallel header verifier
|
||||||
headers := make([]*types.Header, len(chain))
|
headers := make([]*types.Header, len(chain))
|
||||||
@ -1518,7 +1520,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
|
|||||||
for block != nil && err == ErrKnownBlock {
|
for block != nil && err == ErrKnownBlock {
|
||||||
log.Debug("Writing previously known block", "number", block.Number(), "hash", block.Hash())
|
log.Debug("Writing previously known block", "number", block.Number(), "hash", block.Hash())
|
||||||
if err := bc.writeKnownBlock(block); err != nil {
|
if err := bc.writeKnownBlock(block); err != nil {
|
||||||
return it.index, nil, nil, err
|
return it.index, nil, nil, nil, err
|
||||||
}
|
}
|
||||||
lastCanon = block
|
lastCanon = block
|
||||||
|
|
||||||
@ -1537,7 +1539,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
|
|||||||
for block != nil && (it.index == 0 || err == consensus.ErrUnknownAncestor) {
|
for block != nil && (it.index == 0 || err == consensus.ErrUnknownAncestor) {
|
||||||
log.Debug("Future block, postponing import", "number", block.Number(), "hash", block.Hash())
|
log.Debug("Future block, postponing import", "number", block.Number(), "hash", block.Hash())
|
||||||
if err := bc.addFutureBlock(block); err != nil {
|
if err := bc.addFutureBlock(block); err != nil {
|
||||||
return it.index, events, coalescedLogs, err
|
return it.index, events, coalescedLogs, stateDiffs, err
|
||||||
}
|
}
|
||||||
block, err = it.next()
|
block, err = it.next()
|
||||||
}
|
}
|
||||||
@ -1545,13 +1547,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
|
|||||||
stats.ignored += it.remaining()
|
stats.ignored += it.remaining()
|
||||||
|
|
||||||
// If there are any still remaining, mark as ignored
|
// If there are any still remaining, mark as ignored
|
||||||
return it.index, events, coalescedLogs, err
|
return it.index, events, coalescedLogs, stateDiffs, err
|
||||||
|
|
||||||
// Some other error occurred, abort
|
// Some other error occurred, abort
|
||||||
case err != nil:
|
case err != nil:
|
||||||
stats.ignored += len(it.chain)
|
stats.ignored += len(it.chain)
|
||||||
bc.reportBlock(block, nil, err)
|
bc.reportBlock(block, nil, err)
|
||||||
return it.index, events, coalescedLogs, err
|
return it.index, events, coalescedLogs, stateDiffs, err
|
||||||
}
|
}
|
||||||
// No validation errors for the first block (or chain prefix skipped)
|
// No validation errors for the first block (or chain prefix skipped)
|
||||||
for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() {
|
for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() {
|
||||||
@ -1563,7 +1565,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
|
|||||||
// If the header is a banned one, straight out abort
|
// If the header is a banned one, straight out abort
|
||||||
if BadHashes[block.Hash()] {
|
if BadHashes[block.Hash()] {
|
||||||
bc.reportBlock(block, nil, ErrBlacklistedHash)
|
bc.reportBlock(block, nil, ErrBlacklistedHash)
|
||||||
return it.index, events, coalescedLogs, ErrBlacklistedHash
|
return it.index, events, coalescedLogs, stateDiffs, ErrBlacklistedHash
|
||||||
}
|
}
|
||||||
// If the block is known (in the middle of the chain), it's a special case for
|
// If the block is known (in the middle of the chain), it's a special case for
|
||||||
// Clique blocks where they can share state among each other, so importing an
|
// Clique blocks where they can share state among each other, so importing an
|
||||||
@ -1580,7 +1582,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
|
|||||||
"root", block.Root())
|
"root", block.Root())
|
||||||
|
|
||||||
if err := bc.writeKnownBlock(block); err != nil {
|
if err := bc.writeKnownBlock(block); err != nil {
|
||||||
return it.index, nil, nil, err
|
return it.index, nil, nil, stateDiffs, err
|
||||||
}
|
}
|
||||||
stats.processed++
|
stats.processed++
|
||||||
|
|
||||||
@ -1600,7 +1602,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
|
|||||||
}
|
}
|
||||||
statedb, err := state.New(parent.Root, bc.stateCache)
|
statedb, err := state.New(parent.Root, bc.stateCache)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return it.index, events, coalescedLogs, err
|
return it.index, events, coalescedLogs, stateDiffs, err
|
||||||
}
|
}
|
||||||
// If we have a followup block, run that against the current state to pre-cache
|
// If we have a followup block, run that against the current state to pre-cache
|
||||||
// transactions and probabilistically some of the account/storage trie nodes.
|
// transactions and probabilistically some of the account/storage trie nodes.
|
||||||
@ -1625,7 +1627,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
bc.reportBlock(block, receipts, err)
|
bc.reportBlock(block, receipts, err)
|
||||||
atomic.StoreUint32(&followupInterrupt, 1)
|
atomic.StoreUint32(&followupInterrupt, 1)
|
||||||
return it.index, events, coalescedLogs, err
|
return it.index, events, coalescedLogs, stateDiffs, err
|
||||||
}
|
}
|
||||||
// Update the metrics touched during block processing
|
// Update the metrics touched during block processing
|
||||||
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
|
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
|
||||||
@ -1644,7 +1646,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
|
|||||||
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
|
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
|
||||||
bc.reportBlock(block, receipts, err)
|
bc.reportBlock(block, receipts, err)
|
||||||
atomic.StoreUint32(&followupInterrupt, 1)
|
atomic.StoreUint32(&followupInterrupt, 1)
|
||||||
return it.index, events, coalescedLogs, err
|
return it.index, events, coalescedLogs, stateDiffs, err
|
||||||
}
|
}
|
||||||
proctime := time.Since(start)
|
proctime := time.Since(start)
|
||||||
|
|
||||||
@ -1656,10 +1658,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
|
|||||||
|
|
||||||
// Write the block to the chain and get the status.
|
// Write the block to the chain and get the status.
|
||||||
substart = time.Now()
|
substart = time.Now()
|
||||||
status, err := bc.writeBlockWithState(block, receipts, statedb)
|
status, committedStateDiffs, err := bc.writeBlockWithState(block, receipts, statedb)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
atomic.StoreUint32(&followupInterrupt, 1)
|
atomic.StoreUint32(&followupInterrupt, 1)
|
||||||
return it.index, events, coalescedLogs, err
|
return it.index, events, coalescedLogs, stateDiffs, err
|
||||||
}
|
}
|
||||||
atomic.StoreUint32(&followupInterrupt, 1)
|
atomic.StoreUint32(&followupInterrupt, 1)
|
||||||
|
|
||||||
@ -1678,6 +1680,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
|
|||||||
"root", block.Root())
|
"root", block.Root())
|
||||||
|
|
||||||
coalescedLogs = append(coalescedLogs, logs...)
|
coalescedLogs = append(coalescedLogs, logs...)
|
||||||
|
stateDiffs = committedStateDiffs
|
||||||
events = append(events, ChainEvent{block, block.Hash(), logs})
|
events = append(events, ChainEvent{block, block.Hash(), logs})
|
||||||
lastCanon = block
|
lastCanon = block
|
||||||
|
|
||||||
@ -1708,13 +1711,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
|
|||||||
// Any blocks remaining here? The only ones we care about are the future ones
|
// Any blocks remaining here? The only ones we care about are the future ones
|
||||||
if block != nil && err == consensus.ErrFutureBlock {
|
if block != nil && err == consensus.ErrFutureBlock {
|
||||||
if err := bc.addFutureBlock(block); err != nil {
|
if err := bc.addFutureBlock(block); err != nil {
|
||||||
return it.index, events, coalescedLogs, err
|
return it.index, events, coalescedLogs, stateDiffs, err
|
||||||
}
|
}
|
||||||
block, err = it.next()
|
block, err = it.next()
|
||||||
|
|
||||||
for ; block != nil && err == consensus.ErrUnknownAncestor; block, err = it.next() {
|
for ; block != nil && err == consensus.ErrUnknownAncestor; block, err = it.next() {
|
||||||
if err := bc.addFutureBlock(block); err != nil {
|
if err := bc.addFutureBlock(block); err != nil {
|
||||||
return it.index, events, coalescedLogs, err
|
return it.index, events, coalescedLogs, stateDiffs, err
|
||||||
}
|
}
|
||||||
stats.queued++
|
stats.queued++
|
||||||
}
|
}
|
||||||
@ -1725,7 +1728,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
|
|||||||
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
|
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
|
||||||
events = append(events, ChainHeadEvent{lastCanon})
|
events = append(events, ChainHeadEvent{lastCanon})
|
||||||
}
|
}
|
||||||
return it.index, events, coalescedLogs, err
|
return it.index, events, coalescedLogs, stateDiffs, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// insertSideChain is called when an import batch hits upon a pruned ancestor
|
// insertSideChain is called when an import batch hits upon a pruned ancestor
|
||||||
@ -1734,7 +1737,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
|
|||||||
//
|
//
|
||||||
// The method writes all (header-and-body-valid) blocks to disk, then tries to
|
// The method writes all (header-and-body-valid) blocks to disk, then tries to
|
||||||
// switch over to the new chain if the TD exceeded the current chain.
|
// switch over to the new chain if the TD exceeded the current chain.
|
||||||
func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, []interface{}, []*types.Log, error) {
|
func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, []interface{}, []*types.Log, map[common.Address]state.Account, error) {
|
||||||
var (
|
var (
|
||||||
externTd *big.Int
|
externTd *big.Int
|
||||||
current = bc.CurrentBlock()
|
current = bc.CurrentBlock()
|
||||||
@ -1770,7 +1773,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
|
|||||||
// If someone legitimately side-mines blocks, they would still be imported as usual. However,
|
// If someone legitimately side-mines blocks, they would still be imported as usual. However,
|
||||||
// we cannot risk writing unverified blocks to disk when they obviously target the pruning
|
// we cannot risk writing unverified blocks to disk when they obviously target the pruning
|
||||||
// mechanism.
|
// mechanism.
|
||||||
return it.index, nil, nil, errors.New("sidechain ghost-state attack")
|
return it.index, nil, nil, nil, errors.New("sidechain ghost-state attack")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if externTd == nil {
|
if externTd == nil {
|
||||||
@ -1781,7 +1784,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
|
|||||||
if !bc.HasBlock(block.Hash(), block.NumberU64()) {
|
if !bc.HasBlock(block.Hash(), block.NumberU64()) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
if err := bc.writeBlockWithoutState(block, externTd); err != nil {
|
if err := bc.writeBlockWithoutState(block, externTd); err != nil {
|
||||||
return it.index, nil, nil, err
|
return it.index, nil, nil, nil, err
|
||||||
}
|
}
|
||||||
log.Debug("Injected sidechain block", "number", block.Number(), "hash", block.Hash(),
|
log.Debug("Injected sidechain block", "number", block.Number(), "hash", block.Hash(),
|
||||||
"diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
|
"diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
|
||||||
@ -1798,7 +1801,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
|
|||||||
localTd := bc.GetTd(current.Hash(), current.NumberU64())
|
localTd := bc.GetTd(current.Hash(), current.NumberU64())
|
||||||
if localTd.Cmp(externTd) > 0 {
|
if localTd.Cmp(externTd) > 0 {
|
||||||
log.Info("Sidechain written to disk", "start", it.first().NumberU64(), "end", it.previous().Number, "sidetd", externTd, "localtd", localTd)
|
log.Info("Sidechain written to disk", "start", it.first().NumberU64(), "end", it.previous().Number, "sidetd", externTd, "localtd", localTd)
|
||||||
return it.index, nil, nil, err
|
return it.index, nil, nil, nil, err
|
||||||
}
|
}
|
||||||
// Gather all the sidechain hashes (full blocks may be memory heavy)
|
// Gather all the sidechain hashes (full blocks may be memory heavy)
|
||||||
var (
|
var (
|
||||||
@ -1813,7 +1816,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
|
|||||||
parent = bc.GetHeader(parent.ParentHash, parent.Number.Uint64()-1)
|
parent = bc.GetHeader(parent.ParentHash, parent.Number.Uint64()-1)
|
||||||
}
|
}
|
||||||
if parent == nil {
|
if parent == nil {
|
||||||
return it.index, nil, nil, errors.New("missing parent")
|
return it.index, nil, nil, nil, errors.New("missing parent")
|
||||||
}
|
}
|
||||||
// Import all the pruned blocks to make the state available
|
// Import all the pruned blocks to make the state available
|
||||||
var (
|
var (
|
||||||
@ -1832,15 +1835,15 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
|
|||||||
// memory here.
|
// memory here.
|
||||||
if len(blocks) >= 2048 || memory > 64*1024*1024 {
|
if len(blocks) >= 2048 || memory > 64*1024*1024 {
|
||||||
log.Info("Importing heavy sidechain segment", "blocks", len(blocks), "start", blocks[0].NumberU64(), "end", block.NumberU64())
|
log.Info("Importing heavy sidechain segment", "blocks", len(blocks), "start", blocks[0].NumberU64(), "end", block.NumberU64())
|
||||||
if _, _, _, err := bc.insertChain(blocks, false); err != nil {
|
if _, _, _, _, err := bc.insertChain(blocks, false); err != nil {
|
||||||
return 0, nil, nil, err
|
return 0, nil, nil, nil, err
|
||||||
}
|
}
|
||||||
blocks, memory = blocks[:0], 0
|
blocks, memory = blocks[:0], 0
|
||||||
|
|
||||||
// If the chain is terminating, stop processing blocks
|
// If the chain is terminating, stop processing blocks
|
||||||
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
|
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
|
||||||
log.Debug("Premature abort during blocks processing")
|
log.Debug("Premature abort during blocks processing")
|
||||||
return 0, nil, nil, nil
|
return 0, nil, nil, nil, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1848,7 +1851,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
|
|||||||
log.Info("Importing sidechain segment", "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64())
|
log.Info("Importing sidechain segment", "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64())
|
||||||
return bc.insertChain(blocks, false)
|
return bc.insertChain(blocks, false)
|
||||||
}
|
}
|
||||||
return 0, nil, nil, nil
|
return 0, nil, nil, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// reorg takes two blocks, an old chain and a new chain and will reconstruct the
|
// reorg takes two blocks, an old chain and a new chain and will reconstruct the
|
||||||
@ -2003,11 +2006,15 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
|
|||||||
// PostChainEvents iterates over the events generated by a chain insertion and
|
// PostChainEvents iterates over the events generated by a chain insertion and
|
||||||
// posts them into the event feed.
|
// posts them into the event feed.
|
||||||
// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
|
// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
|
||||||
func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
|
func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log, stateDiffs map[common.Address]state.Account) {
|
||||||
// post event logs for further processing
|
// post event logs for further processing
|
||||||
if logs != nil {
|
if logs != nil {
|
||||||
bc.logsFeed.Send(logs)
|
bc.logsFeed.Send(logs)
|
||||||
}
|
}
|
||||||
|
// post state diffs for further processing
|
||||||
|
if stateDiffs != nil {
|
||||||
|
bc.stateDiffsFeed.Send(stateDiffs)
|
||||||
|
}
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
switch ev := event.(type) {
|
switch ev := event.(type) {
|
||||||
case ChainEvent:
|
case ChainEvent:
|
||||||
@ -2215,3 +2222,7 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript
|
|||||||
func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription {
|
func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription {
|
||||||
return bc.scope.Track(bc.blockProcFeed.Subscribe(ch))
|
return bc.scope.Track(bc.blockProcFeed.Subscribe(ch))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bc *BlockChain) SubscribeStateDiffEvent(ch chan<- map[common.Address]state.Account) event.Subscription {
|
||||||
|
return bc.scope.Track(bc.stateDiffsFeed.Subscribe(ch))
|
||||||
|
}
|
||||||
|
@ -216,7 +216,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse
|
|||||||
block, _ := b.engine.FinalizeAndAssemble(chainreader, b.header, statedb, b.txs, b.uncles, b.receipts)
|
block, _ := b.engine.FinalizeAndAssemble(chainreader, b.header, statedb, b.txs, b.uncles, b.receipts)
|
||||||
|
|
||||||
// Write state changes to db
|
// Write state changes to db
|
||||||
root, err := statedb.Commit(config.IsEIP158(b.header.Number))
|
root, _, err := statedb.Commit(config.IsEIP158(b.header.Number))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Sprintf("state write error: %v", err))
|
panic(fmt.Sprintf("state write error: %v", err))
|
||||||
}
|
}
|
||||||
|
@ -158,7 +158,7 @@ func TestSnapshot2(t *testing.T) {
|
|||||||
so0.deleted = false
|
so0.deleted = false
|
||||||
state.setStateObject(so0)
|
state.setStateObject(so0)
|
||||||
|
|
||||||
root, _ := state.Commit(false)
|
root, _, _ := state.Commit(false)
|
||||||
state.Reset(root)
|
state.Reset(root)
|
||||||
|
|
||||||
// and one with deleted == true
|
// and one with deleted == true
|
||||||
|
@ -686,9 +686,10 @@ func (s *StateDB) clearJournalAndRefund() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Commit writes the state to the underlying in-memory trie database.
|
// Commit writes the state to the underlying in-memory trie database.
|
||||||
func (s *StateDB) Commit(deleteEmptyObjects bool) (root common.Hash, err error) {
|
func (s *StateDB) Commit(deleteEmptyObjects bool) (root common.Hash, modifiedAccounts map[common.Address]Account, err error) {
|
||||||
defer s.clearJournalAndRefund()
|
defer s.clearJournalAndRefund()
|
||||||
|
|
||||||
|
modifiedAccounts = make(map[common.Address]Account)
|
||||||
for addr := range s.journal.dirties {
|
for addr := range s.journal.dirties {
|
||||||
s.stateObjectsDirty[addr] = struct{}{}
|
s.stateObjectsDirty[addr] = struct{}{}
|
||||||
}
|
}
|
||||||
@ -699,6 +700,7 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (root common.Hash, err error)
|
|||||||
case stateObject.suicided || (isDirty && deleteEmptyObjects && stateObject.empty()):
|
case stateObject.suicided || (isDirty && deleteEmptyObjects && stateObject.empty()):
|
||||||
// If the object has been removed, don't bother syncing it
|
// If the object has been removed, don't bother syncing it
|
||||||
// and just mark it for deletion in the trie.
|
// and just mark it for deletion in the trie.
|
||||||
|
modifiedAccounts[addr] = stateObject.data
|
||||||
s.deleteStateObject(stateObject)
|
s.deleteStateObject(stateObject)
|
||||||
case isDirty:
|
case isDirty:
|
||||||
// Write any contract code associated with the state object
|
// Write any contract code associated with the state object
|
||||||
@ -708,8 +710,9 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (root common.Hash, err error)
|
|||||||
}
|
}
|
||||||
// Write any storage changes in the state object to its storage trie.
|
// Write any storage changes in the state object to its storage trie.
|
||||||
if err := stateObject.CommitTrie(s.db); err != nil {
|
if err := stateObject.CommitTrie(s.db); err != nil {
|
||||||
return common.Hash{}, err
|
return common.Hash{}, nil, err
|
||||||
}
|
}
|
||||||
|
modifiedAccounts[addr] = stateObject.data
|
||||||
// Update the object in the main account trie.
|
// Update the object in the main account trie.
|
||||||
s.updateStateObject(stateObject)
|
s.updateStateObject(stateObject)
|
||||||
}
|
}
|
||||||
@ -733,5 +736,5 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (root common.Hash, err error)
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
return root, err
|
return root, modifiedAccounts, err
|
||||||
}
|
}
|
||||||
|
@ -98,10 +98,10 @@ func TestIntermediateLeaks(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Commit and cross check the databases.
|
// Commit and cross check the databases.
|
||||||
if _, err := transState.Commit(false); err != nil {
|
if _, _, err := transState.Commit(false); err != nil {
|
||||||
t.Fatalf("failed to commit transition state: %v", err)
|
t.Fatalf("failed to commit transition state: %v", err)
|
||||||
}
|
}
|
||||||
if _, err := finalState.Commit(false); err != nil {
|
if _, _, err := finalState.Commit(false); err != nil {
|
||||||
t.Fatalf("failed to commit final state: %v", err)
|
t.Fatalf("failed to commit final state: %v", err)
|
||||||
}
|
}
|
||||||
it := finalDb.NewIterator()
|
it := finalDb.NewIterator()
|
||||||
@ -420,7 +420,7 @@ func (test *snapshotTest) checkEqual(state, checkstate *StateDB) error {
|
|||||||
|
|
||||||
func (s *StateSuite) TestTouchDelete(c *check.C) {
|
func (s *StateSuite) TestTouchDelete(c *check.C) {
|
||||||
s.state.GetOrNewStateObject(common.Address{})
|
s.state.GetOrNewStateObject(common.Address{})
|
||||||
root, _ := s.state.Commit(false)
|
root, _, _ := s.state.Commit(false)
|
||||||
s.state.Reset(root)
|
s.state.Reset(root)
|
||||||
|
|
||||||
snapshot := s.state.Snapshot()
|
snapshot := s.state.Snapshot()
|
||||||
|
@ -62,7 +62,7 @@ func makeTestState() (Database, common.Hash, []*testAccount) {
|
|||||||
state.updateStateObject(obj)
|
state.updateStateObject(obj)
|
||||||
accounts = append(accounts, acc)
|
accounts = append(accounts, acc)
|
||||||
}
|
}
|
||||||
root, _ := state.Commit(false)
|
root, _, _ := state.Commit(false)
|
||||||
|
|
||||||
// Return the generated state
|
// Return the generated state
|
||||||
return db, root, accounts
|
return db, root, accounts
|
||||||
|
@ -159,6 +159,10 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri
|
|||||||
return b.eth.BlockChain().SubscribeLogsEvent(ch)
|
return b.eth.BlockChain().SubscribeLogsEvent(ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *EthAPIBackend) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription {
|
||||||
|
return b.eth.BlockChain().SubscribeStateDiffEvent(ch)
|
||||||
|
}
|
||||||
|
|
||||||
func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
|
func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
|
||||||
return b.eth.txPool.AddLocal(signedTx)
|
return b.eth.txPool.AddLocal(signedTx)
|
||||||
}
|
}
|
||||||
|
@ -295,7 +295,7 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
// Finalize the state so any modifications are written to the trie
|
// Finalize the state so any modifications are written to the trie
|
||||||
root, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number()))
|
root, _, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
failed = err
|
failed = err
|
||||||
break
|
break
|
||||||
@ -681,7 +681,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (*
|
|||||||
return nil, fmt.Errorf("processing block %d failed: %v", block.NumberU64(), err)
|
return nil, fmt.Errorf("processing block %d failed: %v", block.NumberU64(), err)
|
||||||
}
|
}
|
||||||
// Finalize the state so any modifications are written to the trie
|
// Finalize the state so any modifications are written to the trie
|
||||||
root, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number()))
|
root, _, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
ethereum "github.com/ethereum/go-ethereum"
|
ethereum "github.com/ethereum/go-ethereum"
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||||
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
@ -233,6 +234,36 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er
|
|||||||
return rpcSub, nil
|
return rpcSub, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewStateDiffs sent a notification each time an account changes in a block.
|
||||||
|
func (api *PublicFilterAPI) NewStateDiffs(ctx context.Context) (*rpc.Subscription, error) {
|
||||||
|
notifier, supported := rpc.NotifierFromContext(ctx)
|
||||||
|
if !supported {
|
||||||
|
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
|
||||||
|
}
|
||||||
|
|
||||||
|
rpcSub := notifier.CreateSubscription()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
stateDiffs := make(chan map[common.Address]state.Account)
|
||||||
|
stateDiffsSub := api.events.SubscribeStateDiffs(stateDiffs)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case d := <-stateDiffs:
|
||||||
|
notifier.Notify(rpcSub.ID, d)
|
||||||
|
case <-rpcSub.Err():
|
||||||
|
stateDiffsSub.Unsubscribe()
|
||||||
|
return
|
||||||
|
case <-notifier.Closed():
|
||||||
|
stateDiffsSub.Unsubscribe()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return rpcSub, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Logs creates a subscription that fires for all new log that match the given filter criteria.
|
// Logs creates a subscription that fires for all new log that match the given filter criteria.
|
||||||
func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
|
func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
|
||||||
notifier, supported := rpc.NotifierFromContext(ctx)
|
notifier, supported := rpc.NotifierFromContext(ctx)
|
||||||
|
@ -129,7 +129,7 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) {
|
|||||||
if i%20 == 0 {
|
if i%20 == 0 {
|
||||||
db.Close()
|
db.Close()
|
||||||
db, _ = rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "")
|
db, _ = rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "")
|
||||||
backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
|
backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
|
||||||
}
|
}
|
||||||
var addr common.Address
|
var addr common.Address
|
||||||
addr[0] = byte(i)
|
addr[0] = byte(i)
|
||||||
@ -174,7 +174,7 @@ func BenchmarkNoBloomBits(b *testing.B) {
|
|||||||
b.Log("Running filter benchmarks...")
|
b.Log("Running filter benchmarks...")
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
mux := new(event.TypeMux)
|
mux := new(event.TypeMux)
|
||||||
backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
|
backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
|
||||||
filter := NewRangeFilter(backend, 0, int64(*headNum), []common.Address{{}}, nil)
|
filter := NewRangeFilter(backend, 0, int64(*headNum), []common.Address{{}}, nil)
|
||||||
filter.Logs(context.Background())
|
filter.Logs(context.Background())
|
||||||
d := time.Since(start)
|
d := time.Since(start)
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core"
|
"github.com/ethereum/go-ethereum/core"
|
||||||
"github.com/ethereum/go-ethereum/core/bloombits"
|
"github.com/ethereum/go-ethereum/core/bloombits"
|
||||||
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
@ -42,6 +43,7 @@ type Backend interface {
|
|||||||
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
|
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
|
||||||
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
|
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
|
||||||
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
|
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
|
||||||
|
SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription
|
||||||
|
|
||||||
BloomStatus() (uint64, uint64)
|
BloomStatus() (uint64, uint64)
|
||||||
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
|
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
![]() I'm not sure I understand what the I'm not sure I understand what the `filters` package is for - is it just another way to subscribe to new events in the chain? I think I've previously only interacted with new event subscriptions directly from the `blockchain` package.
![]() I think it's designed to provide a public API for the blockchain's internal subscriptions, but one big question I've had is how functions in this namespace (like I think it's designed to provide a public API for the blockchain's internal subscriptions, but one big question I've had is how functions in this namespace (like `SubscribeNewHeads`) are actually called - will definitely need to sort that out before we can upstream
i-norden
commented
It looks like the filters api is being loaded under the "eth" namespace here. So I think you would call it like:
or use the EthSubscribe method since it is under that namespace. It looks like the filters api is being loaded under the "eth" namespace [here](https://github.com/vulcanize/go-ethereum/blob/pubsub-rpc-account-diffing/eth/backend.go#L316). So I think you would call it like:
```go
cli, _ := rpc.Dial("ipcPathOrWsURL")
stateDiffChan := make(chan map[common.Address]state.Account, 20000)
rpcSub, err := cli.Subscribe(context.Background(), "eth", stateDiffChan, "NewStateDiffs"})
```
or use the [EthSubscribe](https://github.com/vulcanize/go-ethereum/blob/pubsub-rpc-account-diffing/rpc/client.go#L397) method since it is under that namespace.
|
|||||||
"github.com/ethereum/go-ethereum/core"
|
"github.com/ethereum/go-ethereum/core"
|
||||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||||
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
@ -55,6 +56,8 @@ const (
|
|||||||
BlocksSubscription
|
BlocksSubscription
|
||||||
// LastSubscription keeps track of the last index
|
// LastSubscription keeps track of the last index
|
||||||
LastIndexSubscription
|
LastIndexSubscription
|
||||||
|
// StateDiffsSubscription queries for new account changes
|
||||||
|
StateDiffsSubscription
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -68,6 +71,8 @@ const (
|
|||||||
logsChanSize = 10
|
logsChanSize = 10
|
||||||
// chainEvChanSize is the size of channel listening to ChainEvent.
|
// chainEvChanSize is the size of channel listening to ChainEvent.
|
||||||
chainEvChanSize = 10
|
chainEvChanSize = 10
|
||||||
|
// stateDiffChanSize is the size of channel listening to StateDiffEvent.
|
||||||
|
stateDiffChanSize = 10
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -82,6 +87,7 @@ type subscription struct {
|
|||||||
logs chan []*types.Log
|
logs chan []*types.Log
|
||||||
hashes chan []common.Hash
|
hashes chan []common.Hash
|
||||||
headers chan *types.Header
|
headers chan *types.Header
|
||||||
|
stateDiffs chan map[common.Address]state.Account
|
||||||
installed chan struct{} // closed when the filter is installed
|
installed chan struct{} // closed when the filter is installed
|
||||||
err chan error // closed when the filter is uninstalled
|
err chan error // closed when the filter is uninstalled
|
||||||
}
|
}
|
||||||
@ -100,6 +106,7 @@ type EventSystem struct {
|
|||||||
rmLogsSub event.Subscription // Subscription for removed log event
|
rmLogsSub event.Subscription // Subscription for removed log event
|
||||||
chainSub event.Subscription // Subscription for new chain event
|
chainSub event.Subscription // Subscription for new chain event
|
||||||
pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event
|
pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event
|
||||||
|
stateDiffsSub event.Subscription // Subscription for new state diff events
|
||||||
|
|
||||||
// Channels
|
// Channels
|
||||||
install chan *subscription // install filter for event notification
|
install chan *subscription // install filter for event notification
|
||||||
@ -108,6 +115,7 @@ type EventSystem struct {
|
|||||||
logsCh chan []*types.Log // Channel to receive new log event
|
logsCh chan []*types.Log // Channel to receive new log event
|
||||||
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
|
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
|
||||||
chainCh chan core.ChainEvent // Channel to receive new chain event
|
chainCh chan core.ChainEvent // Channel to receive new chain event
|
||||||
|
stateDiffCh chan map[common.Address]state.Account // Channel to receive new state diff events
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEventSystem creates a new manager that listens for event on the given mux,
|
// NewEventSystem creates a new manager that listens for event on the given mux,
|
||||||
@ -127,6 +135,7 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
|
|||||||
logsCh: make(chan []*types.Log, logsChanSize),
|
logsCh: make(chan []*types.Log, logsChanSize),
|
||||||
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
|
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
|
||||||
chainCh: make(chan core.ChainEvent, chainEvChanSize),
|
chainCh: make(chan core.ChainEvent, chainEvChanSize),
|
||||||
|
stateDiffCh: make(chan map[common.Address]state.Account, stateDiffChanSize),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe events
|
// Subscribe events
|
||||||
@ -134,6 +143,7 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
|
|||||||
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
|
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
|
||||||
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
|
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
|
||||||
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
|
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
|
||||||
|
m.stateDiffsSub = m.backend.SubscribeStateDiffs(m.stateDiffCh)
|
||||||
// TODO(rjl493456442): use feed to subscribe pending log event
|
// TODO(rjl493456442): use feed to subscribe pending log event
|
||||||
m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})
|
m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})
|
||||||
|
|
||||||
@ -175,6 +185,7 @@ func (sub *Subscription) Unsubscribe() {
|
|||||||
case <-sub.f.logs:
|
case <-sub.f.logs:
|
||||||
case <-sub.f.hashes:
|
case <-sub.f.hashes:
|
||||||
case <-sub.f.headers:
|
case <-sub.f.headers:
|
||||||
|
case <-sub.f.stateDiffs:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -259,6 +270,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
|
|||||||
logs: logs,
|
logs: logs,
|
||||||
hashes: make(chan []common.Hash),
|
hashes: make(chan []common.Hash),
|
||||||
headers: make(chan *types.Header),
|
headers: make(chan *types.Header),
|
||||||
|
stateDiffs: make(chan map[common.Address]state.Account),
|
||||||
installed: make(chan struct{}),
|
installed: make(chan struct{}),
|
||||||
err: make(chan error),
|
err: make(chan error),
|
||||||
}
|
}
|
||||||
@ -276,6 +288,23 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan
|
|||||||
logs: logs,
|
logs: logs,
|
||||||
hashes: make(chan []common.Hash),
|
hashes: make(chan []common.Hash),
|
||||||
headers: make(chan *types.Header),
|
headers: make(chan *types.Header),
|
||||||
|
stateDiffs: make(chan map[common.Address]state.Account),
|
||||||
|
installed: make(chan struct{}),
|
||||||
|
err: make(chan error),
|
||||||
|
}
|
||||||
|
return es.subscribe(sub)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SubscribeStateDiffs
|
||||||
|
func (es *EventSystem) SubscribeStateDiffs(diffs chan map[common.Address]state.Account) *Subscription {
|
||||||
|
sub := &subscription{
|
||||||
|
id: rpc.NewID(),
|
||||||
|
typ: StateDiffsSubscription,
|
||||||
|
created: time.Now(),
|
||||||
|
logs: make(chan []*types.Log),
|
||||||
|
hashes: make(chan []common.Hash),
|
||||||
|
headers: make(chan *types.Header),
|
||||||
|
stateDiffs: diffs,
|
||||||
installed: make(chan struct{}),
|
installed: make(chan struct{}),
|
||||||
err: make(chan error),
|
err: make(chan error),
|
||||||
}
|
}
|
||||||
@ -292,6 +321,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
|
|||||||
logs: make(chan []*types.Log),
|
logs: make(chan []*types.Log),
|
||||||
hashes: make(chan []common.Hash),
|
hashes: make(chan []common.Hash),
|
||||||
headers: headers,
|
headers: headers,
|
||||||
|
stateDiffs: make(chan map[common.Address]state.Account),
|
||||||
installed: make(chan struct{}),
|
installed: make(chan struct{}),
|
||||||
err: make(chan error),
|
err: make(chan error),
|
||||||
}
|
}
|
||||||
@ -308,6 +338,7 @@ func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscript
|
|||||||
logs: make(chan []*types.Log),
|
logs: make(chan []*types.Log),
|
||||||
hashes: hashes,
|
hashes: hashes,
|
||||||
headers: make(chan *types.Header),
|
headers: make(chan *types.Header),
|
||||||
|
stateDiffs: make(chan map[common.Address]state.Account),
|
||||||
installed: make(chan struct{}),
|
installed: make(chan struct{}),
|
||||||
err: make(chan error),
|
err: make(chan error),
|
||||||
}
|
}
|
||||||
@ -368,6 +399,10 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
|
|||||||
}
|
}
|
||||||
![]() How come we are adding stateDiffs to the subscribeLogs? Is it because it's in the subscription struct, but it can't be nil so we're adding an empty map? How come we are adding stateDiffs to the subscribeLogs? Is it because it's in the subscription struct, but it can't be nil so we're adding an empty map?
![]() Yeah just following the pattern here, seems like every subscription creates empty channels for all the things it's not subscribing to. Yeah just following the pattern here, seems like every subscription creates empty channels for all the things it's not subscribing to.
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
case map[common.Address]state.Account:
|
||||||
|
for _, f := range filters[StateDiffsSubscription] {
|
||||||
|
f.stateDiffs <- e
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -453,6 +488,7 @@ func (es *EventSystem) eventLoop() {
|
|||||||
es.logsSub.Unsubscribe()
|
es.logsSub.Unsubscribe()
|
||||||
es.rmLogsSub.Unsubscribe()
|
es.rmLogsSub.Unsubscribe()
|
||||||
es.chainSub.Unsubscribe()
|
es.chainSub.Unsubscribe()
|
||||||
|
es.stateDiffsSub.Unsubscribe()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
index := make(filterIndex)
|
index := make(filterIndex)
|
||||||
@ -471,6 +507,8 @@ func (es *EventSystem) eventLoop() {
|
|||||||
es.broadcast(index, ev)
|
es.broadcast(index, ev)
|
||||||
case ev := <-es.chainCh:
|
case ev := <-es.chainCh:
|
||||||
es.broadcast(index, ev)
|
es.broadcast(index, ev)
|
||||||
|
case ev := <-es.stateDiffCh:
|
||||||
|
es.broadcast(index, ev)
|
||||||
case ev, active := <-es.pendingLogSub.Chan():
|
case ev, active := <-es.pendingLogSub.Chan():
|
||||||
if !active { // system stopped
|
if !active { // system stopped
|
||||||
return
|
return
|
||||||
@ -506,6 +544,8 @@ func (es *EventSystem) eventLoop() {
|
|||||||
return
|
return
|
||||||
case <-es.chainSub.Err():
|
case <-es.chainSub.Err():
|
||||||
return
|
return
|
||||||
|
case <-es.stateDiffsSub.Err():
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/core"
|
"github.com/ethereum/go-ethereum/core"
|
||||||
"github.com/ethereum/go-ethereum/core/bloombits"
|
"github.com/ethereum/go-ethereum/core/bloombits"
|
||||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||||
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
@ -46,6 +47,7 @@ type testBackend struct {
|
|||||||
rmLogsFeed *event.Feed
|
rmLogsFeed *event.Feed
|
||||||
logsFeed *event.Feed
|
logsFeed *event.Feed
|
||||||
chainFeed *event.Feed
|
chainFeed *event.Feed
|
||||||
|
stateDiffsFeed *event.Feed
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *testBackend) ChainDb() ethdb.Database {
|
func (b *testBackend) ChainDb() ethdb.Database {
|
||||||
@ -120,6 +122,10 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc
|
|||||||
return b.chainFeed.Subscribe(ch)
|
return b.chainFeed.Subscribe(ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *testBackend) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription {
|
||||||
|
return b.stateDiffsFeed.Subscribe(ch)
|
||||||
|
}
|
||||||
|
|
||||||
func (b *testBackend) BloomStatus() (uint64, uint64) {
|
func (b *testBackend) BloomStatus() (uint64, uint64) {
|
||||||
return params.BloomBitsBlocks, b.sections
|
return params.BloomBitsBlocks, b.sections
|
||||||
}
|
}
|
||||||
@ -166,7 +172,8 @@ func TestBlockSubscription(t *testing.T) {
|
|||||||
rmLogsFeed = new(event.Feed)
|
rmLogsFeed = new(event.Feed)
|
||||||
logsFeed = new(event.Feed)
|
logsFeed = new(event.Feed)
|
||||||
chainFeed = new(event.Feed)
|
chainFeed = new(event.Feed)
|
||||||
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
|
stateDiffsFeed = new(event.Feed)
|
||||||
|
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
|
||||||
api = NewPublicFilterAPI(backend, false)
|
api = NewPublicFilterAPI(backend, false)
|
||||||
genesis = new(core.Genesis).MustCommit(db)
|
genesis = new(core.Genesis).MustCommit(db)
|
||||||
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {})
|
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {})
|
||||||
@ -223,7 +230,8 @@ func TestPendingTxFilter(t *testing.T) {
|
|||||||
rmLogsFeed = new(event.Feed)
|
rmLogsFeed = new(event.Feed)
|
||||||
logsFeed = new(event.Feed)
|
logsFeed = new(event.Feed)
|
||||||
chainFeed = new(event.Feed)
|
chainFeed = new(event.Feed)
|
||||||
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
|
stateDiffsFeed = new(event.Feed)
|
||||||
|
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
|
||||||
api = NewPublicFilterAPI(backend, false)
|
api = NewPublicFilterAPI(backend, false)
|
||||||
|
|
||||||
transactions = []*types.Transaction{
|
transactions = []*types.Transaction{
|
||||||
@ -283,7 +291,8 @@ func TestLogFilterCreation(t *testing.T) {
|
|||||||
rmLogsFeed = new(event.Feed)
|
rmLogsFeed = new(event.Feed)
|
||||||
logsFeed = new(event.Feed)
|
logsFeed = new(event.Feed)
|
||||||
chainFeed = new(event.Feed)
|
chainFeed = new(event.Feed)
|
||||||
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
|
stateDiffsFeed = new(event.Feed)
|
||||||
|
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
|
||||||
api = NewPublicFilterAPI(backend, false)
|
api = NewPublicFilterAPI(backend, false)
|
||||||
|
|
||||||
testCases = []struct {
|
testCases = []struct {
|
||||||
@ -332,7 +341,8 @@ func TestInvalidLogFilterCreation(t *testing.T) {
|
|||||||
rmLogsFeed = new(event.Feed)
|
rmLogsFeed = new(event.Feed)
|
||||||
logsFeed = new(event.Feed)
|
logsFeed = new(event.Feed)
|
||||||
chainFeed = new(event.Feed)
|
chainFeed = new(event.Feed)
|
||||||
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
|
stateDiffsFeed = new(event.Feed)
|
||||||
|
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
|
||||||
api = NewPublicFilterAPI(backend, false)
|
api = NewPublicFilterAPI(backend, false)
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -359,7 +369,8 @@ func TestInvalidGetLogsRequest(t *testing.T) {
|
|||||||
rmLogsFeed = new(event.Feed)
|
rmLogsFeed = new(event.Feed)
|
||||||
logsFeed = new(event.Feed)
|
logsFeed = new(event.Feed)
|
||||||
chainFeed = new(event.Feed)
|
chainFeed = new(event.Feed)
|
||||||
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
|
stateDiffsFeed = new(event.Feed)
|
||||||
|
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
|
||||||
api = NewPublicFilterAPI(backend, false)
|
api = NewPublicFilterAPI(backend, false)
|
||||||
blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
|
blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
|
||||||
)
|
)
|
||||||
@ -389,7 +400,8 @@ func TestLogFilter(t *testing.T) {
|
|||||||
rmLogsFeed = new(event.Feed)
|
rmLogsFeed = new(event.Feed)
|
||||||
logsFeed = new(event.Feed)
|
logsFeed = new(event.Feed)
|
||||||
chainFeed = new(event.Feed)
|
chainFeed = new(event.Feed)
|
||||||
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
|
stateDiffsFeed = new(event.Feed)
|
||||||
|
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
|
||||||
api = NewPublicFilterAPI(backend, false)
|
api = NewPublicFilterAPI(backend, false)
|
||||||
|
|
||||||
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
|
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
|
||||||
@ -508,7 +520,8 @@ func TestPendingLogsSubscription(t *testing.T) {
|
|||||||
rmLogsFeed = new(event.Feed)
|
rmLogsFeed = new(event.Feed)
|
||||||
logsFeed = new(event.Feed)
|
logsFeed = new(event.Feed)
|
||||||
chainFeed = new(event.Feed)
|
chainFeed = new(event.Feed)
|
||||||
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
|
stateDiffsFeed = new(event.Feed)
|
||||||
|
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
|
||||||
api = NewPublicFilterAPI(backend, false)
|
api = NewPublicFilterAPI(backend, false)
|
||||||
|
|
||||||
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
|
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
|
||||||
|
@ -56,7 +56,8 @@ func BenchmarkFilters(b *testing.B) {
|
|||||||
rmLogsFeed = new(event.Feed)
|
rmLogsFeed = new(event.Feed)
|
||||||
logsFeed = new(event.Feed)
|
logsFeed = new(event.Feed)
|
||||||
chainFeed = new(event.Feed)
|
chainFeed = new(event.Feed)
|
||||||
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
|
stateDiffsFeed = new(event.Feed)
|
||||||
|
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
|
||||||
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
|
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
|
||||||
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
|
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
|
||||||
addr2 = common.BytesToAddress([]byte("jeff"))
|
addr2 = common.BytesToAddress([]byte("jeff"))
|
||||||
@ -115,7 +116,8 @@ func TestFilters(t *testing.T) {
|
|||||||
rmLogsFeed = new(event.Feed)
|
rmLogsFeed = new(event.Feed)
|
||||||
logsFeed = new(event.Feed)
|
logsFeed = new(event.Feed)
|
||||||
chainFeed = new(event.Feed)
|
chainFeed = new(event.Feed)
|
||||||
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
|
stateDiffsFeed = new(event.Feed)
|
||||||
|
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed, stateDiffsFeed}
|
||||||
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
|
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
|
||||||
addr = crypto.PubkeyToAddress(key1.PublicKey)
|
addr = crypto.PubkeyToAddress(key1.PublicKey)
|
||||||
|
|
||||||
|
@ -78,6 +78,7 @@ type Backend interface {
|
|||||||
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
|
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
|
||||||
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
|
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
|
||||||
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
|
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
|
||||||
|
SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription
|
||||||
|
|
||||||
ChainConfig() *params.ChainConfig
|
ChainConfig() *params.ChainConfig
|
||||||
CurrentBlock() *types.Block
|
CurrentBlock() *types.Block
|
||||||
|
@ -172,6 +172,10 @@ func (b *LesApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEven
|
|||||||
return b.eth.blockchain.SubscribeRemovedLogsEvent(ch)
|
return b.eth.blockchain.SubscribeRemovedLogsEvent(ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *LesApiBackend) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription {
|
||||||
|
return b.eth.blockchain.SubscribeStateDiffs(ch)
|
||||||
|
}
|
||||||
|
|
||||||
func (b *LesApiBackend) Downloader() *downloader.Downloader {
|
func (b *LesApiBackend) Downloader() *downloader.Downloader {
|
||||||
return b.eth.Downloader()
|
return b.eth.Downloader()
|
||||||
}
|
}
|
||||||
|
@ -531,6 +531,12 @@ func (lc *LightChain) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent)
|
|||||||
return lc.scope.Track(new(event.Feed).Subscribe(ch))
|
return lc.scope.Track(new(event.Feed).Subscribe(ch))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SubscribeStateDiffs implements the interface of filters.Backend
|
||||||
|
// LightChain does not sen state diffs, so return an empty subscription.
|
||||||
|
func (lc *LightChain) SubscribeStateDiffs(ch chan<- map[common.Address]state.Account) event.Subscription {
|
||||||
|
return lc.scope.Track(new(event.Feed).Subscribe(ch))
|
||||||
|
}
|
||||||
|
|
||||||
// DisableCheckFreq disables header validation. This is used for ultralight mode.
|
// DisableCheckFreq disables header validation. This is used for ultralight mode.
|
||||||
func (lc *LightChain) DisableCheckFreq() {
|
func (lc *LightChain) DisableCheckFreq() {
|
||||||
atomic.StoreInt32(&lc.disableCheckFreq, 1)
|
atomic.StoreInt32(&lc.disableCheckFreq, 1)
|
||||||
|
@ -589,7 +589,7 @@ func (w *worker) resultLoop() {
|
|||||||
logs = append(logs, receipt.Logs...)
|
logs = append(logs, receipt.Logs...)
|
||||||
}
|
}
|
||||||
// Commit block and state to database.
|
// Commit block and state to database.
|
||||||
stat, err := w.chain.WriteBlockWithState(block, receipts, task.state)
|
stat, stateDiffs, err := w.chain.WriteBlockWithState(block, receipts, task.state)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Failed writing block to chain", "err", err)
|
log.Error("Failed writing block to chain", "err", err)
|
||||||
continue
|
continue
|
||||||
@ -608,7 +608,7 @@ func (w *worker) resultLoop() {
|
|||||||
case core.SideStatTy:
|
case core.SideStatTy:
|
||||||
events = append(events, core.ChainSideEvent{Block: block})
|
events = append(events, core.ChainSideEvent{Block: block})
|
||||||
}
|
}
|
||||||
w.chain.PostChainEvents(events, logs)
|
w.chain.PostChainEvents(events, logs, stateDiffs)
|
||||||
|
|
||||||
// Insert the block into the set of pending ones to resultLoop for confirmations
|
// Insert the block into the set of pending ones to resultLoop for confirmations
|
||||||
w.unconfirmed.Insert(block.NumberU64(), block.Hash())
|
w.unconfirmed.Insert(block.NumberU64(), block.Hash())
|
||||||
|
@ -134,7 +134,7 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine
|
|||||||
func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain }
|
func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain }
|
||||||
func (b *testWorkerBackend) TxPool() *core.TxPool { return b.txPool }
|
func (b *testWorkerBackend) TxPool() *core.TxPool { return b.txPool }
|
||||||
func (b *testWorkerBackend) PostChainEvents(events []interface{}) {
|
func (b *testWorkerBackend) PostChainEvents(events []interface{}) {
|
||||||
b.chain.PostChainEvents(events, nil)
|
b.chain.PostChainEvents(events, nil, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, blocks int) (*worker, *testWorkerBackend) {
|
func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, blocks int) (*worker, *testWorkerBackend) {
|
||||||
|
@ -216,7 +216,7 @@ func MakePreState(db ethdb.Database, accounts core.GenesisAlloc) *state.StateDB
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Commit and re-open to start with a clean state.
|
// Commit and re-open to start with a clean state.
|
||||||
root, _ := statedb.Commit(false)
|
root, _, _ := statedb.Commit(false)
|
||||||
statedb, _ = state.New(root, sdb)
|
statedb, _ = state.New(root, sdb)
|
||||||
return statedb
|
return statedb
|
||||||
}
|
}
|
||||||
|
Is the reason that we're adding
SubscribeStateDiffs
to a bunch of places is because they all implement the Backend interface?It's confusing to know which backend we'll actually want to use for VDB. 🤔
Yeah, you got it. I think we want to (and do) use the
EthApiBackend
with a subscription to a full (non-fast-syncing) node, where all transactions are applied/verified.