diff --git a/core/plugin_hooks.go b/core/plugin_hooks.go index a8f1bee0c..86d246823 100644 --- a/core/plugin_hooks.go +++ b/core/plugin_hooks.go @@ -319,6 +319,7 @@ func PluginSetTrieFlushIntervalClone(pl *plugins.PluginLoader, flushInterval tim func pluginSetTrieFlushIntervalClone(flushInterval time.Duration) time.Duration { if plugins.DefaultPluginLoader == nil { log.Warn("Attempting setTreiFlushIntervalClone, but default PluginLoader has not been initialized") + return flushInterval } return PluginSetTrieFlushIntervalClone(plugins.DefaultPluginLoader, flushInterval) } \ No newline at end of file diff --git a/go.mod b/go.mod index 59c646648..4596d45c0 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( github.com/mattn/go-isatty v0.0.16 github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416 github.com/olekukonko/tablewriter v0.0.5 - github.com/openrelayxyz/plugeth-utils v0.0.22 + github.com/openrelayxyz/plugeth-utils v0.0.23 github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 github.com/prometheus/tsdb v0.7.1 github.com/rs/cors v1.7.0 @@ -71,8 +71,6 @@ require ( gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce ) -require github.com/hashicorp/golang-lru v0.5.1 - require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3 // indirect diff --git a/go.sum b/go.sum index 2cf7ebdc7..e405bc759 100644 --- a/go.sum +++ b/go.sum @@ -292,7 +292,6 @@ github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpx github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0= github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= -github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/holiman/big v0.0.0-20221017200358-a027dc42d04e h1:pIYdhNkDh+YENVNi3gto8n9hAmRxKxoar0iE6BLucjw= @@ -452,8 +451,8 @@ github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9k github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/openrelayxyz/plugeth-utils v0.0.22 h1:0sO1gA+m48uIvpys5wyv5ZqByqdaBZZdRzkujq0gZik= -github.com/openrelayxyz/plugeth-utils v0.0.22/go.mod h1:zGm3/elx1rCcrYAFUQ5/He7AG/n039/3xYg0/Q8nqcQ= +github.com/openrelayxyz/plugeth-utils v0.0.23 h1:TKNGnRPWZ3mXNUHfjr1iRjsto2r6ngNZb95dJLj2j5w= +github.com/openrelayxyz/plugeth-utils v0.0.23/go.mod h1:zGm3/elx1rCcrYAFUQ5/He7AG/n039/3xYg0/Q8nqcQ= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.0.3-0.20180606204148-bd9c31933947/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= diff --git a/plugins/packages/blockupdates/main.go b/plugins/packages/blockupdates/main.go deleted file mode 100644 index 27798f82c..000000000 --- a/plugins/packages/blockupdates/main.go +++ /dev/null @@ -1,306 +0,0 @@ -package main - -import ( - "bytes" - "fmt" - "context" - "encoding/json" - lru "github.com/hashicorp/golang-lru" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/plugins" - "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/plugins/interfaces" - "github.com/ethereum/go-ethereum/cmd/utils" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/internal/ethapi" - "github.com/urfave/cli/v2" - "io" -) - - -var ( - pl *plugins.PluginLoader - backend interfaces.Backend - lastBlock common.Hash - cache *lru.Cache - blockEvents event.Feed -) - - -// stateUpdate will be used to track state updates -type stateUpdate struct { - Destructs map[common.Hash]struct{} - Accounts map[common.Hash][]byte - Storage map[common.Hash]map[common.Hash][]byte -} - - -// kvpair is used for RLP encoding of maps, as maps cannot be RLP encoded directly -type kvpair struct { - Key common.Hash - Value []byte -} - -// storage is used for RLP encoding two layers of maps, as maps cannot be RLP encoded directly -type storage struct { - Account common.Hash - Data []kvpair -} - -// storedStateUpdate is an RLP encodable version of stateUpdate -type storedStateUpdate struct { - Destructs []common.Hash - Accounts []kvpair - Storage []storage -} - - -// MarshalJSON represents the stateUpdate as JSON for RPC calls -func (su *stateUpdate) MarshalJSON() ([]byte, error) { - result := make(map[string]interface{}) - destructs := make([]common.Hash, 0, len(su.Destructs)) - for k := range su.Destructs { - destructs = append(destructs, k) - } - result["destructs"] = destructs - accounts := make(map[common.Hash]hexutil.Bytes) - for k, v := range su.Accounts { - accounts[k] = hexutil.Bytes(v) - } - result["accounts"] = accounts - storage := make(map[common.Hash]map[common.Hash]hexutil.Bytes) - for m, s := range su.Storage { - storage[m] = make(map[common.Hash]hexutil.Bytes) - for k, v := range s { - storage[m][k] = hexutil.Bytes(v) - } - } - result["storage"] = storage - return json.Marshal(result) -} - -// EncodeRLP converts the stateUpdate to a storedStateUpdate, and RLP encodes the result for storage -func (su *stateUpdate) EncodeRLP(w io.Writer) error { - destructs := make([]common.Hash, 0, len(su.Destructs)) - for k := range su.Destructs { - destructs = append(destructs, k) - } - accounts := make([]kvpair, 0, len(su.Accounts)) - for k, v := range su.Accounts { - accounts = append(accounts, kvpair{k, v}) - } - s := make([]storage, 0, len(su.Storage)) - for a, m := range su.Storage { - accountStorage := storage{a, make([]kvpair, 0, len(m))} - for k, v := range m { - accountStorage.Data = append(accountStorage.Data, kvpair{k, v}) - } - s = append(s, accountStorage) - } - return rlp.Encode(w, storedStateUpdate{destructs, accounts, s}) -} - -// DecodeRLP takes a byte stream, decodes it to a storedStateUpdate, the n converts that into a stateUpdate object -func (su *stateUpdate) DecodeRLP(s *rlp.Stream) error { - ssu := storedStateUpdate{} - if err := s.Decode(&ssu); err != nil { return err } - su.Destructs = make(map[common.Hash]struct{}) - for _, s := range ssu.Destructs { - su.Destructs[s] = struct{}{} - } - su.Accounts = make(map[common.Hash][]byte) - for _, kv := range ssu.Accounts { - su.Accounts[kv.Key] = kv.Value - } - su.Storage = make(map[common.Hash]map[common.Hash][]byte) - for _, s := range ssu.Storage { - su.Storage[s.Account] = make(map[common.Hash][]byte) - for _, kv := range s.Data { - su.Storage[s.Account][kv.Key] = kv.Value - } - } - return nil -} - - -// Initialize does initial setup of variables as the plugin is loaded. -func Initialize(ctx *cli.Context, loader *plugins.PluginLoader) { - pl = loader - cache, _ = lru.New(128) // TODO: Make size configurable - if !ctx.GlobalBool(utils.SnapshotFlag.Name) { - log.Warn("Snapshots are required for StateUpdate plugins, but are currently disabled. State Updates will be unavailable") - } - log.Info("Loaded block updater plugin") -} - - -// InitializeNode is invoked by the plugin loader when the node and Backend are -// ready. We will track the backend to provide access to blocks and other -// useful information. -func InitializeNode(stack *node.Node, b interfaces.Backend) { - backend = b -} - - -// StateUpdate gives us updates about state changes made in each block. We -// cache them for short term use, and write them to disk for the longer term. -func StateUpdate(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) { - su := &stateUpdate{ - Destructs: destructs, - Accounts: accounts, - Storage: storage, - } - cache.Add(blockRoot, su) - data, _ := rlp.EncodeToBytes(su) - backend.ChainDb().Put(append([]byte("su"), blockRoot.Bytes()...), data) -} - -// AppendAncient removes our state update records from leveldb as the -// corresponding blocks are moved from leveldb to the ancients database. At -// some point in the future, we may want to look at a way to move the state -// updates to an ancients table of their own for longer term retention. -func AppendAncient(number uint64, hash, headerBytes, body, receipts, td []byte) { - header := new(types.Header) - if err := rlp.Decode(bytes.NewReader(headerBytes), header); err != nil { - log.Warn("Could not decode ancient header", "block", number) - return - } - backend.ChainDb().Delete(append([]byte("su"), header.Root.Bytes()...)) -} - - -// NewHead is invoked when a new block becomes the latest recognized block. We -// use this to notify the blockEvents channel of new blocks, as well as invoke -// the BlockUpdates hook on downstream plugins. -// TODO: We're not necessarily handling reorgs properly, which may result in -// some blocks not being emitted through this hook. -func NewHead(block *types.Block, hash common.Hash, logs []*types.Log) { - if pl == nil { - log.Warn("Attempting to emit NewHead, but default PluginLoader has not been initialized") - return - } - result, err := blockUpdates(context.Background(), block) - if err != nil { - log.Error("Could not serialize block", "err", err, "hash", block.Hash()) - return - } - blockEvents.Send(result) - - receipts, err := backend.GetReceipts(context.Background(), block.Hash()) - var su *stateUpdate - if v, ok := cache.Get(block.Root()); ok { - su = v.(*stateUpdate) - } else { - data, err := backend.ChainDb().Get(append([]byte("su"), block.Root().Bytes()...)) - if err != nil { - log.Error("StateUpdate unavailable for block", "hash", block.Hash()) - return - } - su = &stateUpdate{} - if err := rlp.DecodeBytes(data, su); err != nil { - log.Error("StateUpdate unavailable for block", "hash", block.Hash()) - return - } - } - fnList := pl.Lookup("BlockUpdates", func(item interface{}) bool { - _, ok := item.(func(*types.Block, []*types.Log, types.Receipts, map[common.Hash]struct{}, map[common.Hash][]byte, map[common.Hash]map[common.Hash][]byte)) - return ok - }) - for _, fni := range fnList { - if fn, ok := fni.(func(*types.Block, []*types.Log, types.Receipts, map[common.Hash]struct{}, map[common.Hash][]byte, map[common.Hash]map[common.Hash][]byte)); ok { - fn(block, logs, receipts, su.Destructs, su.Accounts, su.Storage) - } - } -} - - -// BlockUpdates is a service that lets clients query for block updates for a -// given block by hash or number, or subscribe to new block upates. -type BlockUpdates struct{ - backend interfaces.Backend -} - -// blockUpdate handles the serialization of a block -func blockUpdates(ctx context.Context, block *types.Block) (map[string]interface{}, error) { - result, err := ethapi.RPCMarshalBlock(block, true, true) - if err != nil { return nil, err } - result["receipts"], err = backend.GetReceipts(ctx, block.Hash()) - if err != nil { return nil, err } - if v, ok := cache.Get(block.Root()); ok { - result["stateUpdates"] = v - return result, nil - } - data, err := backend.ChainDb().Get(append([]byte("su"), block.Root().Bytes()...)) - if err != nil { return nil, fmt.Errorf("State Updates unavailable for block %#x", block.Hash())} - su := &stateUpdate{} - if err := rlp.DecodeBytes(data, su); err != nil { return nil, fmt.Errorf("State updates unavailable for block %#x", block.Hash()) } - result["stateUpdates"] = su - return result, nil -} - -// BlockUpdatesByNumber retrieves a block by number, gets receipts and state -// updates, and serializes the response. -func (b *BlockUpdates) BlockUpdatesByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) { - block, err := b.backend.BlockByNumber(ctx, number) - if err != nil { return nil, err } - return blockUpdates(ctx, block) -} - -// BlockUpdatesByHash retrieves a block by hash, gets receipts and state -// updates, and serializes the response. -func (b *BlockUpdates) BlockUpdatesByHash(ctx context.Context, hash common.Hash) (map[string]interface{}, error) { - block, err := b.backend.BlockByHash(ctx, hash) - if err != nil { return nil, err } - return blockUpdates(ctx, block) -} - -// BlockUpdates allows clients to subscribe to notifications of new blocks -// along with receipts and state updates. -func (b *BlockUpdates) BlockUpdates(ctx context.Context) (*rpc.Subscription, error) { - notifier, supported := rpc.NotifierFromContext(ctx) - if !supported { - return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported - } - - var ( - rpcSub = notifier.CreateSubscription() - blockDataChan = make(chan map[string]interface{}, 1000) - ) - - sub := blockEvents.Subscribe(blockDataChan) - go func() { - - for { - select { - case b := <-blockDataChan: - notifier.Notify(rpcSub.ID, b) - case <-rpcSub.Err(): // client send an unsubscribe request - sub.Unsubscribe() - return - case <-notifier.Closed(): // connection dropped - sub.Unsubscribe() - return - } - } - }() - - return rpcSub, nil -} - - -// GetAPIs exposes the BlockUpdates service under the cardinal namespace. -func GetAPIs(stack *node.Node, backend interfaces.Backend) []rpc.API { - return []rpc.API{ - { - Namespace: "cardinal", - Version: "1.0", - Service: &BlockUpdates{backend}, - Public: true, - }, - } -}