subscription endpoint for retrieving all the codehash=>code mappings … #28
@ -63,11 +63,10 @@ func (api *PublicStateDiffAPI) Stream(ctx context.Context, params Params) (*rpc.
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case payload := <-payloadChannel:
|
case payload := <-payloadChannel:
|
||||||
if notifyErr := notifier.Notify(rpcSub.ID, payload); notifyErr != nil {
|
if err := notifier.Notify(rpcSub.ID, payload); err != nil {
|
||||||
log.Error("Failed to send state diff packet; error: " + notifyErr.Error())
|
log.Error("Failed to send state diff packet; error: " + err.Error())
|
||||||
unSubErr := api.sds.Unsubscribe(rpcSub.ID)
|
if err := api.sds.Unsubscribe(rpcSub.ID); err != nil {
|
||||||
if unSubErr != nil {
|
log.Error("Failed to unsubscribe from the state diff service; error: " + err.Error())
|
||||||
log.Error("Failed to unsubscribe from the state diff service; error: " + unSubErr.Error())
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -99,3 +98,36 @@ func (api *PublicStateDiffAPI) StateDiffAt(ctx context.Context, blockNumber uint
|
|||||||
func (api *PublicStateDiffAPI) StateTrieAt(ctx context.Context, blockNumber uint64, params Params) (*Payload, error) {
|
func (api *PublicStateDiffAPI) StateTrieAt(ctx context.Context, blockNumber uint64, params Params) (*Payload, error) {
|
||||||
return api.sds.StateTrieAt(blockNumber, params)
|
return api.sds.StateTrieAt(blockNumber, params)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StreamCodeAndCodeHash writes all of the codehash=>code pairs out to a websocket channel
|
||||||
|
func (api *PublicStateDiffAPI) StreamCodeAndCodeHash(ctx context.Context, blockNumber uint64) (*rpc.Subscription, error) {
|
||||||
|
// ensure that the RPC connection supports subscriptions
|
||||||
|
notifier, supported := rpc.NotifierFromContext(ctx)
|
||||||
|
if !supported {
|
||||||
|
return nil, rpc.ErrNotificationsUnsupported
|
||||||
|
}
|
||||||
|
|
||||||
|
// create subscription and start waiting for events
|
||||||
|
rpcSub := notifier.CreateSubscription()
|
||||||
|
payloadChan := make(chan CodeAndCodeHash, chainEventChanSize)
|
||||||
|
quitChan := make(chan bool)
|
||||||
|
api.sds.StreamCodeAndCodeHash(blockNumber, payloadChan, quitChan)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case payload := <-payloadChan:
|
||||||
|
if err := notifier.Notify(rpcSub.ID, payload); err != nil {
|
||||||
|
log.Error("Failed to send code and codehash packet", "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case err := <-rpcSub.Err():
|
||||||
|
log.Error("State diff service rpcSub error", "err", err)
|
||||||
|
return
|
||||||
|
case <-quitChan:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return rpcSub, nil
|
||||||
|
}
|
||||||
|
@ -124,10 +124,9 @@ func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]StateNode, []CodeAnd
|
|||||||
node.StorageNodes = storageNodes
|
node.StorageNodes = storageNodes
|
||||||
// emit codehash => code mappings for cod
|
// emit codehash => code mappings for cod
|
||||||
codeHash := common.BytesToHash(account.CodeHash)
|
codeHash := common.BytesToHash(account.CodeHash)
|
||||||
addrHash := common.BytesToHash(leafKey)
|
code, err := sdb.stateCache.ContractCode(common.Hash{}, codeHash)
|
||||||
code, err := sdb.stateCache.ContractCode(addrHash, codeHash)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s for account with leafkey %s\r\n error: %v", codeHash.String(), addrHash.String(), err)
|
return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %v", codeHash.String(), err)
|
||||||
}
|
}
|
||||||
codeAndCodeHashes = append(codeAndCodeHashes, CodeAndCodeHash{
|
codeAndCodeHashes = append(codeAndCodeHashes, CodeAndCodeHash{
|
||||||
Hash: codeHash,
|
Hash: codeHash,
|
||||||
@ -509,10 +508,9 @@ func (sdb *builder) buildAccountCreations(accounts AccountMap, watchedStorageKey
|
|||||||
diff.StorageNodes = storageDiffs
|
diff.StorageNodes = storageDiffs
|
||||||
// emit codehash => code mappings for cod
|
// emit codehash => code mappings for cod
|
||||||
codeHash := common.BytesToHash(val.Account.CodeHash)
|
codeHash := common.BytesToHash(val.Account.CodeHash)
|
||||||
addrHash := common.BytesToHash(val.LeafKey)
|
code, err := sdb.stateCache.ContractCode(common.Hash{}, codeHash)
|
||||||
code, err := sdb.stateCache.ContractCode(addrHash, codeHash)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s for account with leafkey %s\r\n error: %v", codeHash.String(), addrHash.String(), err)
|
return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %v", codeHash.String(), err)
|
||||||
}
|
}
|
||||||
codeAndCodeHashes = append(codeAndCodeHashes, CodeAndCodeHash{
|
codeAndCodeHashes = append(codeAndCodeHashes, CodeAndCodeHash{
|
||||||
Hash: codeHash,
|
Hash: codeHash,
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core"
|
"github.com/ethereum/go-ethereum/core"
|
||||||
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
@ -33,6 +34,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/p2p"
|
"github.com/ethereum/go-ethereum/p2p"
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
|
"github.com/ethereum/go-ethereum/trie"
|
||||||
)
|
)
|
||||||
|
|
||||||
const chainEventChanSize = 20000
|
const chainEventChanSize = 20000
|
||||||
@ -44,6 +46,7 @@ type blockChain interface {
|
|||||||
GetReceiptsByHash(hash common.Hash) types.Receipts
|
GetReceiptsByHash(hash common.Hash) types.Receipts
|
||||||
GetTdByHash(hash common.Hash) *big.Int
|
GetTdByHash(hash common.Hash) *big.Int
|
||||||
UnlockTrie(root common.Hash)
|
UnlockTrie(root common.Hash)
|
||||||
|
StateCache() state.Database
|
||||||
}
|
}
|
||||||
|
|
||||||
// IService is the state-diffing service interface
|
// IService is the state-diffing service interface
|
||||||
@ -60,6 +63,8 @@ type IService interface {
|
|||||||
StateDiffAt(blockNumber uint64, params Params) (*Payload, error)
|
StateDiffAt(blockNumber uint64, params Params) (*Payload, error)
|
||||||
// Method to get state trie object at specific block
|
// Method to get state trie object at specific block
|
||||||
StateTrieAt(blockNumber uint64, params Params) (*Payload, error)
|
StateTrieAt(blockNumber uint64, params Params) (*Payload, error)
|
||||||
|
// Method to stream out all code and codehash pairs
|
||||||
|
StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- CodeAndCodeHash, quitChan chan<- bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Service is the underlying struct for the state diffing service
|
// Service is the underlying struct for the state diffing service
|
||||||
@ -361,3 +366,42 @@ func sendNonBlockingQuit(id rpc.ID, sub Subscription) {
|
|||||||
log.Info("unable to close subscription %s; channel has no receiver", id)
|
log.Info("unable to close subscription %s; channel has no receiver", id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StreamCodeAndCodeHash subscription method for extracting all the codehash=>code mappings that exist in the trie at the provided height
|
||||||
|
func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- CodeAndCodeHash, quitChan chan<- bool) {
|
||||||
|
current := sds.BlockChain.GetBlockByNumber(blockNumber)
|
||||||
|
log.Info(fmt.Sprintf("sending code and codehash at block %d", blockNumber))
|
||||||
|
currentTrie, err := sds.BlockChain.StateCache().OpenTrie(current.Root())
|
||||||
|
if err != nil {
|
||||||
|
log.Error("error creating trie for block", "number", current.Number(), "err", err)
|
||||||
|
close(quitChan)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
it := currentTrie.NodeIterator([]byte{})
|
||||||
|
leafIt := trie.NewIterator(it)
|
||||||
|
go func() {
|
||||||
|
defer close(quitChan)
|
||||||
|
for leafIt.Next() {
|
||||||
|
select {
|
||||||
|
case <-sds.QuitChan:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
account := new(state.Account)
|
||||||
|
if err := rlp.DecodeBytes(leafIt.Value, account); err != nil {
|
||||||
|
log.Error("error decoding state account", "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
codeHash := common.BytesToHash(account.CodeHash)
|
||||||
|
code, err := sds.BlockChain.StateCache().ContractCode(common.Hash{}, codeHash)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("error collecting contract code", "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
outChan <- CodeAndCodeHash{
|
||||||
|
Hash: codeHash,
|
||||||
|
Code: code,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
@ -21,6 +21,8 @@ import (
|
|||||||
"math/big"
|
"math/big"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core"
|
"github.com/ethereum/go-ethereum/core"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
@ -126,3 +128,7 @@ func (blockChain *BlockChain) SetTdByHash(hash common.Hash, td *big.Int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (blockChain *BlockChain) UnlockTrie(root common.Hash) {}
|
func (blockChain *BlockChain) UnlockTrie(root common.Hash) {}
|
||||||
|
|
||||||
|
func (BlockChain *BlockChain) StateCache() state.Database {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -276,6 +276,10 @@ func (sds *MockStateDiffService) closeType(subType common.Hash) {
|
|||||||
delete(sds.SubscriptionTypes, subType)
|
delete(sds.SubscriptionTypes, subType)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sds *MockStateDiffService) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- statediff.CodeAndCodeHash, quitChan chan<- bool) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
func sendNonBlockingQuit(id rpc.ID, sub statediff.Subscription) {
|
func sendNonBlockingQuit(id rpc.ID, sub statediff.Subscription) {
|
||||||
select {
|
select {
|
||||||
case sub.QuitChan <- true:
|
case sub.QuitChan <- true:
|
||||||
|
@ -92,8 +92,8 @@ type StateObject struct {
|
|||||||
// CodeAndCodeHash struct for holding codehash => code mappings
|
// CodeAndCodeHash struct for holding codehash => code mappings
|
||||||
// we can't use an actual map because they are not rlp serializable
|
// we can't use an actual map because they are not rlp serializable
|
||||||
type CodeAndCodeHash struct {
|
type CodeAndCodeHash struct {
|
||||||
Hash common.Hash
|
Hash common.Hash `json:"codeHash"`
|
||||||
Code []byte
|
Code []byte `json:"code"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// StateNode holds the data for a single state diff node
|
// StateNode holds the data for a single state diff node
|
||||||
|
Loading…
Reference in New Issue
Block a user