update statediffing service

This commit is contained in:
i-norden 2023-03-15 18:08:55 -05:00
parent 8a5befde44
commit 1eef72d1e9

View File

@ -26,6 +26,8 @@ import (
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
@ -100,6 +102,8 @@ type IService interface {
WriteLoop(chainEventCh chan core.ChainEvent)
// WatchAddress method to change the addresses being watched in write loop params
WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error
// StreamCodeAndCodeHash method to export all the codehash => code mappings at a block height
StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- types2.CodeAndCodeHash, quitChan chan<- bool)
// SubscribeWriteStatus method to subscribe to receive state diff processing output
SubscribeWriteStatus(id rpc.ID, sub chan<- JobStatus, quitChan chan<- bool)
@ -793,7 +797,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
}
ipldOutput := func(c types2.IPLD) error {
return sds.indexer.PushCodeAndCodeHash(tx, c)
return sds.indexer.PushIPLD(tx, c)
}
err = sds.Builder.WriteStateDiffObject(types2.StateRoots{
@ -854,6 +858,45 @@ func (sds *Service) UnsubscribeWriteStatus(id rpc.ID) error {
return nil
}
// StreamCodeAndCodeHash subscription method for extracting all the codehash=>code mappings that exist in the trie at the provided height
func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- types2.CodeAndCodeHash, quitChan chan<- bool) {
current := sds.BlockChain.GetBlockByNumber(blockNumber)
log.Info("sending code and codehash", "block height", blockNumber)
currentTrie, err := sds.BlockChain.StateCache().OpenTrie(current.Root())
if err != nil {
log.Error("error creating trie for block", "block height", current.Number(), "err", err)
close(quitChan)
return
}
it := currentTrie.NodeIterator([]byte{})
leafIt := trie.NewIterator(it)
go func() {
defer close(quitChan)
for leafIt.Next() {
select {
case <-sds.QuitChan:
return
default:
}
account := new(types.StateAccount)
if err := rlp.DecodeBytes(leafIt.Value, account); err != nil {
log.Error("error decoding state account", "err", err)
return
}
codeHash := common.BytesToHash(account.CodeHash)
code, err := sds.BlockChain.StateCache().ContractCode(common.Hash{}, codeHash)
if err != nil {
log.Error("error collecting contract code", "err", err)
return
}
outChan <- types2.CodeAndCodeHash{
Hash: codeHash,
Code: code,
}
}
}()
}
// WatchAddress performs one of following operations on the watched addresses in writeLoopParams and the db:
// add | remove | set | clear
func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error {