diff --git a/README.md b/README.md index 311e4d3ec..25215ce84 100644 --- a/README.md +++ b/README.md @@ -253,6 +253,9 @@ And the client could then access this with an rpc call to `mynaespace_helloWorld maps the hash of each account address to the SlimRLP encoding of the account data. `storage` maps the hash of each account to a map of that account's stored data. +* **Warning**: StateUpdate is only called if Geth is running with + `--snapshot=true`. This is the default behavior for Geth, but if you are + explicitly running with `--snapshot=false` this function will not be invoked. #### AppendAncient * **Name**: AppendAncient @@ -357,3 +360,13 @@ people who are building something. If you're trying to do something that isn't supported by the current plugin system, we're happy to help. Reach out to us on [Discord](https://discord.gg/Epf7b7Gr) and we'll help you figure out how to make it work. + + +# Existing Plugins + +We currently provide the following plugins: + +* [BlockUpdates](./plugins/packages/blockupdates/main.go): A good reference + plugin, which leverages several hooks to provide a new BlockUpdates hook, + which plugins can use to get more cohesive updates about new blocks than can + easily be achieved with the standard PluGeth hooks. diff --git a/plugins/packages/blockupdates/main.go b/plugins/packages/blockupdates/main.go index f2a269035..5dbc36469 100644 --- a/plugins/packages/blockupdates/main.go +++ b/plugins/packages/blockupdates/main.go @@ -31,28 +31,36 @@ var ( blockEvents event.Feed ) + +// stateUpdate will be used to track state updates type stateUpdate struct { Destructs map[common.Hash]struct{} Accounts map[common.Hash][]byte Storage map[common.Hash]map[common.Hash][]byte } + +// kvpair is used for RLP encoding of maps, as maps cannot be RLP encoded directly type kvpair struct { Key common.Hash Value []byte } +// storage is used for RLP encoding two layers of maps, as maps cannot be RLP encoded directly type storage struct { Account common.Hash Data []kvpair } +// storedStateUpdate is an RLP encodable version of stateUpdate type storedStateUpdate struct { Destructs []common.Hash Accounts []kvpair Storage []storage } + +// MarshalJSON represents the stateUpdate as JSON for RPC calls func (su *stateUpdate) MarshalJSON() ([]byte, error) { result := make(map[string]interface{}) destructs := make([]common.Hash, 0, len(su.Destructs)) @@ -76,6 +84,7 @@ func (su *stateUpdate) MarshalJSON() ([]byte, error) { return json.Marshal(result) } +// EncodeRLP converts the stateUpdate to a storedStateUpdate, and RLP encodes the result for storage func (su *stateUpdate) EncodeRLP(w io.Writer) error { destructs := make([]common.Hash, 0, len(su.Destructs)) for k := range su.Destructs { @@ -96,6 +105,7 @@ func (su *stateUpdate) EncodeRLP(w io.Writer) error { return rlp.Encode(w, storedStateUpdate{destructs, accounts, s}) } +// DecodeRLP takes a byte stream, decodes it to a storedStateUpdate, the n converts that into a stateUpdate object func (su *stateUpdate) DecodeRLP(s *rlp.Stream) error { ssu := storedStateUpdate{} if err := s.Decode(&ssu); err != nil { return err } @@ -117,6 +127,8 @@ func (su *stateUpdate) DecodeRLP(s *rlp.Stream) error { return nil } + +// Initialize does initial setup of variables as the plugin is loaded. func Initialize(ctx *cli.Context, loader *plugins.PluginLoader) { pl = loader cache, _ = lru.New(128) // TODO: Make size configurable @@ -127,18 +139,16 @@ func Initialize(ctx *cli.Context, loader *plugins.PluginLoader) { } -// TODO: -// x Record StateUpdates in the database as they are written. -// x Keep an LRU cache of the most recent state updates. -// x Prune the StateUpdates in the database as corresponding blocks move to the freezer. -// * Add an RPC endpoint for getting the StateUpdates of a block -// * Invoke other plugins with each block and its respective StateUpdates. - - +// InitializeNode is invoked by the plugin loader when the node and Backend are +// ready. We will track the backend to provide access to blocks and other +// useful information. func InitializeNode(stack *node.Node, b interfaces.Backend) { backend = b } + +// StateUpdate gives us updates about state changes made in each block. We +// cache them for short term use, and write them to disk for the longer term. func StateUpdate(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) { su := &stateUpdate{ Destructs: destructs, @@ -150,6 +160,10 @@ func StateUpdate(blockRoot common.Hash, parentRoot common.Hash, destructs map[co backend.ChainDb().Put(append([]byte("su"), blockRoot.Bytes()...), data) } +// AppendAncient removes our state update records from leveldb as the +// corresponding blocks are moved from leveldb to the ancients database. At +// some point in the future, we may want to look at a way to move the state +// updates to an ancients table of their own for longer term retention. func AppendAncient(number uint64, hash, headerBytes, body, receipts, td []byte) { header := new(types.Header) if err := rlp.Decode(bytes.NewReader(headerBytes), header); err != nil { @@ -160,6 +174,11 @@ func AppendAncient(number uint64, hash, headerBytes, body, receipts, td []byte) } +// NewHead is invoked when a new block becomes the latest recognized block. We +// use this to notify the blockEvents channel of new blocks, as well as invoke +// the BlockUpdates hook on downstream plugins. +// TODO: We're not necessarily handling reorgs properly, which may result in +// some blocks not being emitted through this hook. func NewHead(block *types.Block, hash common.Hash, logs []*types.Log) { if pl == nil { log.Warn("Attempting to emit NewHead, but default PluginLoader has not been initialized") @@ -176,16 +195,17 @@ func NewHead(block *types.Block, hash common.Hash, logs []*types.Log) { var su *stateUpdate if v, ok := cache.Get(block.Root()); ok { su = v.(*stateUpdate) - } - data, err := backend.ChainDb().Get(append([]byte("su"), block.Root().Bytes()...)) - if err != nil { - log.Error("StateUpdate unavailable for block", "hash", block.Hash()) - return - } - su = &stateUpdate{} - if err := rlp.DecodeBytes(data, su); err != nil { - log.Error("StateUpdate unavailable for block", "hash", block.Hash()) - return + } else { + data, err := backend.ChainDb().Get(append([]byte("su"), block.Root().Bytes()...)) + if err != nil { + log.Error("StateUpdate unavailable for block", "hash", block.Hash()) + return + } + su = &stateUpdate{} + if err := rlp.DecodeBytes(data, su); err != nil { + log.Error("StateUpdate unavailable for block", "hash", block.Hash()) + return + } } fnList := pl.Lookup("BlockUpdates", func(item interface{}) bool { _, ok := item.(func(*types.Block, []*types.Log, types.Receipts, map[common.Hash]struct{}, map[common.Hash][]byte, map[common.Hash]map[common.Hash][]byte)) @@ -196,15 +216,16 @@ func NewHead(block *types.Block, hash common.Hash, logs []*types.Log) { fn(block, logs, receipts, su.Destructs, su.Accounts, su.Storage) } } - //TODO: Get plugins to invoke, invoke them } - +// BlockUpdates is a service that lets clients query for block updates for a +// given block by hash or number, or subscribe to new block upates. type BlockUpdates struct{ backend interfaces.Backend } +// blockUpdate handles the serialization of a block func blockUpdates(ctx context.Context, block *types.Block) (map[string]interface{}, error) { result, err := ethapi.RPCMarshalBlock(block, true, true) if err != nil { return nil, err } @@ -222,18 +243,24 @@ func blockUpdates(ctx context.Context, block *types.Block) (map[string]interface return result, nil } +// BlockUpdatesByNumber retrieves a block by number, gets receipts and state +// updates, and serializes the response. func (b *BlockUpdates) BlockUpdatesByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) { block, err := b.backend.BlockByNumber(ctx, number) if err != nil { return nil, err } return blockUpdates(ctx, block) } +// BlockUPdatesByHash retrieves a block by hash, gets receipts and state +// updates, and serializes the response. func (b *BlockUpdates) BlockUpdatesByHash(ctx context.Context, hash common.Hash) (map[string]interface{}, error) { block, err := b.backend.BlockByHash(ctx, hash) if err != nil { return nil, err } return blockUpdates(ctx, block) } +// BlockUpdates allows clients to subscribe to notifications of new blocks +// along with receipts and state updates. func (b *BlockUpdates) BlockUpdates(ctx context.Context) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { @@ -266,7 +293,7 @@ func (b *BlockUpdates) BlockUpdates(ctx context.Context) (*rpc.Subscription, err } - +// GetAPIs exposes the BlockUpdates service under the cardinal namespace. func GetAPIs(stack *node.Node, backend interfaces.Backend) []rpc.API { return []rpc.API{ {