From 328756dc00c873954971d96d4930fdfeedc4311d Mon Sep 17 00:00:00 2001 From: Austin Roberts Date: Wed, 14 Jul 2021 13:06:10 -0500 Subject: [PATCH] Fix initialize function of plugin loader, update blockupdates plugin --- plugins/packages/blockhandler/main.go | 273 ------------------------- plugins/packages/blockupdates/main.go | 279 ++++++++++++++++++++++++++ plugins/plugin_loader.go | 2 +- 3 files changed, 280 insertions(+), 274 deletions(-) delete mode 100644 plugins/packages/blockhandler/main.go create mode 100644 plugins/packages/blockupdates/main.go diff --git a/plugins/packages/blockhandler/main.go b/plugins/packages/blockhandler/main.go deleted file mode 100644 index a535c3f09..000000000 --- a/plugins/packages/blockhandler/main.go +++ /dev/null @@ -1,273 +0,0 @@ -package main - -import ( - "encoding/json" - lru "github.com/hashicorp/golang-lru" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/events" - "github.com/ethereum/go-ethereum/plugins" - "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/plugins/interfaces" - "github.com/ethereum/go-ethereum/cmd/utils" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/internal/ethapi" - "gopkg.in/urfave/cli.v1" - "io" -) - - -var ( - pl *plugins.PluginLoader - backend interfaces.Backend - lastBlock common.Hash - cache *lru.Cache - blockEvents events.Feed -) - -type stateUpdate struct { - Destructs map[common.Hash]struct{} - Accounts map[common.Hash][]byte - Storage map[common.Hash]map[common.Hash][]byte -} - -type kvpair struct { - Key common.Hash - Value []byte -} - -type storage struct { - Account common.Hash - Data []kvpair -} - -type storedStateUpdate struct { - Destructs []common.Hash - Accounts []kvpair - Storage []storage -} - -func (su *stateUpdate) MarshalJSON() ([]byte, error) { - result := make(map[string]interface{}, 0, len(su.Destructs)) - result["destructs"] = []common.Hash{} - for k := range su.Destructs { - result["destructs"] = append(result["destructs"], k) - } - accounts = make(map[common.Hash]hexutil.Bytes) - for k, v := range su.Accounts { - result["accounts"][k] = hexutil.Bytes(v) - } - result["accounts"] = accounts - storage = make(map[common.Hash]map[common.Hash]hexutil.Bytes) - for m, s := range su.Storage { - storage[m] = make(map[common.Hash]hexutil.Bytes) - for k, v := range s { - storage[m][s] = hexutil.Bytes(v) - } - } - result["storage"] = storage - return json.Marshal(result) -} - -func (su *stateUpdate) EncodeRLP(w io.Writer) error { - destructs = make([]common.Hash, 0, len(su.Destructs)) - for k := range su.Destructs { - destructs = append(destructs, k) - } - accounts := make([]kvpair, 0, len(accounts)) - for k, v := range su.Accounts { - accounts = append(accounts, kvpair{k, v}) - } - s := make([]storage, 0, len(storage)) - for a, m := range su.Storage { - accountStorage = storage{a, make([]kvpair, 0, len(m))} - for k, v := range m { - accountStorage.Data = append(accountStorage.Data, []kvpair{k, v}) - } - s = append(s, accountStorage) - } - return rlp.Encode(w, storedStateUpdate{destructs, accounts, s}) -} - -func (su *stateUpdate) DecodeRLP(s *rlp.Stream) error { - ssu := storedStateUpdate{} - if err := s.Decode(&ssu); err != nil { return err } - su.Destructs = make(map[common.Hash]struct{}) - for _, s := range ssu.Destructs { - su.Destructs[s] = struct{}{} - } - su.Accounts = make(map[common.Hash][]byte) - for _, kv := range ssu.Accounts { - su.Accounts[kv.Key] = kv.Value - } - su.Storage = make(map[common.Hash]map[common.Hash][]byte) - for _, s := range ssu.Storage { - su.Storage[s.Account] = make(map[common.Hash][]byte) - for _, kv := range s.Data { - su.Storage[s.Account][kv.Key] = kv.Value - } - } - return nil -} - -func Initialize(ctx *cli.Context, loader *PluginLoader) { - pl = loader - cache, _ = lru.New(128) // TODO: Make size configurable - if !ctx.GlobalBool(utils.SnapshotFlag.Name) { - log.Warn("Snapshots are required for StateUpdate plugins, but are currently disabled. State Updates will be unavailable") - } -} - - -// TODO: -// x Record StateUpdates in the database as they are written. -// x Keep an LRU cache of the most recent state updates. -// x Prune the StateUpdates in the database as corresponding blocks move to the freezer. -// * Add an RPC endpoint for getting the StateUpdates of a block -// * Invoke other plugins with each block and its respective StateUpdates. - - -func InitializeNode(stack *node.Node, b interfaces.Backend) { - backend = b -} - -func StateUpdate(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) { - su := stateUpdate{ - destructs: destructs, - accounts: accounts, - storage: storage, - } - cache.Add(blockRoot, su) - data, _ := rlp.EncodeToBytes(su) - backend.ChainDb().Put(append([]byte("su"), blockRoot...), data) -} - -func AppendAncient(number uint64, hash, header, body, receipts, td []byte) { - header := new(types.Header) - if err := rlp.Decode(bytes.NewReader(data), header); err != nil { - log.Warn("Could not decode ancient header", "block", number) - return - } - backend.Chaindb().Delete(append([]byte("su"), header.Root()...)) -} - - -func NewHead(block *types.Block, hash common.Hash, logs []*types.Log) { - if pl == nil { - log.Warn("Attempting to emit NewHead, but default PluginLoader has not been initialized") - return - } - result, err := blockUpdates(block) - if err != nil { - log.Error("Could not serialize block", "err", err, "hash", block.Hash()) - return - } - blockEvents.Send(result) - - - - receipts, err = backend.GetReceipts(ctx, block.Hash()) (types.Receipts, error) - var su *stateUpdate - if v, ok := cache.Get(block.Root()); ok { - su = v.(stateUpdate) - } - data, err := backend.ChainDb().Get(append([]byte("su"), block.Root()...)) - if err != nil { - log.Error("StateUpdate unavailable for block", "hash", block.Hash()) - return - } - su = &stateUpdate{} - if err := rlp.DecodeBytes(data, su); err != nil { - log.Error("StateUpdate unavailable for block", "hash", block.Hash()) - return - } - //TODO: Get plugins to invoke, invoke them - return result -} - - - -type BlockUpdates struct{ - backend plugin.Backend -} - -func blockUpdates(block *types.Block) (map[string]interface{}, error) { - result := ethapi.RPCMarshalBlock(block, true, true) - var ( - err error - ) - result["receipts"], err = backend.GetReceipts(ctx, block.Hash()) (types.Receipts, error) - if err != nil { return nil, err } - if v, ok := cache.Get(block.Root); ok { - result["stateUpdates"] = v - return result - } - data, err := backend.ChainDb().Get(append([]byte("su"), block.Root()...)) - if err != nil { return nil, fmt.Errorf("State Updates unavailable for block %#x", block.Hash())} - su := &stateUpdate{} - if err := rlp.DecodeBytes(data, su); err != nil { return nil, fmt.Errorf("State updates unavailable for block %#x", block.Hash()) } - result["stateUpdates"] = su - return result -} - -func (b *BlockUpdates) BlockUpdatesByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) { - block := b.backend.BlockByNumber(ctx, number) - return blockUpdates(block) -} - -func (b *BlockUpdates) BlockUpdatesByHash(ctx context.Context, hash common.Hash) (map[string]interface{}, error) { - block := b.backend.BlockByHash(ctx, hash) - return blockUpdates(block) -} - -func (b *BlockUpdates) BlockUpdates(ctx context.Context) (*rpc.Subscription, error) { - notifier, supported := rpc.NotifierFromContext(ctx) - if !supported { - return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported - } - - var ( - rpcSub = notifier.CreateSubscription() - blockDataChan = make(chan map[string]interface{}, 1000) - ) - - sub := blockEvents.Send(blockDataChan) - if err != nil { - return nil, err - } - - go func() { - - for { - select { - case hash := <-hashCh: - b.blockUpdates() - notifier.Notify(rpcSub.ID,) - case <-rpcSub.Err(): // client send an unsubscribe request - sub.Unsubscribe() - return - case <-notifier.Closed(): // connection dropped - sub.Unsubscribe() - return - } - } - }() - - return rpcSub, nil -} - - - -func GetAPIs(stack *node.Node, backend plugins.Backend) []rpc.API { - return []rpc.API{ - { - Namespace: "cardinal", - Version: "1.0", - Service: &BlockUpdates{backend}, - Public: true, - }, - } -} diff --git a/plugins/packages/blockupdates/main.go b/plugins/packages/blockupdates/main.go new file mode 100644 index 000000000..f2a269035 --- /dev/null +++ b/plugins/packages/blockupdates/main.go @@ -0,0 +1,279 @@ +package main + +import ( + "bytes" + "fmt" + "context" + "encoding/json" + lru "github.com/hashicorp/golang-lru" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/plugins" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/plugins/interfaces" + "github.com/ethereum/go-ethereum/cmd/utils" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/internal/ethapi" + "gopkg.in/urfave/cli.v1" + "io" +) + + +var ( + pl *plugins.PluginLoader + backend interfaces.Backend + lastBlock common.Hash + cache *lru.Cache + blockEvents event.Feed +) + +type stateUpdate struct { + Destructs map[common.Hash]struct{} + Accounts map[common.Hash][]byte + Storage map[common.Hash]map[common.Hash][]byte +} + +type kvpair struct { + Key common.Hash + Value []byte +} + +type storage struct { + Account common.Hash + Data []kvpair +} + +type storedStateUpdate struct { + Destructs []common.Hash + Accounts []kvpair + Storage []storage +} + +func (su *stateUpdate) MarshalJSON() ([]byte, error) { + result := make(map[string]interface{}) + destructs := make([]common.Hash, 0, len(su.Destructs)) + for k := range su.Destructs { + destructs = append(destructs, k) + } + result["destructs"] = destructs + accounts := make(map[common.Hash]hexutil.Bytes) + for k, v := range su.Accounts { + accounts[k] = hexutil.Bytes(v) + } + result["accounts"] = accounts + storage := make(map[common.Hash]map[common.Hash]hexutil.Bytes) + for m, s := range su.Storage { + storage[m] = make(map[common.Hash]hexutil.Bytes) + for k, v := range s { + storage[m][k] = hexutil.Bytes(v) + } + } + result["storage"] = storage + return json.Marshal(result) +} + +func (su *stateUpdate) EncodeRLP(w io.Writer) error { + destructs := make([]common.Hash, 0, len(su.Destructs)) + for k := range su.Destructs { + destructs = append(destructs, k) + } + accounts := make([]kvpair, 0, len(su.Accounts)) + for k, v := range su.Accounts { + accounts = append(accounts, kvpair{k, v}) + } + s := make([]storage, 0, len(su.Storage)) + for a, m := range su.Storage { + accountStorage := storage{a, make([]kvpair, 0, len(m))} + for k, v := range m { + accountStorage.Data = append(accountStorage.Data, kvpair{k, v}) + } + s = append(s, accountStorage) + } + return rlp.Encode(w, storedStateUpdate{destructs, accounts, s}) +} + +func (su *stateUpdate) DecodeRLP(s *rlp.Stream) error { + ssu := storedStateUpdate{} + if err := s.Decode(&ssu); err != nil { return err } + su.Destructs = make(map[common.Hash]struct{}) + for _, s := range ssu.Destructs { + su.Destructs[s] = struct{}{} + } + su.Accounts = make(map[common.Hash][]byte) + for _, kv := range ssu.Accounts { + su.Accounts[kv.Key] = kv.Value + } + su.Storage = make(map[common.Hash]map[common.Hash][]byte) + for _, s := range ssu.Storage { + su.Storage[s.Account] = make(map[common.Hash][]byte) + for _, kv := range s.Data { + su.Storage[s.Account][kv.Key] = kv.Value + } + } + return nil +} + +func Initialize(ctx *cli.Context, loader *plugins.PluginLoader) { + pl = loader + cache, _ = lru.New(128) // TODO: Make size configurable + if !ctx.GlobalBool(utils.SnapshotFlag.Name) { + log.Warn("Snapshots are required for StateUpdate plugins, but are currently disabled. State Updates will be unavailable") + } + log.Info("Loaded block updater plugin") +} + + +// TODO: +// x Record StateUpdates in the database as they are written. +// x Keep an LRU cache of the most recent state updates. +// x Prune the StateUpdates in the database as corresponding blocks move to the freezer. +// * Add an RPC endpoint for getting the StateUpdates of a block +// * Invoke other plugins with each block and its respective StateUpdates. + + +func InitializeNode(stack *node.Node, b interfaces.Backend) { + backend = b +} + +func StateUpdate(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) { + su := &stateUpdate{ + Destructs: destructs, + Accounts: accounts, + Storage: storage, + } + cache.Add(blockRoot, su) + data, _ := rlp.EncodeToBytes(su) + backend.ChainDb().Put(append([]byte("su"), blockRoot.Bytes()...), data) +} + +func AppendAncient(number uint64, hash, headerBytes, body, receipts, td []byte) { + header := new(types.Header) + if err := rlp.Decode(bytes.NewReader(headerBytes), header); err != nil { + log.Warn("Could not decode ancient header", "block", number) + return + } + backend.ChainDb().Delete(append([]byte("su"), header.Root.Bytes()...)) +} + + +func NewHead(block *types.Block, hash common.Hash, logs []*types.Log) { + if pl == nil { + log.Warn("Attempting to emit NewHead, but default PluginLoader has not been initialized") + return + } + result, err := blockUpdates(context.Background(), block) + if err != nil { + log.Error("Could not serialize block", "err", err, "hash", block.Hash()) + return + } + blockEvents.Send(result) + + receipts, err := backend.GetReceipts(context.Background(), block.Hash()) + var su *stateUpdate + if v, ok := cache.Get(block.Root()); ok { + su = v.(*stateUpdate) + } + data, err := backend.ChainDb().Get(append([]byte("su"), block.Root().Bytes()...)) + if err != nil { + log.Error("StateUpdate unavailable for block", "hash", block.Hash()) + return + } + su = &stateUpdate{} + if err := rlp.DecodeBytes(data, su); err != nil { + log.Error("StateUpdate unavailable for block", "hash", block.Hash()) + return + } + fnList := pl.Lookup("BlockUpdates", func(item interface{}) bool { + _, ok := item.(func(*types.Block, []*types.Log, types.Receipts, map[common.Hash]struct{}, map[common.Hash][]byte, map[common.Hash]map[common.Hash][]byte)) + return ok + }) + for _, fni := range fnList { + if fn, ok := fni.(func(*types.Block, []*types.Log, types.Receipts, map[common.Hash]struct{}, map[common.Hash][]byte, map[common.Hash]map[common.Hash][]byte)); ok { + fn(block, logs, receipts, su.Destructs, su.Accounts, su.Storage) + } + } + //TODO: Get plugins to invoke, invoke them +} + + + +type BlockUpdates struct{ + backend interfaces.Backend +} + +func blockUpdates(ctx context.Context, block *types.Block) (map[string]interface{}, error) { + result, err := ethapi.RPCMarshalBlock(block, true, true) + if err != nil { return nil, err } + result["receipts"], err = backend.GetReceipts(ctx, block.Hash()) + if err != nil { return nil, err } + if v, ok := cache.Get(block.Root()); ok { + result["stateUpdates"] = v + return result, nil + } + data, err := backend.ChainDb().Get(append([]byte("su"), block.Root().Bytes()...)) + if err != nil { return nil, fmt.Errorf("State Updates unavailable for block %#x", block.Hash())} + su := &stateUpdate{} + if err := rlp.DecodeBytes(data, su); err != nil { return nil, fmt.Errorf("State updates unavailable for block %#x", block.Hash()) } + result["stateUpdates"] = su + return result, nil +} + +func (b *BlockUpdates) BlockUpdatesByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) { + block, err := b.backend.BlockByNumber(ctx, number) + if err != nil { return nil, err } + return blockUpdates(ctx, block) +} + +func (b *BlockUpdates) BlockUpdatesByHash(ctx context.Context, hash common.Hash) (map[string]interface{}, error) { + block, err := b.backend.BlockByHash(ctx, hash) + if err != nil { return nil, err } + return blockUpdates(ctx, block) +} + +func (b *BlockUpdates) BlockUpdates(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + var ( + rpcSub = notifier.CreateSubscription() + blockDataChan = make(chan map[string]interface{}, 1000) + ) + + sub := blockEvents.Subscribe(blockDataChan) + go func() { + + for { + select { + case b := <-blockDataChan: + notifier.Notify(rpcSub.ID, b) + case <-rpcSub.Err(): // client send an unsubscribe request + sub.Unsubscribe() + return + case <-notifier.Closed(): // connection dropped + sub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + + + +func GetAPIs(stack *node.Node, backend interfaces.Backend) []rpc.API { + return []rpc.API{ + { + Namespace: "cardinal", + Version: "1.0", + Service: &BlockUpdates{backend}, + Public: true, + }, + } +} diff --git a/plugins/plugin_loader.go b/plugins/plugin_loader.go index 55382dcab..a9cfb87e1 100644 --- a/plugins/plugin_loader.go +++ b/plugins/plugin_loader.go @@ -113,7 +113,7 @@ func Initialize(target string, ctx *cli.Context) (err error) { func (pl *PluginLoader) Initialize(ctx *cli.Context) { fns := pl.Lookup("Initialize", func(i interface{}) bool { - _, ok := i.(func(*cli.Context, *PluginLoader) error) + _, ok := i.(func(*cli.Context, *PluginLoader)) return ok }) for _, fni := range fns {