subscription endpoint for retrieving all the codehash=>code mappings … #28

Merged
telackey merged 1 commits from codeAndCodeHash into statediff_at_anyblock-1.9.11 2020-10-14 12:53:32 +00:00
6 changed files with 97 additions and 13 deletions

View File

@ -63,11 +63,10 @@ func (api *PublicStateDiffAPI) Stream(ctx context.Context, params Params) (*rpc.
for {
select {
case payload := <-payloadChannel:
if notifyErr := notifier.Notify(rpcSub.ID, payload); notifyErr != nil {
log.Error("Failed to send state diff packet; error: " + notifyErr.Error())
unSubErr := api.sds.Unsubscribe(rpcSub.ID)
if unSubErr != nil {
log.Error("Failed to unsubscribe from the state diff service; error: " + unSubErr.Error())
if err := notifier.Notify(rpcSub.ID, payload); err != nil {
log.Error("Failed to send state diff packet; error: " + err.Error())
if err := api.sds.Unsubscribe(rpcSub.ID); err != nil {
log.Error("Failed to unsubscribe from the state diff service; error: " + err.Error())
}
return
}
@ -99,3 +98,36 @@ func (api *PublicStateDiffAPI) StateDiffAt(ctx context.Context, blockNumber uint
func (api *PublicStateDiffAPI) StateTrieAt(ctx context.Context, blockNumber uint64, params Params) (*Payload, error) {
return api.sds.StateTrieAt(blockNumber, params)
}
// StreamCodeAndCodeHash writes all of the codehash=>code pairs out to a websocket channel
func (api *PublicStateDiffAPI) StreamCodeAndCodeHash(ctx context.Context, blockNumber uint64) (*rpc.Subscription, error) {
// ensure that the RPC connection supports subscriptions
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return nil, rpc.ErrNotificationsUnsupported
}
// create subscription and start waiting for events
rpcSub := notifier.CreateSubscription()
payloadChan := make(chan CodeAndCodeHash, chainEventChanSize)
quitChan := make(chan bool)
api.sds.StreamCodeAndCodeHash(blockNumber, payloadChan, quitChan)
go func() {
for {
select {
case payload := <-payloadChan:
if err := notifier.Notify(rpcSub.ID, payload); err != nil {
log.Error("Failed to send code and codehash packet", "err", err)
return
}
case err := <-rpcSub.Err():
log.Error("State diff service rpcSub error", "err", err)
return
case <-quitChan:
return
}
}
}()
return rpcSub, nil
}

View File

@ -124,10 +124,9 @@ func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]StateNode, []CodeAnd
node.StorageNodes = storageNodes
// emit codehash => code mappings for cod
codeHash := common.BytesToHash(account.CodeHash)
addrHash := common.BytesToHash(leafKey)
code, err := sdb.stateCache.ContractCode(addrHash, codeHash)
code, err := sdb.stateCache.ContractCode(common.Hash{}, codeHash)
if err != nil {
return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s for account with leafkey %s\r\n error: %v", codeHash.String(), addrHash.String(), err)
return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %v", codeHash.String(), err)
}
codeAndCodeHashes = append(codeAndCodeHashes, CodeAndCodeHash{
Hash: codeHash,
@ -509,10 +508,9 @@ func (sdb *builder) buildAccountCreations(accounts AccountMap, watchedStorageKey
diff.StorageNodes = storageDiffs
// emit codehash => code mappings for cod
codeHash := common.BytesToHash(val.Account.CodeHash)
addrHash := common.BytesToHash(val.LeafKey)
code, err := sdb.stateCache.ContractCode(addrHash, codeHash)
code, err := sdb.stateCache.ContractCode(common.Hash{}, codeHash)
if err != nil {
return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s for account with leafkey %s\r\n error: %v", codeHash.String(), addrHash.String(), err)
return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %v", codeHash.String(), err)
}
codeAndCodeHashes = append(codeAndCodeHashes, CodeAndCodeHash{
Hash: codeHash,

View File

@ -25,6 +25,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/event"
@ -33,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/trie"
)
const chainEventChanSize = 20000
@ -44,6 +46,7 @@ type blockChain interface {
GetReceiptsByHash(hash common.Hash) types.Receipts
GetTdByHash(hash common.Hash) *big.Int
UnlockTrie(root common.Hash)
StateCache() state.Database
}
// IService is the state-diffing service interface
@ -60,6 +63,8 @@ type IService interface {
StateDiffAt(blockNumber uint64, params Params) (*Payload, error)
// Method to get state trie object at specific block
StateTrieAt(blockNumber uint64, params Params) (*Payload, error)
// Method to stream out all code and codehash pairs
StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- CodeAndCodeHash, quitChan chan<- bool)
}
// Service is the underlying struct for the state diffing service
@ -361,3 +366,42 @@ func sendNonBlockingQuit(id rpc.ID, sub Subscription) {
log.Info("unable to close subscription %s; channel has no receiver", id)
}
}
// StreamCodeAndCodeHash subscription method for extracting all the codehash=>code mappings that exist in the trie at the provided height
func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- CodeAndCodeHash, quitChan chan<- bool) {
current := sds.BlockChain.GetBlockByNumber(blockNumber)
log.Info(fmt.Sprintf("sending code and codehash at block %d", blockNumber))
currentTrie, err := sds.BlockChain.StateCache().OpenTrie(current.Root())
if err != nil {
log.Error("error creating trie for block", "number", current.Number(), "err", err)
close(quitChan)
return
}
it := currentTrie.NodeIterator([]byte{})
leafIt := trie.NewIterator(it)
go func() {
defer close(quitChan)
for leafIt.Next() {
select {
case <-sds.QuitChan:
return
default:
}
account := new(state.Account)
if err := rlp.DecodeBytes(leafIt.Value, account); err != nil {
log.Error("error decoding state account", "err", err)
return
}
codeHash := common.BytesToHash(account.CodeHash)
code, err := sds.BlockChain.StateCache().ContractCode(common.Hash{}, codeHash)
if err != nil {
log.Error("error collecting contract code", "err", err)
return
}
outChan <- CodeAndCodeHash{
Hash: codeHash,
Code: code,
}
}
}()
}

View File

@ -21,6 +21,8 @@ import (
"math/big"
"time"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
@ -126,3 +128,7 @@ func (blockChain *BlockChain) SetTdByHash(hash common.Hash, td *big.Int) {
}
func (blockChain *BlockChain) UnlockTrie(root common.Hash) {}
func (BlockChain *BlockChain) StateCache() state.Database {
return nil
}

View File

@ -276,6 +276,10 @@ func (sds *MockStateDiffService) closeType(subType common.Hash) {
delete(sds.SubscriptionTypes, subType)
}
func (sds *MockStateDiffService) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- statediff.CodeAndCodeHash, quitChan chan<- bool) {
panic("implement me")
}
func sendNonBlockingQuit(id rpc.ID, sub statediff.Subscription) {
select {
case sub.QuitChan <- true:

View File

@ -92,8 +92,8 @@ type StateObject struct {
// CodeAndCodeHash struct for holding codehash => code mappings
// we can't use an actual map because they are not rlp serializable
type CodeAndCodeHash struct {
Hash common.Hash
Code []byte
Hash common.Hash `json:"codeHash"`
Code []byte `json:"code"`
}
// StateNode holds the data for a single state diff node