refactor: State Streaming Docs + Explicit Config Support (#13894)
This commit is contained in:
parent
6dbf113efd
commit
2c0d445ad3
@ -58,6 +58,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
|
||||
|
||||
### Improvements
|
||||
|
||||
* (config) [#13894](https://github.com/cosmos/cosmos-sdk/pull/13894) Support state streaming configuration in `app.toml` template and default configuration.
|
||||
* (x/nft) [#13836](https://github.com/cosmos/cosmos-sdk/pull/13836) Remove the validation for `classID` and `nftID` from the NFT module.
|
||||
* [#13789](https://github.com/cosmos/cosmos-sdk/pull/13789) Add tx `encode` and `decode` endpoints to tx service.
|
||||
> Note: This endpoint will only encode proto messages, Amino encoding is not supported.
|
||||
|
||||
@ -33,6 +33,9 @@ const (
|
||||
// DefaultGRPCMaxSendMsgSize defines the default gRPC max message size in
|
||||
// bytes the server can send.
|
||||
DefaultGRPCMaxSendMsgSize = math.MaxInt32
|
||||
|
||||
// FileStreamer defines the store streaming type for file streaming.
|
||||
FileStreamer = "file"
|
||||
)
|
||||
|
||||
// BaseConfig defines the server's basic configuration
|
||||
@ -196,6 +199,28 @@ type StateSyncConfig struct {
|
||||
SnapshotKeepRecent uint32 `mapstructure:"snapshot-keep-recent"`
|
||||
}
|
||||
|
||||
type (
|
||||
// StoreConfig defines application configuration for state streaming and other
|
||||
// storage related operations.
|
||||
StoreConfig struct {
|
||||
Streamers []string `mapstructure:"streamers"`
|
||||
}
|
||||
|
||||
// StreamersConfig defines concrete state streaming configuration options. These
|
||||
// fields are required to be set when state streaming is enabled via a non-empty
|
||||
// list defined by 'StoreConfig.Streamers'.
|
||||
StreamersConfig struct {
|
||||
File FileStreamerConfig `mapstructure:"file"`
|
||||
}
|
||||
|
||||
// FileStreamerConfig defines the file streaming configuration options.
|
||||
FileStreamerConfig struct {
|
||||
Keys []string `mapstructure:"keys"`
|
||||
WriteDir string `mapstructure:"write_dir"`
|
||||
Prefix string `mapstructure:"prefix"`
|
||||
}
|
||||
)
|
||||
|
||||
// Config defines the server's top level configuration
|
||||
type Config struct {
|
||||
BaseConfig `mapstructure:",squash"`
|
||||
@ -207,6 +232,8 @@ type Config struct {
|
||||
Rosetta RosettaConfig `mapstructure:"rosetta"`
|
||||
GRPCWeb GRPCWebConfig `mapstructure:"grpc-web"`
|
||||
StateSync StateSyncConfig `mapstructure:"state-sync"`
|
||||
Store StoreConfig `mapstructure:"store"`
|
||||
Streamers StreamersConfig `mapstructure:"streamers"`
|
||||
}
|
||||
|
||||
// SetMinGasPrices sets the validator's minimum gas prices.
|
||||
@ -288,6 +315,14 @@ func DefaultConfig() *Config {
|
||||
SnapshotInterval: 0,
|
||||
SnapshotKeepRecent: 2,
|
||||
},
|
||||
Store: StoreConfig{
|
||||
Streamers: []string{},
|
||||
},
|
||||
Streamers: StreamersConfig{
|
||||
File: FileStreamerConfig{
|
||||
Keys: []string{"*"},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -6,7 +6,6 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/spf13/viper"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
sdk "github.com/cosmos/cosmos-sdk/types"
|
||||
@ -32,28 +31,54 @@ func TestIndexEventsMarshalling(t *testing.T) {
|
||||
err := configTemplate.Execute(&buffer, cfg)
|
||||
require.NoError(t, err, "executing template")
|
||||
actual := buffer.String()
|
||||
assert.Contains(t, actual, expectedIn, "config file contents")
|
||||
require.Contains(t, actual, expectedIn, "config file contents")
|
||||
}
|
||||
|
||||
func TestParseStoreStreaming(t *testing.T) {
|
||||
expectedContents := `[store]
|
||||
streamers = ["file", ]
|
||||
|
||||
[streamers]
|
||||
[streamers.file]
|
||||
keys = ["*", ]
|
||||
write_dir = "/foo/bar"
|
||||
prefix = ""`
|
||||
|
||||
cfg := DefaultConfig()
|
||||
cfg.Store.Streamers = []string{FileStreamer}
|
||||
cfg.Streamers.File.Keys = []string{"*"}
|
||||
cfg.Streamers.File.WriteDir = "/foo/bar"
|
||||
|
||||
var buffer bytes.Buffer
|
||||
require.NoError(t, configTemplate.Execute(&buffer, cfg), "executing template")
|
||||
require.Contains(t, buffer.String(), expectedContents, "config file contents")
|
||||
}
|
||||
|
||||
func TestIndexEventsWriteRead(t *testing.T) {
|
||||
expected := []string{"key3", "key4"}
|
||||
|
||||
// Create config with two IndexEvents entries, and write it to a file.
|
||||
confFile := filepath.Join(t.TempDir(), "app.toml")
|
||||
conf := DefaultConfig()
|
||||
conf.IndexEvents = expected
|
||||
|
||||
WriteConfigFile(confFile, conf)
|
||||
|
||||
// Read that file into viper.
|
||||
// read the file into Viper
|
||||
vpr := viper.New()
|
||||
vpr.SetConfigFile(confFile)
|
||||
|
||||
err := vpr.ReadInConfig()
|
||||
require.NoError(t, err, "reading config file into viper")
|
||||
|
||||
// Check that the raw viper value is correct.
|
||||
actualRaw := vpr.GetStringSlice("index-events")
|
||||
require.Equal(t, expected, actualRaw, "viper's index events")
|
||||
|
||||
// Check that it is parsed into the config correctly.
|
||||
cfg, perr := ParseConfig(vpr)
|
||||
require.NoError(t, perr, "parsing config")
|
||||
|
||||
actual := cfg.IndexEvents
|
||||
require.Equal(t, expected, actual, "config value")
|
||||
}
|
||||
@ -62,7 +87,7 @@ func TestGlobalLabelsEventsMarshalling(t *testing.T) {
|
||||
expectedIn := `global-labels = [
|
||||
["labelname1", "labelvalue1"],
|
||||
["labelname2", "labelvalue2"],
|
||||
]` + "\n"
|
||||
]`
|
||||
cfg := DefaultConfig()
|
||||
cfg.Telemetry.GlobalLabels = [][]string{{"labelname1", "labelvalue1"}, {"labelname2", "labelvalue2"}}
|
||||
var buffer bytes.Buffer
|
||||
@ -70,7 +95,7 @@ func TestGlobalLabelsEventsMarshalling(t *testing.T) {
|
||||
err := configTemplate.Execute(&buffer, cfg)
|
||||
require.NoError(t, err, "executing template")
|
||||
actual := buffer.String()
|
||||
assert.Contains(t, actual, expectedIn, "config file contents")
|
||||
require.Contains(t, actual, expectedIn, "config file contents")
|
||||
}
|
||||
|
||||
func TestGlobalLabelsWriteRead(t *testing.T) {
|
||||
|
||||
@ -235,6 +235,19 @@ snapshot-interval = {{ .StateSync.SnapshotInterval }}
|
||||
|
||||
# snapshot-keep-recent specifies the number of recent snapshots to keep and serve (0 to keep all).
|
||||
snapshot-keep-recent = {{ .StateSync.SnapshotKeepRecent }}
|
||||
|
||||
###############################################################################
|
||||
### Store / State Streaming ###
|
||||
###############################################################################
|
||||
|
||||
[store]
|
||||
streamers = [{{ range .Store.Streamers }}{{ printf "%q, " . }}{{end}}]
|
||||
|
||||
[streamers]
|
||||
[streamers.file]
|
||||
keys = [{{ range .Streamers.File.Keys }}{{ printf "%q, " . }}{{end}}]
|
||||
write_dir = "{{ .Streamers.File.WriteDir }}"
|
||||
prefix = "{{ .Streamers.File.Prefix }}"
|
||||
`
|
||||
|
||||
var configTemplate *template.Template
|
||||
|
||||
@ -14,6 +14,7 @@ import (
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
"cosmossdk.io/depinject"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/baseapp"
|
||||
"github.com/cosmos/cosmos-sdk/client"
|
||||
"github.com/cosmos/cosmos-sdk/codec"
|
||||
@ -266,10 +267,9 @@ func NewSimApp(
|
||||
|
||||
app.App = appBuilder.Build(logger, db, traceStore, baseAppOptions...)
|
||||
|
||||
// configure state listening capabilities using AppOptions
|
||||
// we are doing nothing with the returned streamingServices and waitGroup in this case
|
||||
// load state streaming if enabled
|
||||
if _, _, err := streaming.LoadStreamingServices(app.App.BaseApp, appOpts, app.appCodec, app.keys); err != nil {
|
||||
fmt.Println(err.Error())
|
||||
fmt.Printf("failed to load state streaming: %s", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
|
||||
@ -17,6 +17,7 @@ import (
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
simappparams "cosmossdk.io/simapp/params"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/baseapp"
|
||||
"github.com/cosmos/cosmos-sdk/client"
|
||||
"github.com/cosmos/cosmos-sdk/client/flags"
|
||||
@ -246,10 +247,9 @@ func NewSimApp(
|
||||
// not include this key.
|
||||
memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, "testingkey")
|
||||
|
||||
// configure state listening capabilities using AppOptions
|
||||
// we are doing nothing with the returned streamingServices and waitGroup in this case
|
||||
// load state streaming if enabled
|
||||
if _, _, err := streaming.LoadStreamingServices(bApp, appOpts, appCodec, keys); err != nil {
|
||||
fmt.Println(err.Error())
|
||||
fmt.Printf("failed to load state streaming: %s", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
|
||||
@ -1,29 +1,38 @@
|
||||
# State Streaming Service
|
||||
|
||||
This package contains the constructors for the `StreamingService`s used to write state changes out from individual KVStores to a
|
||||
file or stream, as described in [ADR-038](https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-038-state-listening.md) and defined in [types/streaming.go](https://github.com/cosmos/cosmos-sdk/blob/main/baseapp/streaming.go).
|
||||
This package contains the constructors for the `StreamingService`s used to write
|
||||
state changes out from individual KVStores to a file or stream, as described in
|
||||
[ADR-038](https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-038-state-listening.md)
|
||||
and defined in [types/streaming.go](https://github.com/cosmos/cosmos-sdk/blob/main/baseapp/streaming.go).
|
||||
The child directories contain the implementations for specific output destinations.
|
||||
|
||||
Currently, a `StreamingService` implementation that writes state changes out to files is supported, in the future support for additional
|
||||
output destinations can be added.
|
||||
Currently, a `StreamingService` implementation that writes state changes out to
|
||||
files is supported, in the future support for additional output destinations can
|
||||
be added.
|
||||
|
||||
The `StreamingService` is configured from within an App using the `AppOptions` loaded from the app.toml file:
|
||||
The `StreamingService` is configured from within an App using the `AppOptions`
|
||||
loaded from the `app.toml` file:
|
||||
|
||||
```toml
|
||||
# ...
|
||||
|
||||
[store]
|
||||
streamers = [ # if len(streamers) > 0 we are streaming
|
||||
"file", # name of the streaming service, used by constructor
|
||||
]
|
||||
# streaming is enabled if one or more streamers are defined
|
||||
streamers = [
|
||||
# name of the streaming service, used by constructor
|
||||
"file"
|
||||
]
|
||||
|
||||
[streamers]
|
||||
[streamers.file]
|
||||
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
|
||||
write_dir = "path to the write directory"
|
||||
prefix = "optional prefix to prepend to the generated file names"
|
||||
[streamers.file]
|
||||
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
|
||||
write_dir = "path to the write directory"
|
||||
prefix = "optional prefix to prepend to the generated file names"
|
||||
```
|
||||
|
||||
`store.streamers` contains a list of the names of the `StreamingService` implementations to employ which are used by `ServiceTypeFromString`
|
||||
to return the `ServiceConstructor` for that particular implementation:
|
||||
The `store.streamers` field contains a list of the names of the `StreamingService`
|
||||
implementations to employ which are used by `ServiceTypeFromString` to return
|
||||
the `ServiceConstructor` for that particular implementation:
|
||||
|
||||
```go
|
||||
listeners := cast.ToStringSlice(appOpts.Get("store.streamers"))
|
||||
@ -35,18 +44,27 @@ for _, listenerName := range listeners {
|
||||
}
|
||||
```
|
||||
|
||||
`streamers` contains a mapping of the specific `StreamingService` implementation name to the configuration parameters for that specific service.
|
||||
`streamers.x.keys` contains the list of `StoreKey` names for the KVStores to expose using this service and is required by every type of `StreamingService`.
|
||||
In order to expose *all* KVStores, we can include `*` in this list. An empty list is equivalent to turning the service off.
|
||||
The `streamers` field contains a mapping of the specific `StreamingService`
|
||||
implementation name to the configuration parameters for that specific service.
|
||||
|
||||
The `streamers.x.keys` field contains the list of `StoreKey` names for the
|
||||
KVStores to expose using this service and is required by every type of
|
||||
`StreamingService`. In order to expose *ALL* KVStores, we can include `*` in
|
||||
this list. An empty list is equivalent to turning the service off.
|
||||
|
||||
Additional configuration parameters are optional and specific to the implementation.
|
||||
In the case of the file streaming service, `streamers.file.write_dir` contains the path to the
|
||||
directory to write the files to, and `streamers.file.prefix` contains an optional prefix to prepend to the output files to prevent potential collisions
|
||||
with other App `StreamingService` output files.
|
||||
In the case of the file streaming service, the `streamers.file.write_dir` field
|
||||
contains the path to the directory to write the files to, and `streamers.file.prefix`
|
||||
contains an optional prefix to prepend to the output files to prevent potential
|
||||
collisions with other App `StreamingService` output files.
|
||||
|
||||
The `ServiceConstructor` accepts `AppOptions`, the store keys collected using `streamers.x.keys`, a `BinaryMarshaller` and
|
||||
returns a `StreamingService` implementation. The `AppOptions` are passed in to provide access to any implementation specific configuration options,
|
||||
e.g. in the case of the file streaming service the `streamers.file.write_dir` and `streamers.file.prefix`.
|
||||
The `ServiceConstructor` accepts `AppOptions`, the store keys collected using
|
||||
`streamers.x.keys`, a `BinaryMarshaller` and returns a `StreamingService
|
||||
implementation.
|
||||
|
||||
The `AppOptions` are passed in to provide access to any implementation specific
|
||||
configuration options, e.g. in the case of the file streaming service the
|
||||
`streamers.file.write_dir` and `streamers.file.prefix`.
|
||||
|
||||
```go
|
||||
streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec)
|
||||
@ -55,9 +73,12 @@ if err != nil {
|
||||
}
|
||||
```
|
||||
|
||||
The returned `StreamingService` is loaded into the BaseApp using the BaseApp's `SetStreamingService` method.
|
||||
The `Stream` method is called on the service to begin the streaming process. Depending on the implementation this process
|
||||
may be synchronous or asynchronous with the message processing of the state machine.
|
||||
The returned `StreamingService` is loaded into the BaseApp using the BaseApp's
|
||||
`SetStreamingService` method.
|
||||
|
||||
The `Stream` method is called on the service to begin the streaming process.
|
||||
Depending on the implementation this process may be synchronous or asynchronous
|
||||
with the message processing of the state machine.
|
||||
|
||||
```go
|
||||
bApp.SetStreamingService(streamingService)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user