plugeth/plugins/packages/blockupdates/main.go

307 lines
9.9 KiB
Go

package main
import (
"bytes"
"fmt"
"context"
"encoding/json"
lru "github.com/hashicorp/golang-lru"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/plugins"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/plugins/interfaces"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/urfave/cli/v2"
"io"
)
var (
pl *plugins.PluginLoader
backend interfaces.Backend
lastBlock common.Hash
cache *lru.Cache
blockEvents event.Feed
)
// stateUpdate will be used to track state updates
type stateUpdate struct {
Destructs map[common.Hash]struct{}
Accounts map[common.Hash][]byte
Storage map[common.Hash]map[common.Hash][]byte
}
// kvpair is used for RLP encoding of maps, as maps cannot be RLP encoded directly
type kvpair struct {
Key common.Hash
Value []byte
}
// storage is used for RLP encoding two layers of maps, as maps cannot be RLP encoded directly
type storage struct {
Account common.Hash
Data []kvpair
}
// storedStateUpdate is an RLP encodable version of stateUpdate
type storedStateUpdate struct {
Destructs []common.Hash
Accounts []kvpair
Storage []storage
}
// MarshalJSON represents the stateUpdate as JSON for RPC calls
func (su *stateUpdate) MarshalJSON() ([]byte, error) {
result := make(map[string]interface{})
destructs := make([]common.Hash, 0, len(su.Destructs))
for k := range su.Destructs {
destructs = append(destructs, k)
}
result["destructs"] = destructs
accounts := make(map[common.Hash]hexutil.Bytes)
for k, v := range su.Accounts {
accounts[k] = hexutil.Bytes(v)
}
result["accounts"] = accounts
storage := make(map[common.Hash]map[common.Hash]hexutil.Bytes)
for m, s := range su.Storage {
storage[m] = make(map[common.Hash]hexutil.Bytes)
for k, v := range s {
storage[m][k] = hexutil.Bytes(v)
}
}
result["storage"] = storage
return json.Marshal(result)
}
// EncodeRLP converts the stateUpdate to a storedStateUpdate, and RLP encodes the result for storage
func (su *stateUpdate) EncodeRLP(w io.Writer) error {
destructs := make([]common.Hash, 0, len(su.Destructs))
for k := range su.Destructs {
destructs = append(destructs, k)
}
accounts := make([]kvpair, 0, len(su.Accounts))
for k, v := range su.Accounts {
accounts = append(accounts, kvpair{k, v})
}
s := make([]storage, 0, len(su.Storage))
for a, m := range su.Storage {
accountStorage := storage{a, make([]kvpair, 0, len(m))}
for k, v := range m {
accountStorage.Data = append(accountStorage.Data, kvpair{k, v})
}
s = append(s, accountStorage)
}
return rlp.Encode(w, storedStateUpdate{destructs, accounts, s})
}
// DecodeRLP takes a byte stream, decodes it to a storedStateUpdate, the n converts that into a stateUpdate object
func (su *stateUpdate) DecodeRLP(s *rlp.Stream) error {
ssu := storedStateUpdate{}
if err := s.Decode(&ssu); err != nil { return err }
su.Destructs = make(map[common.Hash]struct{})
for _, s := range ssu.Destructs {
su.Destructs[s] = struct{}{}
}
su.Accounts = make(map[common.Hash][]byte)
for _, kv := range ssu.Accounts {
su.Accounts[kv.Key] = kv.Value
}
su.Storage = make(map[common.Hash]map[common.Hash][]byte)
for _, s := range ssu.Storage {
su.Storage[s.Account] = make(map[common.Hash][]byte)
for _, kv := range s.Data {
su.Storage[s.Account][kv.Key] = kv.Value
}
}
return nil
}
// Initialize does initial setup of variables as the plugin is loaded.
func Initialize(ctx *cli.Context, loader *plugins.PluginLoader) {
pl = loader
cache, _ = lru.New(128) // TODO: Make size configurable
if !ctx.GlobalBool(utils.SnapshotFlag.Name) {
log.Warn("Snapshots are required for StateUpdate plugins, but are currently disabled. State Updates will be unavailable")
}
log.Info("Loaded block updater plugin")
}
// InitializeNode is invoked by the plugin loader when the node and Backend are
// ready. We will track the backend to provide access to blocks and other
// useful information.
func InitializeNode(stack *node.Node, b interfaces.Backend) {
backend = b
}
// StateUpdate gives us updates about state changes made in each block. We
// cache them for short term use, and write them to disk for the longer term.
func StateUpdate(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) {
su := &stateUpdate{
Destructs: destructs,
Accounts: accounts,
Storage: storage,
}
cache.Add(blockRoot, su)
data, _ := rlp.EncodeToBytes(su)
backend.ChainDb().Put(append([]byte("su"), blockRoot.Bytes()...), data)
}
// AppendAncient removes our state update records from leveldb as the
// corresponding blocks are moved from leveldb to the ancients database. At
// some point in the future, we may want to look at a way to move the state
// updates to an ancients table of their own for longer term retention.
func AppendAncient(number uint64, hash, headerBytes, body, receipts, td []byte) {
header := new(types.Header)
if err := rlp.Decode(bytes.NewReader(headerBytes), header); err != nil {
log.Warn("Could not decode ancient header", "block", number)
return
}
backend.ChainDb().Delete(append([]byte("su"), header.Root.Bytes()...))
}
// NewHead is invoked when a new block becomes the latest recognized block. We
// use this to notify the blockEvents channel of new blocks, as well as invoke
// the BlockUpdates hook on downstream plugins.
// TODO: We're not necessarily handling reorgs properly, which may result in
// some blocks not being emitted through this hook.
func NewHead(block *types.Block, hash common.Hash, logs []*types.Log) {
if pl == nil {
log.Warn("Attempting to emit NewHead, but default PluginLoader has not been initialized")
return
}
result, err := blockUpdates(context.Background(), block)
if err != nil {
log.Error("Could not serialize block", "err", err, "hash", block.Hash())
return
}
blockEvents.Send(result)
receipts, err := backend.GetReceipts(context.Background(), block.Hash())
var su *stateUpdate
if v, ok := cache.Get(block.Root()); ok {
su = v.(*stateUpdate)
} else {
data, err := backend.ChainDb().Get(append([]byte("su"), block.Root().Bytes()...))
if err != nil {
log.Error("StateUpdate unavailable for block", "hash", block.Hash())
return
}
su = &stateUpdate{}
if err := rlp.DecodeBytes(data, su); err != nil {
log.Error("StateUpdate unavailable for block", "hash", block.Hash())
return
}
}
fnList := pl.Lookup("BlockUpdates", func(item interface{}) bool {
_, ok := item.(func(*types.Block, []*types.Log, types.Receipts, map[common.Hash]struct{}, map[common.Hash][]byte, map[common.Hash]map[common.Hash][]byte))
return ok
})
for _, fni := range fnList {
if fn, ok := fni.(func(*types.Block, []*types.Log, types.Receipts, map[common.Hash]struct{}, map[common.Hash][]byte, map[common.Hash]map[common.Hash][]byte)); ok {
fn(block, logs, receipts, su.Destructs, su.Accounts, su.Storage)
}
}
}
// BlockUpdates is a service that lets clients query for block updates for a
// given block by hash or number, or subscribe to new block upates.
type BlockUpdates struct{
backend interfaces.Backend
}
// blockUpdate handles the serialization of a block
func blockUpdates(ctx context.Context, block *types.Block) (map[string]interface{}, error) {
result, err := ethapi.RPCMarshalBlock(block, true, true)
if err != nil { return nil, err }
result["receipts"], err = backend.GetReceipts(ctx, block.Hash())
if err != nil { return nil, err }
if v, ok := cache.Get(block.Root()); ok {
result["stateUpdates"] = v
return result, nil
}
data, err := backend.ChainDb().Get(append([]byte("su"), block.Root().Bytes()...))
if err != nil { return nil, fmt.Errorf("State Updates unavailable for block %#x", block.Hash())}
su := &stateUpdate{}
if err := rlp.DecodeBytes(data, su); err != nil { return nil, fmt.Errorf("State updates unavailable for block %#x", block.Hash()) }
result["stateUpdates"] = su
return result, nil
}
// BlockUpdatesByNumber retrieves a block by number, gets receipts and state
// updates, and serializes the response.
func (b *BlockUpdates) BlockUpdatesByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) {
block, err := b.backend.BlockByNumber(ctx, number)
if err != nil { return nil, err }
return blockUpdates(ctx, block)
}
// BlockUpdatesByHash retrieves a block by hash, gets receipts and state
// updates, and serializes the response.
func (b *BlockUpdates) BlockUpdatesByHash(ctx context.Context, hash common.Hash) (map[string]interface{}, error) {
block, err := b.backend.BlockByHash(ctx, hash)
if err != nil { return nil, err }
return blockUpdates(ctx, block)
}
// BlockUpdates allows clients to subscribe to notifications of new blocks
// along with receipts and state updates.
func (b *BlockUpdates) BlockUpdates(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
var (
rpcSub = notifier.CreateSubscription()
blockDataChan = make(chan map[string]interface{}, 1000)
)
sub := blockEvents.Subscribe(blockDataChan)
go func() {
for {
select {
case b := <-blockDataChan:
notifier.Notify(rpcSub.ID, b)
case <-rpcSub.Err(): // client send an unsubscribe request
sub.Unsubscribe()
return
case <-notifier.Closed(): // connection dropped
sub.Unsubscribe()
return
}
}
}()
return rpcSub, nil
}
// GetAPIs exposes the BlockUpdates service under the cardinal namespace.
func GetAPIs(stack *node.Node, backend interfaces.Backend) []rpc.API {
return []rpc.API{
{
Namespace: "cardinal",
Version: "1.0",
Service: &BlockUpdates{backend},
Public: true,
},
}
}