diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b7748d602..50e391a371 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * (x/auth) [#13210](https://github.com/cosmos/cosmos-sdk/pull/13210) Add `Query/AccountInfo` endpoint for simplified access to basic account info. * (x/consensus) [#12905](https://github.com/cosmos/cosmos-sdk/pull/12905) Create a new `x/consensus` module that is now responsible for maintaining Tendermint consensus parameters instead of `x/param`. Legacy types remain in order to facilitate parameter migration from the deprecated `x/params`. App developers should ensure that they execute `baseapp.MigrateParams` during their chain upgrade. These legacy types will be removed in a future release. * (client/tx) [#13670](https://github.com/cosmos/cosmos-sdk/pull/13670) Add validation in `BuildUnsignedTx` to prevent simple inclusion of valid mnemonics +* [#13473](https://github.com/cosmos/cosmos-sdk/pull/13473) ADR-038: Go plugin system proposal ### Improvements diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md index 74f92d2f67..b6a5cd1252 100644 --- a/docs/architecture/adr-038-state-listening.md +++ b/docs/architecture/adr-038-state-listening.md @@ -3,10 +3,11 @@ ## Changelog * 11/23/2020: Initial draft +* 10/06/2022: Introduce plugin system based on hashicorp/go-plugin * 10/14/2022: * Add `ListenCommit`, flatten the state writes in a block to a single batch. * Remove listeners from cache stores, should only listen to `rootmulti.Store`. - * Remove `HaltAppOnDeliveryError()`, the errors are propogated by default, the implementations should return nil if don't want to propogate errors. + * Remove `HaltAppOnDeliveryError()`, the errors are propagated by default, the implementations should return nil if don't want to propogate errors. ## Status @@ -28,28 +29,42 @@ In addition to these request/response queries, it would be beneficial to have a We will modify the `CommitMultiStore` interface and its concrete (`rootmulti`) implementations and introduce a new `listenkv.Store` to allow listening to state changes in underlying KVStores. We don't need to listen to cache stores, because we can't be sure that the writes will be committed eventually, and the writes are duplicated in `rootmulti.Store` eventually, so we should only listen to `rootmulti.Store`. We will introduce a plugin system for configuring and running streaming services that write these state changes and their surrounding ABCI message context to different destinations. -### Listening interface +### Listening -In a new file, `store/types/listening.go`, we will create a `WriteListener` interface for streaming out state changes from a KVStore. +In a new file, `store/types/listening.go`, we will create a `MemoryListener` struct for streaming out protobuf encoded KV pairs state changes from a KVStore. +The `MemoryListener` will be used internally by the concrete `rootmulti` implementation to collect state changes from KVStores. ```go -// WriteListener interface for streaming data out from a listenkv.Store -type WriteListener interface { - // if value is nil then it was deleted - // storeKey indicates the source KVStore, to facilitate using the same WriteListener across separate KVStores - // delete bool indicates if it was a delete; true: delete, false: set - OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error +// MemoryListener listens to the state writes and accumulate the records in memory. +type MemoryListener struct { + stateCache []StoreKVPair +} + +// NewMemoryListener creates a listener that accumulate the state writes in memory. +func NewMemoryListener() *MemoryListener { + return &MemoryListener{} +} + +// OnWrite writes state change events to the internal cache +func (fl *MemoryListener) OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) { + fl.stateCache = append(fl.stateCache, StoreKVPair{ + StoreKey: storeKey.Name(), + Delete: delete, + Key: key, + Value: value, + }) +} + +// PopStateCache returns the current state caches and set to nil +func (fl *MemoryListener) PopStateCache() []StoreKVPair { + res := fl.stateCache + fl.stateCache = nil + return res } ``` -### Listener type - -We will create two concrete implementations of the `WriteListener` interface in `store/types/listening.go`, that writes out protobuf -encoded KV pairs to an underlying `io.Writer`, and simply accumulate them in memory. - -This will include defining a simple protobuf type for the KV pairs. In addition to the key and value fields this message -will include the StoreKey for the originating KVStore so that we can write out from separate KVStores to the same stream/file -and determine the source of each KV pair. +We will also define a protobuf type for the KV pairs. In addition to the key and value fields this message +will include the StoreKey for the originating KVStore so that we can collect information from separate KVStores and determine the source of each KV pair. ```protobuf message StoreKVPair { @@ -60,80 +75,10 @@ message StoreKVPair { } ``` -```go -// StoreKVPairWriteListener is used to configure listening to a KVStore by writing out length-prefixed -// protobuf encoded StoreKVPairs to an underlying io.Writer -type StoreKVPairWriteListener struct { - writer io.Writer - marshaller codec.BinaryCodec -} - -// NewStoreKVPairWriteListener wraps creates a StoreKVPairWriteListener with a provdied io.Writer and codec.BinaryCodec -func NewStoreKVPairWriteListener(w io.Writer, m codec.BinaryCodec) *StoreKVPairWriteListener { - return &StoreKVPairWriteListener{ - writer: w, - marshaller: m, - } -} - -// OnWrite satisfies the WriteListener interface by writing length-prefixed protobuf encoded StoreKVPairs -func (wl *StoreKVPairWriteListener) OnWrite(storeKey types.StoreKey, key []byte, value []byte, delete bool) error error { - kvPair := new(types.StoreKVPair) - kvPair.StoreKey = storeKey.Name() - kvPair.Delete = Delete - kvPair.Key = key - kvPair.Value = value - by, err := wl.marshaller.MarshalBinaryLengthPrefixed(kvPair) - if err != nil { - return err - } - if _, err := wl.writer.Write(by); err != nil { - return err - } - return nil -} -``` - -```golang -// MemoryListener listens to the state writes and accumulate the records in memory. -type MemoryListener struct { - key StoreKey - stateCache []StoreKVPair -} - -// NewMemoryListener creates a listener that accumulate the state writes in memory. -func NewMemoryListener(key StoreKey) *MemoryListener { - return &MemoryListener{key: key} -} - -// OnWrite implements WriteListener interface -func (fl *MemoryListener) OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error { - fl.stateCache = append(fl.stateCache, StoreKVPair{ - StoreKey: storeKey.Name(), - Delete: delete, - Key: key, - Value: value, - }) - return nil -} - -// PopStateCache returns the current state caches and set to nil -func (fl *MemoryListener) PopStateCache() []StoreKVPair { - res := fl.stateCache - fl.stateCache = nil - return res -} - -// StoreKey returns the storeKey it listens to -func (fl *MemoryListener) StoreKey() StoreKey { - return fl.key -} -``` - ### ListenKVStore -We will create a new `Store` type `listenkv.Store` that the `MultiStore` wraps around a `KVStore` to enable state listening. -We can configure the `Store` with a set of `WriteListener`s which stream the output to specific destinations. +We will create a new `Store` type `listenkv.Store` that the `rootmulti` store will use to wrap a `KVStore` to enable state listening. +We will configure the `Store` with a `MemoryListener` which will collect state changes for output to specific destinations. ```go // Store implements the KVStore interface with listening enabled. @@ -141,14 +86,14 @@ We can configure the `Store` with a set of `WriteListener`s which stream the out // underlying listeners with the proper key and operation permissions type Store struct { parent types.KVStore - listeners []types.WriteListener + listener *types.MemoryListener parentStoreKey types.StoreKey } // NewStore returns a reference to a new traceKVStore given a parent // KVStore implementation and a buffered writer. -func NewStore(parent types.KVStore, psk types.StoreKey, listeners []types.WriteListener) *Store { - return &Store{parent: parent, listeners: listeners, parentStoreKey: psk} +func NewStore(parent types.KVStore, psk types.StoreKey, listener *types.MemoryListener) *Store { + return &Store{parent: parent, listener: listener, parentStoreKey: psk} } // Set implements the KVStore interface. It traces a write operation and @@ -156,47 +101,38 @@ func NewStore(parent types.KVStore, psk types.StoreKey, listeners []types.WriteL func (s *Store) Set(key []byte, value []byte) { types.AssertValidKey(key) s.parent.Set(key, value) - s.onWrite(false, key, value) + s.listener.OnWrite(s.parentStoreKey, key, value, false) } // Delete implements the KVStore interface. It traces a write operation and // delegates the Delete call to the parent KVStore. func (s *Store) Delete(key []byte) { s.parent.Delete(key) - s.onWrite(true, key, nil) -} - -// onWrite writes a KVStore operation to all the WriteListeners -func (s *Store) onWrite(delete bool, key, value []byte) { - for _, l := range s.listeners { - if err := l.OnWrite(s.parentStoreKey, key, value, delete); err != nil { - // log error - } - } + s.listener.OnWrite(s.parentStoreKey, key, nil, true) } ``` ### MultiStore interface updates -We will update the `CommitMultiStore` interface to allow us to wrap a set of listeners around a specific `KVStore`. +We will update the `CommitMultiStore` interface to allow us to wrap a `Memorylistener` to a specific `KVStore`. +Note that the `MemoryListener` will be attached internally by the concrete `rootmulti` implementation. ```go type CommitMultiStore interface { ... - // ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey - ListeningEnabled(key StoreKey) bool + // AddListeners adds a listener for the KVStore belonging to the provided StoreKey + AddListeners(keys []StoreKey) - // AddListeners adds WriteListeners for the KVStore belonging to the provided StoreKey - // It appends the listeners to a current set, if one already exists - AddListeners(key StoreKey, listeners []WriteListener) + // PopStateCache returns the accumulated state change messages from MemoryListener + PopStateCache() []StoreKVPair } ``` + ### MultiStore implementation updates -We will modify all of the `CommitMultiStore` implementations to satisfy these new interfaces, and adjust the `rootmulti` `GetKVStore` method -to wrap the returned `KVStore` with a `listenkv.Store` if listening is turned on for that `Store`. +We will adjust the `rootmulti` `GetKVStore` method to wrap the returned `KVStore` with a `listenkv.Store` if listening is turned on for that `Store`. ```go func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore { @@ -206,338 +142,653 @@ func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore { store = tracekv.NewStore(store, rs.traceWriter, rs.traceContext) } if rs.ListeningEnabled(key) { - store = listenkv.NewStore(key, store, rs.listeners[key]) + store = listenkv.NewStore(store, key, rs.listeners[key]) } return store } ``` -We will also adjust the `rootmulti` `CacheMultiStore` method to wrap the stores with `listenkv.Store` to enable listening when the cache layer writes. +We will implement `AddListeners` to manage KVStore listeners internally and implement `PopStateCache` +for a means of retrieving the current state. + +```go +// AddListeners adds state change listener for a specific KVStore +func (rs *Store) AddListeners(keys []types.StoreKey) { + listener := types.NewMemoryListener() + for i := range keys { + rs.listeners[keys[i]] = listener + } +} +``` + +```go +func (rs *Store) PopStateCache() []types.StoreKVPair { + var cache []types.StoreKVPair + for _, ls := range rs.listeners { + cache = append(cache, ls.PopStateCache()...) + } + sort.SliceStable(cache, func(i, j int) bool { + return cache[i].StoreKey < cache[j].StoreKey + }) + return cache +} +``` + +We will also adjust the `rootmulti` `CacheMultiStore` and `CacheMultiStoreWithVersion` methods to enable listening in +the cache layer. ```go func (rs *Store) CacheMultiStore() types.CacheMultiStore { - stores := make(map[types.StoreKey]types.CacheWrapper) - for k, v := range rs.stores { - store := v.(types.KVStore) - // Wire the listenkv.Store to allow listeners to observe the writes from the cache store, - // set same listeners on cache store will observe duplicated writes. - if rs.ListeningEnabled(k) { - store = listenkv.NewStore(store, k, rs.listeners[k]) - } - stores[k] = store - } - return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.getTracingContext()) + stores := make(map[types.StoreKey]types.CacheWrapper) + for k, v := range rs.stores { + store := v.(types.KVStore) + // Wire the listenkv.Store to allow listeners to observe the writes from the cache store, + // set same listeners on cache store will observe duplicated writes. + if rs.ListeningEnabled(k) { + store = listenkv.NewStore(store, k, rs.listeners[k]) + } + stores[k] = store + } + return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.getTracingContext()) +} +``` + +```go +func (rs *Store) CacheMultiStoreWithVersion(version int64) (types.CacheMultiStore, error) { + // ... + + // Wire the listenkv.Store to allow listeners to observe the writes from the cache store, + // set same listeners on cache store will observe duplicated writes. + if rs.ListeningEnabled(key) { + cacheStore = listenkv.NewStore(cacheStore, key, rs.listeners[key]) + } + + cachedStores[key] = cacheStore + } + + return cachemulti.NewStore(rs.db, cachedStores, rs.keysByName, rs.traceWriter, rs.getTracingContext()), nil } ``` ### Exposing the data -#### Streaming service +#### Streaming Service -We will introduce a new `StreamingService` interface for exposing `WriteListener` data streams to external consumers. -In addition to streaming state changes as `StoreKVPair`s, the interface satisfies an `ABCIListener` interface that plugs -into the BaseApp and relays ABCI requests and responses so that the service can observe those block metadatas as well. - -The `WriteListener`s of `StreamingService` listens to the `rootmulti.Store`, which is only written into at commit event by the cache store of `deliverState`. +We will introduce a new `ABCIListener` interface that plugs into the BaseApp and relays ABCI requests and responses +so that the service can group the state changes with the ABCI requests. ```go -// ABCIListener interface used to hook into the ABCI message processing of the BaseApp +// baseapp/streaming.go + +// ABCIListener is the interface that we're exposing as a streaming service. type ABCIListener interface { // ListenBeginBlock updates the streaming service with the latest BeginBlock messages - ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error + ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error // ListenEndBlock updates the steaming service with the latest EndBlock messages ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error // ListenDeliverTx updates the steaming service with the latest DeliverTx messages - ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error - // ListenCommit updates the steaming service with the latest Commit message, - // All the state writes of current block should have notified before this message. - ListenCommit(ctx types.Context, res abci.ResponseCommit) error -} - -// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks -type StreamingService interface { - // Stream is the streaming service loop, awaits kv pairs and writes them to a destination stream or file - Stream(wg *sync.WaitGroup) error - // Listeners returns the streaming service's listeners for the BaseApp to register - Listeners() map[types.StoreKey][]store.WriteListener - // ABCIListener interface for hooking into the ABCI messages from inside the BaseApp - ABCIListener - // Closer interface - io.Closer + ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error + // ListenCommit updates the steaming service with the latest Commit messages and state changes + ListenCommit(ctx context.Context, res abci.ResponseCommit, changeSet []*store.StoreKVPair) error } ``` -#### BaseApp registration +#### BaseApp Registration We will add a new method to the `BaseApp` to enable the registration of `StreamingService`s: -```go -// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore -func (app *BaseApp) SetStreamingService(s StreamingService) { - // add the listeners for each StoreKey - for key, lis := range s.Listeners() { - app.cms.AddListeners(key, lis) - } - // register the StreamingService within the BaseApp - // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context - app.abciListeners = append(app.abciListeners, s) + ```go + // SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore +func (app *BaseApp) SetStreamingService(s ABCIListener) { + // register the StreamingService within the BaseApp + // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context + app.abciListeners = append(app.abciListeners, s) } ``` -We will also modify the `BeginBlock`, `EndBlock`, and `DeliverTx` methods to pass ABCI requests and responses to any streaming service hooks registered -with the `BaseApp`. +We will add two new fields to the `BaseApp` struct: +```go +type BaseApp struct { + + ... + + // abciListenersAsync for determining if abciListeners will run asynchronously. + // When abciListenersAsync=false and stopNodeOnABCIListenerErr=false listeners will run synchronized but will not stop the node. + // When abciListenersAsync=true stopNodeOnABCIListenerErr will be ignored. + abciListenersAsync bool + + // stopNodeOnABCIListenerErr halts the node when ABCI streaming service listening results in an error. + // stopNodeOnABCIListenerErr=true must be paired with abciListenersAsync=false. + stopNodeOnABCIListenerErr bool +} +``` + +#### ABCI Event Hooks + +We will modify the `BeginBlock`, `EndBlock`, `DeliverTx` and `Commit` methods to pass ABCI requests and responses +to any streaming service hooks registered with the `BaseApp`. ```go func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeginBlock) { - ... + ... - defer func() { - // call the hooks with the BeginBlock messages - for _, streamingListener := range app.abciListeners { - if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil { - panic(sdkerrors.Wrapf(err, "BeginBlock listening hook failed, height: %d", req.Header.Height)) - } - } - }() + // call the streaming service hook with the BeginBlock messages + for _, abciListener := range app.abciListeners { + ctx := app.deliverState.ctx + blockHeight := ctx.BlockHeight() + if app.abciListenersAsync { + go func(req abci.RequestBeginBlock, res abci.ResponseBeginBlock) { + if err := app.abciListener.ListenBeginBlock(ctx, req, res); err != nil { + app.logger.Error("BeginBlock listening hook failed", "height", blockHeight, "err", err) + } + }(req, res) + } else { + if err := app.abciListener.ListenBeginBlock(ctx, req, res); err != nil { + app.logger.Error("BeginBlock listening hook failed", "height", blockHeight, "err", err) + if app.stopNodeOnABCIListenerErr { + os.Exit(1) + } + } + } + } - return res + return res } ``` ```go func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBlock) { - ... + ... - defer func() { - // Call the streaming service hooks with the EndBlock messages - for _, streamingListener := range app.abciListeners { - if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil { - panic(sdkerrors.Wrapf(err, "EndBlock listening hook failed, height: %d", req.Height)) - } - } - }() + // call the streaming service hook with the EndBlock messages + for _, abciListener := range app.abciListeners { + ctx := app.deliverState.ctx + blockHeight := ctx.BlockHeight() + if app.abciListenersAsync { + go func(req abci.RequestEndBlock, res abci.ResponseEndBlock) { + if err := app.abciListener.ListenEndBlock(blockHeight, req, res); err != nil { + app.logger.Error("EndBlock listening hook failed", "height", blockHeight, "err", err) + } + }(req, res) + } else { + if err := app.abciListener.ListenEndBlock(blockHeight, req, res); err != nil { + app.logger.Error("EndBlock listening hook failed", "height", blockHeight, "err", err) + if app.stopNodeOnABCIListenerErr { + os.Exit(1) + } + } + } + } - return res + return res } ``` ```go -func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliverTx) { +func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx { - defer func() { - // call the hooks with the DeliverTx messages - for _, streamingListener := range app.abciListeners { - if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil { - panic(sdkerrors.Wrap(err, "DeliverTx listening hook failed")) - } - } - }() + var abciRes abci.ResponseDeliverTx + defer func() { + // call the streaming service hook with the EndBlock messages + for _, abciListener := range app.abciListeners { + ctx := app.deliverState.ctx + blockHeight := ctx.BlockHeight() + if app.abciListenersAsync { + go func(req abci.RequestDeliverTx, res abci.ResponseDeliverTx) { + if err := app.abciListener.ListenDeliverTx(blockHeight, req, res); err != nil { + app.logger.Error("DeliverTx listening hook failed", "height", blockHeight, "err", err) + } + }(req, abciRes) + } else { + if err := app.abciListener.ListenDeliverTx(blockHeight, req, res); err != nil { + app.logger.Error("DeliverTx listening hook failed", "height", blockHeight, "err", err) + if app.stopNodeOnABCIListenerErr { + os.Exit(1) + } + } + } + } + }() - ... + ... - return res + return abciRes } ``` -```golang +```go func (app *BaseApp) Commit() abci.ResponseCommit { - header := app.deliverState.ctx.BlockHeader() - retainHeight := app.GetBlockRetentionHeight(header.Height) - // Write the DeliverTx state into branched storage and commit the MultiStore. - // The write to the DeliverTx state writes all state transitions to the root - // MultiStore (app.cms) so when Commit() is called is persists those values. - app.deliverState.ms.Write() - commitID := app.cms.Commit() + ... - res := abci.ResponseCommit{ - Data: commitID.Hash, - RetainHeight: retainHeight, - } + res := abci.ResponseCommit{ + Data: commitID.Hash, + RetainHeight: retainHeight, + } - // call the hooks with the Commit message - for _, streamingListener := range app.abciListeners { - if err := streamingListener.ListenCommit(app.deliverState.ctx, res); err != nil { - panic(sdkerrors.Wrapf(err, "Commit listening hook failed, height: %d", header.Height)) - } - } + // call the streaming service hook with the Commit messages + for _, abciListener := range app.abciListeners { + ctx := app.deliverState.ctx + blockHeight := ctx.BlockHeight() + changeSet := app.cms.PopStateCache() + if app.abciListenersAsync { + go func(res abci.ResponseCommit, changeSet []store.StoreKVPair) { + if err := app.abciListener.ListenCommit(ctx, res, changeSet); err != nil { + app.logger.Error("ListenCommit listening hook failed", "height", blockHeight, "err", err) + } + }(res, changeSet) + } else { + if err := app.abciListener.ListenCommit(ctx, res, changeSet); err != nil { + app.logger.Error("ListenCommit listening hook failed", "height", blockHeight, "err", err) + if app.stopNodeOnABCIListenerErr { + os.Exit(1) + } + } + } + } - app.logger.Info("commit synced", "commit", fmt.Sprintf("%X", commitID)) - ... + ... + + return res } ``` -#### Error Handling And Async Consumers +#### Go Plugin System -`ABCIListener`s are called synchronously inside the consensus state machine, the returned error causes panic which in turn halt the consensus state machine. The implementer should be careful not to break consensus unexpectedly or slow down it too much. - -For some async use cases, one can spawn a go-routine internanlly to avoid slow down consensus state machine, and handle the errors internally and always returns `nil` to avoid halting consensus state machine on error. - -Furthermore, for most of the cases, we only need to use the builtin file streamer to listen to state changes directly inside cosmos-sdk, the other consumers should subscribe to the file streamer output externally. - -#### File Streamer - -We provide a minimal filesystem based implementation inside cosmos-sdk, and provides options to write output files reliably, the output files can be further consumed by external consumers, so most of the state listeners actually don't need to live inside the sdk and node, which improves the node robustness and simplify sdk internals. - -The file streamer can be wired in app like this: -```golang -exposeStoreKeys := ... // decide the key list to listen -service, err := file.NewStreamingService(streamingDir, "", exposeStoreKeys, appCodec, logger) -bApp.SetStreamingService(service) -``` - -#### Plugin system - -We propose a plugin architecture to load and run `StreamingService` implementations. We will introduce a plugin -loading/preloading system that is used to load, initialize, inject, run, and stop Cosmos-SDK plugins. Each plugin -must implement the following interface: +We propose a plugin architecture to load and run `Streaming` plugins and other types of implementations. We will introduce a plugin +system over gRPC that is used to load and run Cosmos-SDK plugins. The plugin system uses [hashicorp/go-plugin](https://github.com/hashicorp/go-plugin). +Each plugin must have a struct that implements the `plugin.Plugin` interface and an `Impl` interface for processing messages over gRPC. +Each plugin must also have a message protocol defined for the gRPC service: ```go -// Plugin is the base interface for all kinds of cosmos-sdk plugins +// streaming/plugins/abci/{plugin_version}/interface.go + +// Handshake is a common handshake that is shared by streaming and host. +// This prevents users from executing bad plugins or executing a plugin +// directory. It is a UX feature, not a security feature. +var Handshake = plugin.HandshakeConfig{ + ProtocolVersion: 1, + MagicCookieKey: "ABCI_LISTENER_PLUGIN", + MagicCookieValue: "ef78114d-7bdf-411c-868f-347c99a78345", +} + +// ListenerPlugin is the base struc for all kinds of go-plugin implementations // It will be included in interfaces of different Plugins -type Plugin interface { - // Name should return unique name of the plugin - Name() string +type ABCIListenerPlugin struct { + // GRPCPlugin must still implement the Plugin interface + plugin.Plugin + // Concrete implementation, written in Go. This is only used for plugins + // that are written in Go. + Impl baseapp.ABCIListener +} - // Version returns current version of the plugin - Version() string +func (p *ListenerGRPCPlugin) GRPCServer(_ *plugin.GRPCBroker, s *grpc.Server) error { + RegisterABCIListenerServiceServer(s, &GRPCServer{Impl: p.Impl}) + return nil +} - // Init is called once when the Plugin is being loaded - // The plugin is passed the AppOptions for configuration - // A plugin will not necessarily have a functional Init - Init(env serverTypes.AppOptions) error - - // Closer interface for shutting down the plugin process - io.Closer +func (p *ListenerGRPCPlugin) GRPCClient( + _ context.Context, + _ *plugin.GRPCBroker, + c *grpc.ClientConn, +) (interface{}, error) { + return &GRPCClient{client: NewABCIListenerServiceClient(c)}, nil } ``` -The `Name` method returns a plugin's name. -The `Version` method returns a plugin's version. -The `Init` method initializes a plugin with the provided `AppOptions`. -The io.Closer is used to shut down the plugin service. +The `plugin.Plugin` interface has two methods `Client` and `Server`. For our GRPC service these are `GRPCClient` and `GRPCServer` +The `Impl` field holds the concrete implementation of our `baseapp.ABCIListener` interface written in Go. +Note: this is only used for plugin implementations written in Go. -For the purposes of this ADR we introduce a single kind of plugin- a state streaming plugin. -We will define a `StateStreamingPlugin` interface which extends the above `Plugin` interface to support a state streaming service. +The advantage of having such a plugin system is that within each plugin authors can define the message protocol in a way that fits their use case. +For example, when state change listening is desired, the `ABCIListener` message protocol can be defined as below (*for illustrative purposes only*). +When state change listening is not desired than `ListenCommit` can be omitted from the protocol. + +```protobuf +syntax = "proto3"; + +... + +message Empty {} + +message ListenBeginBlockRequest { + RequestBeginBlock req = 1; + ResponseBeginBlock res = 2; +} +message ListenEndBlockRequest { + RequestEndBlock req = 1; + ResponseEndBlock res = 2; +} +message ListenDeliverTxRequest { + int64 block_height = 1; + RequestDeliverTx req = 2; + ResponseDeliverTx res = 3; +} +message ListenCommitRequest { + int64 block_height = 1; + ResponseCommit res = 2; + repeated StoreKVPair changeSet = 3; +} + +// plugin that listens to state changes +service ABCIListenerService { + rpc ListenBeginBlock(ListenBeginBlockRequest) returns (Empty); + rpc ListenEndBlock(ListenEndBlockRequest) returns (Empty); + rpc ListenDeliverTx(ListenDeliverTxRequest) returns (Empty); + rpc ListenCommit(ListenCommitRequest) returns (Empty); +} +``` + +```protobuf +... +// plugin that doesn't listen to state changes +service ABCIListenerService { + rpc ListenBeginBlock(ListenBeginBlockRequest) returns (Empty); + rpc ListenEndBlock(ListenEndBlockRequest) returns (Empty); + rpc ListenDeliverTx(ListenDeliverTxRequest) returns (Empty); + rpc ListenCommit(ListenCommitRequest) returns (Empty); +} +``` + +Implementing the service above: +```go +// streaming/plugins/abci/{plugin_version}/grpc.go + +var ( + _ baseapp.ABCIListener = (*GRPCClient)(nil) +) + +// GRPCClient is an implementation of the ABCIListener and ABCIListenerPlugin interfaces that talks over RPC. +type GRPCClient struct { + client ABCIListenerServiceClient +} + +func (m *GRPCClient) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { + _, err := m.client.ListenBeginBlock(ctx, &ListenBeginBlockRequest{Req: req, Res: res}) + return err +} + +func (m *GRPCClient) ListenEndBlock(goCtx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error { + _, err := m.client.ListenEndBlock(ctx, &ListenEndBlockRequest{Req: req, Res: res}) + return err +} + +func (m *GRPCClient) ListenDeliverTx(goCtx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error { + ctx := sdk.UnwrapSDKContext(goCtx) + _, err := m.client.ListenDeliverTx(ctx, &ListenDeliverTxRequest{BlockHeight: ctx.BlockHeight(), Req: req, Res: res}) + return err +} + +func (m *GRPCClient) ListenCommit(goCtx context.Context, res abci.ResponseCommit, changeSet []store.StoreKVPair) error { + ctx := sdk.UnwrapSDKContext(goCtx) + _, err := m.client.ListenCommit(ctx, &ListenCommitRequest{BlockHeight: ctx.BlockHeight(), Res: res, ChangeSet: changeSet}) + return err +} + +// GRPCServer is the gRPC server that GRPCClient talks to. +type GRPCServer struct { + // This is the real implementation + Impl baseapp.ABCIListener +} + +func (m *GRPCServer) ListenBeginBlock(ctx context.Context, req *ListenBeginBlockRequest) (*Empty, error) { + return &Empty{}, m.Impl.ListenBeginBlock(ctx, req.Req, req.Res) +} + +func (m *GRPCServer) ListenEndBlock(ctx context.Context, req *ListenEndBlockRequest) (*Empty, error) { + return &Empty{}, m.Impl.ListenEndBlock(ctx, req.Req, req.Res) +} + +func (m *GRPCServer) ListenDeliverTx(ctx context.Context, req *ListenDeliverTxRequest) (*Empty, error) { + return &Empty{}, m.Impl.ListenDeliverTx(ctx, req.Req, req.Res) +} + +func (m *GRPCServer) ListenCommit(ctx context.Context, req *ListenCommitRequest) (*Empty, error) { + return &Empty{}, m.Impl.ListenCommit(ctx, req.Res, req.ChangeSet) +} + +``` + +And the pre-compiled Go plugin `Impl`(*this is only used for plugins that are written in Go*): ```go -// StateStreamingPlugin interface for plugins that load a baseapp.StreamingService onto a baseapp.BaseApp -type StateStreamingPlugin interface { - // Register configures and registers the plugin streaming service with the BaseApp - Register(bApp *baseapp.BaseApp, marshaller codec.BinaryCodec, keys map[string]*types.KVStoreKey) error +// streaming/plugins/abci/{plugin_version}/impl/plugin.go - // Start starts the background streaming process of the plugin streaming service - Start(wg *sync.WaitGroup) error +// Plugins are pre-compiled and loaded by the plugin system - // Plugin is the base Plugin interface - Plugin +// ABCIListener is the implementation of the baseapp.ABCIListener interface +type ABCIListener struct{} + +func (m *ABCIListenerPlugin) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { + // send data to external system +} + +func (m *ABCIListenerPlugin) ListenEndBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { + // send data to external system +} + +func (m *ABCIListenerPlugin) ListenDeliverTxBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { + // send data to external system +} + +func (m *ABCIListenerPlugin) ListenCommit(ctx context.Context, res abci.ResponseCommit, changeSet []store.StoreKVPair) error { + // send data to external system +} + +func main() { + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: grpc_abci_v1.Handshake, + Plugins: map[string]plugin.Plugin{ + "grpc_plugin_v1": &grpc_abci_v1.ABCIListenerGRPCPlugin{Impl: &ABCIListenerPlugin{}}, + }, + + // A non-nil value here enables gRPC serving for this streaming... + GRPCServer: plugin.DefaultGRPCServer, + }) } ``` -The `Register` method is used during App construction to register the plugin's streaming service with an App's BaseApp using the BaseApp's `SetStreamingService` method. -The `Start` method is used during App construction to start the registered plugin streaming services and maintain synchronization with them. +We will introduce a plugin loading system that will return `(interface{}, error)`. +This provides the advantage of using versioned plugins where the plugin interface and gRPC protocol change over time. +In addition, it allows for building independent plugin that can expose different parts of the system over gRPC. + +```go +func NewStreamingPlugin(name string, logLevel string) (interface{}, error) { + logger := hclog.New(&hclog.LoggerOptions{ + Output: hclog.DefaultOutput, + Level: toHclogLevel(logLevel), + Name: fmt.Sprintf("plugin.%s", name), + }) + + // We're a host. Start by launching the streaming process. + env := os.Getenv(GetPluginEnvKey(name)) + client := plugin.NewClient(&plugin.ClientConfig{ + HandshakeConfig: HandshakeMap[name], + Plugins: PluginMap, + Cmd: exec.Command("sh", "-c", env), + Logger: logger, + AllowedProtocols: []plugin.Protocol{ + plugin.ProtocolNetRPC, plugin.ProtocolGRPC}, + }) + + // Connect via RPC + rpcClient, err := client.Client() + if err != nil { + return nil, err + } + + // Request streaming plugin + return rpcClient.Dispense(name) +} + +``` + +We propose a `RegisterStreamingPlugin` function for the App to register `NewStreamingPlugin`s with the App's BaseApp. +Streaming plugins can be of `Any` type; therefore, the function takes in an interface vs a concrete type. +For example, we could have plugins of `ABCIListener`, `WasmListener` or `IBCListener`. Note that `RegisterStreamingPluing` function +is helper function and not a requirement. Plugin registration can easily be moved from the App to the BaseApp directly. + +```go +// baseapp/streaming.go + +// RegisterStreamingPlugin registers streaming plugins with the App. +// This method returns an error if a plugin is not supported. +func RegisterStreamingPlugin( + bApp *BaseApp, + appOpts servertypes.AppOptions, + keys map[string]*types.KVStoreKey, + streamingPlugin interface{}, +) error { + switch t := streamingPlugin.(type) { + case ABCIListener: + registerABCIListenerPlugin(bApp, appOpts, keys, t) + default: + return fmt.Errorf("unexpected plugin type %T", t) + } + return nil +} +``` + +```go +func registerABCIListenerPlugin( + bApp *BaseApp, + appOpts servertypes.AppOptions, + keys map[string]*store.KVStoreKey, + abciListener ABCIListener, +) { + asyncKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, StreamingABCITomlKey, StreamingABCIAsync) + async := cast.ToBool(appOpts.Get(asyncKey)) + stopNodeOnErrKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, StreamingABCITomlKey, StreamingABCIStopNodeOnErrTomlKey) + stopNodeOnErr := cast.ToBool(appOpts.Get(stopNodeOnErrKey)) + keysKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, StreamingABCITomlKey, StreamingABCIKeysTomlKey) + exposeKeysStr := cast.ToStringSlice(appOpts.Get(keysKey)) + exposedKeys := exposeStoreKeysSorted(exposeKeysStr, keys) + bApp.cms.AddListeners(exposedKeys) + bApp.SetStreamingService(abciListener) + bApp.stopNodeOnABCIListenerErr = stopNodeOnErr + bApp.abciListenersAsync = async +} +``` + +```go +func exposeAll(list []string) bool { + for _, ele := range list { + if ele == "*" { + return true + } + } + return false +} + +func exposeStoreKeys(keysStr []string, keys map[string]*types.KVStoreKey) []types.StoreKey { + var exposeStoreKeys []types.StoreKey + if exposeAll(keysStr) { + exposeStoreKeys = make([]types.StoreKey, 0, len(keys)) + for _, storeKey := range keys { + exposeStoreKeys = append(exposeStoreKeys, storeKey) + } + } else { + exposeStoreKeys = make([]types.StoreKey, 0, len(keysStr)) + for _, keyStr := range keysStr { + if storeKey, ok := keys[keyStr]; ok { + exposeStoreKeys = append(exposeStoreKeys, storeKey) + } + } + } + // sort storeKeys for deterministic output + sort.SliceStable(exposeStoreKeys, func(i, j int) bool { + return exposeStoreKeys[i].Name() < exposeStoreKeys[j].Name() + }) + + return exposeStoreKeys +} +``` + +The `NewStreamingPlugin` and `RegisterStreamingPlugin` functions are used to register a plugin with the App's BaseApp. e.g. in `NewSimApp`: ```go func NewSimApp( - logger log.Logger, - db dbm.DB, - traceStore io.Writer, - loadLatest bool, - appOpts servertypes.AppOptions, - baseAppOptions ...func(*baseapp.BaseApp), + logger log.Logger, + db dbm.DB, + traceStore io.Writer, + loadLatest bool, + appOpts servertypes.AppOptions, + baseAppOptions ...func(*baseapp.BaseApp), ) *SimApp { - ... + ... - keys := sdk.NewKVStoreKeys( - authtypes.StoreKey, banktypes.StoreKey, stakingtypes.StoreKey, - minttypes.StoreKey, distrtypes.StoreKey, slashingtypes.StoreKey, - govtypes.StoreKey, paramstypes.StoreKey, ibchost.StoreKey, upgradetypes.StoreKey, - evidencetypes.StoreKey, ibctransfertypes.StoreKey, capabilitytypes.StoreKey, - ) + keys := sdk.NewKVStoreKeys( + authtypes.StoreKey, banktypes.StoreKey, stakingtypes.StoreKey, + minttypes.StoreKey, distrtypes.StoreKey, slashingtypes.StoreKey, + govtypes.StoreKey, paramstypes.StoreKey, ibchost.StoreKey, upgradetypes.StoreKey, + evidencetypes.StoreKey, ibctransfertypes.StoreKey, capabilitytypes.StoreKey, + ) - pluginsOnKey := fmt.Sprintf("%s.%s", plugin.PLUGINS_TOML_KEY, plugin.PLUGINS_ON_TOML_KEY) - if cast.ToBool(appOpts.Get(pluginsOnKey)) { - // this loads the preloaded and any plugins found in `plugins.dir` - pluginLoader, err := loader.NewPluginLoader(appOpts, logger) - if err != nil { - // handle error - } + ... - // initialize the loaded plugins - if err := pluginLoader.Initialize(); err != nil { - // handle error - } + // register streaming services + streamingCfg := cast.ToStringMap(appOpts.Get(baseapp.StreamingTomlKey)) + for service := range streamingCfg { + pluginKey := fmt.Sprintf("%s.%s.%s", baseapp.StreamingTomlKey, service, baseapp.StreamingPluginTomlKey) + pluginName := strings.TrimSpace(cast.ToString(appOpts.Get(pluginKey))) + if len(pluginName) > 0 { + logLevel := cast.ToString(appOpts.Get(flags.FlagLogLevel)) + plugin, err := streaming.NewStreamingPlugin(pluginName, logLevel) + if err != nil { + tmos.Exit(err.Error()) + } + if err := baseapp.RegisterStreamingPlugin(bApp, appOpts, keys, plugin); err != nil { + tmos.Exit(err.Error()) + } + } + } - // register the plugin(s) with the BaseApp - if err := pluginLoader.Inject(bApp, appCodec, keys); err != nil { - // handle error - } - - // start the plugin services, optionally use wg to synchronize shutdown using io.Closer - wg := new(sync.WaitGroup) - if err := pluginLoader.Start(wg); err != nil { - // handler error - } - } - - ... - - return app -} + return app ``` - #### Configuration -The plugin system will be configured within an app's app.toml file. +The plugin system will be configured within an App's TOML configuration files. ```toml -[plugins] - on = false # turn the plugin system, as a whole, on or off - enabled = ["list", "of", "plugin", "names", "to", "enable"] - dir = "the directory to load non-preloaded plugins from; defaults to cosmos-sdk/plugin/plugins" +# gRPC streaming +[streaming] + +# ABCI streaming service +[streaming.abci] + +# The plugin version to use for ABCI listening +plugin = "abci_v1" + +# List of kv store keys to listen to for state changes. +# Set to ["*"] to expose all keys. +keys = ["*"] + +# Enable abciListeners to run asynchronously. +# When abciListenersAsync=false and stopNodeOnABCIListenerErr=false listeners will run synchronized but will not stop the node. +# When abciListenersAsync=true stopNodeOnABCIListenerErr will be ignored. +async = false + +# Whether to stop the node on message deliver error. +stop-node-on-err = true ``` -There will be three parameters for configuring the plugin system: `plugins.on`, `plugins.enabled` and `plugins.dir`. -`plugins.on` is a bool that turns on or off the plugin system at large, `plugins.dir` directs the system to a directory -to load plugins from, and `plugins.enabled` provides `opt-in` semantics to plugin names to enable (including preloaded plugins). +There will be four parameters for configuring `ABCIListener` plugin: `streaming.abci.plugin`, `streaming.abci.keys`, `streaming.abci.async` and `streaming.abci.stop-node-on-err`. +`streaming.abci.plugin` is the name of the plugin we want to use for streaming, `streaming.abci.keys` is a set of store keys for stores it listens to, +`streaming.abci.async` is bool enabling asynchronous listening and `streaming.abci.stop-node-on-err` is a bool that stops the node when true and when operating +on synchronized mode `streaming.abci.async=false`. Note that `streaming.abci.stop-node-on-err=true` will be ignored if `streaming.abci.async=true`. -Configuration of a given plugin is ultimately specific to the plugin, but we will introduce some standards here: +The configuration above support additional streaming plugins by adding the plugin to the `[streaming]` configuration section +and registering the plugin with `RegisterStreamingPlugin` helper function. -Plugin TOML configuration should be split into separate sub-tables for each kind of plugin (e.g. `plugins.streaming`). - -Within these sub-tables, the parameters for a specific plugin of that kind are included in another sub-table (e.g. `plugins.streaming.file`). -It is generally expected, but not required, that a streaming service plugin can be configured with a set of store keys -(e.g. `plugins.streaming.file.keys`) for the stores it listens to and a flag (e.g. `plugins.streaming.file.halt_app_on_delivery_error`) -that signifies whether the service operates in a fire-and-forget capacity, or stop the BaseApp when an error occurs in -any of `ListenBeginBlock`, `ListenEndBlock` and `ListenDeliverTx`. - -e.g. - -```toml -[plugins] - on = false # turn the plugin system, as a whole, on or off - enabled = ["list", "of", "plugin", "names", "to", "enable"] - dir = "the directory to load non-preloaded plugins from; defaults to " - [plugins.streaming] # a mapping of plugin-specific streaming service parameters, mapped to their plugin name - [plugins.streaming.file] # the specific parameters for the file streaming service plugin - keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"] - write_dir = "path to the write directory" - prefix = "optional prefix to prepend to the generated file names" - halt_app_on_delivery_error = "false" # false == fire-and-forget; true == stop the application - [plugins.streaming.kafka] - keys = [] - topic_prefix = "block" # Optional prefix for topic names where data will be stored. - flush_timeout_ms = 5000 # Flush and wait for outstanding messages and requests to complete delivery when calling `StreamingService.Close(). (milliseconds) - halt_app_on_delivery_error = true # Whether or not to halt the application when plugin fails to deliver message(s). - ... -``` +Note the that each plugin must include `streaming.{service}.plugin` property as it is a requirement for doing the lookup and registration of the plugin +with the App. All other properties are unique to the individual services. #### Encoding and decoding streams @@ -553,7 +804,7 @@ These changes will provide a means of subscribing to KVStore state changes in re ### Backwards Compatibility -* This ADR changes the `CommitMultiStore` interface, implementations supporting the previous version of these interfaces will not support the new ones +* This ADR changes the `CommitMultiStore` interface, implementations supporting the previous version of this interface will not support the new one ### Positive @@ -561,7 +812,7 @@ These changes will provide a means of subscribing to KVStore state changes in re ### Negative -* Changes `CommitMultiStore`interface +* Changes `CommitMultiStore` interface and its implementations ### Neutral