diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md index 3c718cf206..cb824c00d6 100644 --- a/docs/architecture/adr-038-state-listening.md +++ b/docs/architecture/adr-038-state-listening.md @@ -73,19 +73,19 @@ func NewStoreKVPairWriteListener(w io.Writer, m codec.BinaryCodec) *StoreKVPairW // OnWrite satisfies the WriteListener interface by writing length-prefixed protobuf encoded StoreKVPairs func (wl *StoreKVPairWriteListener) OnWrite(storeKey types.StoreKey, key []byte, value []byte, delete bool) error error { - kvPair := new(types.StoreKVPair) - kvPair.StoreKey = storeKey.Name() - kvPair.Delete = Delete - kvPair.Key = key - kvPair.Value = value - by, err := wl.marshaller.MarshalBinaryLengthPrefixed(kvPair) - if err != nil { - return err - } - if _, err := wl.writer.Write(by); err != nil { - return err - } - return nil + kvPair := new(types.StoreKVPair) + kvPair.StoreKey = storeKey.Name() + kvPair.Delete = Delete + kvPair.Key = key + kvPair.Value = value + by, err := wl.marshaller.MarshalBinaryLengthPrefixed(kvPair) + if err != nil { + return err + } + if _, err := wl.writer.Write(by); err != nil { + return err + } + return nil } ``` @@ -99,39 +99,39 @@ We can configure the `Store` with a set of `WriteListener`s which stream the out // Operations are traced on each core KVStore call and written to any of the // underlying listeners with the proper key and operation permissions type Store struct { - parent types.KVStore - listeners []types.WriteListener - parentStoreKey types.StoreKey + parent types.KVStore + listeners []types.WriteListener + parentStoreKey types.StoreKey } // NewStore returns a reference to a new traceKVStore given a parent // KVStore implementation and a buffered writer. func NewStore(parent types.KVStore, psk types.StoreKey, listeners []types.WriteListener) *Store { - return &Store{parent: parent, listeners: listeners, parentStoreKey: psk} + return &Store{parent: parent, listeners: listeners, parentStoreKey: psk} } // Set implements the KVStore interface. It traces a write operation and // delegates the Set call to the parent KVStore. func (s *Store) Set(key []byte, value []byte) { - types.AssertValidKey(key) - s.parent.Set(key, value) - s.onWrite(false, key, value) + types.AssertValidKey(key) + s.parent.Set(key, value) + s.onWrite(false, key, value) } // Delete implements the KVStore interface. It traces a write operation and // delegates the Delete call to the parent KVStore. func (s *Store) Delete(key []byte) { - s.parent.Delete(key) - s.onWrite(true, key, nil) + s.parent.Delete(key) + s.onWrite(true, key, nil) } // onWrite writes a KVStore operation to all the WriteListeners func (s *Store) onWrite(delete bool, key, value []byte) { - for _, l := range s.listeners { - if err := l.OnWrite(s.parentStoreKey, key, value, delete); err != nil { - // log error - } - } + for _, l := range s.listeners { + if err := l.OnWrite(s.parentStoreKey, key, value, delete); err != nil { + // log error + } + } } ``` @@ -142,30 +142,30 @@ Additionally, we will update the `CacheWrap` and `CacheWrapper` interfaces to en ```go type MultiStore interface { - ... + ... - // ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey - ListeningEnabled(key StoreKey) bool + // ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey + ListeningEnabled(key StoreKey) bool - // AddListeners adds WriteListeners for the KVStore belonging to the provided StoreKey - // It appends the listeners to a current set, if one already exists - AddListeners(key StoreKey, listeners []WriteListener) + // AddListeners adds WriteListeners for the KVStore belonging to the provided StoreKey + // It appends the listeners to a current set, if one already exists + AddListeners(key StoreKey, listeners []WriteListener) } ``` ```go type CacheWrap interface { - ... + ... - // CacheWrapWithListeners recursively wraps again with listening enabled - CacheWrapWithListeners(storeKey types.StoreKey, listeners []WriteListener) CacheWrap + // CacheWrapWithListeners recursively wraps again with listening enabled + CacheWrapWithListeners(storeKey types.StoreKey, listeners []WriteListener) CacheWrap } type CacheWrapper interface { - ... + ... - // CacheWrapWithListeners recursively wraps again with listening enabled - CacheWrapWithListeners(storeKey types.StoreKey, listeners []WriteListener) CacheWrap + // CacheWrapWithListeners recursively wraps again with listening enabled + CacheWrapWithListeners(storeKey types.StoreKey, listeners []WriteListener) CacheWrap } ``` @@ -176,16 +176,16 @@ to wrap the returned `KVStore` with a `listenkv.Store` if listening is turned on ```go func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore { - store := rs.stores[key].(types.KVStore) + store := rs.stores[key].(types.KVStore) - if rs.TracingEnabled() { - store = tracekv.NewStore(store, rs.traceWriter, rs.traceContext) - } - if rs.ListeningEnabled(key) { - store = listenkv.NewStore(key, store, rs.listeners[key]) - } + if rs.TracingEnabled() { + store = tracekv.NewStore(store, rs.traceWriter, rs.traceContext) + } + if rs.ListeningEnabled(key) { + store = listenkv.NewStore(key, store, rs.listeners[key]) + } - return store + return store } ``` @@ -194,11 +194,11 @@ to and enable listening in the cache layer. ```go func (rs *Store) CacheMultiStore() types.CacheMultiStore { - stores := make(map[types.StoreKey]types.CacheWrapper) - for k, v := range rs.stores { - stores[k] = v - } - return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.traceContext, rs.listeners) + stores := make(map[types.StoreKey]types.CacheWrapper) + for k, v := range rs.stores { + stores[k] = v + } + return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.traceContext, rs.listeners) } ``` @@ -216,27 +216,31 @@ receipt from the `StreamingService`. ```go // ABCIListener interface used to hook into the ABCI message processing of the BaseApp type ABCIListener interface { - // ListenBeginBlock updates the streaming service with the latest BeginBlock messages - ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error - // ListenEndBlock updates the steaming service with the latest EndBlock messages - ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error - // ListenDeliverTx updates the steaming service with the latest DeliverTx messages - ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error - // ListenSuccess returns a chan that is used to acknowledge successful receipt of messages by the external service - // after some configurable delay, `false` is sent to this channel from the service to signify failure of receipt - ListenSuccess() <-chan bool + // ListenBeginBlock updates the streaming service with the latest BeginBlock messages + ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error + // ListenEndBlock updates the steaming service with the latest EndBlock messages + ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error + // ListenDeliverTx updates the steaming service with the latest DeliverTx messages + ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error + // HaltAppOnDeliveryError whether or not to halt the application when delivery of massages fails + // in ListenBeginBlock, ListenEndBlock, ListenDeliverTx. When `false, the app will operate in fire-and-forget mode. + // When `true`, the app will gracefully halt and stop the running node. Uncommitted blocks will + // be replayed to all listeners when the node restarts and all successful listeners that received data + // prior to the halt will receive duplicate data. Whether or not a listener operates in a fire-and-forget mode + // is determined by the listener's configuration property `halt_app_on_delivery_error = true|false`. + HaltAppOnDeliveryError() bool } // StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks type StreamingService interface { - // Stream is the streaming service loop, awaits kv pairs and writes them to a destination stream or file - Stream(wg *sync.WaitGroup) error - // Listeners returns the streaming service's listeners for the BaseApp to register - Listeners() map[types.StoreKey][]store.WriteListener - // ABCIListener interface for hooking into the ABCI messages from inside the BaseApp - ABCIListener - // Closer interface - io.Closer + // Stream is the streaming service loop, awaits kv pairs and writes them to a destination stream or file + Stream(wg *sync.WaitGroup) error + // Listeners returns the streaming service's listeners for the BaseApp to register + Listeners() map[types.StoreKey][]store.WriteListener + // ABCIListener interface for hooking into the ABCI messages from inside the BaseApp + ABCIListener + // Closer interface + io.Closer } ``` @@ -257,15 +261,6 @@ func (app *BaseApp) SetStreamingService(s StreamingService) { } ``` -We will add a new method to the `BaseApp` that is used to configure a global wait limit for receiving positive acknowledgement -of message receipt from the integrated `StreamingService`s. - -```go -func (app *BaseApp) SetGlobalWaitLimit(t time.Duration) { - app.globalWaitLimit = t -} -``` - We will also modify the `BeginBlock`, `EndBlock`, and `DeliverTx` methods to pass ABCI requests and responses to any streaming service hooks registered with the `BaseApp`. @@ -274,10 +269,32 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg ... - // Call the streaming service hooks with the BeginBlock messages - for _, listener := range app.abciListeners { - listener.ListenBeginBlock(app.deliverState.ctx, req, res) + // call the hooks with the BeginBlock messages + wg := new(sync.WaitGroup) + for _, streamingListener := range app.abciListeners { + streamingListener := streamingListener // https://go.dev/doc/faq#closures_and_goroutines + if streamingListener.HaltAppOnDeliveryError() { + // increment the wait group counter + wg.Add(1) + go func() { + // decrement the counter when the go routine completes + defer wg.Done() + if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil { + app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err) + app.halt() + } + }() + } else { + // fire and forget semantics + go func() { + if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil { + app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err) + } + }() + } } + // wait for all the listener calls to finish + wg.Wait() return res } @@ -289,9 +306,31 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc ... // Call the streaming service hooks with the EndBlock messages - for _, listener := range app.abciListeners { - listener.ListenEndBlock(app.deliverState.ctx, req, res) + wg := new(sync.WaitGroup) + for _, streamingListener := range app.abciListeners { + streamingListener := streamingListener // https://go.dev/doc/faq#closures_and_goroutines + if streamingListener.HaltAppOnDeliveryError() { + // increment the wait group counter + wg.Add(1) + go func() { + // decrement the counter when the go routine completes + defer wg.Done() + if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil { + app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err) + app.halt() + } + }() + } else { + // fire and forget semantics + go func() { + if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil { + app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err) + } + }() + } } + // wait for all the listener calls to finish + wg.Wait() return res } @@ -299,84 +338,40 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc ```go func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx { - - ... - - gInfo, result, err := app.runTx(runTxModeDeliver, req.Tx) - if err != nil { - resultStr = "failed" - res := sdkerrors.ResponseDeliverTx(err, gInfo.GasWanted, gInfo.GasUsed, app.trace) - // If we throw an error, be sure to still call the streaming service's hook - for _, listener := range app.abciListeners { - listener.ListenDeliverTx(app.deliverState.ctx, req, res) + + var abciRes abci.ResponseDeliverTx + defer func() { + // call the hooks with the BeginBlock messages + wg := new(sync.WaitGroup) + for _, streamingListener := range app.abciListeners { + streamingListener := streamingListener // https://go.dev/doc/faq#closures_and_goroutines + if streamingListener.HaltAppOnDeliveryError() { + // increment the wait group counter + wg.Add(1) + go func() { + // decrement the counter when the go routine completes + defer wg.Done() + if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, abciRes); err != nil { + app.logger.Error("DeliverTx listening hook failed", "err", err) + app.halt() + } + }() + } else { + // fire and forget semantics + go func() { + if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, abciRes); err != nil { + app.logger.Error("DeliverTx listening hook failed", "err", err) + } + }() + } } - return res - } - - res := abci.ResponseDeliverTx{ - GasWanted: int64(gInfo.GasWanted), // TODO: Should type accept unsigned ints? - GasUsed: int64(gInfo.GasUsed), // TODO: Should type accept unsigned ints? - Log: result.Log, - Data: result.Data, - Events: sdk.MarkEventsToIndex(result.Events, app.indexEvents), - } - - // Call the streaming service hooks with the DeliverTx messages - for _, listener := range app.abciListeners { - listener.ListenDeliverTx(app.deliverState.ctx, req, res) - } - - return res -} -``` - -We will also modify the `Commit` method to process `success/failure` signals from the integrated `StreamingService`s using -the `ABCIListener.ListenSuccess()` method. Each `StreamingService` has an internal wait threshold after which it sends -`false` to the `ListenSuccess()` channel, and the BaseApp also imposes a configurable global wait limit. -If the `StreamingService` is operating in a "fire-and-forget" mode, `ListenSuccess()` should immediately return `true` -off the channel despite the success status of the service. - -```go -func (app *BaseApp) Commit() (res abci.ResponseCommit) { + // wait for all the listener calls to finish + wg.Wait() + }() ... - var halt bool - - switch { - case app.haltHeight > 0 && uint64(header.Height) >= app.haltHeight: - halt = true - - case app.haltTime > 0 && header.Time.Unix() >= int64(app.haltTime): - halt = true - } - - // each listener has an internal wait threshold after which it sends `false` to the ListenSuccess() channel - // but the BaseApp also imposes a global wait limit - maxWait := time.NewTicker(app.globalWaitLimit) - for _, lis := range app.abciListeners { - select { - case success := <- lis.ListenSuccess(): - if success == false { - halt = true - break - } - case <- maxWait.C: - halt = true - break - } - } - - if halt { - // Halt the binary and allow Tendermint to receive the ResponseCommit - // response with the commit ID hash. This will allow the node to successfully - // restart and process blocks assuming the halt configuration has been - // reset or moved to a more distant value. - app.halt() - } - - ... - + return res } ``` @@ -421,7 +416,7 @@ type StateStreamingPlugin interface { Register(bApp *baseapp.BaseApp, marshaller codec.BinaryCodec, keys map[string]*types.KVStoreKey) error // Start starts the background streaming process of the plugin streaming service - Start(wg *sync.WaitGroup) + Start(wg *sync.WaitGroup) error // Plugin is the base Plugin interface Plugin @@ -440,34 +435,37 @@ func NewSimApp( ... - // this loads the preloaded and any plugins found in `plugins.dir` - pluginLoader, err := loader.NewPluginLoader(appOpts, logger) - if err != nil { - // handle error - } - - // initialize the loaded plugins - if err := pluginLoader.Initialize(); err != nil { - // hanlde error - } - keys := sdk.NewKVStoreKeys( - authtypes.StoreKey, banktypes.StoreKey, stakingtypes.StoreKey, - minttypes.StoreKey, distrtypes.StoreKey, slashingtypes.StoreKey, - govtypes.StoreKey, paramstypes.StoreKey, ibchost.StoreKey, upgradetypes.StoreKey, - evidencetypes.StoreKey, ibctransfertypes.StoreKey, capabilitytypes.StoreKey, + authtypes.StoreKey, banktypes.StoreKey, stakingtypes.StoreKey, + minttypes.StoreKey, distrtypes.StoreKey, slashingtypes.StoreKey, + govtypes.StoreKey, paramstypes.StoreKey, ibchost.StoreKey, upgradetypes.StoreKey, + evidencetypes.StoreKey, ibctransfertypes.StoreKey, capabilitytypes.StoreKey, ) - // register the plugin(s) with the BaseApp - if err := pluginLoader.Inject(bApp, appCodec, keys); err != nil { - // handle error - } + pluginsOnKey := fmt.Sprintf("%s.%s", plugin.PLUGINS_TOML_KEY, plugin.PLUGINS_ON_TOML_KEY) + if cast.ToBool(appOpts.Get(pluginsOnKey)) { + // this loads the preloaded and any plugins found in `plugins.dir` + pluginLoader, err := loader.NewPluginLoader(appOpts, logger) + if err != nil { + // handle error + } - // start the plugin services, optionally use wg to synchronize shutdown using io.Closer - wg := new(sync.WaitGroup) - if err := pluginLoader.Start(wg); err != nil { - // handler error - } + // initialize the loaded plugins + if err := pluginLoader.Initialize(); err != nil { + // handle error + } + + // register the plugin(s) with the BaseApp + if err := pluginLoader.Inject(bApp, appCodec, keys); err != nil { + // handle error + } + + // start the plugin services, optionally use wg to synchronize shutdown using io.Closer + wg := new(sync.WaitGroup) + if err := pluginLoader.Start(wg); err != nil { + // handler error + } + } ... @@ -483,39 +481,42 @@ The plugin system will be configured within an app's app.toml file. ```toml [plugins] on = false # turn the plugin system, as a whole, on or off - disabled = ["list", "of", "plugin", "names", "to", "disable"] + enabled = ["list", "of", "plugin", "names", "to", "enable"] dir = "the directory to load non-preloaded plugins from; defaults to cosmos-sdk/plugin/plugins" ``` -There will be three parameters for configuring the plugin system: `plugins.on`, `plugins.disabled` and `plugins.dir`. +There will be three parameters for configuring the plugin system: `plugins.on`, `plugins.enabled` and `plugins.dir`. `plugins.on` is a bool that turns on or off the plugin system at large, `plugins.dir` directs the system to a directory -to load plugins from, and `plugins.disabled` is a list of names for the plugins we want to disable (useful for disabling preloaded plugins). +to load plugins from, and `plugins.enabled` provides `opt-in` semantics to plugin names to enable (including preloaded plugins). Configuration of a given plugin is ultimately specific to the plugin, but we will introduce some standards here: Plugin TOML configuration should be split into separate sub-tables for each kind of plugin (e.g. `plugins.streaming`). + Within these sub-tables, the parameters for a specific plugin of that kind are included in another sub-table (e.g. `plugins.streaming.file`). It is generally expected, but not required, that a streaming service plugin can be configured with a set of store keys -(e.g. `plugins.streaming.file.keys`) for the stores it listens to and a mode (e.g. `plugins.streaming.file.mode`) -that signifies whether the service operates in a fire-and-forget capacity (`faf`) or the BaseApp should require positive -acknowledgement of message receipt by the service (`ack`). +(e.g. `plugins.streaming.file.keys`) for the stores it listens to and a flag (e.g. `plugins.streaming.file.halt_app_on_delivery_error`) +that signifies whether the service operates in a fire-and-forget capacity, or stop the BaseApp when an error occurs in +any of `ListenBeginBlock`, `ListenEndBlock` and `ListenDeliverTx`. e.g. ```toml [plugins] on = false # turn the plugin system, as a whole, on or off - disabled = ["list", "of", "plugin", "names", "to", "disable"] + enabled = ["list", "of", "plugin", "names", "to", "enable"] dir = "the directory to load non-preloaded plugins from; defaults to " [plugins.streaming] # a mapping of plugin-specific streaming service parameters, mapped to their plugin name [plugins.streaming.file] # the specific parameters for the file streaming service plugin keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"] - writeDir = "path to the write directory" + write_dir = "path to the write directory" prefix = "optional prefix to prepend to the generated file names" - mode = "faf" # faf == fire-and-forget; ack == require positive acknowledge of receipt + halt_app_on_delivery_error = "false" # false == fire-and-forget; true == stop the application [plugins.streaming.kafka] - ... - [plugins.modules] + keys = [] + topic_prefix = "block" # Optional prefix for topic names where data will be stored. + flush_timeout_ms = 5000 # Flush and wait for outstanding messages and requests to complete delivery when calling `StreamingService.Close(). (milliseconds) + halt_app_on_delivery_error = true # Whether or not to halt the application when plugin fails to deliver message(s). ... ```