Added return to trie flush hook, updated to utils v0.0.23.
This commit is contained in:
parent
aee0d470b1
commit
2c14e954cf
@ -319,6 +319,7 @@ func PluginSetTrieFlushIntervalClone(pl *plugins.PluginLoader, flushInterval tim
|
||||
func pluginSetTrieFlushIntervalClone(flushInterval time.Duration) time.Duration {
|
||||
if plugins.DefaultPluginLoader == nil {
|
||||
log.Warn("Attempting setTreiFlushIntervalClone, but default PluginLoader has not been initialized")
|
||||
return flushInterval
|
||||
}
|
||||
return PluginSetTrieFlushIntervalClone(plugins.DefaultPluginLoader, flushInterval)
|
||||
}
|
4
go.mod
4
go.mod
@ -50,7 +50,7 @@ require (
|
||||
github.com/mattn/go-isatty v0.0.16
|
||||
github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416
|
||||
github.com/olekukonko/tablewriter v0.0.5
|
||||
github.com/openrelayxyz/plugeth-utils v0.0.22
|
||||
github.com/openrelayxyz/plugeth-utils v0.0.23
|
||||
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7
|
||||
github.com/prometheus/tsdb v0.7.1
|
||||
github.com/rs/cors v1.7.0
|
||||
@ -71,8 +71,6 @@ require (
|
||||
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce
|
||||
)
|
||||
|
||||
require github.com/hashicorp/golang-lru v0.5.1
|
||||
|
||||
require (
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3 // indirect
|
||||
|
5
go.sum
5
go.sum
@ -292,7 +292,6 @@ github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpx
|
||||
github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0=
|
||||
github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
|
||||
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
||||
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
|
||||
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
||||
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
|
||||
github.com/holiman/big v0.0.0-20221017200358-a027dc42d04e h1:pIYdhNkDh+YENVNi3gto8n9hAmRxKxoar0iE6BLucjw=
|
||||
@ -452,8 +451,8 @@ github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9k
|
||||
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
|
||||
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
|
||||
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
|
||||
github.com/openrelayxyz/plugeth-utils v0.0.22 h1:0sO1gA+m48uIvpys5wyv5ZqByqdaBZZdRzkujq0gZik=
|
||||
github.com/openrelayxyz/plugeth-utils v0.0.22/go.mod h1:zGm3/elx1rCcrYAFUQ5/He7AG/n039/3xYg0/Q8nqcQ=
|
||||
github.com/openrelayxyz/plugeth-utils v0.0.23 h1:TKNGnRPWZ3mXNUHfjr1iRjsto2r6ngNZb95dJLj2j5w=
|
||||
github.com/openrelayxyz/plugeth-utils v0.0.23/go.mod h1:zGm3/elx1rCcrYAFUQ5/He7AG/n039/3xYg0/Q8nqcQ=
|
||||
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
|
||||
github.com/opentracing/opentracing-go v1.0.3-0.20180606204148-bd9c31933947/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
|
||||
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
|
||||
|
@ -1,306 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"context"
|
||||
"encoding/json"
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
"github.com/ethereum/go-ethereum/plugins"
|
||||
"github.com/ethereum/go-ethereum/node"
|
||||
"github.com/ethereum/go-ethereum/plugins/interfaces"
|
||||
"github.com/ethereum/go-ethereum/cmd/utils"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"github.com/ethereum/go-ethereum/internal/ethapi"
|
||||
"github.com/urfave/cli/v2"
|
||||
"io"
|
||||
)
|
||||
|
||||
|
||||
var (
|
||||
pl *plugins.PluginLoader
|
||||
backend interfaces.Backend
|
||||
lastBlock common.Hash
|
||||
cache *lru.Cache
|
||||
blockEvents event.Feed
|
||||
)
|
||||
|
||||
|
||||
// stateUpdate will be used to track state updates
|
||||
type stateUpdate struct {
|
||||
Destructs map[common.Hash]struct{}
|
||||
Accounts map[common.Hash][]byte
|
||||
Storage map[common.Hash]map[common.Hash][]byte
|
||||
}
|
||||
|
||||
|
||||
// kvpair is used for RLP encoding of maps, as maps cannot be RLP encoded directly
|
||||
type kvpair struct {
|
||||
Key common.Hash
|
||||
Value []byte
|
||||
}
|
||||
|
||||
// storage is used for RLP encoding two layers of maps, as maps cannot be RLP encoded directly
|
||||
type storage struct {
|
||||
Account common.Hash
|
||||
Data []kvpair
|
||||
}
|
||||
|
||||
// storedStateUpdate is an RLP encodable version of stateUpdate
|
||||
type storedStateUpdate struct {
|
||||
Destructs []common.Hash
|
||||
Accounts []kvpair
|
||||
Storage []storage
|
||||
}
|
||||
|
||||
|
||||
// MarshalJSON represents the stateUpdate as JSON for RPC calls
|
||||
func (su *stateUpdate) MarshalJSON() ([]byte, error) {
|
||||
result := make(map[string]interface{})
|
||||
destructs := make([]common.Hash, 0, len(su.Destructs))
|
||||
for k := range su.Destructs {
|
||||
destructs = append(destructs, k)
|
||||
}
|
||||
result["destructs"] = destructs
|
||||
accounts := make(map[common.Hash]hexutil.Bytes)
|
||||
for k, v := range su.Accounts {
|
||||
accounts[k] = hexutil.Bytes(v)
|
||||
}
|
||||
result["accounts"] = accounts
|
||||
storage := make(map[common.Hash]map[common.Hash]hexutil.Bytes)
|
||||
for m, s := range su.Storage {
|
||||
storage[m] = make(map[common.Hash]hexutil.Bytes)
|
||||
for k, v := range s {
|
||||
storage[m][k] = hexutil.Bytes(v)
|
||||
}
|
||||
}
|
||||
result["storage"] = storage
|
||||
return json.Marshal(result)
|
||||
}
|
||||
|
||||
// EncodeRLP converts the stateUpdate to a storedStateUpdate, and RLP encodes the result for storage
|
||||
func (su *stateUpdate) EncodeRLP(w io.Writer) error {
|
||||
destructs := make([]common.Hash, 0, len(su.Destructs))
|
||||
for k := range su.Destructs {
|
||||
destructs = append(destructs, k)
|
||||
}
|
||||
accounts := make([]kvpair, 0, len(su.Accounts))
|
||||
for k, v := range su.Accounts {
|
||||
accounts = append(accounts, kvpair{k, v})
|
||||
}
|
||||
s := make([]storage, 0, len(su.Storage))
|
||||
for a, m := range su.Storage {
|
||||
accountStorage := storage{a, make([]kvpair, 0, len(m))}
|
||||
for k, v := range m {
|
||||
accountStorage.Data = append(accountStorage.Data, kvpair{k, v})
|
||||
}
|
||||
s = append(s, accountStorage)
|
||||
}
|
||||
return rlp.Encode(w, storedStateUpdate{destructs, accounts, s})
|
||||
}
|
||||
|
||||
// DecodeRLP takes a byte stream, decodes it to a storedStateUpdate, the n converts that into a stateUpdate object
|
||||
func (su *stateUpdate) DecodeRLP(s *rlp.Stream) error {
|
||||
ssu := storedStateUpdate{}
|
||||
if err := s.Decode(&ssu); err != nil { return err }
|
||||
su.Destructs = make(map[common.Hash]struct{})
|
||||
for _, s := range ssu.Destructs {
|
||||
su.Destructs[s] = struct{}{}
|
||||
}
|
||||
su.Accounts = make(map[common.Hash][]byte)
|
||||
for _, kv := range ssu.Accounts {
|
||||
su.Accounts[kv.Key] = kv.Value
|
||||
}
|
||||
su.Storage = make(map[common.Hash]map[common.Hash][]byte)
|
||||
for _, s := range ssu.Storage {
|
||||
su.Storage[s.Account] = make(map[common.Hash][]byte)
|
||||
for _, kv := range s.Data {
|
||||
su.Storage[s.Account][kv.Key] = kv.Value
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
// Initialize does initial setup of variables as the plugin is loaded.
|
||||
func Initialize(ctx *cli.Context, loader *plugins.PluginLoader) {
|
||||
pl = loader
|
||||
cache, _ = lru.New(128) // TODO: Make size configurable
|
||||
if !ctx.GlobalBool(utils.SnapshotFlag.Name) {
|
||||
log.Warn("Snapshots are required for StateUpdate plugins, but are currently disabled. State Updates will be unavailable")
|
||||
}
|
||||
log.Info("Loaded block updater plugin")
|
||||
}
|
||||
|
||||
|
||||
// InitializeNode is invoked by the plugin loader when the node and Backend are
|
||||
// ready. We will track the backend to provide access to blocks and other
|
||||
// useful information.
|
||||
func InitializeNode(stack *node.Node, b interfaces.Backend) {
|
||||
backend = b
|
||||
}
|
||||
|
||||
|
||||
// StateUpdate gives us updates about state changes made in each block. We
|
||||
// cache them for short term use, and write them to disk for the longer term.
|
||||
func StateUpdate(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) {
|
||||
su := &stateUpdate{
|
||||
Destructs: destructs,
|
||||
Accounts: accounts,
|
||||
Storage: storage,
|
||||
}
|
||||
cache.Add(blockRoot, su)
|
||||
data, _ := rlp.EncodeToBytes(su)
|
||||
backend.ChainDb().Put(append([]byte("su"), blockRoot.Bytes()...), data)
|
||||
}
|
||||
|
||||
// AppendAncient removes our state update records from leveldb as the
|
||||
// corresponding blocks are moved from leveldb to the ancients database. At
|
||||
// some point in the future, we may want to look at a way to move the state
|
||||
// updates to an ancients table of their own for longer term retention.
|
||||
func AppendAncient(number uint64, hash, headerBytes, body, receipts, td []byte) {
|
||||
header := new(types.Header)
|
||||
if err := rlp.Decode(bytes.NewReader(headerBytes), header); err != nil {
|
||||
log.Warn("Could not decode ancient header", "block", number)
|
||||
return
|
||||
}
|
||||
backend.ChainDb().Delete(append([]byte("su"), header.Root.Bytes()...))
|
||||
}
|
||||
|
||||
|
||||
// NewHead is invoked when a new block becomes the latest recognized block. We
|
||||
// use this to notify the blockEvents channel of new blocks, as well as invoke
|
||||
// the BlockUpdates hook on downstream plugins.
|
||||
// TODO: We're not necessarily handling reorgs properly, which may result in
|
||||
// some blocks not being emitted through this hook.
|
||||
func NewHead(block *types.Block, hash common.Hash, logs []*types.Log) {
|
||||
if pl == nil {
|
||||
log.Warn("Attempting to emit NewHead, but default PluginLoader has not been initialized")
|
||||
return
|
||||
}
|
||||
result, err := blockUpdates(context.Background(), block)
|
||||
if err != nil {
|
||||
log.Error("Could not serialize block", "err", err, "hash", block.Hash())
|
||||
return
|
||||
}
|
||||
blockEvents.Send(result)
|
||||
|
||||
receipts, err := backend.GetReceipts(context.Background(), block.Hash())
|
||||
var su *stateUpdate
|
||||
if v, ok := cache.Get(block.Root()); ok {
|
||||
su = v.(*stateUpdate)
|
||||
} else {
|
||||
data, err := backend.ChainDb().Get(append([]byte("su"), block.Root().Bytes()...))
|
||||
if err != nil {
|
||||
log.Error("StateUpdate unavailable for block", "hash", block.Hash())
|
||||
return
|
||||
}
|
||||
su = &stateUpdate{}
|
||||
if err := rlp.DecodeBytes(data, su); err != nil {
|
||||
log.Error("StateUpdate unavailable for block", "hash", block.Hash())
|
||||
return
|
||||
}
|
||||
}
|
||||
fnList := pl.Lookup("BlockUpdates", func(item interface{}) bool {
|
||||
_, ok := item.(func(*types.Block, []*types.Log, types.Receipts, map[common.Hash]struct{}, map[common.Hash][]byte, map[common.Hash]map[common.Hash][]byte))
|
||||
return ok
|
||||
})
|
||||
for _, fni := range fnList {
|
||||
if fn, ok := fni.(func(*types.Block, []*types.Log, types.Receipts, map[common.Hash]struct{}, map[common.Hash][]byte, map[common.Hash]map[common.Hash][]byte)); ok {
|
||||
fn(block, logs, receipts, su.Destructs, su.Accounts, su.Storage)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// BlockUpdates is a service that lets clients query for block updates for a
|
||||
// given block by hash or number, or subscribe to new block upates.
|
||||
type BlockUpdates struct{
|
||||
backend interfaces.Backend
|
||||
}
|
||||
|
||||
// blockUpdate handles the serialization of a block
|
||||
func blockUpdates(ctx context.Context, block *types.Block) (map[string]interface{}, error) {
|
||||
result, err := ethapi.RPCMarshalBlock(block, true, true)
|
||||
if err != nil { return nil, err }
|
||||
result["receipts"], err = backend.GetReceipts(ctx, block.Hash())
|
||||
if err != nil { return nil, err }
|
||||
if v, ok := cache.Get(block.Root()); ok {
|
||||
result["stateUpdates"] = v
|
||||
return result, nil
|
||||
}
|
||||
data, err := backend.ChainDb().Get(append([]byte("su"), block.Root().Bytes()...))
|
||||
if err != nil { return nil, fmt.Errorf("State Updates unavailable for block %#x", block.Hash())}
|
||||
su := &stateUpdate{}
|
||||
if err := rlp.DecodeBytes(data, su); err != nil { return nil, fmt.Errorf("State updates unavailable for block %#x", block.Hash()) }
|
||||
result["stateUpdates"] = su
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// BlockUpdatesByNumber retrieves a block by number, gets receipts and state
|
||||
// updates, and serializes the response.
|
||||
func (b *BlockUpdates) BlockUpdatesByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) {
|
||||
block, err := b.backend.BlockByNumber(ctx, number)
|
||||
if err != nil { return nil, err }
|
||||
return blockUpdates(ctx, block)
|
||||
}
|
||||
|
||||
// BlockUpdatesByHash retrieves a block by hash, gets receipts and state
|
||||
// updates, and serializes the response.
|
||||
func (b *BlockUpdates) BlockUpdatesByHash(ctx context.Context, hash common.Hash) (map[string]interface{}, error) {
|
||||
block, err := b.backend.BlockByHash(ctx, hash)
|
||||
if err != nil { return nil, err }
|
||||
return blockUpdates(ctx, block)
|
||||
}
|
||||
|
||||
// BlockUpdates allows clients to subscribe to notifications of new blocks
|
||||
// along with receipts and state updates.
|
||||
func (b *BlockUpdates) BlockUpdates(ctx context.Context) (*rpc.Subscription, error) {
|
||||
notifier, supported := rpc.NotifierFromContext(ctx)
|
||||
if !supported {
|
||||
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
|
||||
}
|
||||
|
||||
var (
|
||||
rpcSub = notifier.CreateSubscription()
|
||||
blockDataChan = make(chan map[string]interface{}, 1000)
|
||||
)
|
||||
|
||||
sub := blockEvents.Subscribe(blockDataChan)
|
||||
go func() {
|
||||
|
||||
for {
|
||||
select {
|
||||
case b := <-blockDataChan:
|
||||
notifier.Notify(rpcSub.ID, b)
|
||||
case <-rpcSub.Err(): // client send an unsubscribe request
|
||||
sub.Unsubscribe()
|
||||
return
|
||||
case <-notifier.Closed(): // connection dropped
|
||||
sub.Unsubscribe()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return rpcSub, nil
|
||||
}
|
||||
|
||||
|
||||
// GetAPIs exposes the BlockUpdates service under the cardinal namespace.
|
||||
func GetAPIs(stack *node.Node, backend interfaces.Backend) []rpc.API {
|
||||
return []rpc.API{
|
||||
{
|
||||
Namespace: "cardinal",
|
||||
Version: "1.0",
|
||||
Service: &BlockUpdates{backend},
|
||||
Public: true,
|
||||
},
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user