docs: update adr038 (#12629)

This commit is contained in:
Marko 2022-07-19 15:29:42 +02:00 committed by GitHub
parent da32265513
commit 0f8f4ec9a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -73,19 +73,19 @@ func NewStoreKVPairWriteListener(w io.Writer, m codec.BinaryCodec) *StoreKVPairW
// OnWrite satisfies the WriteListener interface by writing length-prefixed protobuf encoded StoreKVPairs
func (wl *StoreKVPairWriteListener) OnWrite(storeKey types.StoreKey, key []byte, value []byte, delete bool) error error {
kvPair := new(types.StoreKVPair)
kvPair.StoreKey = storeKey.Name()
kvPair.Delete = Delete
kvPair.Key = key
kvPair.Value = value
by, err := wl.marshaller.MarshalBinaryLengthPrefixed(kvPair)
if err != nil {
return err
}
if _, err := wl.writer.Write(by); err != nil {
return err
}
return nil
kvPair := new(types.StoreKVPair)
kvPair.StoreKey = storeKey.Name()
kvPair.Delete = Delete
kvPair.Key = key
kvPair.Value = value
by, err := wl.marshaller.MarshalBinaryLengthPrefixed(kvPair)
if err != nil {
return err
}
if _, err := wl.writer.Write(by); err != nil {
return err
}
return nil
}
```
@ -99,39 +99,39 @@ We can configure the `Store` with a set of `WriteListener`s which stream the out
// Operations are traced on each core KVStore call and written to any of the
// underlying listeners with the proper key and operation permissions
type Store struct {
parent types.KVStore
listeners []types.WriteListener
parentStoreKey types.StoreKey
parent types.KVStore
listeners []types.WriteListener
parentStoreKey types.StoreKey
}
// NewStore returns a reference to a new traceKVStore given a parent
// KVStore implementation and a buffered writer.
func NewStore(parent types.KVStore, psk types.StoreKey, listeners []types.WriteListener) *Store {
return &Store{parent: parent, listeners: listeners, parentStoreKey: psk}
return &Store{parent: parent, listeners: listeners, parentStoreKey: psk}
}
// Set implements the KVStore interface. It traces a write operation and
// delegates the Set call to the parent KVStore.
func (s *Store) Set(key []byte, value []byte) {
types.AssertValidKey(key)
s.parent.Set(key, value)
s.onWrite(false, key, value)
types.AssertValidKey(key)
s.parent.Set(key, value)
s.onWrite(false, key, value)
}
// Delete implements the KVStore interface. It traces a write operation and
// delegates the Delete call to the parent KVStore.
func (s *Store) Delete(key []byte) {
s.parent.Delete(key)
s.onWrite(true, key, nil)
s.parent.Delete(key)
s.onWrite(true, key, nil)
}
// onWrite writes a KVStore operation to all the WriteListeners
func (s *Store) onWrite(delete bool, key, value []byte) {
for _, l := range s.listeners {
if err := l.OnWrite(s.parentStoreKey, key, value, delete); err != nil {
// log error
}
}
for _, l := range s.listeners {
if err := l.OnWrite(s.parentStoreKey, key, value, delete); err != nil {
// log error
}
}
}
```
@ -142,30 +142,30 @@ Additionally, we will update the `CacheWrap` and `CacheWrapper` interfaces to en
```go
type MultiStore interface {
...
...
// ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey
ListeningEnabled(key StoreKey) bool
// ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey
ListeningEnabled(key StoreKey) bool
// AddListeners adds WriteListeners for the KVStore belonging to the provided StoreKey
// It appends the listeners to a current set, if one already exists
AddListeners(key StoreKey, listeners []WriteListener)
// AddListeners adds WriteListeners for the KVStore belonging to the provided StoreKey
// It appends the listeners to a current set, if one already exists
AddListeners(key StoreKey, listeners []WriteListener)
}
```
```go
type CacheWrap interface {
...
...
// CacheWrapWithListeners recursively wraps again with listening enabled
CacheWrapWithListeners(storeKey types.StoreKey, listeners []WriteListener) CacheWrap
// CacheWrapWithListeners recursively wraps again with listening enabled
CacheWrapWithListeners(storeKey types.StoreKey, listeners []WriteListener) CacheWrap
}
type CacheWrapper interface {
...
...
// CacheWrapWithListeners recursively wraps again with listening enabled
CacheWrapWithListeners(storeKey types.StoreKey, listeners []WriteListener) CacheWrap
// CacheWrapWithListeners recursively wraps again with listening enabled
CacheWrapWithListeners(storeKey types.StoreKey, listeners []WriteListener) CacheWrap
}
```
@ -176,16 +176,16 @@ to wrap the returned `KVStore` with a `listenkv.Store` if listening is turned on
```go
func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore {
store := rs.stores[key].(types.KVStore)
store := rs.stores[key].(types.KVStore)
if rs.TracingEnabled() {
store = tracekv.NewStore(store, rs.traceWriter, rs.traceContext)
}
if rs.ListeningEnabled(key) {
store = listenkv.NewStore(key, store, rs.listeners[key])
}
if rs.TracingEnabled() {
store = tracekv.NewStore(store, rs.traceWriter, rs.traceContext)
}
if rs.ListeningEnabled(key) {
store = listenkv.NewStore(key, store, rs.listeners[key])
}
return store
return store
}
```
@ -194,11 +194,11 @@ to and enable listening in the cache layer.
```go
func (rs *Store) CacheMultiStore() types.CacheMultiStore {
stores := make(map[types.StoreKey]types.CacheWrapper)
for k, v := range rs.stores {
stores[k] = v
}
return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.traceContext, rs.listeners)
stores := make(map[types.StoreKey]types.CacheWrapper)
for k, v := range rs.stores {
stores[k] = v
}
return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.traceContext, rs.listeners)
}
```
@ -216,27 +216,31 @@ receipt from the `StreamingService`.
```go
// ABCIListener interface used to hook into the ABCI message processing of the BaseApp
type ABCIListener interface {
// ListenBeginBlock updates the streaming service with the latest BeginBlock messages
ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error
// ListenEndBlock updates the steaming service with the latest EndBlock messages
ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
// ListenSuccess returns a chan that is used to acknowledge successful receipt of messages by the external service
// after some configurable delay, `false` is sent to this channel from the service to signify failure of receipt
ListenSuccess() <-chan bool
// ListenBeginBlock updates the streaming service with the latest BeginBlock messages
ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error
// ListenEndBlock updates the steaming service with the latest EndBlock messages
ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
// HaltAppOnDeliveryError whether or not to halt the application when delivery of massages fails
// in ListenBeginBlock, ListenEndBlock, ListenDeliverTx. When `false, the app will operate in fire-and-forget mode.
// When `true`, the app will gracefully halt and stop the running node. Uncommitted blocks will
// be replayed to all listeners when the node restarts and all successful listeners that received data
// prior to the halt will receive duplicate data. Whether or not a listener operates in a fire-and-forget mode
// is determined by the listener's configuration property `halt_app_on_delivery_error = true|false`.
HaltAppOnDeliveryError() bool
}
// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
type StreamingService interface {
// Stream is the streaming service loop, awaits kv pairs and writes them to a destination stream or file
Stream(wg *sync.WaitGroup) error
// Listeners returns the streaming service's listeners for the BaseApp to register
Listeners() map[types.StoreKey][]store.WriteListener
// ABCIListener interface for hooking into the ABCI messages from inside the BaseApp
ABCIListener
// Closer interface
io.Closer
// Stream is the streaming service loop, awaits kv pairs and writes them to a destination stream or file
Stream(wg *sync.WaitGroup) error
// Listeners returns the streaming service's listeners for the BaseApp to register
Listeners() map[types.StoreKey][]store.WriteListener
// ABCIListener interface for hooking into the ABCI messages from inside the BaseApp
ABCIListener
// Closer interface
io.Closer
}
```
@ -257,15 +261,6 @@ func (app *BaseApp) SetStreamingService(s StreamingService) {
}
```
We will add a new method to the `BaseApp` that is used to configure a global wait limit for receiving positive acknowledgement
of message receipt from the integrated `StreamingService`s.
```go
func (app *BaseApp) SetGlobalWaitLimit(t time.Duration) {
app.globalWaitLimit = t
}
```
We will also modify the `BeginBlock`, `EndBlock`, and `DeliverTx` methods to pass ABCI requests and responses to any streaming service hooks registered
with the `BaseApp`.
@ -274,10 +269,32 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg
...
// Call the streaming service hooks with the BeginBlock messages
for _, listener := range app.abciListeners {
listener.ListenBeginBlock(app.deliverState.ctx, req, res)
// call the hooks with the BeginBlock messages
wg := new(sync.WaitGroup)
for _, streamingListener := range app.abciListeners {
streamingListener := streamingListener // https://go.dev/doc/faq#closures_and_goroutines
if streamingListener.HaltAppOnDeliveryError() {
// increment the wait group counter
wg.Add(1)
go func() {
// decrement the counter when the go routine completes
defer wg.Done()
if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err)
app.halt()
}
}()
} else {
// fire and forget semantics
go func() {
if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err)
}
}()
}
}
// wait for all the listener calls to finish
wg.Wait()
return res
}
@ -289,9 +306,31 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
...
// Call the streaming service hooks with the EndBlock messages
for _, listener := range app.abciListeners {
listener.ListenEndBlock(app.deliverState.ctx, req, res)
wg := new(sync.WaitGroup)
for _, streamingListener := range app.abciListeners {
streamingListener := streamingListener // https://go.dev/doc/faq#closures_and_goroutines
if streamingListener.HaltAppOnDeliveryError() {
// increment the wait group counter
wg.Add(1)
go func() {
// decrement the counter when the go routine completes
defer wg.Done()
if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err)
app.halt()
}
}()
} else {
// fire and forget semantics
go func() {
if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err)
}
}()
}
}
// wait for all the listener calls to finish
wg.Wait()
return res
}
@ -299,84 +338,40 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
```go
func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
...
gInfo, result, err := app.runTx(runTxModeDeliver, req.Tx)
if err != nil {
resultStr = "failed"
res := sdkerrors.ResponseDeliverTx(err, gInfo.GasWanted, gInfo.GasUsed, app.trace)
// If we throw an error, be sure to still call the streaming service's hook
for _, listener := range app.abciListeners {
listener.ListenDeliverTx(app.deliverState.ctx, req, res)
var abciRes abci.ResponseDeliverTx
defer func() {
// call the hooks with the BeginBlock messages
wg := new(sync.WaitGroup)
for _, streamingListener := range app.abciListeners {
streamingListener := streamingListener // https://go.dev/doc/faq#closures_and_goroutines
if streamingListener.HaltAppOnDeliveryError() {
// increment the wait group counter
wg.Add(1)
go func() {
// decrement the counter when the go routine completes
defer wg.Done()
if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, abciRes); err != nil {
app.logger.Error("DeliverTx listening hook failed", "err", err)
app.halt()
}
}()
} else {
// fire and forget semantics
go func() {
if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, abciRes); err != nil {
app.logger.Error("DeliverTx listening hook failed", "err", err)
}
}()
}
}
return res
}
res := abci.ResponseDeliverTx{
GasWanted: int64(gInfo.GasWanted), // TODO: Should type accept unsigned ints?
GasUsed: int64(gInfo.GasUsed), // TODO: Should type accept unsigned ints?
Log: result.Log,
Data: result.Data,
Events: sdk.MarkEventsToIndex(result.Events, app.indexEvents),
}
// Call the streaming service hooks with the DeliverTx messages
for _, listener := range app.abciListeners {
listener.ListenDeliverTx(app.deliverState.ctx, req, res)
}
return res
}
```
We will also modify the `Commit` method to process `success/failure` signals from the integrated `StreamingService`s using
the `ABCIListener.ListenSuccess()` method. Each `StreamingService` has an internal wait threshold after which it sends
`false` to the `ListenSuccess()` channel, and the BaseApp also imposes a configurable global wait limit.
If the `StreamingService` is operating in a "fire-and-forget" mode, `ListenSuccess()` should immediately return `true`
off the channel despite the success status of the service.
```go
func (app *BaseApp) Commit() (res abci.ResponseCommit) {
// wait for all the listener calls to finish
wg.Wait()
}()
...
var halt bool
switch {
case app.haltHeight > 0 && uint64(header.Height) >= app.haltHeight:
halt = true
case app.haltTime > 0 && header.Time.Unix() >= int64(app.haltTime):
halt = true
}
// each listener has an internal wait threshold after which it sends `false` to the ListenSuccess() channel
// but the BaseApp also imposes a global wait limit
maxWait := time.NewTicker(app.globalWaitLimit)
for _, lis := range app.abciListeners {
select {
case success := <- lis.ListenSuccess():
if success == false {
halt = true
break
}
case <- maxWait.C:
halt = true
break
}
}
if halt {
// Halt the binary and allow Tendermint to receive the ResponseCommit
// response with the commit ID hash. This will allow the node to successfully
// restart and process blocks assuming the halt configuration has been
// reset or moved to a more distant value.
app.halt()
}
...
return res
}
```
@ -421,7 +416,7 @@ type StateStreamingPlugin interface {
Register(bApp *baseapp.BaseApp, marshaller codec.BinaryCodec, keys map[string]*types.KVStoreKey) error
// Start starts the background streaming process of the plugin streaming service
Start(wg *sync.WaitGroup)
Start(wg *sync.WaitGroup) error
// Plugin is the base Plugin interface
Plugin
@ -440,34 +435,37 @@ func NewSimApp(
...
// this loads the preloaded and any plugins found in `plugins.dir`
pluginLoader, err := loader.NewPluginLoader(appOpts, logger)
if err != nil {
// handle error
}
// initialize the loaded plugins
if err := pluginLoader.Initialize(); err != nil {
// hanlde error
}
keys := sdk.NewKVStoreKeys(
authtypes.StoreKey, banktypes.StoreKey, stakingtypes.StoreKey,
minttypes.StoreKey, distrtypes.StoreKey, slashingtypes.StoreKey,
govtypes.StoreKey, paramstypes.StoreKey, ibchost.StoreKey, upgradetypes.StoreKey,
evidencetypes.StoreKey, ibctransfertypes.StoreKey, capabilitytypes.StoreKey,
authtypes.StoreKey, banktypes.StoreKey, stakingtypes.StoreKey,
minttypes.StoreKey, distrtypes.StoreKey, slashingtypes.StoreKey,
govtypes.StoreKey, paramstypes.StoreKey, ibchost.StoreKey, upgradetypes.StoreKey,
evidencetypes.StoreKey, ibctransfertypes.StoreKey, capabilitytypes.StoreKey,
)
// register the plugin(s) with the BaseApp
if err := pluginLoader.Inject(bApp, appCodec, keys); err != nil {
// handle error
}
pluginsOnKey := fmt.Sprintf("%s.%s", plugin.PLUGINS_TOML_KEY, plugin.PLUGINS_ON_TOML_KEY)
if cast.ToBool(appOpts.Get(pluginsOnKey)) {
// this loads the preloaded and any plugins found in `plugins.dir`
pluginLoader, err := loader.NewPluginLoader(appOpts, logger)
if err != nil {
// handle error
}
// start the plugin services, optionally use wg to synchronize shutdown using io.Closer
wg := new(sync.WaitGroup)
if err := pluginLoader.Start(wg); err != nil {
// handler error
}
// initialize the loaded plugins
if err := pluginLoader.Initialize(); err != nil {
// handle error
}
// register the plugin(s) with the BaseApp
if err := pluginLoader.Inject(bApp, appCodec, keys); err != nil {
// handle error
}
// start the plugin services, optionally use wg to synchronize shutdown using io.Closer
wg := new(sync.WaitGroup)
if err := pluginLoader.Start(wg); err != nil {
// handler error
}
}
...
@ -483,39 +481,42 @@ The plugin system will be configured within an app's app.toml file.
```toml
[plugins]
on = false # turn the plugin system, as a whole, on or off
disabled = ["list", "of", "plugin", "names", "to", "disable"]
enabled = ["list", "of", "plugin", "names", "to", "enable"]
dir = "the directory to load non-preloaded plugins from; defaults to cosmos-sdk/plugin/plugins"
```
There will be three parameters for configuring the plugin system: `plugins.on`, `plugins.disabled` and `plugins.dir`.
There will be three parameters for configuring the plugin system: `plugins.on`, `plugins.enabled` and `plugins.dir`.
`plugins.on` is a bool that turns on or off the plugin system at large, `plugins.dir` directs the system to a directory
to load plugins from, and `plugins.disabled` is a list of names for the plugins we want to disable (useful for disabling preloaded plugins).
to load plugins from, and `plugins.enabled` provides `opt-in` semantics to plugin names to enable (including preloaded plugins).
Configuration of a given plugin is ultimately specific to the plugin, but we will introduce some standards here:
Plugin TOML configuration should be split into separate sub-tables for each kind of plugin (e.g. `plugins.streaming`).
Within these sub-tables, the parameters for a specific plugin of that kind are included in another sub-table (e.g. `plugins.streaming.file`).
It is generally expected, but not required, that a streaming service plugin can be configured with a set of store keys
(e.g. `plugins.streaming.file.keys`) for the stores it listens to and a mode (e.g. `plugins.streaming.file.mode`)
that signifies whether the service operates in a fire-and-forget capacity (`faf`) or the BaseApp should require positive
acknowledgement of message receipt by the service (`ack`).
(e.g. `plugins.streaming.file.keys`) for the stores it listens to and a flag (e.g. `plugins.streaming.file.halt_app_on_delivery_error`)
that signifies whether the service operates in a fire-and-forget capacity, or stop the BaseApp when an error occurs in
any of `ListenBeginBlock`, `ListenEndBlock` and `ListenDeliverTx`.
e.g.
```toml
[plugins]
on = false # turn the plugin system, as a whole, on or off
disabled = ["list", "of", "plugin", "names", "to", "disable"]
enabled = ["list", "of", "plugin", "names", "to", "enable"]
dir = "the directory to load non-preloaded plugins from; defaults to "
[plugins.streaming] # a mapping of plugin-specific streaming service parameters, mapped to their plugin name
[plugins.streaming.file] # the specific parameters for the file streaming service plugin
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
writeDir = "path to the write directory"
write_dir = "path to the write directory"
prefix = "optional prefix to prepend to the generated file names"
mode = "faf" # faf == fire-and-forget; ack == require positive acknowledge of receipt
halt_app_on_delivery_error = "false" # false == fire-and-forget; true == stop the application
[plugins.streaming.kafka]
...
[plugins.modules]
keys = []
topic_prefix = "block" # Optional prefix for topic names where data will be stored.
flush_timeout_ms = 5000 # Flush and wait for outstanding messages and requests to complete delivery when calling `StreamingService.Close(). (milliseconds)
halt_app_on_delivery_error = true # Whether or not to halt the application when plugin fails to deliver message(s).
...
```