feat: support for multi gRPC query clients serve with old binary (#25565)

Co-authored-by: Alex | Cosmos Labs <alex@cosmoslabs.io>
This commit is contained in:
mmsqe 2025-11-19 23:46:50 +08:00 committed by GitHub
parent 2667feb515
commit f4e2ce0ea4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 644 additions and 28 deletions

View File

@ -56,6 +56,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (crypto) [#24861](https://github.com/cosmos/cosmos-sdk/pull/24861) add `PubKeyFromCometTypeAndBytes` helper function to convert from `comet/v2` PubKeys to the `cryptotypes.Pubkey` interface.
* (abci_utils) [#25008](https://github.com/cosmos/cosmos-sdk/pull/25008) add the ability to assign a custom signer extraction adapter in `DefaultProposalHandler`.
* (crypto/ledger) [#25435](https://github.com/cosmos/cosmos-sdk/pull/25435) Add SetDERConversion to reset skipDERConversion and App name for ledger.
* (gRPC) [#25565](https://github.com/cosmos/cosmos-sdk/pull/25565) Support for multi gRPC query clients serve with historical binaries to serve proper historical state.
### Improvements

View File

@ -30,6 +30,7 @@ type Context struct {
FromAddress sdk.AccAddress
Client CometRPC
GRPCClient *grpc.ClientConn
GRPCConnProvider *GRPCConnProvider
ChainID string
Codec codec.Codec
InterfaceRegistry codectypes.InterfaceRegistry
@ -155,6 +156,22 @@ func (ctx Context) WithGRPCClient(grpcClient *grpc.ClientConn) Context {
return ctx
}
// WithGRPCConnProvider returns a copy of the context with an updated GRPCConnProvider.
func (ctx Context) WithGRPCConnProvider(provider *GRPCConnProvider) Context {
ctx.GRPCConnProvider = provider
return ctx
}
// GetGRPCConn returns the appropriate gRPC connection for the given height.
// If GRPCConnProvider is set, it uses it to determine the connection.
// Otherwise, it falls back to the default GRPCClient.
func (ctx Context) GetGRPCConn(height int64) *grpc.ClientConn {
if ctx.GRPCConnProvider != nil {
return ctx.GRPCConnProvider.GetGRPCConn(height)
}
return ctx.GRPCClient
}
// WithUseLedger returns a copy of the context with an updated UseLedger flag.
func (ctx Context) WithUseLedger(useLedger bool) Context {
ctx.UseLedger = useLedger

View File

@ -16,11 +16,56 @@ import (
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/server/config"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
grpctypes "github.com/cosmos/cosmos-sdk/types/grpc"
"github.com/cosmos/cosmos-sdk/types/tx"
)
// GRPCConn provides a method to get the appropriate gRPC connection based on block height.
type GRPCConn interface {
GetGRPCConn(height int64) *grpc.ClientConn
}
// GRPCConnProvider manages gRPC connections with optional historical connections for historical queries.
type GRPCConnProvider struct {
// DefaultConn is the primary gRPC connection
DefaultConn *grpc.ClientConn
// HistoricalConns maps block ranges to historical gRPC connections for routing historical queries
HistoricalConns config.HistoricalGRPCConnections
}
// NewGRPCConnProvider creates a new GRPCConnProvider with the given connections.
func NewGRPCConnProvider(defaultConn *grpc.ClientConn, historicalConns config.HistoricalGRPCConnections) *GRPCConnProvider {
if historicalConns == nil {
historicalConns = make(config.HistoricalGRPCConnections)
}
return &GRPCConnProvider{
DefaultConn: defaultConn,
HistoricalConns: historicalConns,
}
}
// GetGRPCConn returns the appropriate gRPC connection based on the block height.
// For height <= 0 (latest block), it returns the default connection.
// For positive heights, it checks if a historical connection exists for that height range.
func (g *GRPCConnProvider) GetGRPCConn(height int64) *grpc.ClientConn {
// height = 0 means latest block, use the default connection
if height <= 0 {
return g.DefaultConn
}
// Check if there's a historical connection for this height
for blockRange, conn := range g.HistoricalConns {
if int64(blockRange[0]) <= height && int64(blockRange[1]) >= height {
return conn
}
}
// Default to the primary connection if no historical matches
return g.DefaultConn
}
var _ gogogrpc.ClientConn = Context{}
// fallBackCodec is used by Context in case Codec is not set.
@ -28,6 +73,27 @@ var _ gogogrpc.ClientConn = Context{}
// interfaces in their types.
var fallBackCodec = codec.NewProtoCodec(types.NewInterfaceRegistry())
// GetHeightFromMetadata extracts the block height from gRPC metadata in the context.
// Returns 0 if no valid height is found.
func GetHeightFromMetadata(grpcCtx gocontext.Context) int64 {
md, ok := metadata.FromOutgoingContext(grpcCtx)
if !ok {
return 0
}
heights := md.Get(grpctypes.GRPCBlockHeightHeader)
if len(heights) == 0 {
return 0
}
height, err := strconv.ParseInt(heights[0], 10, 64)
if err != nil {
return 0
}
if height < 0 {
return 0
}
return height
}
// Invoke implements the grpc ClientConn.Invoke method
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply any, opts ...grpc.CallOption) (err error) {
// Two things can happen here:
@ -58,7 +124,16 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply a
if ctx.GRPCClient != nil {
// Case 2-1. Invoke grpc.
return ctx.GRPCClient.Invoke(grpcCtx, method, req, reply, opts...)
grpcConn := ctx.GRPCClient
if ctx.GRPCConnProvider != nil {
height := ctx.Height
if height <= 0 {
height = GetHeightFromMetadata(grpcCtx)
}
grpcConn = ctx.GRPCConnProvider.GetGRPCConn(height)
}
return grpcConn.Invoke(grpcCtx, method, req, reply, opts...)
}
// Case 2-2. Querying state via abci query.
@ -68,18 +143,14 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply a
}
// parse height header
md, _ := metadata.FromOutgoingContext(grpcCtx)
if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 {
height, err := strconv.ParseInt(heights[0], 10, 64)
if err != nil {
return err
}
if height < 0 {
return errorsmod.Wrapf(
sdkerrors.ErrInvalidRequest,
"client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader)
}
height := GetHeightFromMetadata(grpcCtx)
if height < 0 {
return errorsmod.Wrapf(
sdkerrors.ErrInvalidRequest,
"client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader)
}
if height > 0 {
ctx = ctx.WithHeight(height)
}
@ -104,7 +175,7 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply a
// We then parse all the call options, if the call option is a
// HeaderCallOption, then we manually set the value of that header to the
// metadata.
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10))
md := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10))
for _, callOpt := range opts {
header, ok := callOpt.(grpc.HeaderCallOption)
if !ok {

View File

@ -7,8 +7,10 @@ import (
abci "github.com/cometbft/cometbft/abci/types"
cmtjson "github.com/cometbft/cometbft/libs/json"
dbm "github.com/cosmos/cosmos-db"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"cosmossdk.io/depinject"
@ -16,13 +18,16 @@ import (
"cosmossdk.io/math"
"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1"
"github.com/cosmos/cosmos-sdk/runtime"
"github.com/cosmos/cosmos-sdk/server/config"
"github.com/cosmos/cosmos-sdk/testutil/sims"
"github.com/cosmos/cosmos-sdk/testutil/testdata"
sdk "github.com/cosmos/cosmos-sdk/types"
grpctypes "github.com/cosmos/cosmos-sdk/types/grpc"
"github.com/cosmos/cosmos-sdk/x/auth/testutil"
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
bankkeeper "github.com/cosmos/cosmos-sdk/x/bank/keeper"
@ -136,3 +141,192 @@ func (s *IntegrationTestSuite) TestGRPCQuery() {
func TestIntegrationTestSuite(t *testing.T) {
suite.Run(t, new(IntegrationTestSuite))
}
func (s *IntegrationTestSuite) TestGetGRPCConnWithContext() {
defaultConn, err := grpc.NewClient("localhost:9090",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
s.Require().NoError(err)
defer defaultConn.Close()
historicalConn, err := grpc.NewClient("localhost:9091",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
s.Require().NoError(err)
defer historicalConn.Close()
historicalConns := config.HistoricalGRPCConnections{
config.BlockRange{100, 500}: historicalConn,
}
provider := client.NewGRPCConnProvider(defaultConn, historicalConns)
testCases := []struct {
name string
height int64
setupCtx func() client.Context
expectedConn *grpc.ClientConn
}{
{
name: "context with GRPCConnProvider and historical height",
height: 300,
setupCtx: func() client.Context {
return client.Context{}.
WithCodec(s.cdc).
WithGRPCClient(defaultConn).
WithGRPCConnProvider(provider).
WithHeight(300)
},
expectedConn: historicalConn,
},
{
name: "context with GRPCConnProvider and latest height",
height: 0,
setupCtx: func() client.Context {
return client.Context{}.
WithCodec(s.cdc).
WithGRPCClient(defaultConn).
WithGRPCConnProvider(provider).
WithHeight(0)
},
expectedConn: defaultConn,
},
{
name: "context without GRPCConnProvider",
height: 300,
setupCtx: func() client.Context {
return client.Context{}.
WithCodec(s.cdc).
WithGRPCClient(defaultConn).
WithHeight(300)
},
expectedConn: defaultConn,
},
{
name: "context with nil historical connections map",
height: 100,
setupCtx: func() client.Context {
nilProvider := client.NewGRPCConnProvider(defaultConn, nil)
return client.Context{}.
WithCodec(s.cdc).
WithGRPCClient(defaultConn).
WithGRPCConnProvider(nilProvider).
WithHeight(100)
},
expectedConn: defaultConn,
},
{
name: "context with empty historical connections map",
height: 100,
setupCtx: func() client.Context {
emptyProvider := client.NewGRPCConnProvider(defaultConn, config.HistoricalGRPCConnections{})
return client.Context{}.
WithCodec(s.cdc).
WithGRPCClient(defaultConn).
WithGRPCConnProvider(emptyProvider).
WithHeight(100)
},
expectedConn: defaultConn,
},
}
for _, tc := range testCases {
s.Run(tc.name, func() {
ctx := tc.setupCtx()
var actualConn *grpc.ClientConn
if ctx.GRPCConnProvider != nil {
actualConn = ctx.GRPCConnProvider.GetGRPCConn(ctx.Height)
} else {
actualConn = ctx.GRPCClient
}
s.Require().Equal(tc.expectedConn, actualConn)
})
}
}
func TestGetHeightFromMetadata(t *testing.T) {
tests := []struct {
name string
setupContext func() context.Context
expectedHeight int64
}{
{
name: "valid height in metadata",
setupContext: func() context.Context {
md := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, "12345")
return metadata.NewOutgoingContext(context.Background(), md)
},
expectedHeight: 12345,
},
{
name: "zero height in metadata",
setupContext: func() context.Context {
md := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, "0")
return metadata.NewOutgoingContext(context.Background(), md)
},
expectedHeight: 0,
},
{
name: "negative height returns zero",
setupContext: func() context.Context {
md := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, "-100")
return metadata.NewOutgoingContext(context.Background(), md)
},
expectedHeight: 0,
},
{
name: "no metadata returns zero",
setupContext: context.Background,
expectedHeight: 0,
},
{
name: "empty height header returns zero",
setupContext: func() context.Context {
md := metadata.New(map[string]string{})
return metadata.NewOutgoingContext(context.Background(), md)
},
expectedHeight: 0,
},
{
name: "invalid height string returns zero",
setupContext: func() context.Context {
md := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, "not-a-number")
return metadata.NewOutgoingContext(context.Background(), md)
},
expectedHeight: 0,
},
{
name: "multiple height values uses first",
setupContext: func() context.Context {
md := metadata.Pairs(
grpctypes.GRPCBlockHeightHeader, "100",
grpctypes.GRPCBlockHeightHeader, "200",
)
return metadata.NewOutgoingContext(context.Background(), md)
},
expectedHeight: 100,
},
{
name: "very large height",
setupContext: func() context.Context {
md := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, "9223372036854775807") // max int64
return metadata.NewOutgoingContext(context.Background(), md)
},
expectedHeight: 9223372036854775807,
},
{
name: "height exceeding int64 returns zero",
setupContext: func() context.Context {
md := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, "9223372036854775808") // max int64 + 1
return metadata.NewOutgoingContext(context.Background(), md)
},
expectedHeight: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := tt.setupContext()
height := client.GetHeightFromMetadata(ctx)
require.Equal(t, tt.expectedHeight, height)
})
}
}

View File

@ -1,10 +1,12 @@
package config
import (
"encoding/json"
"fmt"
"math"
"github.com/spf13/viper"
"google.golang.org/grpc"
pruningtypes "cosmossdk.io/store/pruning/types"
@ -124,6 +126,21 @@ type APIConfig struct {
// Ref: https://github.com/cosmos/cosmos-sdk/issues/6420
}
// BlockRange represents a range of block heights as [start_block, end_block] (inclusive).
// It is used to map gRPC historical connections to specific block height ranges for routing
// historical queries to appropriate archive nodes.
//
// Example:
// - [0, 1000] represents blocks from genesis (0) through block 1000
// - [1001, 2000] represents blocks from 1001 through 2000
//
// Both start and end blocks must be non-negative, and start must be less than or equal to end.
type BlockRange [2]int
// HistoricalGRPCConnections is a map of block ranges to gRPC client connections
// used for routing requests to different backend nodes based on block height.
type HistoricalGRPCConnections map[BlockRange]*grpc.ClientConn
// GRPCConfig defines configuration for the gRPC server.
type GRPCConfig struct {
// Enable defines if the gRPC server should be enabled.
@ -142,6 +159,9 @@ type GRPCConfig struct {
// SkipCheckHeader defines if the gRPC server should bypass header checking.
SkipCheckHeader bool `mapstructure:"skip-check-header"`
// HistoricalGRPCAddressBlockRange maps block ranges to gRPC addresses for routing historical queries.
HistoricalGRPCAddressBlockRange map[BlockRange]string `mapstructure:"-"`
}
// GRPCWebConfig defines configuration for the gRPC-web server.
@ -278,6 +298,26 @@ func GetConfig(v *viper.Viper) (Config, error) {
if err := v.Unmarshal(conf); err != nil {
return Config{}, fmt.Errorf("error extracting app config: %w", err)
}
raw := v.GetString("grpc.historical-grpc-address-block-range")
if len(raw) > 0 {
data := make(map[string]BlockRange)
if err := json.Unmarshal([]byte(raw), &data); err != nil {
return Config{}, fmt.Errorf("failed to parse historical-grpc-address-block-range as JSON: %w (value: %s)", err, raw)
}
historicalGRPCAddressBlockRange := make(map[BlockRange]string, len(data))
for address, blockRange := range data {
if blockRange[0] < 0 || blockRange[1] < 0 {
return Config{}, fmt.Errorf("invalid block range [%d, %d] for address %s: block numbers cannot be negative",
blockRange[0], blockRange[1], address)
}
if blockRange[0] > blockRange[1] {
return Config{}, fmt.Errorf("invalid block range [%d, %d] for address %s: start block must be <= end block",
blockRange[0], blockRange[1], address)
}
historicalGRPCAddressBlockRange[blockRange] = address
}
conf.GRPC.HistoricalGRPCAddressBlockRange = historicalGRPCAddressBlockRange
}
return *conf, nil
}

View File

@ -234,3 +234,208 @@ func TestAppConfig(t *testing.T) {
require.NoError(t, v.Unmarshal(appCfg))
require.EqualValues(t, appCfg, defAppConfig)
}
func TestGetConfig_HistoricalGRPCAddressBlockRange(t *testing.T) {
tests := []struct {
name string
setupViper func(*viper.Viper)
expectError bool
errorMsg string
validate func(*testing.T, Config)
}{
{
name: "valid single historical grpc address",
setupViper: func(v *viper.Viper) {
v.Set("grpc.historical-grpc-address-block-range", `{"localhost:9091": [0, 1000]}`)
},
expectError: false,
validate: func(t *testing.T, cfg Config) {
t.Helper()
require.Len(t, cfg.GRPC.HistoricalGRPCAddressBlockRange, 1)
expectedRange := BlockRange{0, 1000}
address, exists := cfg.GRPC.HistoricalGRPCAddressBlockRange[expectedRange]
require.True(t, exists, "Block range [0, 1000] should exist")
require.Equal(t, "localhost:9091", address)
},
},
{
name: "valid multiple historical grpc addresses",
setupViper: func(v *viper.Viper) {
v.Set("grpc.historical-grpc-address-block-range",
`{"localhost:9091": [0, 1000], "localhost:9092": [1001, 2000], "localhost:9093": [2001, 3000]}`)
},
expectError: false,
validate: func(t *testing.T, cfg Config) {
t.Helper()
require.Len(t, cfg.GRPC.HistoricalGRPCAddressBlockRange, 3)
testCases := []struct {
blockRange BlockRange
address string
}{
{BlockRange{0, 1000}, "localhost:9091"},
{BlockRange{1001, 2000}, "localhost:9092"},
{BlockRange{2001, 3000}, "localhost:9093"},
}
for _, tc := range testCases {
address, exists := cfg.GRPC.HistoricalGRPCAddressBlockRange[tc.blockRange]
require.True(t, exists, "Block range %v should exist", tc.blockRange)
require.Equal(t, tc.address, address)
}
},
},
{
name: "empty configuration",
setupViper: func(v *viper.Viper) {
v.Set("grpc.historical-grpc-address-block-range", "")
},
expectError: false,
validate: func(t *testing.T, cfg Config) {
t.Helper()
require.Nil(t, cfg.GRPC.HistoricalGRPCAddressBlockRange)
},
},
{
name: "no configuration set",
setupViper: func(v *viper.Viper) {},
expectError: false,
validate: func(t *testing.T, cfg Config) {
t.Helper()
require.Nil(t, cfg.GRPC.HistoricalGRPCAddressBlockRange)
},
},
{
name: "invalid JSON format",
setupViper: func(v *viper.Viper) {
// missing closing brace
v.Set("grpc.historical-grpc-address-block-range", `{"localhost:9091": [0, 1000]`)
},
expectError: true,
errorMsg: "failed to parse historical-grpc-address-block-range as JSON",
},
{
name: "negative start block",
setupViper: func(v *viper.Viper) {
v.Set("grpc.historical-grpc-address-block-range", `{"localhost:9091": [-1, 1000]}`)
},
expectError: true,
errorMsg: "block numbers cannot be negative",
},
{
name: "negative end block",
setupViper: func(v *viper.Viper) {
v.Set("grpc.historical-grpc-address-block-range", `{"localhost:9091": [0, -100]}`)
},
expectError: true,
errorMsg: "block numbers cannot be negative",
},
{
name: "start block greater than end block",
setupViper: func(v *viper.Viper) {
v.Set("grpc.historical-grpc-address-block-range", `{"localhost:9091": [1000, 500]}`)
},
expectError: true,
errorMsg: "start block must be <= end block",
},
{
name: "single block range (start equals end)",
setupViper: func(v *viper.Viper) {
v.Set("grpc.historical-grpc-address-block-range", `{"localhost:9091": [1000, 1000]}`)
},
expectError: false,
validate: func(t *testing.T, cfg Config) {
t.Helper()
require.Len(t, cfg.GRPC.HistoricalGRPCAddressBlockRange, 1)
expectedRange := BlockRange{1000, 1000}
address, exists := cfg.GRPC.HistoricalGRPCAddressBlockRange[expectedRange]
require.True(t, exists)
require.Equal(t, "localhost:9091", address)
},
},
{
name: "zero to zero range",
setupViper: func(v *viper.Viper) {
v.Set("grpc.historical-grpc-address-block-range", `{"localhost:9091": [0, 0]}`)
},
expectError: false,
validate: func(t *testing.T, cfg Config) {
t.Helper()
require.Len(t, cfg.GRPC.HistoricalGRPCAddressBlockRange, 1)
expectedRange := BlockRange{0, 0}
address, exists := cfg.GRPC.HistoricalGRPCAddressBlockRange[expectedRange]
require.True(t, exists)
require.Equal(t, "localhost:9091", address)
},
},
{
name: "large block numbers",
setupViper: func(v *viper.Viper) {
v.Set("grpc.historical-grpc-address-block-range", `{"localhost:9091": [1000000, 2000000]}`)
},
expectError: false,
validate: func(t *testing.T, cfg Config) {
t.Helper()
require.Len(t, cfg.GRPC.HistoricalGRPCAddressBlockRange, 1)
expectedRange := BlockRange{1000000, 2000000}
address, exists := cfg.GRPC.HistoricalGRPCAddressBlockRange[expectedRange]
require.True(t, exists)
require.Equal(t, "localhost:9091", address)
},
},
{
name: "invalid array length (too few elements)",
setupViper: func(v *viper.Viper) {
v.Set("grpc.historical-grpc-address-block-range", `{"localhost:9091": [100]}`)
},
expectError: true,
errorMsg: "start block must be <= end block",
},
{
name: "invalid array length (too many elements)",
setupViper: func(v *viper.Viper) {
v.Set("grpc.historical-grpc-address-block-range", `{"localhost:9091": [0, 1000, 2000]}`)
},
expectError: false,
validate: func(t *testing.T, cfg Config) {
t.Helper()
require.Len(t, cfg.GRPC.HistoricalGRPCAddressBlockRange, 1)
expectedRange := BlockRange{0, 1000}
address, exists := cfg.GRPC.HistoricalGRPCAddressBlockRange[expectedRange]
require.True(t, exists)
require.Equal(t, "localhost:9091", address)
},
},
{
name: "address with port and protocol",
setupViper: func(v *viper.Viper) {
v.Set("grpc.historical-grpc-address-block-range", `{"https://archive.example.com:9091": [0, 1000]}`)
},
expectError: false,
validate: func(t *testing.T, cfg Config) {
t.Helper()
require.Len(t, cfg.GRPC.HistoricalGRPCAddressBlockRange, 1)
expectedRange := BlockRange{0, 1000}
address, exists := cfg.GRPC.HistoricalGRPCAddressBlockRange[expectedRange]
require.True(t, exists)
require.Equal(t, "https://archive.example.com:9091", address)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
v := viper.New()
v.Set("minimum-gas-prices", "0stake")
tt.setupViper(v)
cfg, err := GetConfig(v)
if tt.expectError {
require.Error(t, err)
require.Contains(t, err.Error(), tt.errorMsg)
} else {
require.NoError(t, err)
if tt.validate != nil {
tt.validate(t, cfg)
}
}
})
}
}

View File

@ -182,6 +182,13 @@ max-recv-msg-size = "{{ .GRPC.MaxRecvMsgSize }}"
# The default value is math.MaxInt32.
max-send-msg-size = "{{ .GRPC.MaxSendMsgSize }}"
# Historical gRPC addresses with block ranges for historical query routing.
# This should be a JSON string mapping gRPC addresses to block ranges.
# Format: '{"address1": [start_block, end_block], "address2": [start_block, end_block]}'
# Example: '{"0.0.0.0:26113": [0, 1000], "0.0.0.0:26114": [1001, 2000]}'
# Leave empty to disable historical gRPC routing.
historical-grpc-address-block-range = "{{ printf "{" }}{{ range $k, $v := .GRPC.HistoricalGRPCAddressBlockRange }}\"{{ $v }}\": [{{index $k 0 }}, {{ index $k 1}}]{{ end }}{{ printf "}" }}"
###############################################################################
### gRPC Web Configuration ###
###############################################################################

View File

@ -6,6 +6,7 @@ import (
"net"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"cosmossdk.io/log"
@ -22,6 +23,13 @@ import (
// NewGRPCServer returns a correctly configured and initialized gRPC server.
// Note, the caller is responsible for starting the server. See StartGRPCServer.
func NewGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) {
srv, _, err := NewGRPCServerAndContext(clientCtx, app, cfg, log.NewNopLogger())
return srv, err
}
// NewGRPCServerAndContext returns a correctly configured and initialized gRPC server
// along with an updated client context that may include historical gRPC connections.
func NewGRPCServerAndContext(clientCtx client.Context, app types.Application, cfg config.GRPCConfig, logger log.Logger) (*grpc.Server, client.Context, error) {
maxSendMsgSize := cfg.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = config.DefaultGRPCMaxSendMsgSize
@ -32,6 +40,21 @@ func NewGRPCServer(clientCtx client.Context, app types.Application, cfg config.G
maxRecvMsgSize = config.DefaultGRPCMaxRecvMsgSize
}
// Setup historical gRPC connections if configured
if len(cfg.HistoricalGRPCAddressBlockRange) > 0 {
updatedCtx, err := setupHistoricalGRPCConnections(
clientCtx,
cfg.HistoricalGRPCAddressBlockRange,
maxRecvMsgSize,
maxSendMsgSize,
logger,
)
if err != nil {
return nil, clientCtx, fmt.Errorf("failed to setup historical gRPC connections: %w", err)
}
clientCtx = updatedCtx
}
grpcSrv := grpc.NewServer(
grpc.ForceServerCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()),
grpc.MaxSendMsgSize(maxSendMsgSize),
@ -58,14 +81,55 @@ func NewGRPCServer(clientCtx client.Context, app types.Application, cfg config.G
InterfaceRegistry: clientCtx.InterfaceRegistry,
})
if err != nil {
return nil, fmt.Errorf("failed to register reflection service: %w", err)
return nil, clientCtx, fmt.Errorf("failed to register reflection service: %w", err)
}
// Reflection allows external clients to see what services and methods
// the gRPC server exposes.
gogoreflection.Register(grpcSrv)
return grpcSrv, nil
return grpcSrv, clientCtx, nil
}
// setupHistoricalGRPCConnections creates historical gRPC connections based on the configuration.
func setupHistoricalGRPCConnections(
clientCtx client.Context,
historicalAddresses map[config.BlockRange]string,
maxRecvMsgSize, maxSendMsgSize int,
logger log.Logger,
) (client.Context, error) {
if len(historicalAddresses) == 0 {
return clientCtx, nil
}
historicalConns := make(config.HistoricalGRPCConnections)
for blockRange, address := range historicalAddresses {
conn, err := grpc.NewClient(
address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()),
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize),
),
)
if err != nil {
return clientCtx, fmt.Errorf("failed to create historical gRPC connection for %s: %w", address, err)
}
historicalConns[blockRange] = conn
}
// Get the default connection from the clientCtx
defaultConn := clientCtx.GRPCClient
if defaultConn == nil {
return clientCtx, fmt.Errorf("default gRPC client not set in clientCtx")
}
provider := client.NewGRPCConnProvider(defaultConn, historicalConns)
clientCtx = clientCtx.WithGRPCConnProvider(provider)
logger.Info("historical gRPC connections configured", "count", len(historicalConns))
return clientCtx, nil
}
// StartGRPCServer starts the provided gRPC server on the address specified in cfg.

View File

@ -96,11 +96,12 @@ const (
// gRPC-related flags
flagGRPCOnly = "grpc-only"
flagGRPCEnable = "grpc.enable"
flagGRPCAddress = "grpc.address"
flagGRPCWebEnable = "grpc-web.enable"
flagGRPCSkipCheckHeader = "grpc.skip-check-header"
flagGRPCOnly = "grpc-only"
flagGRPCEnable = "grpc.enable"
flagGRPCAddress = "grpc.address"
flagGRPCWebEnable = "grpc-web.enable"
flagGRPCSkipCheckHeader = "grpc.skip-check-header"
flagHistoricalGRPCAddressBlockRange = "grpc.historical-grpc-address-block-range"
// mempool flags
@ -277,7 +278,7 @@ func startStandAlone(svrCtx *Context, svrCfg serverconfig.Config, clientCtx clie
app.RegisterNodeService(clientCtx, svrCfg)
}
grpcSrv, clientCtx, err := startGrpcServer(ctx, g, svrCfg.GRPC, clientCtx, svrCtx, app)
grpcSrv, clientCtx, err := StartGrpcServer(ctx, g, svrCfg.GRPC, clientCtx, svrCtx, app)
if err != nil {
return err
}
@ -343,7 +344,7 @@ func startInProcess(svrCtx *Context, svrCfg serverconfig.Config, clientCtx clien
}
}
grpcSrv, clientCtx, err := startGrpcServer(ctx, g, svrCfg.GRPC, clientCtx, svrCtx, app)
grpcSrv, clientCtx, err := StartGrpcServer(ctx, g, svrCfg.GRPC, clientCtx, svrCtx, app)
if err != nil {
return fmt.Errorf("failed to start grpc server: %w", err)
}
@ -451,7 +452,17 @@ func setupTraceWriter(svrCtx *Context) (traceWriter io.WriteCloser, cleanup func
return traceWriter, cleanup, nil
}
func startGrpcServer(
// StartGrpcServer starts a gRPC server with the provided configuration.
// It returns the gRPC server instance, updated client context with historical connections,
// and any error encountered during setup.
//
// The function will:
// - Create a gRPC client connection
// - Setup historical gRPC connections if configured
// - Start the gRPC server in a goroutine
//
// Note: The provided context will ensure that the server is gracefully shut down.
func StartGrpcServer(
ctx context.Context,
g *errgroup.Group,
config serverconfig.GRPCConfig,
@ -479,7 +490,7 @@ func startGrpcServer(
}
// if gRPC is enabled, configure gRPC client for gRPC gateway
grpcClient, err := grpc.Dial( //nolint: staticcheck // ignore this line for this linter
grpcClient, err := grpc.NewClient(
config.Address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
@ -495,7 +506,9 @@ func startGrpcServer(
clientCtx = clientCtx.WithGRPCClient(grpcClient)
svrCtx.Logger.Debug("gRPC client assigned to client context", "target", config.Address)
grpcSrv, err := servergrpc.NewGRPCServer(clientCtx, app, config)
logger := svrCtx.Logger.With("module", "grpc-server")
var grpcSrv *grpc.Server
grpcSrv, clientCtx, err = servergrpc.NewGRPCServerAndContext(clientCtx, app, config, logger)
if err != nil {
return nil, clientCtx, err
}
@ -503,7 +516,7 @@ func startGrpcServer(
// Start the gRPC server in a goroutine. Note, the provided ctx will ensure
// that the server is gracefully shut down.
g.Go(func() error {
return servergrpc.StartGRPCServer(ctx, svrCtx.Logger.With("module", "grpc-server"), config, grpcSrv)
return servergrpc.StartGRPCServer(ctx, logger, config, grpcSrv)
})
return grpcSrv, clientCtx, nil
}
@ -994,6 +1007,7 @@ func addStartNodeFlags(cmd *cobra.Command, opts StartCmdOptions) {
cmd.Flags().Bool(flagGRPCEnable, true, "Define if the gRPC server should be enabled")
cmd.Flags().String(flagGRPCAddress, serverconfig.DefaultGRPCAddress, "the gRPC server address to listen on")
cmd.Flags().Bool(flagGRPCWebEnable, true, "Define if the gRPC-Web server should be enabled. (Note: gRPC must also be enabled)")
cmd.Flags().String(flagHistoricalGRPCAddressBlockRange, "", "Define if historical grpc and block range is available")
cmd.Flags().Uint64(FlagStateSyncSnapshotInterval, 0, "State sync snapshot interval")
cmd.Flags().Uint32(FlagStateSyncSnapshotKeepRecent, 2, "State sync snapshot to keep")
cmd.Flags().Bool(FlagDisableIAVLFastNode, false, "Disable fast node for IAVL tree")

View File

@ -17,6 +17,7 @@ import (
cmttypes "github.com/cometbft/cometbft/types"
cmttime "github.com/cometbft/cometbft/types/time"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"cosmossdk.io/log"
@ -97,7 +98,9 @@ func startInProcess(cfg Config, val *Validator) error {
grpcCfg := val.AppConfig.GRPC
if grpcCfg.Enable {
grpcSrv, err := servergrpc.NewGRPCServer(val.ClientCtx, app, grpcCfg)
grpcLogger := logger.With(log.ModuleKey, "grpc-server")
var grpcSrv *grpc.Server
grpcSrv, val.ClientCtx, err = servergrpc.NewGRPCServerAndContext(val.ClientCtx, app, grpcCfg, grpcLogger)
if err != nil {
return err
}
@ -105,7 +108,7 @@ func startInProcess(cfg Config, val *Validator) error {
// Start the gRPC server in a goroutine. Note, the provided ctx will ensure
// that the server is gracefully shut down.
val.errGroup.Go(func() error {
return servergrpc.StartGRPCServer(ctx, logger.With(log.ModuleKey, "grpc-server"), grpcCfg, grpcSrv)
return servergrpc.StartGRPCServer(ctx, grpcLogger, grpcCfg, grpcSrv)
})
val.grpc = grpcSrv