diff --git a/statediff/service.go b/statediff/service.go index 6e25d927f..4bc5cda84 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -26,6 +26,8 @@ import ( "sync/atomic" "time" + "github.com/ethereum/go-ethereum/trie" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" @@ -100,6 +102,8 @@ type IService interface { WriteLoop(chainEventCh chan core.ChainEvent) // WatchAddress method to change the addresses being watched in write loop params WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error + // StreamCodeAndCodeHash method to export all the codehash => code mappings at a block height + StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- types2.CodeAndCodeHash, quitChan chan<- bool) // SubscribeWriteStatus method to subscribe to receive state diff processing output SubscribeWriteStatus(id rpc.ID, sub chan<- JobStatus, quitChan chan<- bool) @@ -793,7 +797,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p return sds.indexer.PushStateNode(tx, node, block.Hash().String()) } ipldOutput := func(c types2.IPLD) error { - return sds.indexer.PushCodeAndCodeHash(tx, c) + return sds.indexer.PushIPLD(tx, c) } err = sds.Builder.WriteStateDiffObject(types2.StateRoots{ @@ -854,6 +858,45 @@ func (sds *Service) UnsubscribeWriteStatus(id rpc.ID) error { return nil } +// StreamCodeAndCodeHash subscription method for extracting all the codehash=>code mappings that exist in the trie at the provided height +func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- types2.CodeAndCodeHash, quitChan chan<- bool) { + current := sds.BlockChain.GetBlockByNumber(blockNumber) + log.Info("sending code and codehash", "block height", blockNumber) + currentTrie, err := sds.BlockChain.StateCache().OpenTrie(current.Root()) + if err != nil { + log.Error("error creating trie for block", "block height", current.Number(), "err", err) + close(quitChan) + return + } + it := currentTrie.NodeIterator([]byte{}) + leafIt := trie.NewIterator(it) + go func() { + defer close(quitChan) + for leafIt.Next() { + select { + case <-sds.QuitChan: + return + default: + } + account := new(types.StateAccount) + if err := rlp.DecodeBytes(leafIt.Value, account); err != nil { + log.Error("error decoding state account", "err", err) + return + } + codeHash := common.BytesToHash(account.CodeHash) + code, err := sds.BlockChain.StateCache().ContractCode(common.Hash{}, codeHash) + if err != nil { + log.Error("error collecting contract code", "err", err) + return + } + outChan <- types2.CodeAndCodeHash{ + Hash: codeHash, + Code: code, + } + } + }() +} + // WatchAddress performs one of following operations on the watched addresses in writeLoopParams and the db: // add | remove | set | clear func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error {