Documentation!

This commit is contained in:
Austin Roberts 2021-07-14 15:45:36 -05:00
parent 7ed35c32bd
commit c58a596a53
2 changed files with 61 additions and 21 deletions

View File

@ -253,6 +253,9 @@ And the client could then access this with an rpc call to `mynaespace_helloWorld
maps the hash of each account address to the SlimRLP encoding of the account maps the hash of each account address to the SlimRLP encoding of the account
data. `storage` maps the hash of each account to a map of that account's data. `storage` maps the hash of each account to a map of that account's
stored data. stored data.
* **Warning**: StateUpdate is only called if Geth is running with
`--snapshot=true`. This is the default behavior for Geth, but if you are
explicitly running with `--snapshot=false` this function will not be invoked.
#### AppendAncient #### AppendAncient
* **Name**: AppendAncient * **Name**: AppendAncient
@ -357,3 +360,13 @@ people who are building something. If you're trying to do something that isn't
supported by the current plugin system, we're happy to help. Reach out to us on supported by the current plugin system, we're happy to help. Reach out to us on
[Discord](https://discord.gg/Epf7b7Gr) and we'll help you figure out how to [Discord](https://discord.gg/Epf7b7Gr) and we'll help you figure out how to
make it work. make it work.
# Existing Plugins
We currently provide the following plugins:
* [BlockUpdates](./plugins/packages/blockupdates/main.go): A good reference
plugin, which leverages several hooks to provide a new BlockUpdates hook,
which plugins can use to get more cohesive updates about new blocks than can
easily be achieved with the standard PluGeth hooks.

View File

@ -31,28 +31,36 @@ var (
blockEvents event.Feed blockEvents event.Feed
) )
// stateUpdate will be used to track state updates
type stateUpdate struct { type stateUpdate struct {
Destructs map[common.Hash]struct{} Destructs map[common.Hash]struct{}
Accounts map[common.Hash][]byte Accounts map[common.Hash][]byte
Storage map[common.Hash]map[common.Hash][]byte Storage map[common.Hash]map[common.Hash][]byte
} }
// kvpair is used for RLP encoding of maps, as maps cannot be RLP encoded directly
type kvpair struct { type kvpair struct {
Key common.Hash Key common.Hash
Value []byte Value []byte
} }
// storage is used for RLP encoding two layers of maps, as maps cannot be RLP encoded directly
type storage struct { type storage struct {
Account common.Hash Account common.Hash
Data []kvpair Data []kvpair
} }
// storedStateUpdate is an RLP encodable version of stateUpdate
type storedStateUpdate struct { type storedStateUpdate struct {
Destructs []common.Hash Destructs []common.Hash
Accounts []kvpair Accounts []kvpair
Storage []storage Storage []storage
} }
// MarshalJSON represents the stateUpdate as JSON for RPC calls
func (su *stateUpdate) MarshalJSON() ([]byte, error) { func (su *stateUpdate) MarshalJSON() ([]byte, error) {
result := make(map[string]interface{}) result := make(map[string]interface{})
destructs := make([]common.Hash, 0, len(su.Destructs)) destructs := make([]common.Hash, 0, len(su.Destructs))
@ -76,6 +84,7 @@ func (su *stateUpdate) MarshalJSON() ([]byte, error) {
return json.Marshal(result) return json.Marshal(result)
} }
// EncodeRLP converts the stateUpdate to a storedStateUpdate, and RLP encodes the result for storage
func (su *stateUpdate) EncodeRLP(w io.Writer) error { func (su *stateUpdate) EncodeRLP(w io.Writer) error {
destructs := make([]common.Hash, 0, len(su.Destructs)) destructs := make([]common.Hash, 0, len(su.Destructs))
for k := range su.Destructs { for k := range su.Destructs {
@ -96,6 +105,7 @@ func (su *stateUpdate) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, storedStateUpdate{destructs, accounts, s}) return rlp.Encode(w, storedStateUpdate{destructs, accounts, s})
} }
// DecodeRLP takes a byte stream, decodes it to a storedStateUpdate, the n converts that into a stateUpdate object
func (su *stateUpdate) DecodeRLP(s *rlp.Stream) error { func (su *stateUpdate) DecodeRLP(s *rlp.Stream) error {
ssu := storedStateUpdate{} ssu := storedStateUpdate{}
if err := s.Decode(&ssu); err != nil { return err } if err := s.Decode(&ssu); err != nil { return err }
@ -117,6 +127,8 @@ func (su *stateUpdate) DecodeRLP(s *rlp.Stream) error {
return nil return nil
} }
// Initialize does initial setup of variables as the plugin is loaded.
func Initialize(ctx *cli.Context, loader *plugins.PluginLoader) { func Initialize(ctx *cli.Context, loader *plugins.PluginLoader) {
pl = loader pl = loader
cache, _ = lru.New(128) // TODO: Make size configurable cache, _ = lru.New(128) // TODO: Make size configurable
@ -127,18 +139,16 @@ func Initialize(ctx *cli.Context, loader *plugins.PluginLoader) {
} }
// TODO: // InitializeNode is invoked by the plugin loader when the node and Backend are
// x Record StateUpdates in the database as they are written. // ready. We will track the backend to provide access to blocks and other
// x Keep an LRU cache of the most recent state updates. // useful information.
// x Prune the StateUpdates in the database as corresponding blocks move to the freezer.
// * Add an RPC endpoint for getting the StateUpdates of a block
// * Invoke other plugins with each block and its respective StateUpdates.
func InitializeNode(stack *node.Node, b interfaces.Backend) { func InitializeNode(stack *node.Node, b interfaces.Backend) {
backend = b backend = b
} }
// StateUpdate gives us updates about state changes made in each block. We
// cache them for short term use, and write them to disk for the longer term.
func StateUpdate(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) { func StateUpdate(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) {
su := &stateUpdate{ su := &stateUpdate{
Destructs: destructs, Destructs: destructs,
@ -150,6 +160,10 @@ func StateUpdate(blockRoot common.Hash, parentRoot common.Hash, destructs map[co
backend.ChainDb().Put(append([]byte("su"), blockRoot.Bytes()...), data) backend.ChainDb().Put(append([]byte("su"), blockRoot.Bytes()...), data)
} }
// AppendAncient removes our state update records from leveldb as the
// corresponding blocks are moved from leveldb to the ancients database. At
// some point in the future, we may want to look at a way to move the state
// updates to an ancients table of their own for longer term retention.
func AppendAncient(number uint64, hash, headerBytes, body, receipts, td []byte) { func AppendAncient(number uint64, hash, headerBytes, body, receipts, td []byte) {
header := new(types.Header) header := new(types.Header)
if err := rlp.Decode(bytes.NewReader(headerBytes), header); err != nil { if err := rlp.Decode(bytes.NewReader(headerBytes), header); err != nil {
@ -160,6 +174,11 @@ func AppendAncient(number uint64, hash, headerBytes, body, receipts, td []byte)
} }
// NewHead is invoked when a new block becomes the latest recognized block. We
// use this to notify the blockEvents channel of new blocks, as well as invoke
// the BlockUpdates hook on downstream plugins.
// TODO: We're not necessarily handling reorgs properly, which may result in
// some blocks not being emitted through this hook.
func NewHead(block *types.Block, hash common.Hash, logs []*types.Log) { func NewHead(block *types.Block, hash common.Hash, logs []*types.Log) {
if pl == nil { if pl == nil {
log.Warn("Attempting to emit NewHead, but default PluginLoader has not been initialized") log.Warn("Attempting to emit NewHead, but default PluginLoader has not been initialized")
@ -176,16 +195,17 @@ func NewHead(block *types.Block, hash common.Hash, logs []*types.Log) {
var su *stateUpdate var su *stateUpdate
if v, ok := cache.Get(block.Root()); ok { if v, ok := cache.Get(block.Root()); ok {
su = v.(*stateUpdate) su = v.(*stateUpdate)
} } else {
data, err := backend.ChainDb().Get(append([]byte("su"), block.Root().Bytes()...)) data, err := backend.ChainDb().Get(append([]byte("su"), block.Root().Bytes()...))
if err != nil { if err != nil {
log.Error("StateUpdate unavailable for block", "hash", block.Hash()) log.Error("StateUpdate unavailable for block", "hash", block.Hash())
return return
} }
su = &stateUpdate{} su = &stateUpdate{}
if err := rlp.DecodeBytes(data, su); err != nil { if err := rlp.DecodeBytes(data, su); err != nil {
log.Error("StateUpdate unavailable for block", "hash", block.Hash()) log.Error("StateUpdate unavailable for block", "hash", block.Hash())
return return
}
} }
fnList := pl.Lookup("BlockUpdates", func(item interface{}) bool { fnList := pl.Lookup("BlockUpdates", func(item interface{}) bool {
_, ok := item.(func(*types.Block, []*types.Log, types.Receipts, map[common.Hash]struct{}, map[common.Hash][]byte, map[common.Hash]map[common.Hash][]byte)) _, ok := item.(func(*types.Block, []*types.Log, types.Receipts, map[common.Hash]struct{}, map[common.Hash][]byte, map[common.Hash]map[common.Hash][]byte))
@ -196,15 +216,16 @@ func NewHead(block *types.Block, hash common.Hash, logs []*types.Log) {
fn(block, logs, receipts, su.Destructs, su.Accounts, su.Storage) fn(block, logs, receipts, su.Destructs, su.Accounts, su.Storage)
} }
} }
//TODO: Get plugins to invoke, invoke them
} }
// BlockUpdates is a service that lets clients query for block updates for a
// given block by hash or number, or subscribe to new block upates.
type BlockUpdates struct{ type BlockUpdates struct{
backend interfaces.Backend backend interfaces.Backend
} }
// blockUpdate handles the serialization of a block
func blockUpdates(ctx context.Context, block *types.Block) (map[string]interface{}, error) { func blockUpdates(ctx context.Context, block *types.Block) (map[string]interface{}, error) {
result, err := ethapi.RPCMarshalBlock(block, true, true) result, err := ethapi.RPCMarshalBlock(block, true, true)
if err != nil { return nil, err } if err != nil { return nil, err }
@ -222,18 +243,24 @@ func blockUpdates(ctx context.Context, block *types.Block) (map[string]interface
return result, nil return result, nil
} }
// BlockUpdatesByNumber retrieves a block by number, gets receipts and state
// updates, and serializes the response.
func (b *BlockUpdates) BlockUpdatesByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) { func (b *BlockUpdates) BlockUpdatesByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) {
block, err := b.backend.BlockByNumber(ctx, number) block, err := b.backend.BlockByNumber(ctx, number)
if err != nil { return nil, err } if err != nil { return nil, err }
return blockUpdates(ctx, block) return blockUpdates(ctx, block)
} }
// BlockUPdatesByHash retrieves a block by hash, gets receipts and state
// updates, and serializes the response.
func (b *BlockUpdates) BlockUpdatesByHash(ctx context.Context, hash common.Hash) (map[string]interface{}, error) { func (b *BlockUpdates) BlockUpdatesByHash(ctx context.Context, hash common.Hash) (map[string]interface{}, error) {
block, err := b.backend.BlockByHash(ctx, hash) block, err := b.backend.BlockByHash(ctx, hash)
if err != nil { return nil, err } if err != nil { return nil, err }
return blockUpdates(ctx, block) return blockUpdates(ctx, block)
} }
// BlockUpdates allows clients to subscribe to notifications of new blocks
// along with receipts and state updates.
func (b *BlockUpdates) BlockUpdates(ctx context.Context) (*rpc.Subscription, error) { func (b *BlockUpdates) BlockUpdates(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx) notifier, supported := rpc.NotifierFromContext(ctx)
if !supported { if !supported {
@ -266,7 +293,7 @@ func (b *BlockUpdates) BlockUpdates(ctx context.Context) (*rpc.Subscription, err
} }
// GetAPIs exposes the BlockUpdates service under the cardinal namespace.
func GetAPIs(stack *node.Node, backend interfaces.Backend) []rpc.API { func GetAPIs(stack *node.Node, backend interfaces.Backend) []rpc.API {
return []rpc.API{ return []rpc.API{
{ {