diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md index 0b2c03090e..81bfa5a712 100644 --- a/docs/architecture/adr-038-state-listening.md +++ b/docs/architecture/adr-038-state-listening.md @@ -8,7 +8,7 @@ * Add `ListenCommit`, flatten the state writes in a block to a single batch. * Remove listeners from cache stores, should only listen to `rootmulti.Store`. * Remove `HaltAppOnDeliveryError()`, the errors are propagated by default, the implementations should return nil if don't want to propogate errors. - +* 26/05/2023: Update with ABCI 2.0 ## Status @@ -223,14 +223,10 @@ so that the service can group the state changes with the ABCI requests. // ABCIListener is the interface that we're exposing as a streaming service. type ABCIListener interface { - // ListenBeginBlock updates the streaming service with the latest BeginBlock messages - ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error - // ListenEndBlock updates the steaming service with the latest EndBlock messages - ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error - // ListenDeliverTx updates the steaming service with the latest DeliverTx messages - ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error - // ListenCommit updates the steaming service with the latest Commit messages and state changes - ListenCommit(ctx context.Context, res abci.ResponseCommit, changeSet []*store.StoreKVPair) error + // ListenFinalizeBlock updates the streaming service with the latest FinalizeBlock messages + ListenFinalizeBlock(ctx context.Context, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) error + // ListenCommit updates the steaming service with the latest Commit messages and state changes + ListenCommit(ctx context.Context, res abci.ResponseCommit, changeSet []*StoreKVPair) error } ``` @@ -267,85 +263,27 @@ type BaseApp struct { #### ABCI Event Hooks -We will modify the `BeginBlock`, `EndBlock`, `DeliverTx` and `Commit` methods to pass ABCI requests and responses +We will modify the `FinalizeBlock` and `Commit` methods to pass ABCI requests and responses to any streaming service hooks registered with the `BaseApp`. ```go -func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeginBlock) { +func (app *BaseApp) FinalizeBlock(req abci.RequestFinalizeBlock) abci.ResponseFinalizeBlock { - ... - - // call the streaming service hook with the BeginBlock messages - for _, abciListener := range app.abciListeners { - ctx := app.deliverState.ctx - blockHeight := ctx.BlockHeight() - if app.abciListenersAsync { - go func(req abci.RequestBeginBlock, res abci.ResponseBeginBlock) { - if err := app.abciListener.ListenBeginBlock(ctx, req, res); err != nil { - app.logger.Error("BeginBlock listening hook failed", "height", blockHeight, "err", err) - } - }(req, res) - } else { - if err := app.abciListener.ListenBeginBlock(ctx, req, res); err != nil { - app.logger.Error("BeginBlock listening hook failed", "height", blockHeight, "err", err) - if app.stopNodeOnABCIListenerErr { - os.Exit(1) - } - } - } - } - - return res -} -``` - -```go -func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBlock) { - - ... - - // call the streaming service hook with the EndBlock messages - for _, abciListener := range app.abciListeners { - ctx := app.deliverState.ctx - blockHeight := ctx.BlockHeight() - if app.abciListenersAsync { - go func(req abci.RequestEndBlock, res abci.ResponseEndBlock) { - if err := app.abciListener.ListenEndBlock(blockHeight, req, res); err != nil { - app.logger.Error("EndBlock listening hook failed", "height", blockHeight, "err", err) - } - }(req, res) - } else { - if err := app.abciListener.ListenEndBlock(blockHeight, req, res); err != nil { - app.logger.Error("EndBlock listening hook failed", "height", blockHeight, "err", err) - if app.stopNodeOnABCIListenerErr { - os.Exit(1) - } - } - } - } - - return res -} -``` - -```go -func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx { - - var abciRes abci.ResponseDeliverTx + var abciRes abci.ResponseFinalizeBlock defer func() { - // call the streaming service hook with the EndBlock messages + // call the streaming service hook with the FinalizeBlock messages for _, abciListener := range app.abciListeners { - ctx := app.deliverState.ctx + ctx := app.finalizeState.ctx blockHeight := ctx.BlockHeight() if app.abciListenersAsync { - go func(req abci.RequestDeliverTx, res abci.ResponseDeliverTx) { - if err := app.abciListener.ListenDeliverTx(blockHeight, req, res); err != nil { - app.logger.Error("DeliverTx listening hook failed", "height", blockHeight, "err", err) + go func(req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) { + if err := app.abciListener.FinalizeBlock(blockHeight, req, res); err != nil { + app.logger.Error("FinalizeBlock listening hook failed", "height", blockHeight, "err", err) } }(req, abciRes) } else { - if err := app.abciListener.ListenDeliverTx(blockHeight, req, res); err != nil { - app.logger.Error("DeliverTx listening hook failed", "height", blockHeight, "err", err) + if err := app.abciListener.ListenFinalizeBlock(blockHeight, req, res); err != nil { + app.logger.Error("FinalizeBlock listening hook failed", "height", blockHeight, "err", err) if app.stopNodeOnABCIListenerErr { os.Exit(1) } @@ -455,18 +393,9 @@ syntax = "proto3"; message Empty {} -message ListenBeginBlockRequest { - RequestBeginBlock req = 1; - ResponseBeginBlock res = 2; -} -message ListenEndBlockRequest { - RequestEndBlock req = 1; - ResponseEndBlock res = 2; -} -message ListenDeliverTxRequest { - int64 block_height = 1; - RequestDeliverTx req = 2; - ResponseDeliverTx res = 3; +message ListenFinalizeBlockRequest { + RequestFinalizeBlock req = 1; + ResponseFinalizeBlock res = 2; } message ListenCommitRequest { int64 block_height = 1; @@ -476,9 +405,7 @@ message ListenCommitRequest { // plugin that listens to state changes service ABCIListenerService { - rpc ListenBeginBlock(ListenBeginBlockRequest) returns (Empty); - rpc ListenEndBlock(ListenEndBlockRequest) returns (Empty); - rpc ListenDeliverTx(ListenDeliverTxRequest) returns (Empty); + rpc ListenFinalizeBlock(ListenFinalizeBlockRequest) returns (Empty); rpc ListenCommit(ListenCommitRequest) returns (Empty); } ``` @@ -487,9 +414,7 @@ service ABCIListenerService { ... // plugin that doesn't listen to state changes service ABCIListenerService { - rpc ListenBeginBlock(ListenBeginBlockRequest) returns (Empty); - rpc ListenEndBlock(ListenEndBlockRequest) returns (Empty); - rpc ListenDeliverTx(ListenDeliverTxRequest) returns (Empty); + rpc ListenFinalizeBlock(ListenFinalizeBlockRequest) returns (Empty); rpc ListenCommit(ListenCommitRequest) returns (Empty); } ``` @@ -508,17 +433,7 @@ type GRPCClient struct { client ABCIListenerServiceClient } -func (m *GRPCClient) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { - _, err := m.client.ListenBeginBlock(ctx, &ListenBeginBlockRequest{Req: req, Res: res}) - return err -} - -func (m *GRPCClient) ListenEndBlock(goCtx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error { - _, err := m.client.ListenEndBlock(ctx, &ListenEndBlockRequest{Req: req, Res: res}) - return err -} - -func (m *GRPCClient) ListenDeliverTx(goCtx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error { +func (m *GRPCClient) ListenFinalizeBlock(goCtx context.Context, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) error { ctx := sdk.UnwrapSDKContext(goCtx) _, err := m.client.ListenDeliverTx(ctx, &ListenDeliverTxRequest{BlockHeight: ctx.BlockHeight(), Req: req, Res: res}) return err @@ -536,16 +451,8 @@ type GRPCServer struct { Impl baseapp.ABCIListener } -func (m *GRPCServer) ListenBeginBlock(ctx context.Context, req *ListenBeginBlockRequest) (*Empty, error) { - return &Empty{}, m.Impl.ListenBeginBlock(ctx, req.Req, req.Res) -} - -func (m *GRPCServer) ListenEndBlock(ctx context.Context, req *ListenEndBlockRequest) (*Empty, error) { - return &Empty{}, m.Impl.ListenEndBlock(ctx, req.Req, req.Res) -} - -func (m *GRPCServer) ListenDeliverTx(ctx context.Context, req *ListenDeliverTxRequest) (*Empty, error) { - return &Empty{}, m.Impl.ListenDeliverTx(ctx, req.Req, req.Res) +func (m *GRPCServer) ListenFinalizeBlock(ctx context.Context, req *ListenFinalizeBlockRequest) (*Empty, error) { + return &Empty{}, m.Impl.ListenFinalizeBlock(ctx, req.Req, req.Res) } func (m *GRPCServer) ListenCommit(ctx context.Context, req *ListenCommitRequest) (*Empty, error) { @@ -564,15 +471,7 @@ And the pre-compiled Go plugin `Impl`(*this is only used for plugins that are wr // ABCIListener is the implementation of the baseapp.ABCIListener interface type ABCIListener struct{} -func (m *ABCIListenerPlugin) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { - // send data to external system -} - -func (m *ABCIListenerPlugin) ListenEndBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { - // send data to external system -} - -func (m *ABCIListenerPlugin) ListenDeliverTxBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { +func (m *ABCIListenerPlugin) ListenFinalizeBlock(ctx context.Context, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) error { // send data to external system } diff --git a/store/streaming/abci/README.md b/store/streaming/abci/README.md index 821689e588..08aaf12e8a 100644 --- a/store/streaming/abci/README.md +++ b/store/streaming/abci/README.md @@ -13,55 +13,10 @@ In this section we describe the implementation of the `ABCIListener` interface a ### Service Protocol The companion service protocol for the `ABCIListener` interface is described below. -See [proto/cosmos/store/streaming/abci/grpc.proto](https://github.com/cosmos/cosmos-sdk/blob/main/proto/cosmos/store/streaming/grpc.proto) for full details. +See [proto/cosmos/store/streaming/abci/grpc.proto](https://github.com/cosmos/cosmos-sdk/blob/main/proto/cosmos/store/streaming/abci/grpc.proto) for full details. -```protobuf -syntax = "proto3"; - -package cosmos.store.streaming.abci; - -import "tendermint/abci/types.proto"; -import "cosmos/base/store/v1beta1/listening.proto"; - -option go_package = "cosmossdk.io/store/streaming/abci"; - -// Empty is the response message for incoming requests -message Empty {} - -// ListenBeginBlockRequest sends BeginBlock requests and responses to server -message ListenBeginBlockRequest { - tendermint.abci.RequestBeginBlock req = 1; - tendermint.abci.ResponseBeginBlock res = 2; -} - -// ListenEndBlockRequest sends EndBlock requests and responses to server -message ListenEndBlockRequest { - tendermint.abci.RequestEndBlock req = 1; - tendermint.abci.ResponseEndBlock res = 2; -} - -// ListenDeliverTxRequest sends DeliverTx requests and responses to server -message ListenDeliverTxRequest { - // explicitly pass in block height as neither RequestDeliverTx or ResponseDeliverTx contain it - int64 block_height = 1; - tendermint.abci.RequestDeliverTx req = 2; - tendermint.abci.ResponseDeliverTx res = 3; -} - -// ListenCommitRequest sends Commit responses and state changes to server -message ListenCommitRequest { - // explicitly pass in block height as ResponseCommit does not contain this info - int64 block_height = 1; - tendermint.abci.ResponseCommit res = 2; - repeated cosmos.base.store.v1beta1.StoreKVPair change_set = 3; -} - -service ABCIListenerService { - rpc ListenBeginBlock(ListenBeginBlockRequest) returns (Empty); - rpc ListenEndBlock(ListenEndBlockRequest) returns (Empty); - rpc ListenDeliverTx(ListenDeliverTxRequest) returns (Empty); - rpc ListenCommit(ListenCommitRequest) returns (Empty); -} +```protobuf reference +https://github.com/cosmos/cosmos-sdk/blob/6cee22df52eb0cbb30e351fbb41f66d26c1f8300/proto/cosmos/store/streaming/abci/grpc.proto#L1-L36 ``` ### Generating the Code @@ -90,142 +45,16 @@ of the `ABCIListener` plugin written in Go. The `BaseApp` `ABCIListener` interface will be what will define the plugins capabilities. -Boilerplate RPC implementation example of the `ABCIListener` interface. ([store/streaming/abci/grpc.go](grpc.go)) +Boilerplate RPC implementation example of the `ABCIListener` interface. ([store/streaming/abci/grpc.go](https://github.com/cosmos/cosmos-sdk/blob/main/store/streaming/abci/grpc.go)) -```go -... - -var ( - _ storetypes.ABCIListener = (*GRPCClient)(nil) -) - -// GRPCClient is an implementation of the ABCIListener interface that talks over gRPC. -type GRPCClient struct { - client ABCIListenerServiceClient -} - -func (m GRPCClient) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { - _, err := m.client.ListenBeginBlock(ctx, &ListenBeginBlockRequest{ - Req: &req, - Res: &res, - }) - return err -} - -func (m GRPCClient) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error { - _, err := m.client.ListenEndBlock(ctx, &ListenEndBlockRequest{ - Req: &req, - Res: &res, - }) - return err -} - -func (m GRPCClient) ListenDeliverTx(goCtx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error { - ctx := sdk.UnwrapSDKContext(goCtx) - _, err := m.client.ListenDeliverTx(ctx, &ListenDeliverTxRequest{ - BlockHeight: ctx.BlockHeight(), - Req: &req, - Res: &res, - }) - return err -} - -func (m GRPCClient) ListenCommit(goCtx context.Context, res abci.ResponseCommit, changeSet []*store.StoreKVPair) error { - ctx := sdk.UnwrapSDKContext(goCtx) - _, err := m.client.ListenCommit(ctx, &ListenCommitRequest{ - BlockHeight: ctx.BlockHeight(), - Res: &res, - ChangeSet: changeSet, - }) - return err -} - -// GRPCServer is the gRPC server that GRPCClient talks to. -type GRPCServer struct { - // This is the real implementation - Impl storetypes.ABCIListener -} - -func (m GRPCServer) ListenBeginBlock(ctx context.Context, request *ListenBeginBlockRequest) (*Empty, error) { - if err := m.Impl.ListenBeginBlock(ctx, *request.Req, *request.Res); err != nil { - return nil, err - } - return &Empty{}, nil -} - -func (m GRPCServer) ListenEndBlock(ctx context.Context, request *ListenEndBlockRequest) (*Empty, error) { - if err := m.Impl.ListenEndBlock(ctx, *request.Req, *request.Res); err != nil { - return nil, err - } - return &Empty{}, nil -} - -func (m GRPCServer) ListenDeliverTx(ctx context.Context, request *ListenDeliverTxRequest) (*Empty, error) { - if err := m.Impl.ListenDeliverTx(ctx, *request.Req, *request.Res); err != nil { - return nil, err - } - return &Empty{}, nil -} - -func (m GRPCServer) ListenCommit(ctx context.Context, request *ListenCommitRequest) (*Empty, error) { - if err := m.Impl.ListenCommit(ctx, *request.Res, request.ChangeSet); err != nil { - return nil, err - } - return &Empty{}, nil -} +```go reference +https://github.com/cosmos/cosmos-sdk/blob/f851e188b3b9d46e7c63fa514ad137e6d558fdd9/store/streaming/abci/grpc.go#L13-L79 ``` Our `ABCIlistener` service plugin. ([store/streaming/plugins/abci/v1/interface.go](interface.go)) -```go -... - -// Handshake is a common handshake that is shared by streaming and host. -// This prevents users from executing bad plugins or executing a plugin -// directory. It is a UX feature, not a security feature. -var Handshake = plugin.HandshakeConfig{ - // This isn't required when using VersionedPlugins - ProtocolVersion: 1, - MagicCookieKey: "ABCI_LISTENER_PLUGIN", - MagicCookieValue: "ef78114d-7bdf-411c-868f-347c99a78345", -} - -var ( - _ plugin.GRPCPlugin = (*ABCIListenerGRPCPlugin)(nil) -) - -// ListenerGRPCPlugin is the implementation of plugin.Plugin so we can serve/consume this -// -// This has two methods: Server must return an RPC server for this plugin -// type. We construct a GreeterRPCServer for this. -// -// Client must return an implementation of our interface that communicates -// over an RPC client. We return GreeterRPC for this. -// -// Ignore MuxBroker. That is used to create more multiplexed streams on our -// plugin connection and is a more advanced use case. -// -// description: copied from hashicorp plugin documentation. -type ListenerGRPCPlugin struct { - // GRPCPlugin must still implement the Plugin interface - plugin.Plugin - // Concrete implementation, written in Go. This is only used for plugins - // that are written in Go. - Impl storetypes.ABCIListener -} - -func (p *ListenerGRPCPlugin) GRPCServer(_ *plugin.GRPCBroker, s *grpc.Server) error { - RegisterABCIListenerServiceServer(s, &GRPCServer{Impl: p.Impl}) - return nil -} - -func (p *ListenerGRPCPlugin) GRPCClient( - _ context.Context, - _ *plugin.GRPCBroker, - c *grpc.ClientConn, -) (interface{}, error) { - return &GRPCClient{client: NewABCIListenerServiceClient(c)}, nil -} +```go reference +https://github.com/cosmos/cosmos-sdk/blob/f851e188b3b9d46e7c63fa514ad137e6d558fdd9/store/streaming/abci/interface.go#L13-L45 ``` #### Plugin Implementation @@ -248,21 +77,11 @@ type MyPlugin struct { ... } -func (a FilePlugin) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { +func (a FilePlugin) ListenFinalizeBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { // process data return nil } -func (a FilePlugin) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error { - // process data - return nil -} - -func (a FilePlugin) ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error { - // process data - return nil -} - func (a FilePlugin) ListenCommit(ctx context.Context, res abci.ResponseCommit, changeSet []*store.StoreKVPair) error { // process data return nil