From dc399a85fd5fda3dc209a09a070dc02f5adfa450 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Tue, 13 Oct 2020 22:59:53 -0500 Subject: [PATCH] subscription endpoint for retrieving all the codehash=>code mappings that exist at provided height --- statediff/api.go | 42 +++++++++++++++++++--- statediff/builder.go | 10 +++--- statediff/service.go | 44 +++++++++++++++++++++++ statediff/testhelpers/mocks/blockchain.go | 6 ++++ statediff/testhelpers/mocks/service.go | 4 +++ statediff/types.go | 4 +-- 6 files changed, 97 insertions(+), 13 deletions(-) diff --git a/statediff/api.go b/statediff/api.go index 3be07b73e..b395557bb 100644 --- a/statediff/api.go +++ b/statediff/api.go @@ -63,11 +63,10 @@ func (api *PublicStateDiffAPI) Stream(ctx context.Context, params Params) (*rpc. for { select { case payload := <-payloadChannel: - if notifyErr := notifier.Notify(rpcSub.ID, payload); notifyErr != nil { - log.Error("Failed to send state diff packet; error: " + notifyErr.Error()) - unSubErr := api.sds.Unsubscribe(rpcSub.ID) - if unSubErr != nil { - log.Error("Failed to unsubscribe from the state diff service; error: " + unSubErr.Error()) + if err := notifier.Notify(rpcSub.ID, payload); err != nil { + log.Error("Failed to send state diff packet; error: " + err.Error()) + if err := api.sds.Unsubscribe(rpcSub.ID); err != nil { + log.Error("Failed to unsubscribe from the state diff service; error: " + err.Error()) } return } @@ -99,3 +98,36 @@ func (api *PublicStateDiffAPI) StateDiffAt(ctx context.Context, blockNumber uint func (api *PublicStateDiffAPI) StateTrieAt(ctx context.Context, blockNumber uint64, params Params) (*Payload, error) { return api.sds.StateTrieAt(blockNumber, params) } + +// StreamCodeAndCodeHash writes all of the codehash=>code pairs out to a websocket channel +func (api *PublicStateDiffAPI) StreamCodeAndCodeHash(ctx context.Context, blockNumber uint64) (*rpc.Subscription, error) { + // ensure that the RPC connection supports subscriptions + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } + + // create subscription and start waiting for events + rpcSub := notifier.CreateSubscription() + payloadChan := make(chan CodeAndCodeHash, chainEventChanSize) + quitChan := make(chan bool) + api.sds.StreamCodeAndCodeHash(blockNumber, payloadChan, quitChan) + go func() { + for { + select { + case payload := <-payloadChan: + if err := notifier.Notify(rpcSub.ID, payload); err != nil { + log.Error("Failed to send code and codehash packet", "err", err) + return + } + case err := <-rpcSub.Err(): + log.Error("State diff service rpcSub error", "err", err) + return + case <-quitChan: + return + } + } + }() + + return rpcSub, nil +} diff --git a/statediff/builder.go b/statediff/builder.go index c9ddbf03b..c84678d51 100644 --- a/statediff/builder.go +++ b/statediff/builder.go @@ -124,10 +124,9 @@ func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]StateNode, []CodeAnd node.StorageNodes = storageNodes // emit codehash => code mappings for cod codeHash := common.BytesToHash(account.CodeHash) - addrHash := common.BytesToHash(leafKey) - code, err := sdb.stateCache.ContractCode(addrHash, codeHash) + code, err := sdb.stateCache.ContractCode(common.Hash{}, codeHash) if err != nil { - return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s for account with leafkey %s\r\n error: %v", codeHash.String(), addrHash.String(), err) + return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %v", codeHash.String(), err) } codeAndCodeHashes = append(codeAndCodeHashes, CodeAndCodeHash{ Hash: codeHash, @@ -509,10 +508,9 @@ func (sdb *builder) buildAccountCreations(accounts AccountMap, watchedStorageKey diff.StorageNodes = storageDiffs // emit codehash => code mappings for cod codeHash := common.BytesToHash(val.Account.CodeHash) - addrHash := common.BytesToHash(val.LeafKey) - code, err := sdb.stateCache.ContractCode(addrHash, codeHash) + code, err := sdb.stateCache.ContractCode(common.Hash{}, codeHash) if err != nil { - return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s for account with leafkey %s\r\n error: %v", codeHash.String(), addrHash.String(), err) + return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %v", codeHash.String(), err) } codeAndCodeHashes = append(codeAndCodeHashes, CodeAndCodeHash{ Hash: codeHash, diff --git a/statediff/service.go b/statediff/service.go index 96ad03a48..aec5a34ba 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -25,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/event" @@ -33,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/trie" ) const chainEventChanSize = 20000 @@ -44,6 +46,7 @@ type blockChain interface { GetReceiptsByHash(hash common.Hash) types.Receipts GetTdByHash(hash common.Hash) *big.Int UnlockTrie(root common.Hash) + StateCache() state.Database } // IService is the state-diffing service interface @@ -60,6 +63,8 @@ type IService interface { StateDiffAt(blockNumber uint64, params Params) (*Payload, error) // Method to get state trie object at specific block StateTrieAt(blockNumber uint64, params Params) (*Payload, error) + // Method to stream out all code and codehash pairs + StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- CodeAndCodeHash, quitChan chan<- bool) } // Service is the underlying struct for the state diffing service @@ -361,3 +366,42 @@ func sendNonBlockingQuit(id rpc.ID, sub Subscription) { log.Info("unable to close subscription %s; channel has no receiver", id) } } + +// StreamCodeAndCodeHash subscription method for extracting all the codehash=>code mappings that exist in the trie at the provided height +func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- CodeAndCodeHash, quitChan chan<- bool) { + current := sds.BlockChain.GetBlockByNumber(blockNumber) + log.Info(fmt.Sprintf("sending code and codehash at block %d", blockNumber)) + currentTrie, err := sds.BlockChain.StateCache().OpenTrie(current.Root()) + if err != nil { + log.Error("error creating trie for block", "number", current.Number(), "err", err) + close(quitChan) + return + } + it := currentTrie.NodeIterator([]byte{}) + leafIt := trie.NewIterator(it) + go func() { + defer close(quitChan) + for leafIt.Next() { + select { + case <-sds.QuitChan: + return + default: + } + account := new(state.Account) + if err := rlp.DecodeBytes(leafIt.Value, account); err != nil { + log.Error("error decoding state account", "err", err) + return + } + codeHash := common.BytesToHash(account.CodeHash) + code, err := sds.BlockChain.StateCache().ContractCode(common.Hash{}, codeHash) + if err != nil { + log.Error("error collecting contract code", "err", err) + return + } + outChan <- CodeAndCodeHash{ + Hash: codeHash, + Code: code, + } + } + }() +} diff --git a/statediff/testhelpers/mocks/blockchain.go b/statediff/testhelpers/mocks/blockchain.go index a995e47c6..b0111e64c 100644 --- a/statediff/testhelpers/mocks/blockchain.go +++ b/statediff/testhelpers/mocks/blockchain.go @@ -21,6 +21,8 @@ import ( "math/big" "time" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" @@ -126,3 +128,7 @@ func (blockChain *BlockChain) SetTdByHash(hash common.Hash, td *big.Int) { } func (blockChain *BlockChain) UnlockTrie(root common.Hash) {} + +func (BlockChain *BlockChain) StateCache() state.Database { + return nil +} diff --git a/statediff/testhelpers/mocks/service.go b/statediff/testhelpers/mocks/service.go index 9b886d947..6f04da8fb 100644 --- a/statediff/testhelpers/mocks/service.go +++ b/statediff/testhelpers/mocks/service.go @@ -276,6 +276,10 @@ func (sds *MockStateDiffService) closeType(subType common.Hash) { delete(sds.SubscriptionTypes, subType) } +func (sds *MockStateDiffService) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- statediff.CodeAndCodeHash, quitChan chan<- bool) { + panic("implement me") +} + func sendNonBlockingQuit(id rpc.ID, sub statediff.Subscription) { select { case sub.QuitChan <- true: diff --git a/statediff/types.go b/statediff/types.go index 86812846b..f3ff8818a 100644 --- a/statediff/types.go +++ b/statediff/types.go @@ -92,8 +92,8 @@ type StateObject struct { // CodeAndCodeHash struct for holding codehash => code mappings // we can't use an actual map because they are not rlp serializable type CodeAndCodeHash struct { - Hash common.Hash - Code []byte + Hash common.Hash `json:"codeHash"` + Code []byte `json:"code"` } // StateNode holds the data for a single state diff node