feat(server/v2): init the indexer in server/v2 (#22218)
Co-authored-by: Julien Robert <julien@rbrt.fr>
This commit is contained in:
parent
dd2369daf2
commit
e84c0eb86b
@ -8,6 +8,7 @@ import (
|
||||
"cosmossdk.io/core/registry"
|
||||
"cosmossdk.io/core/transaction"
|
||||
"cosmossdk.io/log"
|
||||
"cosmossdk.io/schema/decoding"
|
||||
"cosmossdk.io/server/v2/appmanager"
|
||||
"cosmossdk.io/server/v2/stf"
|
||||
)
|
||||
@ -96,3 +97,12 @@ func (a *App[T]) GetAppManager() *appmanager.AppManager[T] {
|
||||
func (a *App[T]) GetQueryHandlers() map[string]appmodulev2.Handler {
|
||||
return a.QueryHandlers
|
||||
}
|
||||
|
||||
// GetSchemaDecoderResolver returns the module schema resolver.
|
||||
func (a *App[T]) GetSchemaDecoderResolver() decoding.DecoderResolver {
|
||||
moduleSet := map[string]any{}
|
||||
for moduleName, module := range a.moduleManager.Modules() {
|
||||
moduleSet[moduleName] = module
|
||||
}
|
||||
return decoding.ModuleSetDecoderResolver(moduleSet)
|
||||
}
|
||||
|
||||
@ -16,6 +16,7 @@ require (
|
||||
cosmossdk.io/core v1.0.0-alpha.4
|
||||
cosmossdk.io/depinject v1.0.0
|
||||
cosmossdk.io/log v1.4.1
|
||||
cosmossdk.io/schema v0.3.0
|
||||
cosmossdk.io/server/v2/appmanager v0.0.0-00010101000000-000000000000
|
||||
cosmossdk.io/server/v2/stf v0.0.0-00010101000000-000000000000
|
||||
cosmossdk.io/store/v2 v2.0.0-00010101000000-000000000000
|
||||
@ -30,7 +31,6 @@ require (
|
||||
buf.build/gen/go/cosmos/gogo-proto/protocolbuffers/go v1.34.2-20240130113600-88ef6483f90f.2 // indirect
|
||||
cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29 // indirect
|
||||
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 // indirect
|
||||
cosmossdk.io/schema v0.3.0 // indirect
|
||||
github.com/DataDog/zstd v1.5.5 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
|
||||
@ -51,11 +51,11 @@ type IndexingOptions struct {
|
||||
// IndexingConfig is the configuration of the indexer manager and contains the configuration for each indexer target.
|
||||
type IndexingConfig struct {
|
||||
// Target is a map of named indexer targets to their configuration.
|
||||
Target map[string]Config
|
||||
Target map[string]Config `mapstructure:"target" toml:"target" json:"target" comment:"Target is a map of named indexer targets to their configuration."`
|
||||
|
||||
// ChannelBufferSize is the buffer size of the channels used for buffering data sent to indexer go routines.
|
||||
// It defaults to 1024.
|
||||
ChannelBufferSize *int `json:"channel_buffer_size,omitempty"`
|
||||
ChannelBufferSize int `mapstructure:"channel_buffer_size" toml:"channel_buffer_size" json:"channel_buffer_size,omitempty" comment:"Buffer size of the channels used for buffering data sent to indexer go routines."`
|
||||
}
|
||||
|
||||
// IndexingTarget returns the indexing target listener and associated data.
|
||||
@ -142,8 +142,8 @@ func StartIndexing(opts IndexingOptions) (IndexingTarget, error) {
|
||||
}
|
||||
|
||||
bufSize := 1024
|
||||
if cfg.ChannelBufferSize != nil {
|
||||
bufSize = *cfg.ChannelBufferSize
|
||||
if cfg.ChannelBufferSize != 0 {
|
||||
bufSize = cfg.ChannelBufferSize
|
||||
}
|
||||
asyncOpts := appdata.AsyncListenerOptions{
|
||||
Context: ctx,
|
||||
|
||||
@ -114,11 +114,6 @@ func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) {
|
||||
c.streaming = sm
|
||||
}
|
||||
|
||||
// SetListener sets the listener for the consensus module.
|
||||
func (c *Consensus[T]) SetListener(l *appdata.Listener) {
|
||||
c.listener = l
|
||||
}
|
||||
|
||||
// RegisterSnapshotExtensions registers the given extensions with the consensus module's snapshot manager.
|
||||
// It allows additional snapshotter implementations to be used for creating and restoring snapshots.
|
||||
func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error {
|
||||
|
||||
@ -3,6 +3,7 @@ package cometbft
|
||||
import (
|
||||
cmtcfg "github.com/cometbft/cometbft/config"
|
||||
|
||||
"cosmossdk.io/schema/indexer"
|
||||
"cosmossdk.io/server/v2/cometbft/mempool"
|
||||
)
|
||||
|
||||
@ -23,6 +24,10 @@ func DefaultAppTomlConfig() *AppTomlConfig {
|
||||
Trace: false,
|
||||
Standalone: false,
|
||||
Mempool: mempool.DefaultConfig(),
|
||||
Indexer: indexer.IndexingConfig{
|
||||
Target: make(map[string]indexer.Config),
|
||||
ChannelBufferSize: 1024,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@ -37,7 +42,8 @@ type AppTomlConfig struct {
|
||||
Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."`
|
||||
|
||||
// Sub configs
|
||||
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
|
||||
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
|
||||
Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."`
|
||||
}
|
||||
|
||||
// CfgOption is a function that allows to overwrite the default server configuration.
|
||||
|
||||
@ -22,7 +22,7 @@ require (
|
||||
cosmossdk.io/core v1.0.0-alpha.4
|
||||
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5
|
||||
cosmossdk.io/log v1.4.1
|
||||
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9
|
||||
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac
|
||||
cosmossdk.io/server/v2 v2.0.0-00010101000000-000000000000
|
||||
cosmossdk.io/server/v2/appmanager v0.0.0-20240802110823-cffeedff643d
|
||||
cosmossdk.io/server/v2/stf v0.0.0-20240708142107-25e99c54bac1
|
||||
|
||||
@ -20,8 +20,8 @@ cosmossdk.io/log v1.4.1 h1:wKdjfDRbDyZRuWa8M+9nuvpVYxrEOwbD/CA8hvhU8QM=
|
||||
cosmossdk.io/log v1.4.1/go.mod h1:k08v0Pyq+gCP6phvdI6RCGhLf/r425UT6Rk/m+o74rU=
|
||||
cosmossdk.io/math v1.3.0 h1:RC+jryuKeytIiictDslBP9i1fhkVm6ZDmZEoNP316zE=
|
||||
cosmossdk.io/math v1.3.0/go.mod h1:vnRTxewy+M7BtXBNFybkuhSH4WfedVAAnERHgVFhp3k=
|
||||
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 h1:DmOoS/1PeY6Ih0hAVlJ69kLMUrLV+TCbfICrZtB1vdU=
|
||||
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
|
||||
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac h1:3joNZZWZ3k7fMsrBDL1ktuQ2xQwYLZOaDhkruadDFmc=
|
||||
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
|
||||
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
|
||||
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
|
||||
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs=
|
||||
|
||||
@ -20,6 +20,7 @@ import (
|
||||
|
||||
"cosmossdk.io/core/transaction"
|
||||
"cosmossdk.io/log"
|
||||
"cosmossdk.io/schema/indexer"
|
||||
serverv2 "cosmossdk.io/server/v2"
|
||||
cometlog "cosmossdk.io/server/v2/cometbft/log"
|
||||
"cosmossdk.io/server/v2/cometbft/mempool"
|
||||
@ -131,6 +132,19 @@ func (s *CometBFTServer[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logg
|
||||
}
|
||||
consensus.snapshotManager = snapshots.NewManager(snapshotStore, s.serverOptions.SnapshotOptions(cfg), sc, ss, nil, s.logger)
|
||||
|
||||
// initialize the indexer
|
||||
if indexerCfg := s.config.AppTomlConfig.Indexer; len(indexerCfg.Target) > 0 {
|
||||
listener, err := indexer.StartIndexing(indexer.IndexingOptions{
|
||||
Config: indexerCfg,
|
||||
Resolver: appI.GetSchemaDecoderResolver(),
|
||||
Logger: s.logger.With(log.ModuleKey, "indexer"),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start indexing: %w", err)
|
||||
}
|
||||
consensus.listener = &listener.Listener
|
||||
}
|
||||
|
||||
s.Consensus = consensus
|
||||
|
||||
return nil
|
||||
|
||||
@ -44,11 +44,8 @@ func ReadConfig(configPath string) (*viper.Viper, error) {
|
||||
func UnmarshalSubConfig(cfg map[string]any, subName string, target any) error {
|
||||
var sub any
|
||||
if subName != "" {
|
||||
for k, val := range cfg {
|
||||
if k == subName {
|
||||
sub = val
|
||||
break
|
||||
}
|
||||
if val, ok := cfg[subName]; ok {
|
||||
sub = val
|
||||
}
|
||||
} else {
|
||||
sub = cfg
|
||||
|
||||
@ -16,6 +16,7 @@ require (
|
||||
cosmossdk.io/core v1.0.0-alpha.4
|
||||
cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29
|
||||
cosmossdk.io/log v1.4.1
|
||||
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac
|
||||
cosmossdk.io/server/v2/appmanager v0.0.0-00010101000000-000000000000
|
||||
cosmossdk.io/store/v2 v2.0.0-00010101000000-000000000000
|
||||
github.com/cosmos/cosmos-proto v1.0.0-beta.5
|
||||
@ -42,7 +43,6 @@ require (
|
||||
|
||||
require (
|
||||
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 // indirect
|
||||
cosmossdk.io/schema v0.3.0 // indirect
|
||||
github.com/DataDog/datadog-go v4.8.3+incompatible // indirect
|
||||
github.com/DataDog/zstd v1.5.5 // indirect
|
||||
github.com/Microsoft/go-winio v0.6.1 // indirect
|
||||
|
||||
@ -8,8 +8,8 @@ cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 h1:IQNdY2kB+k+1OM2DvqF
|
||||
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5/go.mod h1:0CuYKkFHxc1vw2JC+t21THBCALJVROrWVR/3PQ1urpc=
|
||||
cosmossdk.io/log v1.4.1 h1:wKdjfDRbDyZRuWa8M+9nuvpVYxrEOwbD/CA8hvhU8QM=
|
||||
cosmossdk.io/log v1.4.1/go.mod h1:k08v0Pyq+gCP6phvdI6RCGhLf/r425UT6Rk/m+o74rU=
|
||||
cosmossdk.io/schema v0.3.0 h1:01lcaM4trhzZ1HQTfTV8z6Ma1GziOZ/YmdzBN3F720c=
|
||||
cosmossdk.io/schema v0.3.0/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
|
||||
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac h1:3joNZZWZ3k7fMsrBDL1ktuQ2xQwYLZOaDhkruadDFmc=
|
||||
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
|
||||
github.com/DataDog/datadog-go v4.8.3+incompatible h1:fNGaYSuObuQb5nzeTQqowRAd9bpDIRRV4/gUtIBjh8Q=
|
||||
|
||||
@ -7,6 +7,7 @@ import (
|
||||
"cosmossdk.io/core/server"
|
||||
"cosmossdk.io/core/transaction"
|
||||
"cosmossdk.io/log"
|
||||
"cosmossdk.io/schema/decoding"
|
||||
"cosmossdk.io/server/v2/appmanager"
|
||||
"cosmossdk.io/store/v2"
|
||||
)
|
||||
@ -19,4 +20,5 @@ type AppI[T transaction.Tx] interface {
|
||||
GetAppManager() *appmanager.AppManager[T]
|
||||
GetQueryHandlers() map[string]appmodulev2.Handler
|
||||
GetStore() store.RootStore
|
||||
GetSchemaDecoderResolver() decoding.DecoderResolver
|
||||
}
|
||||
|
||||
@ -61,7 +61,7 @@ require (
|
||||
cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29 // indirect
|
||||
cosmossdk.io/errors v1.0.1 // indirect
|
||||
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 // indirect
|
||||
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 // indirect
|
||||
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac // indirect
|
||||
cosmossdk.io/server/v2/appmanager v0.0.0-20240802110823-cffeedff643d // indirect
|
||||
cosmossdk.io/server/v2/stf v0.0.0-20240708142107-25e99c54bac1 // indirect
|
||||
cosmossdk.io/store v1.1.1 // indirect
|
||||
|
||||
@ -206,8 +206,8 @@ cosmossdk.io/log v1.4.1 h1:wKdjfDRbDyZRuWa8M+9nuvpVYxrEOwbD/CA8hvhU8QM=
|
||||
cosmossdk.io/log v1.4.1/go.mod h1:k08v0Pyq+gCP6phvdI6RCGhLf/r425UT6Rk/m+o74rU=
|
||||
cosmossdk.io/math v1.3.0 h1:RC+jryuKeytIiictDslBP9i1fhkVm6ZDmZEoNP316zE=
|
||||
cosmossdk.io/math v1.3.0/go.mod h1:vnRTxewy+M7BtXBNFybkuhSH4WfedVAAnERHgVFhp3k=
|
||||
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 h1:DmOoS/1PeY6Ih0hAVlJ69kLMUrLV+TCbfICrZtB1vdU=
|
||||
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
|
||||
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac h1:3joNZZWZ3k7fMsrBDL1ktuQ2xQwYLZOaDhkruadDFmc=
|
||||
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
|
||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
|
||||
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
|
||||
|
||||
@ -21,6 +21,14 @@ standalone = false
|
||||
# max-txs defines the maximum number of transactions that can be in the mempool. A value of 0 indicates an unbounded mempool, a negative value disables the app-side mempool.
|
||||
max-txs = -1
|
||||
|
||||
# indexer defines the configuration for the SDK built-in indexer implementation.
|
||||
[comet.indexer]
|
||||
# Buffer size of the channels used for buffering data sent to indexer go routines.
|
||||
channel_buffer_size = 1024
|
||||
|
||||
# Target is a map of named indexer targets to their configuration.
|
||||
[comet.indexer.target]
|
||||
|
||||
[grpc]
|
||||
# Enable defines if the gRPC server should be enabled.
|
||||
enable = true
|
||||
@ -71,7 +79,7 @@ skip-fast-storage-upgrade = true
|
||||
# Enable enables the application telemetry functionality. When enabled, an in-memory sink is also enabled by default. Operators may also enabled other sinks such as Prometheus.
|
||||
enable = true
|
||||
# Address defines the metrics server address to bind to.
|
||||
address = 'localhost:1338'
|
||||
address = 'localhost:1318'
|
||||
# Prefixed with keys to separate services.
|
||||
service-name = ''
|
||||
# Enable prefixing gauge values with hostname.
|
||||
|
||||
Loading…
Reference in New Issue
Block a user