docs: update streaming docs (#16311)

This commit is contained in:
Julien Robert 2023-05-26 15:47:46 +02:00 committed by GitHub
parent 60977e6dbd
commit 0360c3d87f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 315 deletions

View File

@ -8,7 +8,7 @@
* Add `ListenCommit`, flatten the state writes in a block to a single batch.
* Remove listeners from cache stores, should only listen to `rootmulti.Store`.
* Remove `HaltAppOnDeliveryError()`, the errors are propagated by default, the implementations should return nil if don't want to propogate errors.
* 26/05/2023: Update with ABCI 2.0
## Status
@ -223,14 +223,10 @@ so that the service can group the state changes with the ABCI requests.
// ABCIListener is the interface that we're exposing as a streaming service.
type ABCIListener interface {
// ListenBeginBlock updates the streaming service with the latest BeginBlock messages
ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error
// ListenEndBlock updates the steaming service with the latest EndBlock messages
ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
// ListenCommit updates the steaming service with the latest Commit messages and state changes
ListenCommit(ctx context.Context, res abci.ResponseCommit, changeSet []*store.StoreKVPair) error
// ListenFinalizeBlock updates the streaming service with the latest FinalizeBlock messages
ListenFinalizeBlock(ctx context.Context, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) error
// ListenCommit updates the steaming service with the latest Commit messages and state changes
ListenCommit(ctx context.Context, res abci.ResponseCommit, changeSet []*StoreKVPair) error
}
```
@ -267,85 +263,27 @@ type BaseApp struct {
#### ABCI Event Hooks
We will modify the `BeginBlock`, `EndBlock`, `DeliverTx` and `Commit` methods to pass ABCI requests and responses
We will modify the `FinalizeBlock` and `Commit` methods to pass ABCI requests and responses
to any streaming service hooks registered with the `BaseApp`.
```go
func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeginBlock) {
func (app *BaseApp) FinalizeBlock(req abci.RequestFinalizeBlock) abci.ResponseFinalizeBlock {
...
// call the streaming service hook with the BeginBlock messages
for _, abciListener := range app.abciListeners {
ctx := app.deliverState.ctx
blockHeight := ctx.BlockHeight()
if app.abciListenersAsync {
go func(req abci.RequestBeginBlock, res abci.ResponseBeginBlock) {
if err := app.abciListener.ListenBeginBlock(ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", blockHeight, "err", err)
}
}(req, res)
} else {
if err := app.abciListener.ListenBeginBlock(ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", blockHeight, "err", err)
if app.stopNodeOnABCIListenerErr {
os.Exit(1)
}
}
}
}
return res
}
```
```go
func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBlock) {
...
// call the streaming service hook with the EndBlock messages
for _, abciListener := range app.abciListeners {
ctx := app.deliverState.ctx
blockHeight := ctx.BlockHeight()
if app.abciListenersAsync {
go func(req abci.RequestEndBlock, res abci.ResponseEndBlock) {
if err := app.abciListener.ListenEndBlock(blockHeight, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", blockHeight, "err", err)
}
}(req, res)
} else {
if err := app.abciListener.ListenEndBlock(blockHeight, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", blockHeight, "err", err)
if app.stopNodeOnABCIListenerErr {
os.Exit(1)
}
}
}
}
return res
}
```
```go
func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
var abciRes abci.ResponseDeliverTx
var abciRes abci.ResponseFinalizeBlock
defer func() {
// call the streaming service hook with the EndBlock messages
// call the streaming service hook with the FinalizeBlock messages
for _, abciListener := range app.abciListeners {
ctx := app.deliverState.ctx
ctx := app.finalizeState.ctx
blockHeight := ctx.BlockHeight()
if app.abciListenersAsync {
go func(req abci.RequestDeliverTx, res abci.ResponseDeliverTx) {
if err := app.abciListener.ListenDeliverTx(blockHeight, req, res); err != nil {
app.logger.Error("DeliverTx listening hook failed", "height", blockHeight, "err", err)
go func(req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) {
if err := app.abciListener.FinalizeBlock(blockHeight, req, res); err != nil {
app.logger.Error("FinalizeBlock listening hook failed", "height", blockHeight, "err", err)
}
}(req, abciRes)
} else {
if err := app.abciListener.ListenDeliverTx(blockHeight, req, res); err != nil {
app.logger.Error("DeliverTx listening hook failed", "height", blockHeight, "err", err)
if err := app.abciListener.ListenFinalizeBlock(blockHeight, req, res); err != nil {
app.logger.Error("FinalizeBlock listening hook failed", "height", blockHeight, "err", err)
if app.stopNodeOnABCIListenerErr {
os.Exit(1)
}
@ -455,18 +393,9 @@ syntax = "proto3";
message Empty {}
message ListenBeginBlockRequest {
RequestBeginBlock req = 1;
ResponseBeginBlock res = 2;
}
message ListenEndBlockRequest {
RequestEndBlock req = 1;
ResponseEndBlock res = 2;
}
message ListenDeliverTxRequest {
int64 block_height = 1;
RequestDeliverTx req = 2;
ResponseDeliverTx res = 3;
message ListenFinalizeBlockRequest {
RequestFinalizeBlock req = 1;
ResponseFinalizeBlock res = 2;
}
message ListenCommitRequest {
int64 block_height = 1;
@ -476,9 +405,7 @@ message ListenCommitRequest {
// plugin that listens to state changes
service ABCIListenerService {
rpc ListenBeginBlock(ListenBeginBlockRequest) returns (Empty);
rpc ListenEndBlock(ListenEndBlockRequest) returns (Empty);
rpc ListenDeliverTx(ListenDeliverTxRequest) returns (Empty);
rpc ListenFinalizeBlock(ListenFinalizeBlockRequest) returns (Empty);
rpc ListenCommit(ListenCommitRequest) returns (Empty);
}
```
@ -487,9 +414,7 @@ service ABCIListenerService {
...
// plugin that doesn't listen to state changes
service ABCIListenerService {
rpc ListenBeginBlock(ListenBeginBlockRequest) returns (Empty);
rpc ListenEndBlock(ListenEndBlockRequest) returns (Empty);
rpc ListenDeliverTx(ListenDeliverTxRequest) returns (Empty);
rpc ListenFinalizeBlock(ListenFinalizeBlockRequest) returns (Empty);
rpc ListenCommit(ListenCommitRequest) returns (Empty);
}
```
@ -508,17 +433,7 @@ type GRPCClient struct {
client ABCIListenerServiceClient
}
func (m *GRPCClient) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
_, err := m.client.ListenBeginBlock(ctx, &ListenBeginBlockRequest{Req: req, Res: res})
return err
}
func (m *GRPCClient) ListenEndBlock(goCtx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error {
_, err := m.client.ListenEndBlock(ctx, &ListenEndBlockRequest{Req: req, Res: res})
return err
}
func (m *GRPCClient) ListenDeliverTx(goCtx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error {
func (m *GRPCClient) ListenFinalizeBlock(goCtx context.Context, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) error {
ctx := sdk.UnwrapSDKContext(goCtx)
_, err := m.client.ListenDeliverTx(ctx, &ListenDeliverTxRequest{BlockHeight: ctx.BlockHeight(), Req: req, Res: res})
return err
@ -536,16 +451,8 @@ type GRPCServer struct {
Impl baseapp.ABCIListener
}
func (m *GRPCServer) ListenBeginBlock(ctx context.Context, req *ListenBeginBlockRequest) (*Empty, error) {
return &Empty{}, m.Impl.ListenBeginBlock(ctx, req.Req, req.Res)
}
func (m *GRPCServer) ListenEndBlock(ctx context.Context, req *ListenEndBlockRequest) (*Empty, error) {
return &Empty{}, m.Impl.ListenEndBlock(ctx, req.Req, req.Res)
}
func (m *GRPCServer) ListenDeliverTx(ctx context.Context, req *ListenDeliverTxRequest) (*Empty, error) {
return &Empty{}, m.Impl.ListenDeliverTx(ctx, req.Req, req.Res)
func (m *GRPCServer) ListenFinalizeBlock(ctx context.Context, req *ListenFinalizeBlockRequest) (*Empty, error) {
return &Empty{}, m.Impl.ListenFinalizeBlock(ctx, req.Req, req.Res)
}
func (m *GRPCServer) ListenCommit(ctx context.Context, req *ListenCommitRequest) (*Empty, error) {
@ -564,15 +471,7 @@ And the pre-compiled Go plugin `Impl`(*this is only used for plugins that are wr
// ABCIListener is the implementation of the baseapp.ABCIListener interface
type ABCIListener struct{}
func (m *ABCIListenerPlugin) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
// send data to external system
}
func (m *ABCIListenerPlugin) ListenEndBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
// send data to external system
}
func (m *ABCIListenerPlugin) ListenDeliverTxBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
func (m *ABCIListenerPlugin) ListenFinalizeBlock(ctx context.Context, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) error {
// send data to external system
}

View File

@ -13,55 +13,10 @@ In this section we describe the implementation of the `ABCIListener` interface a
### Service Protocol
The companion service protocol for the `ABCIListener` interface is described below.
See [proto/cosmos/store/streaming/abci/grpc.proto](https://github.com/cosmos/cosmos-sdk/blob/main/proto/cosmos/store/streaming/grpc.proto) for full details.
See [proto/cosmos/store/streaming/abci/grpc.proto](https://github.com/cosmos/cosmos-sdk/blob/main/proto/cosmos/store/streaming/abci/grpc.proto) for full details.
```protobuf
syntax = "proto3";
package cosmos.store.streaming.abci;
import "tendermint/abci/types.proto";
import "cosmos/base/store/v1beta1/listening.proto";
option go_package = "cosmossdk.io/store/streaming/abci";
// Empty is the response message for incoming requests
message Empty {}
// ListenBeginBlockRequest sends BeginBlock requests and responses to server
message ListenBeginBlockRequest {
tendermint.abci.RequestBeginBlock req = 1;
tendermint.abci.ResponseBeginBlock res = 2;
}
// ListenEndBlockRequest sends EndBlock requests and responses to server
message ListenEndBlockRequest {
tendermint.abci.RequestEndBlock req = 1;
tendermint.abci.ResponseEndBlock res = 2;
}
// ListenDeliverTxRequest sends DeliverTx requests and responses to server
message ListenDeliverTxRequest {
// explicitly pass in block height as neither RequestDeliverTx or ResponseDeliverTx contain it
int64 block_height = 1;
tendermint.abci.RequestDeliverTx req = 2;
tendermint.abci.ResponseDeliverTx res = 3;
}
// ListenCommitRequest sends Commit responses and state changes to server
message ListenCommitRequest {
// explicitly pass in block height as ResponseCommit does not contain this info
int64 block_height = 1;
tendermint.abci.ResponseCommit res = 2;
repeated cosmos.base.store.v1beta1.StoreKVPair change_set = 3;
}
service ABCIListenerService {
rpc ListenBeginBlock(ListenBeginBlockRequest) returns (Empty);
rpc ListenEndBlock(ListenEndBlockRequest) returns (Empty);
rpc ListenDeliverTx(ListenDeliverTxRequest) returns (Empty);
rpc ListenCommit(ListenCommitRequest) returns (Empty);
}
```protobuf reference
https://github.com/cosmos/cosmos-sdk/blob/6cee22df52eb0cbb30e351fbb41f66d26c1f8300/proto/cosmos/store/streaming/abci/grpc.proto#L1-L36
```
### Generating the Code
@ -90,142 +45,16 @@ of the `ABCIListener` plugin written in Go.
The `BaseApp` `ABCIListener` interface will be what will define the plugins capabilities.
Boilerplate RPC implementation example of the `ABCIListener` interface. ([store/streaming/abci/grpc.go](grpc.go))
Boilerplate RPC implementation example of the `ABCIListener` interface. ([store/streaming/abci/grpc.go](https://github.com/cosmos/cosmos-sdk/blob/main/store/streaming/abci/grpc.go))
```go
...
var (
_ storetypes.ABCIListener = (*GRPCClient)(nil)
)
// GRPCClient is an implementation of the ABCIListener interface that talks over gRPC.
type GRPCClient struct {
client ABCIListenerServiceClient
}
func (m GRPCClient) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
_, err := m.client.ListenBeginBlock(ctx, &ListenBeginBlockRequest{
Req: &req,
Res: &res,
})
return err
}
func (m GRPCClient) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error {
_, err := m.client.ListenEndBlock(ctx, &ListenEndBlockRequest{
Req: &req,
Res: &res,
})
return err
}
func (m GRPCClient) ListenDeliverTx(goCtx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error {
ctx := sdk.UnwrapSDKContext(goCtx)
_, err := m.client.ListenDeliverTx(ctx, &ListenDeliverTxRequest{
BlockHeight: ctx.BlockHeight(),
Req: &req,
Res: &res,
})
return err
}
func (m GRPCClient) ListenCommit(goCtx context.Context, res abci.ResponseCommit, changeSet []*store.StoreKVPair) error {
ctx := sdk.UnwrapSDKContext(goCtx)
_, err := m.client.ListenCommit(ctx, &ListenCommitRequest{
BlockHeight: ctx.BlockHeight(),
Res: &res,
ChangeSet: changeSet,
})
return err
}
// GRPCServer is the gRPC server that GRPCClient talks to.
type GRPCServer struct {
// This is the real implementation
Impl storetypes.ABCIListener
}
func (m GRPCServer) ListenBeginBlock(ctx context.Context, request *ListenBeginBlockRequest) (*Empty, error) {
if err := m.Impl.ListenBeginBlock(ctx, *request.Req, *request.Res); err != nil {
return nil, err
}
return &Empty{}, nil
}
func (m GRPCServer) ListenEndBlock(ctx context.Context, request *ListenEndBlockRequest) (*Empty, error) {
if err := m.Impl.ListenEndBlock(ctx, *request.Req, *request.Res); err != nil {
return nil, err
}
return &Empty{}, nil
}
func (m GRPCServer) ListenDeliverTx(ctx context.Context, request *ListenDeliverTxRequest) (*Empty, error) {
if err := m.Impl.ListenDeliverTx(ctx, *request.Req, *request.Res); err != nil {
return nil, err
}
return &Empty{}, nil
}
func (m GRPCServer) ListenCommit(ctx context.Context, request *ListenCommitRequest) (*Empty, error) {
if err := m.Impl.ListenCommit(ctx, *request.Res, request.ChangeSet); err != nil {
return nil, err
}
return &Empty{}, nil
}
```go reference
https://github.com/cosmos/cosmos-sdk/blob/f851e188b3b9d46e7c63fa514ad137e6d558fdd9/store/streaming/abci/grpc.go#L13-L79
```
Our `ABCIlistener` service plugin. ([store/streaming/plugins/abci/v1/interface.go](interface.go))
```go
...
// Handshake is a common handshake that is shared by streaming and host.
// This prevents users from executing bad plugins or executing a plugin
// directory. It is a UX feature, not a security feature.
var Handshake = plugin.HandshakeConfig{
// This isn't required when using VersionedPlugins
ProtocolVersion: 1,
MagicCookieKey: "ABCI_LISTENER_PLUGIN",
MagicCookieValue: "ef78114d-7bdf-411c-868f-347c99a78345",
}
var (
_ plugin.GRPCPlugin = (*ABCIListenerGRPCPlugin)(nil)
)
// ListenerGRPCPlugin is the implementation of plugin.Plugin so we can serve/consume this
//
// This has two methods: Server must return an RPC server for this plugin
// type. We construct a GreeterRPCServer for this.
//
// Client must return an implementation of our interface that communicates
// over an RPC client. We return GreeterRPC for this.
//
// Ignore MuxBroker. That is used to create more multiplexed streams on our
// plugin connection and is a more advanced use case.
//
// description: copied from hashicorp plugin documentation.
type ListenerGRPCPlugin struct {
// GRPCPlugin must still implement the Plugin interface
plugin.Plugin
// Concrete implementation, written in Go. This is only used for plugins
// that are written in Go.
Impl storetypes.ABCIListener
}
func (p *ListenerGRPCPlugin) GRPCServer(_ *plugin.GRPCBroker, s *grpc.Server) error {
RegisterABCIListenerServiceServer(s, &GRPCServer{Impl: p.Impl})
return nil
}
func (p *ListenerGRPCPlugin) GRPCClient(
_ context.Context,
_ *plugin.GRPCBroker,
c *grpc.ClientConn,
) (interface{}, error) {
return &GRPCClient{client: NewABCIListenerServiceClient(c)}, nil
}
```go reference
https://github.com/cosmos/cosmos-sdk/blob/f851e188b3b9d46e7c63fa514ad137e6d558fdd9/store/streaming/abci/interface.go#L13-L45
```
#### Plugin Implementation
@ -248,21 +77,11 @@ type MyPlugin struct {
...
}
func (a FilePlugin) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
func (a FilePlugin) ListenFinalizeBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
// process data
return nil
}
func (a FilePlugin) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error {
// process data
return nil
}
func (a FilePlugin) ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error {
// process data
return nil
}
func (a FilePlugin) ListenCommit(ctx context.Context, res abci.ResponseCommit, changeSet []*store.StoreKVPair) error {
// process data
return nil