From f4e2ce0ea47971bbb8b69d8581609a4a22903a8b Mon Sep 17 00:00:00 2001 From: mmsqe Date: Wed, 19 Nov 2025 23:46:50 +0800 Subject: [PATCH] feat: support for multi gRPC query clients serve with old binary (#25565) Co-authored-by: Alex | Cosmos Labs --- CHANGELOG.md | 1 + client/context.go | 17 +++ client/grpc_query.go | 97 ++++++++++++++--- client/grpc_query_test.go | 194 +++++++++++++++++++++++++++++++++ server/config/config.go | 40 +++++++ server/config/config_test.go | 205 +++++++++++++++++++++++++++++++++++ server/config/toml.go | 7 ++ server/grpc/server.go | 68 +++++++++++- server/start.go | 36 ++++-- testutil/network/util.go | 7 +- 10 files changed, 644 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 274b1d72f0..2b2af6210e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * (crypto) [#24861](https://github.com/cosmos/cosmos-sdk/pull/24861) add `PubKeyFromCometTypeAndBytes` helper function to convert from `comet/v2` PubKeys to the `cryptotypes.Pubkey` interface. * (abci_utils) [#25008](https://github.com/cosmos/cosmos-sdk/pull/25008) add the ability to assign a custom signer extraction adapter in `DefaultProposalHandler`. * (crypto/ledger) [#25435](https://github.com/cosmos/cosmos-sdk/pull/25435) Add SetDERConversion to reset skipDERConversion and App name for ledger. +* (gRPC) [#25565](https://github.com/cosmos/cosmos-sdk/pull/25565) Support for multi gRPC query clients serve with historical binaries to serve proper historical state. ### Improvements diff --git a/client/context.go b/client/context.go index 4779d8a15b..d5a23db738 100644 --- a/client/context.go +++ b/client/context.go @@ -30,6 +30,7 @@ type Context struct { FromAddress sdk.AccAddress Client CometRPC GRPCClient *grpc.ClientConn + GRPCConnProvider *GRPCConnProvider ChainID string Codec codec.Codec InterfaceRegistry codectypes.InterfaceRegistry @@ -155,6 +156,22 @@ func (ctx Context) WithGRPCClient(grpcClient *grpc.ClientConn) Context { return ctx } +// WithGRPCConnProvider returns a copy of the context with an updated GRPCConnProvider. +func (ctx Context) WithGRPCConnProvider(provider *GRPCConnProvider) Context { + ctx.GRPCConnProvider = provider + return ctx +} + +// GetGRPCConn returns the appropriate gRPC connection for the given height. +// If GRPCConnProvider is set, it uses it to determine the connection. +// Otherwise, it falls back to the default GRPCClient. +func (ctx Context) GetGRPCConn(height int64) *grpc.ClientConn { + if ctx.GRPCConnProvider != nil { + return ctx.GRPCConnProvider.GetGRPCConn(height) + } + return ctx.GRPCClient +} + // WithUseLedger returns a copy of the context with an updated UseLedger flag. func (ctx Context) WithUseLedger(useLedger bool) Context { ctx.UseLedger = useLedger diff --git a/client/grpc_query.go b/client/grpc_query.go index d48f8231f3..50955b44b3 100644 --- a/client/grpc_query.go +++ b/client/grpc_query.go @@ -16,11 +16,56 @@ import ( "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/codec/types" + "github.com/cosmos/cosmos-sdk/server/config" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" grpctypes "github.com/cosmos/cosmos-sdk/types/grpc" "github.com/cosmos/cosmos-sdk/types/tx" ) +// GRPCConn provides a method to get the appropriate gRPC connection based on block height. +type GRPCConn interface { + GetGRPCConn(height int64) *grpc.ClientConn +} + +// GRPCConnProvider manages gRPC connections with optional historical connections for historical queries. +type GRPCConnProvider struct { + // DefaultConn is the primary gRPC connection + DefaultConn *grpc.ClientConn + // HistoricalConns maps block ranges to historical gRPC connections for routing historical queries + HistoricalConns config.HistoricalGRPCConnections +} + +// NewGRPCConnProvider creates a new GRPCConnProvider with the given connections. +func NewGRPCConnProvider(defaultConn *grpc.ClientConn, historicalConns config.HistoricalGRPCConnections) *GRPCConnProvider { + if historicalConns == nil { + historicalConns = make(config.HistoricalGRPCConnections) + } + return &GRPCConnProvider{ + DefaultConn: defaultConn, + HistoricalConns: historicalConns, + } +} + +// GetGRPCConn returns the appropriate gRPC connection based on the block height. +// For height <= 0 (latest block), it returns the default connection. +// For positive heights, it checks if a historical connection exists for that height range. +func (g *GRPCConnProvider) GetGRPCConn(height int64) *grpc.ClientConn { + // height = 0 means latest block, use the default connection + if height <= 0 { + return g.DefaultConn + } + + // Check if there's a historical connection for this height + for blockRange, conn := range g.HistoricalConns { + if int64(blockRange[0]) <= height && int64(blockRange[1]) >= height { + return conn + } + } + + // Default to the primary connection if no historical matches + return g.DefaultConn +} + var _ gogogrpc.ClientConn = Context{} // fallBackCodec is used by Context in case Codec is not set. @@ -28,6 +73,27 @@ var _ gogogrpc.ClientConn = Context{} // interfaces in their types. var fallBackCodec = codec.NewProtoCodec(types.NewInterfaceRegistry()) +// GetHeightFromMetadata extracts the block height from gRPC metadata in the context. +// Returns 0 if no valid height is found. +func GetHeightFromMetadata(grpcCtx gocontext.Context) int64 { + md, ok := metadata.FromOutgoingContext(grpcCtx) + if !ok { + return 0 + } + heights := md.Get(grpctypes.GRPCBlockHeightHeader) + if len(heights) == 0 { + return 0 + } + height, err := strconv.ParseInt(heights[0], 10, 64) + if err != nil { + return 0 + } + if height < 0 { + return 0 + } + return height +} + // Invoke implements the grpc ClientConn.Invoke method func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply any, opts ...grpc.CallOption) (err error) { // Two things can happen here: @@ -58,7 +124,16 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply a if ctx.GRPCClient != nil { // Case 2-1. Invoke grpc. - return ctx.GRPCClient.Invoke(grpcCtx, method, req, reply, opts...) + grpcConn := ctx.GRPCClient + if ctx.GRPCConnProvider != nil { + height := ctx.Height + if height <= 0 { + height = GetHeightFromMetadata(grpcCtx) + } + + grpcConn = ctx.GRPCConnProvider.GetGRPCConn(height) + } + return grpcConn.Invoke(grpcCtx, method, req, reply, opts...) } // Case 2-2. Querying state via abci query. @@ -68,18 +143,14 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply a } // parse height header - md, _ := metadata.FromOutgoingContext(grpcCtx) - if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 { - height, err := strconv.ParseInt(heights[0], 10, 64) - if err != nil { - return err - } - if height < 0 { - return errorsmod.Wrapf( - sdkerrors.ErrInvalidRequest, - "client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader) - } + height := GetHeightFromMetadata(grpcCtx) + if height < 0 { + return errorsmod.Wrapf( + sdkerrors.ErrInvalidRequest, + "client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader) + } + if height > 0 { ctx = ctx.WithHeight(height) } @@ -104,7 +175,7 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply a // We then parse all the call options, if the call option is a // HeaderCallOption, then we manually set the value of that header to the // metadata. - md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10)) + md := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10)) for _, callOpt := range opts { header, ok := callOpt.(grpc.HeaderCallOption) if !ok { diff --git a/client/grpc_query_test.go b/client/grpc_query_test.go index f409103c73..84bcfaacd8 100644 --- a/client/grpc_query_test.go +++ b/client/grpc_query_test.go @@ -7,8 +7,10 @@ import ( abci "github.com/cometbft/cometbft/abci/types" cmtjson "github.com/cometbft/cometbft/libs/json" dbm "github.com/cosmos/cosmos-db" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" "cosmossdk.io/depinject" @@ -16,13 +18,16 @@ import ( "cosmossdk.io/math" "github.com/cosmos/cosmos-sdk/baseapp" + "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/codec" codectypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" "github.com/cosmos/cosmos-sdk/runtime" + "github.com/cosmos/cosmos-sdk/server/config" "github.com/cosmos/cosmos-sdk/testutil/sims" "github.com/cosmos/cosmos-sdk/testutil/testdata" sdk "github.com/cosmos/cosmos-sdk/types" + grpctypes "github.com/cosmos/cosmos-sdk/types/grpc" "github.com/cosmos/cosmos-sdk/x/auth/testutil" authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" bankkeeper "github.com/cosmos/cosmos-sdk/x/bank/keeper" @@ -136,3 +141,192 @@ func (s *IntegrationTestSuite) TestGRPCQuery() { func TestIntegrationTestSuite(t *testing.T) { suite.Run(t, new(IntegrationTestSuite)) } + +func (s *IntegrationTestSuite) TestGetGRPCConnWithContext() { + defaultConn, err := grpc.NewClient("localhost:9090", + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + s.Require().NoError(err) + defer defaultConn.Close() + + historicalConn, err := grpc.NewClient("localhost:9091", + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + s.Require().NoError(err) + defer historicalConn.Close() + + historicalConns := config.HistoricalGRPCConnections{ + config.BlockRange{100, 500}: historicalConn, + } + provider := client.NewGRPCConnProvider(defaultConn, historicalConns) + testCases := []struct { + name string + height int64 + setupCtx func() client.Context + expectedConn *grpc.ClientConn + }{ + { + name: "context with GRPCConnProvider and historical height", + height: 300, + setupCtx: func() client.Context { + return client.Context{}. + WithCodec(s.cdc). + WithGRPCClient(defaultConn). + WithGRPCConnProvider(provider). + WithHeight(300) + }, + expectedConn: historicalConn, + }, + { + name: "context with GRPCConnProvider and latest height", + height: 0, + setupCtx: func() client.Context { + return client.Context{}. + WithCodec(s.cdc). + WithGRPCClient(defaultConn). + WithGRPCConnProvider(provider). + WithHeight(0) + }, + expectedConn: defaultConn, + }, + { + name: "context without GRPCConnProvider", + height: 300, + setupCtx: func() client.Context { + return client.Context{}. + WithCodec(s.cdc). + WithGRPCClient(defaultConn). + WithHeight(300) + }, + expectedConn: defaultConn, + }, + { + name: "context with nil historical connections map", + height: 100, + setupCtx: func() client.Context { + nilProvider := client.NewGRPCConnProvider(defaultConn, nil) + return client.Context{}. + WithCodec(s.cdc). + WithGRPCClient(defaultConn). + WithGRPCConnProvider(nilProvider). + WithHeight(100) + }, + expectedConn: defaultConn, + }, + { + name: "context with empty historical connections map", + height: 100, + setupCtx: func() client.Context { + emptyProvider := client.NewGRPCConnProvider(defaultConn, config.HistoricalGRPCConnections{}) + return client.Context{}. + WithCodec(s.cdc). + WithGRPCClient(defaultConn). + WithGRPCConnProvider(emptyProvider). + WithHeight(100) + }, + expectedConn: defaultConn, + }, + } + + for _, tc := range testCases { + s.Run(tc.name, func() { + ctx := tc.setupCtx() + var actualConn *grpc.ClientConn + if ctx.GRPCConnProvider != nil { + actualConn = ctx.GRPCConnProvider.GetGRPCConn(ctx.Height) + } else { + actualConn = ctx.GRPCClient + } + s.Require().Equal(tc.expectedConn, actualConn) + }) + } +} + +func TestGetHeightFromMetadata(t *testing.T) { + tests := []struct { + name string + setupContext func() context.Context + expectedHeight int64 + }{ + { + name: "valid height in metadata", + setupContext: func() context.Context { + md := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, "12345") + return metadata.NewOutgoingContext(context.Background(), md) + }, + expectedHeight: 12345, + }, + { + name: "zero height in metadata", + setupContext: func() context.Context { + md := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, "0") + return metadata.NewOutgoingContext(context.Background(), md) + }, + expectedHeight: 0, + }, + { + name: "negative height returns zero", + setupContext: func() context.Context { + md := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, "-100") + return metadata.NewOutgoingContext(context.Background(), md) + }, + expectedHeight: 0, + }, + { + name: "no metadata returns zero", + setupContext: context.Background, + expectedHeight: 0, + }, + { + name: "empty height header returns zero", + setupContext: func() context.Context { + md := metadata.New(map[string]string{}) + return metadata.NewOutgoingContext(context.Background(), md) + }, + expectedHeight: 0, + }, + { + name: "invalid height string returns zero", + setupContext: func() context.Context { + md := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, "not-a-number") + return metadata.NewOutgoingContext(context.Background(), md) + }, + expectedHeight: 0, + }, + { + name: "multiple height values uses first", + setupContext: func() context.Context { + md := metadata.Pairs( + grpctypes.GRPCBlockHeightHeader, "100", + grpctypes.GRPCBlockHeightHeader, "200", + ) + return metadata.NewOutgoingContext(context.Background(), md) + }, + expectedHeight: 100, + }, + { + name: "very large height", + setupContext: func() context.Context { + md := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, "9223372036854775807") // max int64 + return metadata.NewOutgoingContext(context.Background(), md) + }, + expectedHeight: 9223372036854775807, + }, + { + name: "height exceeding int64 returns zero", + setupContext: func() context.Context { + md := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, "9223372036854775808") // max int64 + 1 + return metadata.NewOutgoingContext(context.Background(), md) + }, + expectedHeight: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := tt.setupContext() + height := client.GetHeightFromMetadata(ctx) + require.Equal(t, tt.expectedHeight, height) + }) + } +} diff --git a/server/config/config.go b/server/config/config.go index a50893e850..95230d73b7 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -1,10 +1,12 @@ package config import ( + "encoding/json" "fmt" "math" "github.com/spf13/viper" + "google.golang.org/grpc" pruningtypes "cosmossdk.io/store/pruning/types" @@ -124,6 +126,21 @@ type APIConfig struct { // Ref: https://github.com/cosmos/cosmos-sdk/issues/6420 } +// BlockRange represents a range of block heights as [start_block, end_block] (inclusive). +// It is used to map gRPC historical connections to specific block height ranges for routing +// historical queries to appropriate archive nodes. +// +// Example: +// - [0, 1000] represents blocks from genesis (0) through block 1000 +// - [1001, 2000] represents blocks from 1001 through 2000 +// +// Both start and end blocks must be non-negative, and start must be less than or equal to end. +type BlockRange [2]int + +// HistoricalGRPCConnections is a map of block ranges to gRPC client connections +// used for routing requests to different backend nodes based on block height. +type HistoricalGRPCConnections map[BlockRange]*grpc.ClientConn + // GRPCConfig defines configuration for the gRPC server. type GRPCConfig struct { // Enable defines if the gRPC server should be enabled. @@ -142,6 +159,9 @@ type GRPCConfig struct { // SkipCheckHeader defines if the gRPC server should bypass header checking. SkipCheckHeader bool `mapstructure:"skip-check-header"` + + // HistoricalGRPCAddressBlockRange maps block ranges to gRPC addresses for routing historical queries. + HistoricalGRPCAddressBlockRange map[BlockRange]string `mapstructure:"-"` } // GRPCWebConfig defines configuration for the gRPC-web server. @@ -278,6 +298,26 @@ func GetConfig(v *viper.Viper) (Config, error) { if err := v.Unmarshal(conf); err != nil { return Config{}, fmt.Errorf("error extracting app config: %w", err) } + raw := v.GetString("grpc.historical-grpc-address-block-range") + if len(raw) > 0 { + data := make(map[string]BlockRange) + if err := json.Unmarshal([]byte(raw), &data); err != nil { + return Config{}, fmt.Errorf("failed to parse historical-grpc-address-block-range as JSON: %w (value: %s)", err, raw) + } + historicalGRPCAddressBlockRange := make(map[BlockRange]string, len(data)) + for address, blockRange := range data { + if blockRange[0] < 0 || blockRange[1] < 0 { + return Config{}, fmt.Errorf("invalid block range [%d, %d] for address %s: block numbers cannot be negative", + blockRange[0], blockRange[1], address) + } + if blockRange[0] > blockRange[1] { + return Config{}, fmt.Errorf("invalid block range [%d, %d] for address %s: start block must be <= end block", + blockRange[0], blockRange[1], address) + } + historicalGRPCAddressBlockRange[blockRange] = address + } + conf.GRPC.HistoricalGRPCAddressBlockRange = historicalGRPCAddressBlockRange + } return *conf, nil } diff --git a/server/config/config_test.go b/server/config/config_test.go index f516aa826a..07aac5e0fe 100644 --- a/server/config/config_test.go +++ b/server/config/config_test.go @@ -234,3 +234,208 @@ func TestAppConfig(t *testing.T) { require.NoError(t, v.Unmarshal(appCfg)) require.EqualValues(t, appCfg, defAppConfig) } + +func TestGetConfig_HistoricalGRPCAddressBlockRange(t *testing.T) { + tests := []struct { + name string + setupViper func(*viper.Viper) + expectError bool + errorMsg string + validate func(*testing.T, Config) + }{ + { + name: "valid single historical grpc address", + setupViper: func(v *viper.Viper) { + v.Set("grpc.historical-grpc-address-block-range", `{"localhost:9091": [0, 1000]}`) + }, + expectError: false, + validate: func(t *testing.T, cfg Config) { + t.Helper() + require.Len(t, cfg.GRPC.HistoricalGRPCAddressBlockRange, 1) + expectedRange := BlockRange{0, 1000} + address, exists := cfg.GRPC.HistoricalGRPCAddressBlockRange[expectedRange] + require.True(t, exists, "Block range [0, 1000] should exist") + require.Equal(t, "localhost:9091", address) + }, + }, + { + name: "valid multiple historical grpc addresses", + setupViper: func(v *viper.Viper) { + v.Set("grpc.historical-grpc-address-block-range", + `{"localhost:9091": [0, 1000], "localhost:9092": [1001, 2000], "localhost:9093": [2001, 3000]}`) + }, + expectError: false, + validate: func(t *testing.T, cfg Config) { + t.Helper() + require.Len(t, cfg.GRPC.HistoricalGRPCAddressBlockRange, 3) + testCases := []struct { + blockRange BlockRange + address string + }{ + {BlockRange{0, 1000}, "localhost:9091"}, + {BlockRange{1001, 2000}, "localhost:9092"}, + {BlockRange{2001, 3000}, "localhost:9093"}, + } + for _, tc := range testCases { + address, exists := cfg.GRPC.HistoricalGRPCAddressBlockRange[tc.blockRange] + require.True(t, exists, "Block range %v should exist", tc.blockRange) + require.Equal(t, tc.address, address) + } + }, + }, + { + name: "empty configuration", + setupViper: func(v *viper.Viper) { + v.Set("grpc.historical-grpc-address-block-range", "") + }, + expectError: false, + validate: func(t *testing.T, cfg Config) { + t.Helper() + require.Nil(t, cfg.GRPC.HistoricalGRPCAddressBlockRange) + }, + }, + { + name: "no configuration set", + setupViper: func(v *viper.Viper) {}, + expectError: false, + validate: func(t *testing.T, cfg Config) { + t.Helper() + require.Nil(t, cfg.GRPC.HistoricalGRPCAddressBlockRange) + }, + }, + { + name: "invalid JSON format", + setupViper: func(v *viper.Viper) { + // missing closing brace + v.Set("grpc.historical-grpc-address-block-range", `{"localhost:9091": [0, 1000]`) + }, + expectError: true, + errorMsg: "failed to parse historical-grpc-address-block-range as JSON", + }, + { + name: "negative start block", + setupViper: func(v *viper.Viper) { + v.Set("grpc.historical-grpc-address-block-range", `{"localhost:9091": [-1, 1000]}`) + }, + expectError: true, + errorMsg: "block numbers cannot be negative", + }, + { + name: "negative end block", + setupViper: func(v *viper.Viper) { + v.Set("grpc.historical-grpc-address-block-range", `{"localhost:9091": [0, -100]}`) + }, + expectError: true, + errorMsg: "block numbers cannot be negative", + }, + { + name: "start block greater than end block", + setupViper: func(v *viper.Viper) { + v.Set("grpc.historical-grpc-address-block-range", `{"localhost:9091": [1000, 500]}`) + }, + expectError: true, + errorMsg: "start block must be <= end block", + }, + { + name: "single block range (start equals end)", + setupViper: func(v *viper.Viper) { + v.Set("grpc.historical-grpc-address-block-range", `{"localhost:9091": [1000, 1000]}`) + }, + expectError: false, + validate: func(t *testing.T, cfg Config) { + t.Helper() + require.Len(t, cfg.GRPC.HistoricalGRPCAddressBlockRange, 1) + expectedRange := BlockRange{1000, 1000} + address, exists := cfg.GRPC.HistoricalGRPCAddressBlockRange[expectedRange] + require.True(t, exists) + require.Equal(t, "localhost:9091", address) + }, + }, + { + name: "zero to zero range", + setupViper: func(v *viper.Viper) { + v.Set("grpc.historical-grpc-address-block-range", `{"localhost:9091": [0, 0]}`) + }, + expectError: false, + validate: func(t *testing.T, cfg Config) { + t.Helper() + require.Len(t, cfg.GRPC.HistoricalGRPCAddressBlockRange, 1) + expectedRange := BlockRange{0, 0} + address, exists := cfg.GRPC.HistoricalGRPCAddressBlockRange[expectedRange] + require.True(t, exists) + require.Equal(t, "localhost:9091", address) + }, + }, + { + name: "large block numbers", + setupViper: func(v *viper.Viper) { + v.Set("grpc.historical-grpc-address-block-range", `{"localhost:9091": [1000000, 2000000]}`) + }, + expectError: false, + validate: func(t *testing.T, cfg Config) { + t.Helper() + require.Len(t, cfg.GRPC.HistoricalGRPCAddressBlockRange, 1) + expectedRange := BlockRange{1000000, 2000000} + address, exists := cfg.GRPC.HistoricalGRPCAddressBlockRange[expectedRange] + require.True(t, exists) + require.Equal(t, "localhost:9091", address) + }, + }, + { + name: "invalid array length (too few elements)", + setupViper: func(v *viper.Viper) { + v.Set("grpc.historical-grpc-address-block-range", `{"localhost:9091": [100]}`) + }, + expectError: true, + errorMsg: "start block must be <= end block", + }, + { + name: "invalid array length (too many elements)", + setupViper: func(v *viper.Viper) { + v.Set("grpc.historical-grpc-address-block-range", `{"localhost:9091": [0, 1000, 2000]}`) + }, + expectError: false, + validate: func(t *testing.T, cfg Config) { + t.Helper() + require.Len(t, cfg.GRPC.HistoricalGRPCAddressBlockRange, 1) + expectedRange := BlockRange{0, 1000} + address, exists := cfg.GRPC.HistoricalGRPCAddressBlockRange[expectedRange] + require.True(t, exists) + require.Equal(t, "localhost:9091", address) + }, + }, + { + name: "address with port and protocol", + setupViper: func(v *viper.Viper) { + v.Set("grpc.historical-grpc-address-block-range", `{"https://archive.example.com:9091": [0, 1000]}`) + }, + expectError: false, + validate: func(t *testing.T, cfg Config) { + t.Helper() + require.Len(t, cfg.GRPC.HistoricalGRPCAddressBlockRange, 1) + expectedRange := BlockRange{0, 1000} + address, exists := cfg.GRPC.HistoricalGRPCAddressBlockRange[expectedRange] + require.True(t, exists) + require.Equal(t, "https://archive.example.com:9091", address) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + v := viper.New() + v.Set("minimum-gas-prices", "0stake") + tt.setupViper(v) + cfg, err := GetConfig(v) + if tt.expectError { + require.Error(t, err) + require.Contains(t, err.Error(), tt.errorMsg) + } else { + require.NoError(t, err) + if tt.validate != nil { + tt.validate(t, cfg) + } + } + }) + } +} diff --git a/server/config/toml.go b/server/config/toml.go index caf01eb432..28ab5f3c45 100644 --- a/server/config/toml.go +++ b/server/config/toml.go @@ -182,6 +182,13 @@ max-recv-msg-size = "{{ .GRPC.MaxRecvMsgSize }}" # The default value is math.MaxInt32. max-send-msg-size = "{{ .GRPC.MaxSendMsgSize }}" +# Historical gRPC addresses with block ranges for historical query routing. +# This should be a JSON string mapping gRPC addresses to block ranges. +# Format: '{"address1": [start_block, end_block], "address2": [start_block, end_block]}' +# Example: '{"0.0.0.0:26113": [0, 1000], "0.0.0.0:26114": [1001, 2000]}' +# Leave empty to disable historical gRPC routing. +historical-grpc-address-block-range = "{{ printf "{" }}{{ range $k, $v := .GRPC.HistoricalGRPCAddressBlockRange }}\"{{ $v }}\": [{{index $k 0 }}, {{ index $k 1}}]{{ end }}{{ printf "}" }}" + ############################################################################### ### gRPC Web Configuration ### ############################################################################### diff --git a/server/grpc/server.go b/server/grpc/server.go index ba9c9f5fb8..54842e1727 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -6,6 +6,7 @@ import ( "net" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "cosmossdk.io/log" @@ -22,6 +23,13 @@ import ( // NewGRPCServer returns a correctly configured and initialized gRPC server. // Note, the caller is responsible for starting the server. See StartGRPCServer. func NewGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) { + srv, _, err := NewGRPCServerAndContext(clientCtx, app, cfg, log.NewNopLogger()) + return srv, err +} + +// NewGRPCServerAndContext returns a correctly configured and initialized gRPC server +// along with an updated client context that may include historical gRPC connections. +func NewGRPCServerAndContext(clientCtx client.Context, app types.Application, cfg config.GRPCConfig, logger log.Logger) (*grpc.Server, client.Context, error) { maxSendMsgSize := cfg.MaxSendMsgSize if maxSendMsgSize == 0 { maxSendMsgSize = config.DefaultGRPCMaxSendMsgSize @@ -32,6 +40,21 @@ func NewGRPCServer(clientCtx client.Context, app types.Application, cfg config.G maxRecvMsgSize = config.DefaultGRPCMaxRecvMsgSize } + // Setup historical gRPC connections if configured + if len(cfg.HistoricalGRPCAddressBlockRange) > 0 { + updatedCtx, err := setupHistoricalGRPCConnections( + clientCtx, + cfg.HistoricalGRPCAddressBlockRange, + maxRecvMsgSize, + maxSendMsgSize, + logger, + ) + if err != nil { + return nil, clientCtx, fmt.Errorf("failed to setup historical gRPC connections: %w", err) + } + clientCtx = updatedCtx + } + grpcSrv := grpc.NewServer( grpc.ForceServerCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()), grpc.MaxSendMsgSize(maxSendMsgSize), @@ -58,14 +81,55 @@ func NewGRPCServer(clientCtx client.Context, app types.Application, cfg config.G InterfaceRegistry: clientCtx.InterfaceRegistry, }) if err != nil { - return nil, fmt.Errorf("failed to register reflection service: %w", err) + return nil, clientCtx, fmt.Errorf("failed to register reflection service: %w", err) } // Reflection allows external clients to see what services and methods // the gRPC server exposes. gogoreflection.Register(grpcSrv) - return grpcSrv, nil + return grpcSrv, clientCtx, nil +} + +// setupHistoricalGRPCConnections creates historical gRPC connections based on the configuration. +func setupHistoricalGRPCConnections( + clientCtx client.Context, + historicalAddresses map[config.BlockRange]string, + maxRecvMsgSize, maxSendMsgSize int, + logger log.Logger, +) (client.Context, error) { + if len(historicalAddresses) == 0 { + return clientCtx, nil + } + + historicalConns := make(config.HistoricalGRPCConnections) + for blockRange, address := range historicalAddresses { + conn, err := grpc.NewClient( + address, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()), + grpc.MaxCallRecvMsgSize(maxRecvMsgSize), + grpc.MaxCallSendMsgSize(maxSendMsgSize), + ), + ) + if err != nil { + return clientCtx, fmt.Errorf("failed to create historical gRPC connection for %s: %w", address, err) + } + historicalConns[blockRange] = conn + } + + // Get the default connection from the clientCtx + defaultConn := clientCtx.GRPCClient + if defaultConn == nil { + return clientCtx, fmt.Errorf("default gRPC client not set in clientCtx") + } + + provider := client.NewGRPCConnProvider(defaultConn, historicalConns) + clientCtx = clientCtx.WithGRPCConnProvider(provider) + + logger.Info("historical gRPC connections configured", "count", len(historicalConns)) + return clientCtx, nil } // StartGRPCServer starts the provided gRPC server on the address specified in cfg. diff --git a/server/start.go b/server/start.go index aab620b275..883afa618b 100644 --- a/server/start.go +++ b/server/start.go @@ -96,11 +96,12 @@ const ( // gRPC-related flags - flagGRPCOnly = "grpc-only" - flagGRPCEnable = "grpc.enable" - flagGRPCAddress = "grpc.address" - flagGRPCWebEnable = "grpc-web.enable" - flagGRPCSkipCheckHeader = "grpc.skip-check-header" + flagGRPCOnly = "grpc-only" + flagGRPCEnable = "grpc.enable" + flagGRPCAddress = "grpc.address" + flagGRPCWebEnable = "grpc-web.enable" + flagGRPCSkipCheckHeader = "grpc.skip-check-header" + flagHistoricalGRPCAddressBlockRange = "grpc.historical-grpc-address-block-range" // mempool flags @@ -277,7 +278,7 @@ func startStandAlone(svrCtx *Context, svrCfg serverconfig.Config, clientCtx clie app.RegisterNodeService(clientCtx, svrCfg) } - grpcSrv, clientCtx, err := startGrpcServer(ctx, g, svrCfg.GRPC, clientCtx, svrCtx, app) + grpcSrv, clientCtx, err := StartGrpcServer(ctx, g, svrCfg.GRPC, clientCtx, svrCtx, app) if err != nil { return err } @@ -343,7 +344,7 @@ func startInProcess(svrCtx *Context, svrCfg serverconfig.Config, clientCtx clien } } - grpcSrv, clientCtx, err := startGrpcServer(ctx, g, svrCfg.GRPC, clientCtx, svrCtx, app) + grpcSrv, clientCtx, err := StartGrpcServer(ctx, g, svrCfg.GRPC, clientCtx, svrCtx, app) if err != nil { return fmt.Errorf("failed to start grpc server: %w", err) } @@ -451,7 +452,17 @@ func setupTraceWriter(svrCtx *Context) (traceWriter io.WriteCloser, cleanup func return traceWriter, cleanup, nil } -func startGrpcServer( +// StartGrpcServer starts a gRPC server with the provided configuration. +// It returns the gRPC server instance, updated client context with historical connections, +// and any error encountered during setup. +// +// The function will: +// - Create a gRPC client connection +// - Setup historical gRPC connections if configured +// - Start the gRPC server in a goroutine +// +// Note: The provided context will ensure that the server is gracefully shut down. +func StartGrpcServer( ctx context.Context, g *errgroup.Group, config serverconfig.GRPCConfig, @@ -479,7 +490,7 @@ func startGrpcServer( } // if gRPC is enabled, configure gRPC client for gRPC gateway - grpcClient, err := grpc.Dial( //nolint: staticcheck // ignore this line for this linter + grpcClient, err := grpc.NewClient( config.Address, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions( @@ -495,7 +506,9 @@ func startGrpcServer( clientCtx = clientCtx.WithGRPCClient(grpcClient) svrCtx.Logger.Debug("gRPC client assigned to client context", "target", config.Address) - grpcSrv, err := servergrpc.NewGRPCServer(clientCtx, app, config) + logger := svrCtx.Logger.With("module", "grpc-server") + var grpcSrv *grpc.Server + grpcSrv, clientCtx, err = servergrpc.NewGRPCServerAndContext(clientCtx, app, config, logger) if err != nil { return nil, clientCtx, err } @@ -503,7 +516,7 @@ func startGrpcServer( // Start the gRPC server in a goroutine. Note, the provided ctx will ensure // that the server is gracefully shut down. g.Go(func() error { - return servergrpc.StartGRPCServer(ctx, svrCtx.Logger.With("module", "grpc-server"), config, grpcSrv) + return servergrpc.StartGRPCServer(ctx, logger, config, grpcSrv) }) return grpcSrv, clientCtx, nil } @@ -994,6 +1007,7 @@ func addStartNodeFlags(cmd *cobra.Command, opts StartCmdOptions) { cmd.Flags().Bool(flagGRPCEnable, true, "Define if the gRPC server should be enabled") cmd.Flags().String(flagGRPCAddress, serverconfig.DefaultGRPCAddress, "the gRPC server address to listen on") cmd.Flags().Bool(flagGRPCWebEnable, true, "Define if the gRPC-Web server should be enabled. (Note: gRPC must also be enabled)") + cmd.Flags().String(flagHistoricalGRPCAddressBlockRange, "", "Define if historical grpc and block range is available") cmd.Flags().Uint64(FlagStateSyncSnapshotInterval, 0, "State sync snapshot interval") cmd.Flags().Uint32(FlagStateSyncSnapshotKeepRecent, 2, "State sync snapshot to keep") cmd.Flags().Bool(FlagDisableIAVLFastNode, false, "Disable fast node for IAVL tree") diff --git a/testutil/network/util.go b/testutil/network/util.go index d378a057f9..77657a8bb0 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -17,6 +17,7 @@ import ( cmttypes "github.com/cometbft/cometbft/types" cmttime "github.com/cometbft/cometbft/types/time" "golang.org/x/sync/errgroup" + "google.golang.org/grpc" "cosmossdk.io/log" @@ -97,7 +98,9 @@ func startInProcess(cfg Config, val *Validator) error { grpcCfg := val.AppConfig.GRPC if grpcCfg.Enable { - grpcSrv, err := servergrpc.NewGRPCServer(val.ClientCtx, app, grpcCfg) + grpcLogger := logger.With(log.ModuleKey, "grpc-server") + var grpcSrv *grpc.Server + grpcSrv, val.ClientCtx, err = servergrpc.NewGRPCServerAndContext(val.ClientCtx, app, grpcCfg, grpcLogger) if err != nil { return err } @@ -105,7 +108,7 @@ func startInProcess(cfg Config, val *Validator) error { // Start the gRPC server in a goroutine. Note, the provided ctx will ensure // that the server is gracefully shut down. val.errGroup.Go(func() error { - return servergrpc.StartGRPCServer(ctx, logger.With(log.ModuleKey, "grpc-server"), grpcCfg, grpcSrv) + return servergrpc.StartGRPCServer(ctx, grpcLogger, grpcCfg, grpcSrv) }) val.grpc = grpcSrv