subscription endpoint for retrieving all the codehash=>code mappings … #28
@ -63,11 +63,10 @@ func (api *PublicStateDiffAPI) Stream(ctx context.Context, params Params) (*rpc.
|
||||
for {
|
||||
select {
|
||||
case payload := <-payloadChannel:
|
||||
if notifyErr := notifier.Notify(rpcSub.ID, payload); notifyErr != nil {
|
||||
log.Error("Failed to send state diff packet; error: " + notifyErr.Error())
|
||||
unSubErr := api.sds.Unsubscribe(rpcSub.ID)
|
||||
if unSubErr != nil {
|
||||
log.Error("Failed to unsubscribe from the state diff service; error: " + unSubErr.Error())
|
||||
if err := notifier.Notify(rpcSub.ID, payload); err != nil {
|
||||
log.Error("Failed to send state diff packet; error: " + err.Error())
|
||||
if err := api.sds.Unsubscribe(rpcSub.ID); err != nil {
|
||||
log.Error("Failed to unsubscribe from the state diff service; error: " + err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -99,3 +98,36 @@ func (api *PublicStateDiffAPI) StateDiffAt(ctx context.Context, blockNumber uint
|
||||
func (api *PublicStateDiffAPI) StateTrieAt(ctx context.Context, blockNumber uint64, params Params) (*Payload, error) {
|
||||
return api.sds.StateTrieAt(blockNumber, params)
|
||||
}
|
||||
|
||||
// StreamCodeAndCodeHash writes all of the codehash=>code pairs out to a websocket channel
|
||||
func (api *PublicStateDiffAPI) StreamCodeAndCodeHash(ctx context.Context, blockNumber uint64) (*rpc.Subscription, error) {
|
||||
// ensure that the RPC connection supports subscriptions
|
||||
notifier, supported := rpc.NotifierFromContext(ctx)
|
||||
if !supported {
|
||||
return nil, rpc.ErrNotificationsUnsupported
|
||||
}
|
||||
|
||||
// create subscription and start waiting for events
|
||||
rpcSub := notifier.CreateSubscription()
|
||||
payloadChan := make(chan CodeAndCodeHash, chainEventChanSize)
|
||||
quitChan := make(chan bool)
|
||||
api.sds.StreamCodeAndCodeHash(blockNumber, payloadChan, quitChan)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case payload := <-payloadChan:
|
||||
if err := notifier.Notify(rpcSub.ID, payload); err != nil {
|
||||
log.Error("Failed to send code and codehash packet", "err", err)
|
||||
return
|
||||
}
|
||||
case err := <-rpcSub.Err():
|
||||
log.Error("State diff service rpcSub error", "err", err)
|
||||
return
|
||||
case <-quitChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return rpcSub, nil
|
||||
}
|
||||
|
@ -124,10 +124,9 @@ func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]StateNode, []CodeAnd
|
||||
node.StorageNodes = storageNodes
|
||||
// emit codehash => code mappings for cod
|
||||
codeHash := common.BytesToHash(account.CodeHash)
|
||||
addrHash := common.BytesToHash(leafKey)
|
||||
code, err := sdb.stateCache.ContractCode(addrHash, codeHash)
|
||||
code, err := sdb.stateCache.ContractCode(common.Hash{}, codeHash)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s for account with leafkey %s\r\n error: %v", codeHash.String(), addrHash.String(), err)
|
||||
return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %v", codeHash.String(), err)
|
||||
}
|
||||
codeAndCodeHashes = append(codeAndCodeHashes, CodeAndCodeHash{
|
||||
Hash: codeHash,
|
||||
@ -509,10 +508,9 @@ func (sdb *builder) buildAccountCreations(accounts AccountMap, watchedStorageKey
|
||||
diff.StorageNodes = storageDiffs
|
||||
// emit codehash => code mappings for cod
|
||||
codeHash := common.BytesToHash(val.Account.CodeHash)
|
||||
addrHash := common.BytesToHash(val.LeafKey)
|
||||
code, err := sdb.stateCache.ContractCode(addrHash, codeHash)
|
||||
code, err := sdb.stateCache.ContractCode(common.Hash{}, codeHash)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s for account with leafkey %s\r\n error: %v", codeHash.String(), addrHash.String(), err)
|
||||
return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %v", codeHash.String(), err)
|
||||
}
|
||||
codeAndCodeHashes = append(codeAndCodeHashes, CodeAndCodeHash{
|
||||
Hash: codeHash,
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core"
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
@ -33,6 +34,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"github.com/ethereum/go-ethereum/trie"
|
||||
)
|
||||
|
||||
const chainEventChanSize = 20000
|
||||
@ -44,6 +46,7 @@ type blockChain interface {
|
||||
GetReceiptsByHash(hash common.Hash) types.Receipts
|
||||
GetTdByHash(hash common.Hash) *big.Int
|
||||
UnlockTrie(root common.Hash)
|
||||
StateCache() state.Database
|
||||
}
|
||||
|
||||
// IService is the state-diffing service interface
|
||||
@ -60,6 +63,8 @@ type IService interface {
|
||||
StateDiffAt(blockNumber uint64, params Params) (*Payload, error)
|
||||
// Method to get state trie object at specific block
|
||||
StateTrieAt(blockNumber uint64, params Params) (*Payload, error)
|
||||
// Method to stream out all code and codehash pairs
|
||||
StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- CodeAndCodeHash, quitChan chan<- bool)
|
||||
}
|
||||
|
||||
// Service is the underlying struct for the state diffing service
|
||||
@ -361,3 +366,42 @@ func sendNonBlockingQuit(id rpc.ID, sub Subscription) {
|
||||
log.Info("unable to close subscription %s; channel has no receiver", id)
|
||||
}
|
||||
}
|
||||
|
||||
// StreamCodeAndCodeHash subscription method for extracting all the codehash=>code mappings that exist in the trie at the provided height
|
||||
func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- CodeAndCodeHash, quitChan chan<- bool) {
|
||||
current := sds.BlockChain.GetBlockByNumber(blockNumber)
|
||||
log.Info(fmt.Sprintf("sending code and codehash at block %d", blockNumber))
|
||||
currentTrie, err := sds.BlockChain.StateCache().OpenTrie(current.Root())
|
||||
if err != nil {
|
||||
log.Error("error creating trie for block", "number", current.Number(), "err", err)
|
||||
close(quitChan)
|
||||
return
|
||||
}
|
||||
it := currentTrie.NodeIterator([]byte{})
|
||||
leafIt := trie.NewIterator(it)
|
||||
go func() {
|
||||
defer close(quitChan)
|
||||
for leafIt.Next() {
|
||||
select {
|
||||
case <-sds.QuitChan:
|
||||
return
|
||||
default:
|
||||
}
|
||||
account := new(state.Account)
|
||||
if err := rlp.DecodeBytes(leafIt.Value, account); err != nil {
|
||||
log.Error("error decoding state account", "err", err)
|
||||
return
|
||||
}
|
||||
codeHash := common.BytesToHash(account.CodeHash)
|
||||
code, err := sds.BlockChain.StateCache().ContractCode(common.Hash{}, codeHash)
|
||||
if err != nil {
|
||||
log.Error("error collecting contract code", "err", err)
|
||||
return
|
||||
}
|
||||
outChan <- CodeAndCodeHash{
|
||||
Hash: codeHash,
|
||||
Code: code,
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -21,6 +21,8 @@ import (
|
||||
"math/big"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
@ -126,3 +128,7 @@ func (blockChain *BlockChain) SetTdByHash(hash common.Hash, td *big.Int) {
|
||||
}
|
||||
|
||||
func (blockChain *BlockChain) UnlockTrie(root common.Hash) {}
|
||||
|
||||
func (BlockChain *BlockChain) StateCache() state.Database {
|
||||
return nil
|
||||
}
|
||||
|
@ -276,6 +276,10 @@ func (sds *MockStateDiffService) closeType(subType common.Hash) {
|
||||
delete(sds.SubscriptionTypes, subType)
|
||||
}
|
||||
|
||||
func (sds *MockStateDiffService) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- statediff.CodeAndCodeHash, quitChan chan<- bool) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func sendNonBlockingQuit(id rpc.ID, sub statediff.Subscription) {
|
||||
select {
|
||||
case sub.QuitChan <- true:
|
||||
|
@ -92,8 +92,8 @@ type StateObject struct {
|
||||
// CodeAndCodeHash struct for holding codehash => code mappings
|
||||
// we can't use an actual map because they are not rlp serializable
|
||||
type CodeAndCodeHash struct {
|
||||
Hash common.Hash
|
||||
Code []byte
|
||||
Hash common.Hash `json:"codeHash"`
|
||||
Code []byte `json:"code"`
|
||||
}
|
||||
|
||||
// StateNode holds the data for a single state diff node
|
||||
|
Loading…
Reference in New Issue
Block a user