feat(server/v2): wire telemetry + server refactors (#21746)
This commit is contained in:
parent
9d631d355c
commit
7fe95fc3f9
@ -4,14 +4,9 @@ import "math"
|
||||
|
||||
func DefaultConfig() *Config {
|
||||
return &Config{
|
||||
Enable: true,
|
||||
// DefaultGRPCAddress defines the default address to bind the gRPC server to.
|
||||
Address: "localhost:9090",
|
||||
// DefaultGRPCMaxRecvMsgSize defines the default gRPC max message size in
|
||||
// bytes the server can receive.
|
||||
Enable: true,
|
||||
Address: "localhost:9090",
|
||||
MaxRecvMsgSize: 1024 * 1024 * 10,
|
||||
// DefaultGRPCMaxSendMsgSize defines the default gRPC max message size in
|
||||
// bytes the server can send.
|
||||
MaxSendMsgSize: math.MaxInt32,
|
||||
}
|
||||
}
|
||||
|
||||
@ -148,7 +148,7 @@ func (s *Server[T]) Name() string {
|
||||
}
|
||||
|
||||
func (s *Server[T]) Config() any {
|
||||
if s.config == nil || s.config == (&Config{}) {
|
||||
if s.config == nil || s.config.Address == "" {
|
||||
cfg := DefaultConfig()
|
||||
// overwrite the default config with the provided options
|
||||
for _, opt := range s.cfgOptions {
|
||||
@ -163,6 +163,7 @@ func (s *Server[T]) Config() any {
|
||||
|
||||
func (s *Server[T]) Start(ctx context.Context) error {
|
||||
if !s.config.Enable {
|
||||
s.logger.Info(fmt.Sprintf("%s server is disabled via config", s.Name()))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -171,24 +172,12 @@ func (s *Server[T]) Start(ctx context.Context) error {
|
||||
return fmt.Errorf("failed to listen on address %s: %w", s.config.Address, err)
|
||||
}
|
||||
|
||||
errCh := make(chan error)
|
||||
|
||||
// Start the gRPC in an external goroutine as Serve is blocking and will return
|
||||
// an error upon failure, which we'll send on the error channel that will be
|
||||
// consumed by the for block below.
|
||||
go func() {
|
||||
s.logger.Info("starting gRPC server...", "address", s.config.Address)
|
||||
errCh <- s.grpcSrv.Serve(listener)
|
||||
}()
|
||||
|
||||
// Start a blocking select to wait for an indication to stop the server or that
|
||||
// the server failed to start properly.
|
||||
err = <-errCh
|
||||
if err != nil {
|
||||
s.logger.Error("failed to start gRPC server", "err", err)
|
||||
s.logger.Info("starting gRPC server...", "address", s.config.Address)
|
||||
if err := s.grpcSrv.Serve(listener); err != nil {
|
||||
return fmt.Errorf("failed to start gRPC server: %w", err)
|
||||
}
|
||||
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server[T]) Stop(ctx context.Context) error {
|
||||
@ -198,6 +187,10 @@ func (s *Server[T]) Stop(ctx context.Context) error {
|
||||
|
||||
s.logger.Info("stopping gRPC server...", "address", s.config.Address)
|
||||
s.grpcSrv.GracefulStop()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetGRPCServer returns the underlying gRPC server.
|
||||
func (s *Server[T]) GetGRPCServer() *grpc.Server {
|
||||
return s.grpcSrv
|
||||
}
|
||||
|
||||
@ -2,13 +2,17 @@ package grpcgateway
|
||||
|
||||
func DefaultConfig() *Config {
|
||||
return &Config{
|
||||
Enable: true,
|
||||
Enable: true,
|
||||
Address: "localhost:1317",
|
||||
}
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
// Enable defines if the gRPC-gateway should be enabled.
|
||||
Enable bool `mapstructure:"enable" toml:"enable" comment:"Enable defines if the gRPC-gateway should be enabled."`
|
||||
|
||||
// Address defines the address the gRPC-gateway server binds to.
|
||||
Address string `mapstructure:"address" toml:"address" comment:"Address defines the address the gRPC-gateway server binds to."`
|
||||
}
|
||||
|
||||
type CfgOption func(*Config)
|
||||
|
||||
@ -8,7 +8,6 @@ import (
|
||||
|
||||
gateway "github.com/cosmos/gogogateway"
|
||||
"github.com/cosmos/gogoproto/jsonpb"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/grpc-ecosystem/grpc-gateway/runtime"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
@ -17,26 +16,25 @@ import (
|
||||
serverv2 "cosmossdk.io/server/v2"
|
||||
)
|
||||
|
||||
var _ serverv2.ServerComponent[transaction.Tx] = (*GRPCGatewayServer[transaction.Tx])(nil)
|
||||
|
||||
const (
|
||||
ServerName = "grpc-gateway"
|
||||
|
||||
// GRPCBlockHeightHeader is the gRPC header for block height.
|
||||
GRPCBlockHeightHeader = "x-cosmos-block-height"
|
||||
var (
|
||||
_ serverv2.ServerComponent[transaction.Tx] = (*Server[transaction.Tx])(nil)
|
||||
_ serverv2.HasConfig = (*Server[transaction.Tx])(nil)
|
||||
)
|
||||
|
||||
type GRPCGatewayServer[T transaction.Tx] struct {
|
||||
const ServerName = "grpc-gateway"
|
||||
|
||||
type Server[T transaction.Tx] struct {
|
||||
logger log.Logger
|
||||
config *Config
|
||||
cfgOptions []CfgOption
|
||||
|
||||
GRPCSrv *grpc.Server
|
||||
GRPCGatewayRouter *runtime.ServeMux
|
||||
server *http.Server
|
||||
gRPCSrv *grpc.Server
|
||||
gRPCGatewayRouter *runtime.ServeMux
|
||||
}
|
||||
|
||||
// New creates a new gRPC-gateway server.
|
||||
func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptions ...CfgOption) *GRPCGatewayServer[T] {
|
||||
func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptions ...CfgOption) *Server[T] {
|
||||
// The default JSON marshaller used by the gRPC-Gateway is unable to marshal non-nullable non-scalar fields.
|
||||
// Using the gogo/gateway package with the gRPC-Gateway WithMarshaler option fixes the scalar field marshaling issue.
|
||||
marshalerOption := &gateway.JSONPb{
|
||||
@ -46,9 +44,9 @@ func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptio
|
||||
AnyResolver: ir,
|
||||
}
|
||||
|
||||
return &GRPCGatewayServer[T]{
|
||||
GRPCSrv: grpcSrv,
|
||||
GRPCGatewayRouter: runtime.NewServeMux(
|
||||
return &Server[T]{
|
||||
gRPCSrv: grpcSrv,
|
||||
gRPCGatewayRouter: runtime.NewServeMux(
|
||||
// Custom marshaler option is required for gogo proto
|
||||
runtime.WithMarshalerOption(runtime.MIMEWildcard, marshalerOption),
|
||||
|
||||
@ -64,12 +62,12 @@ func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptio
|
||||
}
|
||||
}
|
||||
|
||||
func (g *GRPCGatewayServer[T]) Name() string {
|
||||
func (s *Server[T]) Name() string {
|
||||
return ServerName
|
||||
}
|
||||
|
||||
func (s *GRPCGatewayServer[T]) Config() any {
|
||||
if s.config == nil || s.config == (&Config{}) {
|
||||
func (s *Server[T]) Config() any {
|
||||
if s.config == nil || s.config.Address == "" {
|
||||
cfg := DefaultConfig()
|
||||
// overwrite the default config with the provided options
|
||||
for _, opt := range s.cfgOptions {
|
||||
@ -82,7 +80,7 @@ func (s *GRPCGatewayServer[T]) Config() any {
|
||||
return s.config
|
||||
}
|
||||
|
||||
func (s *GRPCGatewayServer[T]) Init(appI serverv2.AppI[transaction.Tx], cfg map[string]any, logger log.Logger) error {
|
||||
func (s *Server[T]) Init(appI serverv2.AppI[transaction.Tx], cfg map[string]any, logger log.Logger) error {
|
||||
serverCfg := s.Config().(*Config)
|
||||
if len(cfg) > 0 {
|
||||
if err := serverv2.UnmarshalSubConfig(cfg, s.Name(), &serverCfg); err != nil {
|
||||
@ -90,42 +88,43 @@ func (s *GRPCGatewayServer[T]) Init(appI serverv2.AppI[transaction.Tx], cfg map[
|
||||
}
|
||||
}
|
||||
|
||||
// Register the gRPC-Gateway server.
|
||||
// appI.RegisterGRPCGatewayRoutes(s.GRPCGatewayRouter, s.GRPCSrv)
|
||||
// TODO: register the gRPC-Gateway routes
|
||||
|
||||
s.logger = logger
|
||||
s.logger = logger.With(log.ModuleKey, s.Name())
|
||||
s.config = serverCfg
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *GRPCGatewayServer[T]) Start(ctx context.Context) error {
|
||||
func (s *Server[T]) Start(ctx context.Context) error {
|
||||
if !s.config.Enable {
|
||||
s.logger.Info(fmt.Sprintf("%s server is disabled via config", s.Name()))
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO start a normal Go http server (and do not leverage comet's like https://github.com/cosmos/cosmos-sdk/blob/9df6019de6ee7999fe9864bac836deb2f36dd44a/server/api/server.go#L98)
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/", s.gRPCGatewayRouter)
|
||||
|
||||
return nil
|
||||
}
|
||||
s.server = &http.Server{
|
||||
Addr: s.config.Address,
|
||||
Handler: mux,
|
||||
}
|
||||
|
||||
func (s *GRPCGatewayServer[T]) Stop(ctx context.Context) error {
|
||||
if !s.config.Enable {
|
||||
return nil
|
||||
s.logger.Info("starting gRPC-Gateway server...", "address", s.config.Address)
|
||||
if err := s.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
return fmt.Errorf("failed to start gRPC-Gateway server: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Register implements registers a grpc-gateway server
|
||||
func (s *GRPCGatewayServer[T]) Register(r mux.Router) error {
|
||||
// configure grpc-gatway server
|
||||
r.PathPrefix("/").Handler(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
// Fall back to grpc gateway server.
|
||||
s.GRPCGatewayRouter.ServeHTTP(w, req)
|
||||
}))
|
||||
func (s *Server[T]) Stop(ctx context.Context) error {
|
||||
if !s.config.Enable {
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
s.logger.Info("stopping gRPC-Gateway server...", "address", s.config.Address)
|
||||
return s.server.Shutdown(ctx)
|
||||
}
|
||||
|
||||
// CustomGRPCHeaderMatcher for mapping request headers to
|
||||
@ -134,6 +133,9 @@ func (s *GRPCGatewayServer[T]) Register(r mux.Router) error {
|
||||
// gRPC metadata after removing prefix 'Grpc-Metadata-'. We can use this
|
||||
// CustomGRPCHeaderMatcher if headers don't start with `Grpc-Metadata-`
|
||||
func CustomGRPCHeaderMatcher(key string) (string, bool) {
|
||||
// GRPCBlockHeightHeader is the gRPC header for block height.
|
||||
const GRPCBlockHeightHeader = "x-cosmos-block-height"
|
||||
|
||||
switch strings.ToLower(key) {
|
||||
case GRPCBlockHeightHeader:
|
||||
return GRPCBlockHeightHeader, true
|
||||
|
||||
@ -1,13 +1,32 @@
|
||||
package telemetry
|
||||
|
||||
type Config struct {
|
||||
// Prefixed with keys to separate services
|
||||
ServiceName string `mapstructure:"service-name" toml:"service-name" comment:"Prefixed with keys to separate services."`
|
||||
func DefaultConfig() *Config {
|
||||
return &Config{
|
||||
Enable: true,
|
||||
Address: "localhost:1318",
|
||||
ServiceName: "",
|
||||
EnableHostname: false,
|
||||
EnableHostnameLabel: false,
|
||||
EnableServiceLabel: false,
|
||||
PrometheusRetentionTime: 0,
|
||||
GlobalLabels: nil,
|
||||
MetricsSink: "",
|
||||
StatsdAddr: "",
|
||||
DatadogHostname: "",
|
||||
}
|
||||
}
|
||||
|
||||
// Enabled enables the application telemetry functionality. When enabled,
|
||||
type Config struct {
|
||||
// Enable enables the application telemetry functionality. When enabled,
|
||||
// an in-memory sink is also enabled by default. Operators may also enabled
|
||||
// other sinks such as Prometheus.
|
||||
Enabled bool `mapstructure:"enabled" toml:"enabled" comment:"Enabled enables the application telemetry functionality. When enabled, an in-memory sink is also enabled by default. Operators may also enabled other sinks such as Prometheus."`
|
||||
Enable bool `mapstructure:"enable" toml:"enable" comment:"Enable enables the application telemetry functionality. When enabled, an in-memory sink is also enabled by default. Operators may also enabled other sinks such as Prometheus."`
|
||||
|
||||
// Address defines the API server to listen on
|
||||
Address string `mapstructure:"address" toml:"address" comment:"Address defines the metrics server address to bind to."`
|
||||
|
||||
// Prefixed with keys to separate services
|
||||
ServiceName string `mapstructure:"service-name" toml:"service-name" comment:"Prefixed with keys to separate services."`
|
||||
|
||||
// Enable prefixing gauge values with hostname
|
||||
EnableHostname bool `mapstructure:"enable-hostname" toml:"enable-hostname" comment:"Enable prefixing gauge values with hostname."`
|
||||
|
||||
@ -58,11 +58,7 @@ type GatherResponse struct {
|
||||
}
|
||||
|
||||
// New creates a new instance of Metrics
|
||||
func New(cfg Config) (_ *Metrics, rerr error) {
|
||||
if !cfg.Enabled {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func NewMetrics(cfg *Config) (*Metrics, error) {
|
||||
if numGlobalLabels := len(cfg.GlobalLabels); numGlobalLabels > 0 {
|
||||
parsedGlobalLabels := make([]metrics.Label, numGlobalLabels)
|
||||
for i, gl := range cfg.GlobalLabels {
|
||||
@ -89,12 +85,11 @@ func New(cfg Config) (_ *Metrics, rerr error) {
|
||||
sink = memSink
|
||||
inMemSig := metrics.DefaultInmemSignal(memSink)
|
||||
defer func() {
|
||||
if rerr != nil {
|
||||
if err != nil {
|
||||
inMemSig.Stop()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -1,47 +1,126 @@
|
||||
package telemetry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"cosmossdk.io/core/transaction"
|
||||
"cosmossdk.io/log"
|
||||
serverv2 "cosmossdk.io/server/v2"
|
||||
)
|
||||
|
||||
func RegisterMetrics(r mux.Router, cfg Config) (*Metrics, error) {
|
||||
m, err := New(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
var (
|
||||
_ serverv2.ServerComponent[transaction.Tx] = (*Server[transaction.Tx])(nil)
|
||||
_ serverv2.HasConfig = (*Server[transaction.Tx])(nil)
|
||||
)
|
||||
|
||||
const ServerName = "telemetry"
|
||||
|
||||
type Server[T transaction.Tx] struct {
|
||||
config *Config
|
||||
logger log.Logger
|
||||
server *http.Server
|
||||
metrics *Metrics
|
||||
}
|
||||
|
||||
// New creates a new telemetry server.
|
||||
func New[T transaction.Tx]() *Server[T] {
|
||||
return &Server[T]{}
|
||||
}
|
||||
|
||||
// Name returns the server name.
|
||||
func (s *Server[T]) Name() string {
|
||||
return ServerName
|
||||
}
|
||||
|
||||
func (s *Server[T]) Config() any {
|
||||
if s.config == nil || s.config.Address == "" {
|
||||
return DefaultConfig()
|
||||
}
|
||||
|
||||
metricsHandler := func(w http.ResponseWriter, r *http.Request) {
|
||||
format := strings.TrimSpace(r.FormValue("format"))
|
||||
return s.config
|
||||
}
|
||||
|
||||
gr, err := m.Gather(format)
|
||||
// Init implements serverv2.ServerComponent.
|
||||
func (s *Server[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logger log.Logger) error {
|
||||
serverCfg := s.Config().(*Config)
|
||||
if len(cfg) > 0 {
|
||||
if err := serverv2.UnmarshalSubConfig(cfg, s.Name(), &serverCfg); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal config: %w", err)
|
||||
}
|
||||
}
|
||||
s.config = serverCfg
|
||||
s.logger = logger.With(log.ModuleKey, s.Name())
|
||||
|
||||
metrics, err := NewMetrics(s.config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize metrics: %w", err)
|
||||
}
|
||||
s.metrics = metrics
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server[T]) Start(ctx context.Context) error {
|
||||
if !s.config.Enable {
|
||||
s.logger.Info(fmt.Sprintf("%s server is disabled via config", s.Name()))
|
||||
return nil
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/", s.metricsHandler)
|
||||
// keeping /metrics for backwards compatibility
|
||||
mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
|
||||
http.Redirect(w, r, "/", http.StatusMovedPermanently)
|
||||
})
|
||||
|
||||
s.server = &http.Server{
|
||||
Addr: s.config.Address,
|
||||
Handler: mux,
|
||||
}
|
||||
|
||||
s.logger.Info("starting telemetry server...", "address", s.config.Address)
|
||||
if err := s.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
return fmt.Errorf("failed to start telemetry server: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server[T]) Stop(ctx context.Context) error {
|
||||
if !s.config.Enable || s.server == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.logger.Info("stopping telemetry server...", "address", s.config.Address)
|
||||
return s.server.Shutdown(ctx)
|
||||
}
|
||||
|
||||
func (s *Server[T]) metricsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
format := strings.TrimSpace(r.FormValue("format"))
|
||||
|
||||
// errorResponse defines the attributes of a JSON error response.
|
||||
type errorResponse struct {
|
||||
Code int `json:"code,omitempty"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
gr, err := s.metrics.Gather(format)
|
||||
if err != nil {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
bz, err := json.Marshal(errorResponse{Code: 400, Error: fmt.Sprintf("failed to gather metrics: %s", err)})
|
||||
if err != nil {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
bz, err := json.Marshal(errorResponse{Code: 400, Error: fmt.Sprintf("failed to gather metrics: %s", err)})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
_, _ = w.Write(bz)
|
||||
|
||||
return
|
||||
}
|
||||
_, _ = w.Write(bz)
|
||||
|
||||
w.Header().Set("Content-Type", gr.ContentType)
|
||||
_, _ = w.Write(gr.Metrics)
|
||||
return
|
||||
}
|
||||
|
||||
r.HandleFunc("/metrics", metricsHandler).Methods("GET")
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// errorResponse defines the attributes of a JSON error response.
|
||||
type errorResponse struct {
|
||||
Code int `json:"code,omitempty"`
|
||||
Error string `json:"error"`
|
||||
w.Header().Set("Content-Type", gr.ContentType)
|
||||
_, _ = w.Write(gr.Metrics)
|
||||
}
|
||||
|
||||
@ -271,7 +271,7 @@ func (s *CometBFTServer[T]) CLICommands() serverv2.CLIConfig {
|
||||
|
||||
// Config returns the (app.toml) server configuration.
|
||||
func (s *CometBFTServer[T]) Config() any {
|
||||
if s.config.AppTomlConfig == nil || s.config.AppTomlConfig == (&AppTomlConfig{}) {
|
||||
if s.config.AppTomlConfig == nil || s.config.AppTomlConfig.Address == "" {
|
||||
cfg := &Config{AppTomlConfig: DefaultAppTomlConfig()}
|
||||
// overwrite the default config with the provided options
|
||||
for _, opt := range s.cfgOptions {
|
||||
|
||||
@ -22,7 +22,6 @@ require (
|
||||
github.com/cosmos/gogogateway v1.2.0
|
||||
github.com/cosmos/gogoproto v1.7.0
|
||||
github.com/golang/protobuf v1.5.4
|
||||
github.com/gorilla/mux v1.8.1
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0
|
||||
github.com/hashicorp/go-hclog v1.6.3
|
||||
github.com/hashicorp/go-metrics v0.5.3
|
||||
|
||||
@ -150,8 +150,6 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
|
||||
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
|
||||
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
|
||||
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
|
||||
|
||||
@ -17,7 +17,7 @@ import (
|
||||
)
|
||||
|
||||
// QueryBlockResultsCmd implements the default command for a BlockResults query.
|
||||
func (s *StoreComponent[T]) PrunesCmd() *cobra.Command {
|
||||
func (s *Server[T]) PrunesCmd() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "prune [pruning-method]",
|
||||
Short: "Prune app history states by keeping the recent heights and deleting old heights",
|
||||
|
||||
@ -12,49 +12,49 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
_ serverv2.ServerComponent[transaction.Tx] = (*StoreComponent[transaction.Tx])(nil)
|
||||
_ serverv2.HasConfig = (*StoreComponent[transaction.Tx])(nil)
|
||||
_ serverv2.HasCLICommands = (*StoreComponent[transaction.Tx])(nil)
|
||||
_ serverv2.ServerComponent[transaction.Tx] = (*Server[transaction.Tx])(nil)
|
||||
_ serverv2.HasConfig = (*Server[transaction.Tx])(nil)
|
||||
_ serverv2.HasCLICommands = (*Server[transaction.Tx])(nil)
|
||||
)
|
||||
|
||||
const ServerName = "store"
|
||||
|
||||
// StoreComponent manages store config
|
||||
// and contains prune & snapshot commands
|
||||
type StoreComponent[T transaction.Tx] struct {
|
||||
// StoreComponent manages store config and contains prune & snapshot commands
|
||||
type Server[T transaction.Tx] struct {
|
||||
config *Config
|
||||
// saving appCreator for only RestoreSnapshotCmd
|
||||
appCreator serverv2.AppCreator[T]
|
||||
}
|
||||
|
||||
func New[T transaction.Tx](appCreator serverv2.AppCreator[T]) *StoreComponent[T] {
|
||||
return &StoreComponent[T]{appCreator: appCreator}
|
||||
func New[T transaction.Tx](appCreator serverv2.AppCreator[T]) *Server[T] {
|
||||
return &Server[T]{appCreator: appCreator}
|
||||
}
|
||||
|
||||
func (s *StoreComponent[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logger log.Logger) error {
|
||||
serverCfg := DefaultConfig()
|
||||
func (s *Server[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logger log.Logger) error {
|
||||
serverCfg := s.Config().(*Config)
|
||||
if len(cfg) > 0 {
|
||||
if err := serverv2.UnmarshalSubConfig(cfg, s.Name(), &serverCfg); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal config: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
s.config = serverCfg
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *StoreComponent[T]) Name() string {
|
||||
func (s *Server[T]) Name() string {
|
||||
return ServerName
|
||||
}
|
||||
|
||||
func (s *StoreComponent[T]) Start(ctx context.Context) error {
|
||||
func (s *Server[T]) Start(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *StoreComponent[T]) Stop(ctx context.Context) error {
|
||||
func (s *Server[T]) Stop(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *StoreComponent[T]) CLICommands() serverv2.CLIConfig {
|
||||
func (s *Server[T]) CLICommands() serverv2.CLIConfig {
|
||||
return serverv2.CLIConfig{
|
||||
Commands: []*cobra.Command{
|
||||
s.PrunesCmd(),
|
||||
@ -68,10 +68,10 @@ func (s *StoreComponent[T]) CLICommands() serverv2.CLIConfig {
|
||||
}
|
||||
}
|
||||
|
||||
func (g *StoreComponent[T]) Config() any {
|
||||
if g.config == nil || g.config == (&Config{}) {
|
||||
func (s *Server[T]) Config() any {
|
||||
if s.config == nil || s.config.AppDBBackend == "" {
|
||||
return DefaultConfig()
|
||||
}
|
||||
|
||||
return g.config
|
||||
return s.config
|
||||
}
|
||||
|
||||
@ -25,7 +25,7 @@ import (
|
||||
const SnapshotFileName = "_snapshot"
|
||||
|
||||
// QueryBlockResultsCmd implements the default command for a BlockResults query.
|
||||
func (s *StoreComponent[T]) ExportSnapshotCmd() *cobra.Command {
|
||||
func (s *Server[T]) ExportSnapshotCmd() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "export",
|
||||
Short: "Export app state to snapshot store",
|
||||
@ -76,7 +76,7 @@ func (s *StoreComponent[T]) ExportSnapshotCmd() *cobra.Command {
|
||||
}
|
||||
|
||||
// RestoreSnapshotCmd returns a command to restore a snapshot
|
||||
func (s *StoreComponent[T]) RestoreSnapshotCmd(newApp serverv2.AppCreator[T]) *cobra.Command {
|
||||
func (s *Server[T]) RestoreSnapshotCmd(newApp serverv2.AppCreator[T]) *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "restore <height> <format>",
|
||||
Short: "Restore app state from local snapshot",
|
||||
@ -113,7 +113,7 @@ func (s *StoreComponent[T]) RestoreSnapshotCmd(newApp serverv2.AppCreator[T]) *c
|
||||
}
|
||||
|
||||
// ListSnapshotsCmd returns the command to list local snapshots
|
||||
func (s *StoreComponent[T]) ListSnapshotsCmd() *cobra.Command {
|
||||
func (s *Server[T]) ListSnapshotsCmd() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "list",
|
||||
Short: "List local snapshots",
|
||||
@ -139,7 +139,7 @@ func (s *StoreComponent[T]) ListSnapshotsCmd() *cobra.Command {
|
||||
}
|
||||
|
||||
// DeleteSnapshotCmd returns the command to delete a local snapshot
|
||||
func (s *StoreComponent[T]) DeleteSnapshotCmd() *cobra.Command {
|
||||
func (s *Server[T]) DeleteSnapshotCmd() *cobra.Command {
|
||||
return &cobra.Command{
|
||||
Use: "delete <height> <format>",
|
||||
Short: "Delete a local snapshot",
|
||||
@ -167,7 +167,7 @@ func (s *StoreComponent[T]) DeleteSnapshotCmd() *cobra.Command {
|
||||
}
|
||||
|
||||
// DumpArchiveCmd returns a command to dump the snapshot as portable archive format
|
||||
func (s *StoreComponent[T]) DumpArchiveCmd() *cobra.Command {
|
||||
func (s *Server[T]) DumpArchiveCmd() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "dump <height> <format>",
|
||||
Short: "Dump the snapshot as portable archive format",
|
||||
@ -260,7 +260,7 @@ func (s *StoreComponent[T]) DumpArchiveCmd() *cobra.Command {
|
||||
}
|
||||
|
||||
// LoadArchiveCmd load a portable archive format snapshot into snapshot store
|
||||
func (s *StoreComponent[T]) LoadArchiveCmd() *cobra.Command {
|
||||
func (s *Server[T]) LoadArchiveCmd() *cobra.Command {
|
||||
return &cobra.Command{
|
||||
Use: "load <archive-file>",
|
||||
Short: "Load a snapshot archive file (.tar.gz) into snapshot store",
|
||||
|
||||
@ -15,6 +15,7 @@ import (
|
||||
runtimev2 "cosmossdk.io/runtime/v2"
|
||||
serverv2 "cosmossdk.io/server/v2"
|
||||
"cosmossdk.io/server/v2/api/grpc"
|
||||
"cosmossdk.io/server/v2/api/telemetry"
|
||||
"cosmossdk.io/server/v2/cometbft"
|
||||
"cosmossdk.io/server/v2/store"
|
||||
"cosmossdk.io/simapp/v2"
|
||||
@ -81,6 +82,7 @@ func initRootCmd[T transaction.Tx](
|
||||
),
|
||||
grpc.New[T](),
|
||||
store.New[T](newApp),
|
||||
telemetry.New[T](),
|
||||
); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user