feat(runtime): message router service (#19571)
This commit is contained in:
parent
83a7f0ee37
commit
cfd426fdb3
@ -42,6 +42,7 @@ Every module contains its own CHANGELOG.md. Please refer to the module you are i
|
||||
|
||||
### Features
|
||||
|
||||
* (runtime) [#19571](https://github.com/cosmos/cosmos-sdk/pull/19571) Implement `core/router.Service` it in runtime. This service is present in all modules (when using depinject).
|
||||
* (types) [#19164](https://github.com/cosmos/cosmos-sdk/pull/19164) Add a ValueCodec for the math.Uint type that can be used in collections maps.
|
||||
* (types) [#19281](https://github.com/cosmos/cosmos-sdk/pull/19281) Added a new method, `IsGT`, for `types.Coin`. This method is used to check if a `types.Coin` is greater than another `types.Coin`.
|
||||
* (client) [#18557](https://github.com/cosmos/cosmos-sdk/pull/18557) Add `--qrcode` flag to `keys show` command to support displaying keys address QR code.
|
||||
|
||||
@ -281,10 +281,8 @@ func (app *BaseApp) Trace() bool {
|
||||
// MsgServiceRouter returns the MsgServiceRouter of a BaseApp.
|
||||
func (app *BaseApp) MsgServiceRouter() *MsgServiceRouter { return app.msgServiceRouter }
|
||||
|
||||
// SetMsgServiceRouter sets the MsgServiceRouter of a BaseApp.
|
||||
func (app *BaseApp) SetMsgServiceRouter(msgServiceRouter *MsgServiceRouter) {
|
||||
app.msgServiceRouter = msgServiceRouter
|
||||
}
|
||||
// GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp.
|
||||
func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter }
|
||||
|
||||
// MountStores mounts all IAVL or DB stores to the provided keys in the BaseApp
|
||||
// multistore.
|
||||
|
||||
@ -24,6 +24,8 @@ type GRPCQueryRouter struct {
|
||||
// hybridHandlers maps the request name to the handler. It is a hybrid handler which seamlessly
|
||||
// handles both gogo and protov2 messages.
|
||||
hybridHandlers map[string][]func(ctx context.Context, req, resp protoiface.MessageV1) error
|
||||
// responseByRequestName maps the request name to the response name.
|
||||
responseByRequestName map[string]string
|
||||
// binaryCodec is used to encode/decode binary protobuf messages.
|
||||
binaryCodec codec.BinaryCodec
|
||||
// cdc is the gRPC codec used by the router to correctly unmarshal messages.
|
||||
@ -43,8 +45,9 @@ var _ gogogrpc.Server = &GRPCQueryRouter{}
|
||||
// NewGRPCQueryRouter creates a new GRPCQueryRouter
|
||||
func NewGRPCQueryRouter() *GRPCQueryRouter {
|
||||
return &GRPCQueryRouter{
|
||||
routes: map[string]GRPCQueryHandler{},
|
||||
hybridHandlers: map[string][]func(ctx context.Context, req, resp protoiface.MessageV1) error{},
|
||||
routes: map[string]GRPCQueryHandler{},
|
||||
hybridHandlers: map[string][]func(ctx context.Context, req, resp protoiface.MessageV1) error{},
|
||||
responseByRequestName: map[string]string{},
|
||||
}
|
||||
}
|
||||
|
||||
@ -133,16 +136,26 @@ func (qrt *GRPCQueryRouter) HybridHandlerByRequestName(name string) []func(ctx c
|
||||
return qrt.hybridHandlers[name]
|
||||
}
|
||||
|
||||
func (qrt *GRPCQueryRouter) ResponseNameByRequestName(requestName string) string {
|
||||
return qrt.responseByRequestName[requestName]
|
||||
}
|
||||
|
||||
func (qrt *GRPCQueryRouter) registerHybridHandler(sd *grpc.ServiceDesc, method grpc.MethodDesc, handler interface{}) error {
|
||||
// extract message name from method descriptor
|
||||
inputName, err := protocompat.RequestFullNameFromMethodDesc(sd, method)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
outputName, err := protocompat.ResponseFullNameFromMethodDesc(sd, method)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
methodHandler, err := protocompat.MakeHybridHandler(qrt.binaryCodec, sd, method, handler)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// map input name to output name
|
||||
qrt.responseByRequestName[string(inputName)] = string(outputName)
|
||||
qrt.hybridHandlers[string(inputName)] = append(qrt.hybridHandlers[string(inputName)], methodHandler)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -20,9 +20,6 @@ import (
|
||||
grpctypes "github.com/cosmos/cosmos-sdk/types/grpc"
|
||||
)
|
||||
|
||||
// GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp.
|
||||
func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter }
|
||||
|
||||
// RegisterGRPCServer registers gRPC services directly with the gRPC server.
|
||||
func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) {
|
||||
// Define an interceptor for all gRPC queries: this interceptor will create
|
||||
|
||||
@ -24,6 +24,9 @@ import (
|
||||
type MessageRouter interface {
|
||||
Handler(msg sdk.Msg) MsgServiceHandler
|
||||
HandlerByTypeURL(typeURL string) MsgServiceHandler
|
||||
|
||||
ResponseNameByMsgName(msgName string) string
|
||||
HybridHandlerByMsgName(msgName string) func(ctx context.Context, req, resp protoiface.MessageV1) error
|
||||
}
|
||||
|
||||
// MsgServiceRouter routes fully-qualified Msg service methods to their handler.
|
||||
@ -31,7 +34,7 @@ type MsgServiceRouter struct {
|
||||
interfaceRegistry codectypes.InterfaceRegistry
|
||||
routes map[string]MsgServiceHandler
|
||||
hybridHandlers map[string]func(ctx context.Context, req, resp protoiface.MessageV1) error
|
||||
responseByRequest map[string]string
|
||||
responseByMsgName map[string]string
|
||||
circuitBreaker CircuitBreaker
|
||||
}
|
||||
|
||||
@ -42,7 +45,7 @@ func NewMsgServiceRouter() *MsgServiceRouter {
|
||||
return &MsgServiceRouter{
|
||||
routes: map[string]MsgServiceHandler{},
|
||||
hybridHandlers: map[string]func(ctx context.Context, req, resp protoiface.MessageV1) error{},
|
||||
responseByRequest: map[string]string{},
|
||||
responseByMsgName: map[string]string{},
|
||||
circuitBreaker: nil,
|
||||
}
|
||||
}
|
||||
@ -90,8 +93,8 @@ func (msr *MsgServiceRouter) HybridHandlerByMsgName(msgName string) func(ctx con
|
||||
return msr.hybridHandlers[msgName]
|
||||
}
|
||||
|
||||
func (msr *MsgServiceRouter) ResponseNameByRequestName(msgName string) string {
|
||||
return msr.responseByRequest[msgName]
|
||||
func (msr *MsgServiceRouter) ResponseNameByMsgName(msgName string) string {
|
||||
return msr.responseByMsgName[msgName]
|
||||
}
|
||||
|
||||
func (msr *MsgServiceRouter) registerHybridHandler(sd *grpc.ServiceDesc, method grpc.MethodDesc, handler interface{}) error {
|
||||
@ -109,7 +112,7 @@ func (msr *MsgServiceRouter) registerHybridHandler(sd *grpc.ServiceDesc, method
|
||||
return err
|
||||
}
|
||||
// map input name to output name
|
||||
msr.responseByRequest[string(inputName)] = string(outputName)
|
||||
msr.responseByMsgName[string(inputName)] = string(outputName)
|
||||
// if circuit breaker is not nil, then we decorate the hybrid handler with the circuit breaker
|
||||
if msr.circuitBreaker == nil {
|
||||
msr.hybridHandlers[string(inputName)] = hybridHandler
|
||||
|
||||
@ -385,3 +385,13 @@ func (app *BaseApp) SetStoreMetrics(gatherer metrics.StoreMetrics) {
|
||||
func (app *BaseApp) SetStreamingManager(manager storetypes.StreamingManager) {
|
||||
app.streamingManager = manager
|
||||
}
|
||||
|
||||
// SetMsgServiceRouter sets the MsgServiceRouter of a BaseApp.
|
||||
func (app *BaseApp) SetMsgServiceRouter(msgServiceRouter *MsgServiceRouter) {
|
||||
app.msgServiceRouter = msgServiceRouter
|
||||
}
|
||||
|
||||
// SetGRPCQueryRouter sets the GRPCQueryRouter of the BaseApp.
|
||||
func (app *BaseApp) SetGRPCQueryRouter(grpcQueryRouter *GRPCQueryRouter) {
|
||||
app.grpcQueryRouter = grpcQueryRouter
|
||||
}
|
||||
|
||||
@ -47,7 +47,7 @@ func (s *IntegrationTestSuite) SetupSuite() {
|
||||
s.testClient = testdata.NewQueryClient(queryHelper)
|
||||
|
||||
kvs := runtime.NewKVStoreService(keys[countertypes.StoreKey])
|
||||
counterKeeper := counterkeeper.NewKeeper(kvs, runtime.EventService{})
|
||||
counterKeeper := counterkeeper.NewKeeper(runtime.NewEnvironment(kvs, logger))
|
||||
countertypes.RegisterQueryServer(queryHelper, counterKeeper)
|
||||
s.counterClient = countertypes.NewQueryClient(queryHelper)
|
||||
}
|
||||
|
||||
@ -42,7 +42,9 @@ Ref: https://keepachangelog.com/en/1.0.0/
|
||||
* [#18457](https://github.com/cosmos/cosmos-sdk/pull/18457) Add branch.ExecuteWithGasLimit.
|
||||
* [#19041](https://github.com/cosmos/cosmos-sdk/pull/19041) Add `appmodule.Environment` interface to fetch different services
|
||||
* [#19370](https://github.com/cosmos/cosmos-sdk/pull/19370) Add `appmodule.Migrations` interface to handle migrations
|
||||
* [#19617](https://github.com/cosmos/cosmos-sdk/pull/19617) Add DataBaseService to store non-consensus data in a database
|
||||
* [#19571](https://github.com/cosmos/cosmos-sdk/pull/19571) Add `router.Service` and add it in `appmodule.Environment`
|
||||
* [#19617](https://github.com/cosmos/cosmos-sdk/pull/19617) Server/v2 compatible interface:
|
||||
* Add DataBaseService to store non-consensus data in a database
|
||||
* Create V2 appmodule with v2 api for runtime/v2
|
||||
* Introduce `Transaction.Tx` for use in runtime/v2
|
||||
* Introduce `HasUpdateValidators` interface and `ValidatorUpdate` struct for validator updates
|
||||
|
||||
@ -5,18 +5,21 @@ import (
|
||||
"cosmossdk.io/core/event"
|
||||
"cosmossdk.io/core/gas"
|
||||
"cosmossdk.io/core/header"
|
||||
"cosmossdk.io/core/router"
|
||||
"cosmossdk.io/core/store"
|
||||
"cosmossdk.io/log"
|
||||
)
|
||||
|
||||
// Environment is used to get all services to their respective module
|
||||
type Environment struct {
|
||||
BranchService branch.Service
|
||||
EventService event.Service
|
||||
GasService gas.Service
|
||||
HeaderService header.Service
|
||||
Logger log.Logger
|
||||
|
||||
BranchService branch.Service
|
||||
EventService event.Service
|
||||
GasService gas.Service
|
||||
HeaderService header.Service
|
||||
RouterService router.Service
|
||||
|
||||
KVStoreService store.KVStoreService
|
||||
MemStoreService store.MemoryStoreService
|
||||
DataBaseService store.DatabaseService
|
||||
Logger log.Logger
|
||||
}
|
||||
|
||||
24
core/router/service.go
Normal file
24
core/router/service.go
Normal file
@ -0,0 +1,24 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"google.golang.org/protobuf/runtime/protoiface"
|
||||
)
|
||||
|
||||
// Service embeds a QueryRouterService and MessageRouterService.
|
||||
// Each router allows to invoke messages and queries via the corresponding router.
|
||||
type Service interface {
|
||||
QueryRouterService() Router
|
||||
MessageRouterService() Router
|
||||
}
|
||||
|
||||
// Router is the interface that wraps the basic methods for a router.
|
||||
type Router interface {
|
||||
// CanInvoke returns an error if the given request cannot be invoked.
|
||||
CanInvoke(ctx context.Context, req protoiface.MessageV1) error
|
||||
// InvokeTyped execute a message or query. It should be used when the called knows the type of the response.
|
||||
InvokeTyped(ctx context.Context, req, res protoiface.MessageV1) error
|
||||
// InvokeUntyped execute a message or query. It should be used when the called doesn't know the type of the response.
|
||||
InvokeUntyped(ctx context.Context, req protoiface.MessageV1) (res protoiface.MessageV1, err error)
|
||||
}
|
||||
@ -49,6 +49,7 @@ type App struct {
|
||||
amino *codec.LegacyAmino
|
||||
baseAppOptions []BaseAppOption
|
||||
msgServiceRouter *baseapp.MsgServiceRouter
|
||||
grpcQueryRouter *baseapp.GRPCQueryRouter
|
||||
appConfig *appv1alpha1.Config
|
||||
logger log.Logger
|
||||
// initChainer is the init chainer function defined by the app config.
|
||||
|
||||
@ -31,6 +31,7 @@ func (a *AppBuilder) Build(db dbm.DB, traceStore io.Writer, baseAppOptions ...fu
|
||||
|
||||
bApp := baseapp.NewBaseApp(a.app.config.AppName, a.app.logger, db, nil, baseAppOptions...)
|
||||
bApp.SetMsgServiceRouter(a.app.msgServiceRouter)
|
||||
bApp.SetGRPCQueryRouter(a.app.grpcQueryRouter)
|
||||
bApp.SetCommitMultiStoreTracer(traceStore)
|
||||
bApp.SetVersion(version.Version)
|
||||
bApp.SetInterfaceRegistry(a.app.interfaceRegistry)
|
||||
|
||||
@ -4,17 +4,44 @@ import (
|
||||
"cosmossdk.io/core/appmodule"
|
||||
"cosmossdk.io/core/store"
|
||||
"cosmossdk.io/log"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/baseapp"
|
||||
)
|
||||
|
||||
// NewEnvironment creates a new environment for the application
|
||||
// if memstoreservice is needed, it can be added to the environment: environment.MemStoreService = memstoreservice
|
||||
func NewEnvironment(kvService store.KVStoreService, logger log.Logger) appmodule.Environment {
|
||||
return appmodule.Environment{
|
||||
// For setting custom services that aren't set by default, use the EnvOption
|
||||
// Note: Depinject always provide an environment with all services (mandatory and optional)
|
||||
func NewEnvironment(
|
||||
kvService store.KVStoreService,
|
||||
logger log.Logger,
|
||||
opts ...EnvOption,
|
||||
) appmodule.Environment {
|
||||
env := appmodule.Environment{
|
||||
Logger: logger,
|
||||
EventService: EventService{},
|
||||
HeaderService: HeaderService{},
|
||||
BranchService: BranchService{},
|
||||
GasService: GasService{},
|
||||
KVStoreService: kvService,
|
||||
Logger: logger,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(&env)
|
||||
}
|
||||
|
||||
return env
|
||||
}
|
||||
|
||||
type EnvOption func(*appmodule.Environment)
|
||||
|
||||
func EnvWithRouterService(queryServiceRouter *baseapp.GRPCQueryRouter, msgServiceRouter *baseapp.MsgServiceRouter) EnvOption {
|
||||
return func(env *appmodule.Environment) {
|
||||
env.RouterService = NewRouterService(env.KVStoreService, queryServiceRouter, msgServiceRouter)
|
||||
}
|
||||
}
|
||||
|
||||
func EnvWithMemStoreService(memStoreService store.MemoryStoreService) EnvOption {
|
||||
return func(env *appmodule.Environment) {
|
||||
env.MemStoreService = memStoreService
|
||||
}
|
||||
}
|
||||
|
||||
@ -67,7 +67,6 @@ func init() {
|
||||
ProvideMemoryStoreKey,
|
||||
ProvideGenesisTxHandler,
|
||||
ProvideEnvironment,
|
||||
ProvideMemoryStoreService,
|
||||
ProvideTransientStoreService,
|
||||
ProvideModuleManager,
|
||||
ProvideAppVersionModifier,
|
||||
@ -82,6 +81,7 @@ func ProvideApp(interfaceRegistry codectypes.InterfaceRegistry) (
|
||||
*codec.LegacyAmino,
|
||||
*AppBuilder,
|
||||
*baseapp.MsgServiceRouter,
|
||||
*baseapp.GRPCQueryRouter,
|
||||
appmodule.AppModule,
|
||||
protodesc.Resolver,
|
||||
protoregistry.MessageTypeResolver,
|
||||
@ -104,16 +104,18 @@ func ProvideApp(interfaceRegistry codectypes.InterfaceRegistry) (
|
||||
|
||||
cdc := codec.NewProtoCodec(interfaceRegistry)
|
||||
msgServiceRouter := baseapp.NewMsgServiceRouter()
|
||||
grpcQueryRouter := baseapp.NewGRPCQueryRouter()
|
||||
app := &App{
|
||||
storeKeys: nil,
|
||||
interfaceRegistry: interfaceRegistry,
|
||||
cdc: cdc,
|
||||
amino: amino,
|
||||
msgServiceRouter: msgServiceRouter,
|
||||
grpcQueryRouter: grpcQueryRouter,
|
||||
}
|
||||
appBuilder := &AppBuilder{app}
|
||||
|
||||
return cdc, amino, appBuilder, msgServiceRouter, appModule{app}, protoFiles, protoTypes, nil
|
||||
return cdc, amino, appBuilder, msgServiceRouter, grpcQueryRouter, appModule{app}, protoFiles, protoTypes, nil
|
||||
}
|
||||
|
||||
type AppInputs struct {
|
||||
@ -212,15 +214,26 @@ func ProvideGenesisTxHandler(appBuilder *AppBuilder) genesis.TxHandler {
|
||||
return appBuilder.app
|
||||
}
|
||||
|
||||
func ProvideEnvironment(config *runtimev1alpha1.Module, key depinject.ModuleKey, app *AppBuilder, logger log.Logger) (store.KVStoreService, appmodule.Environment) {
|
||||
func ProvideEnvironment(
|
||||
logger log.Logger,
|
||||
config *runtimev1alpha1.Module,
|
||||
key depinject.ModuleKey,
|
||||
app *AppBuilder,
|
||||
msgServiceRouter *baseapp.MsgServiceRouter,
|
||||
queryServiceRouter *baseapp.GRPCQueryRouter,
|
||||
) (store.KVStoreService, store.MemoryStoreService, appmodule.Environment) {
|
||||
storeKey := ProvideKVStoreKey(config, key, app)
|
||||
kvService := kvStoreService{key: storeKey}
|
||||
return kvService, NewEnvironment(kvService, logger)
|
||||
}
|
||||
|
||||
func ProvideMemoryStoreService(key depinject.ModuleKey, app *AppBuilder) store.MemoryStoreService {
|
||||
storeKey := ProvideMemoryStoreKey(key, app)
|
||||
return memStoreService{key: storeKey}
|
||||
memStoreKey := ProvideMemoryStoreKey(key, app)
|
||||
memStoreService := memStoreService{key: memStoreKey}
|
||||
|
||||
return kvService, memStoreService, NewEnvironment(
|
||||
kvService,
|
||||
logger,
|
||||
EnvWithRouterService(queryServiceRouter, msgServiceRouter),
|
||||
EnvWithMemStoreService(memStoreService),
|
||||
)
|
||||
}
|
||||
|
||||
func ProvideTransientStoreService(key depinject.ModuleKey, app *AppBuilder) store.TransientStoreService {
|
||||
|
||||
164
runtime/router.go
Normal file
164
runtime/router.go
Normal file
@ -0,0 +1,164 @@
|
||||
package runtime
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
protov2 "google.golang.org/protobuf/proto"
|
||||
"google.golang.org/protobuf/runtime/protoiface"
|
||||
|
||||
"cosmossdk.io/core/router"
|
||||
"cosmossdk.io/core/store"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/baseapp"
|
||||
)
|
||||
|
||||
// NewRouterService creates a router.Service which allows to invoke messages and queries using the msg router.
|
||||
func NewRouterService(storeService store.KVStoreService, queryRouter *baseapp.GRPCQueryRouter, msgRouter baseapp.MessageRouter) router.Service {
|
||||
return &routerService{
|
||||
queryRouterService: &queryRouterService{
|
||||
storeService: storeService, // TODO: this will be used later on as authenticating modules before routing
|
||||
router: queryRouter,
|
||||
},
|
||||
msgRouterService: &msgRouterService{
|
||||
storeService: storeService, // TODO: this will be used later on as authenticating modules before routing
|
||||
router: msgRouter,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
var _ router.Service = (*routerService)(nil)
|
||||
|
||||
type routerService struct {
|
||||
queryRouterService router.Router
|
||||
msgRouterService router.Router
|
||||
}
|
||||
|
||||
// MessageRouterService implements router.Service.
|
||||
func (r *routerService) MessageRouterService() router.Router {
|
||||
return r.msgRouterService
|
||||
}
|
||||
|
||||
// QueryRouterService implements router.Service.
|
||||
func (r *routerService) QueryRouterService() router.Router {
|
||||
return r.queryRouterService
|
||||
}
|
||||
|
||||
var _ router.Router = (*msgRouterService)(nil)
|
||||
|
||||
type msgRouterService struct {
|
||||
storeService store.KVStoreService
|
||||
router baseapp.MessageRouter
|
||||
}
|
||||
|
||||
// CanInvoke returns an error if the given message cannot be invoked.
|
||||
func (m *msgRouterService) CanInvoke(ctx context.Context, msg protoiface.MessageV1) error {
|
||||
messageName := msgTypeURL(msg)
|
||||
handler := m.router.HybridHandlerByMsgName(messageName)
|
||||
if handler == nil {
|
||||
return fmt.Errorf("unknown message: %s", messageName)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// InvokeTyped execute a message and fill-in a response.
|
||||
// The response must be known and passed as a parameter.
|
||||
// Use InvokeUntyped if the response type is not known.
|
||||
func (m *msgRouterService) InvokeTyped(ctx context.Context, msg, resp protoiface.MessageV1) error {
|
||||
messageName := msgTypeURL(msg)
|
||||
handler := m.router.HybridHandlerByMsgName(messageName)
|
||||
if handler == nil {
|
||||
return fmt.Errorf("unknown message: %s", messageName)
|
||||
}
|
||||
|
||||
return handler(ctx, msg, resp)
|
||||
}
|
||||
|
||||
// InvokeUntyped execute a message and returns a response.
|
||||
func (m *msgRouterService) InvokeUntyped(ctx context.Context, msg protoiface.MessageV1) (protoiface.MessageV1, error) {
|
||||
messageName := msgTypeURL(msg)
|
||||
respName := m.router.ResponseNameByMsgName(messageName)
|
||||
if respName == "" {
|
||||
return nil, fmt.Errorf("could not find response type for message %s (%T)", messageName, msg)
|
||||
}
|
||||
|
||||
// get response type
|
||||
typ := proto.MessageType(respName)
|
||||
if typ == nil {
|
||||
return nil, fmt.Errorf("no message type found for %s", respName)
|
||||
}
|
||||
msgResp, ok := reflect.New(typ.Elem()).Interface().(protoiface.MessageV1)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("could not create response message %s", respName)
|
||||
}
|
||||
|
||||
return msgResp, m.InvokeTyped(ctx, msg, msgResp)
|
||||
}
|
||||
|
||||
var _ router.Router = (*queryRouterService)(nil)
|
||||
|
||||
type queryRouterService struct {
|
||||
storeService store.KVStoreService
|
||||
router *baseapp.GRPCQueryRouter
|
||||
}
|
||||
|
||||
// CanInvoke returns an error if the given request cannot be invoked.
|
||||
func (m *queryRouterService) CanInvoke(ctx context.Context, req protoiface.MessageV1) error {
|
||||
reqName := msgTypeURL(req)
|
||||
handlers := m.router.HybridHandlerByRequestName(reqName)
|
||||
if len(handlers) == 0 {
|
||||
return fmt.Errorf("unknown request: %s", reqName)
|
||||
} else if len(handlers) > 1 {
|
||||
return fmt.Errorf("ambiguous request, query have multiple handlers: %s", reqName)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// InvokeTyped execute a message and fill-in a response.
|
||||
// The response must be known and passed as a parameter.
|
||||
// Use InvokeUntyped if the response type is not known.
|
||||
func (m *queryRouterService) InvokeTyped(ctx context.Context, req, resp protoiface.MessageV1) error {
|
||||
reqName := msgTypeURL(req)
|
||||
handlers := m.router.HybridHandlerByRequestName(reqName)
|
||||
if len(handlers) == 0 {
|
||||
return fmt.Errorf("unknown request: %s", reqName)
|
||||
} else if len(handlers) > 1 {
|
||||
return fmt.Errorf("ambiguous request, query have multiple handlers: %s", reqName)
|
||||
}
|
||||
|
||||
return handlers[0](ctx, req, resp)
|
||||
}
|
||||
|
||||
// InvokeUntyped execute a message and returns a response.
|
||||
func (m *queryRouterService) InvokeUntyped(ctx context.Context, req protoiface.MessageV1) (protoiface.MessageV1, error) {
|
||||
reqName := msgTypeURL(req)
|
||||
respName := m.router.ResponseNameByRequestName(reqName)
|
||||
if respName == "" {
|
||||
return nil, fmt.Errorf("could not find response type for request %s (%T)", reqName, req)
|
||||
}
|
||||
|
||||
// get response type
|
||||
typ := proto.MessageType(respName)
|
||||
if typ == nil {
|
||||
return nil, fmt.Errorf("no message type found for %s", respName)
|
||||
}
|
||||
reqResp, ok := reflect.New(typ.Elem()).Interface().(protoiface.MessageV1)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("could not create response request %s", respName)
|
||||
}
|
||||
|
||||
return reqResp, m.InvokeTyped(ctx, req, reqResp)
|
||||
}
|
||||
|
||||
// msgTypeURL returns the TypeURL of a proto message.
|
||||
func msgTypeURL(msg proto.Message) string {
|
||||
if m, ok := msg.(protov2.Message); ok {
|
||||
return string(m.ProtoReflect().Descriptor().FullName())
|
||||
}
|
||||
|
||||
return proto.MessageName(msg)
|
||||
}
|
||||
110
runtime/router_test.go
Normal file
110
runtime/router_test.go
Normal file
@ -0,0 +1,110 @@
|
||||
package runtime_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
bankv1beta1 "cosmossdk.io/api/cosmos/bank/v1beta1"
|
||||
counterv1 "cosmossdk.io/api/cosmos/counter/v1"
|
||||
"cosmossdk.io/log"
|
||||
storetypes "cosmossdk.io/store/types"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/baseapp"
|
||||
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
|
||||
"github.com/cosmos/cosmos-sdk/runtime"
|
||||
"github.com/cosmos/cosmos-sdk/testutil"
|
||||
counterkeeper "github.com/cosmos/cosmos-sdk/x/counter/keeper"
|
||||
countertypes "github.com/cosmos/cosmos-sdk/x/counter/types"
|
||||
)
|
||||
|
||||
func TestRouterService(t *testing.T) {
|
||||
interfaceRegistry := codectypes.NewInterfaceRegistry()
|
||||
msgRouter := baseapp.NewMsgServiceRouter()
|
||||
msgRouter.SetInterfaceRegistry(interfaceRegistry)
|
||||
queryRouter := baseapp.NewGRPCQueryRouter()
|
||||
queryRouter.SetInterfaceRegistry(interfaceRegistry)
|
||||
key := storetypes.NewKVStoreKey(countertypes.StoreKey)
|
||||
storeService := runtime.NewKVStoreService(key)
|
||||
counterKeeper := counterkeeper.NewKeeper(runtime.NewEnvironment(storeService, log.NewNopLogger()))
|
||||
countertypes.RegisterInterfaces(interfaceRegistry)
|
||||
countertypes.RegisterMsgServer(msgRouter, counterKeeper)
|
||||
countertypes.RegisterQueryServer(queryRouter, counterKeeper)
|
||||
|
||||
routerService := runtime.NewRouterService(storeService, queryRouter, msgRouter)
|
||||
testCtx := testutil.DefaultContextWithDB(t, key, storetypes.NewTransientStoreKey("transient_test"))
|
||||
|
||||
// Messages
|
||||
|
||||
t.Run("invalid msg", func(t *testing.T) {
|
||||
_, err := routerService.MessageRouterService().InvokeUntyped(testCtx.Ctx, &bankv1beta1.MsgSend{})
|
||||
require.ErrorContains(t, err, "could not find response type for message cosmos.bank.v1beta1.MsgSend")
|
||||
})
|
||||
|
||||
t.Run("invoke untyped: valid msg (proto v1)", func(t *testing.T) {
|
||||
resp, err := routerService.MessageRouterService().InvokeUntyped(testCtx.Ctx, &countertypes.MsgIncreaseCounter{
|
||||
Signer: "cosmos1",
|
||||
Count: 42,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
})
|
||||
|
||||
t.Run("invoke typed: valid msg (proto v1)", func(t *testing.T) {
|
||||
resp := &countertypes.MsgIncreaseCountResponse{}
|
||||
err := routerService.MessageRouterService().InvokeTyped(testCtx.Ctx, &countertypes.MsgIncreaseCounter{
|
||||
Signer: "cosmos1",
|
||||
Count: 42,
|
||||
}, resp)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
})
|
||||
|
||||
t.Run("invoke typed: valid msg (proto v2)", func(t *testing.T) {
|
||||
resp := &counterv1.MsgIncreaseCountResponse{}
|
||||
err := routerService.MessageRouterService().InvokeTyped(testCtx.Ctx, &counterv1.MsgIncreaseCounter{
|
||||
Signer: "cosmos1",
|
||||
Count: 42,
|
||||
}, resp)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
})
|
||||
|
||||
// Queries
|
||||
|
||||
t.Run("invalid query", func(t *testing.T) {
|
||||
err := routerService.QueryRouterService().InvokeTyped(testCtx.Ctx, &bankv1beta1.QueryBalanceRequest{}, &bankv1beta1.QueryBalanceResponse{})
|
||||
require.ErrorContains(t, err, "unknown request: cosmos.bank.v1beta1.QueryBalanceRequest")
|
||||
})
|
||||
|
||||
t.Run("invoke typed: valid query (proto v1)", func(t *testing.T) {
|
||||
_ = counterKeeper.CountStore.Set(testCtx.Ctx, 42)
|
||||
|
||||
resp := &countertypes.QueryGetCountResponse{}
|
||||
err := routerService.QueryRouterService().InvokeTyped(testCtx.Ctx, &countertypes.QueryGetCountRequest{}, resp)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
require.Equal(t, int64(42), resp.TotalCount)
|
||||
})
|
||||
|
||||
t.Run("invoke typed: valid query (proto v2)", func(t *testing.T) {
|
||||
_ = counterKeeper.CountStore.Set(testCtx.Ctx, 42)
|
||||
|
||||
resp := &counterv1.QueryGetCountResponse{}
|
||||
err := routerService.QueryRouterService().InvokeTyped(testCtx.Ctx, &counterv1.QueryGetCountRequest{}, resp)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
require.Equal(t, int64(42), resp.TotalCount)
|
||||
})
|
||||
|
||||
t.Run("invoke untyped: valid query (proto v1)", func(t *testing.T) {
|
||||
_ = counterKeeper.CountStore.Set(testCtx.Ctx, 42)
|
||||
|
||||
resp, err := routerService.QueryRouterService().InvokeUntyped(testCtx.Ctx, &countertypes.QueryGetCountRequest{})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
respVal, ok := resp.(*countertypes.QueryGetCountResponse)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, int64(42), respVal.TotalCount)
|
||||
})
|
||||
}
|
||||
@ -75,13 +75,13 @@ func NewIntegrationApp(
|
||||
return moduleManager.EndBlock(sdkCtx)
|
||||
})
|
||||
|
||||
router := baseapp.NewMsgServiceRouter()
|
||||
router.SetInterfaceRegistry(interfaceRegistry)
|
||||
bApp.SetMsgServiceRouter(router)
|
||||
msgRouter := baseapp.NewMsgServiceRouter()
|
||||
msgRouter.SetInterfaceRegistry(interfaceRegistry)
|
||||
bApp.SetMsgServiceRouter(msgRouter)
|
||||
|
||||
if keys[consensusparamtypes.StoreKey] != nil {
|
||||
// set baseApp param store
|
||||
consensusParamsKeeper := consensusparamkeeper.NewKeeper(appCodec, runtime.NewEnvironment(runtime.NewKVStoreService(keys[consensusparamtypes.StoreKey]), log.NewNopLogger()), authtypes.NewModuleAddress("gov").String())
|
||||
consensusParamsKeeper := consensusparamkeeper.NewKeeper(appCodec, runtime.NewEnvironment(runtime.NewKVStoreService(keys[consensusparamtypes.StoreKey]), log.NewNopLogger(), runtime.EnvWithRouterService(baseapp.NewGRPCQueryRouter(), msgRouter)), authtypes.NewModuleAddress("gov").String())
|
||||
bApp.SetParamStore(consensusParamsKeeper.ParamsStore)
|
||||
|
||||
if err := bApp.LoadLatestVersion(); err != nil {
|
||||
|
||||
@ -46,7 +46,7 @@ type QueryRouter interface {
|
||||
// MsgRouter represents a router which can be used to route messages to the correct module.
|
||||
type MsgRouter interface {
|
||||
HybridHandlerByMsgName(msgName string) func(ctx context.Context, req, resp implementation.ProtoMsg) error
|
||||
ResponseNameByRequestName(name string) string
|
||||
ResponseNameByMsgName(name string) string
|
||||
}
|
||||
|
||||
// SignerProvider defines an interface used to get the expected sender from a message.
|
||||
@ -102,9 +102,9 @@ type Keeper struct {
|
||||
// deps coming from the runtime
|
||||
environment appmodule.Environment
|
||||
addressCodec address.Codec
|
||||
msgRouter MsgRouter
|
||||
msgRouter MsgRouter // todo use env
|
||||
signerProvider SignerProvider
|
||||
queryRouter QueryRouter
|
||||
queryRouter QueryRouter // todo use env
|
||||
makeSendCoinsMsg coinsTransferMsgFunc
|
||||
|
||||
accounts map[string]implementation.Implementation
|
||||
@ -344,7 +344,7 @@ func (k Keeper) sendAnyMessages(ctx context.Context, sender []byte, anyMessages
|
||||
func (k Keeper) sendModuleMessageUntyped(ctx context.Context, sender []byte, msg implementation.ProtoMsg) (implementation.ProtoMsg, error) {
|
||||
// we need to fetch the response type from the request message type.
|
||||
// this is because the response type is not known.
|
||||
respName := k.msgRouter.ResponseNameByRequestName(implementation.MessageName(msg))
|
||||
respName := k.msgRouter.ResponseNameByMsgName(implementation.MessageName(msg))
|
||||
if respName == "" {
|
||||
return nil, fmt.Errorf("could not find response type for message %T", msg)
|
||||
}
|
||||
|
||||
@ -89,6 +89,6 @@ func (m mockExec) HybridHandlerByMsgName(_ string) func(ctx context.Context, req
|
||||
}
|
||||
}
|
||||
|
||||
func (m mockExec) ResponseNameByRequestName(name string) string {
|
||||
func (m mockExec) ResponseNameByMsgName(name string) string {
|
||||
return name + "Response"
|
||||
}
|
||||
|
||||
@ -3,8 +3,6 @@ package counter
|
||||
import (
|
||||
modulev1 "cosmossdk.io/api/cosmos/counter/module/v1"
|
||||
"cosmossdk.io/core/appmodule"
|
||||
"cosmossdk.io/core/event"
|
||||
storetypes "cosmossdk.io/core/store"
|
||||
"cosmossdk.io/depinject"
|
||||
"cosmossdk.io/depinject/appconfig"
|
||||
|
||||
@ -26,9 +24,8 @@ func init() {
|
||||
type ModuleInputs struct {
|
||||
depinject.In
|
||||
|
||||
Config *modulev1.Module
|
||||
StoreService storetypes.KVStoreService
|
||||
EventManager event.Service
|
||||
Config *modulev1.Module
|
||||
Environment appmodule.Environment
|
||||
}
|
||||
|
||||
type ModuleOutputs struct {
|
||||
@ -39,7 +36,7 @@ type ModuleOutputs struct {
|
||||
}
|
||||
|
||||
func ProvideModule(in ModuleInputs) ModuleOutputs {
|
||||
k := keeper.NewKeeper(in.StoreService, in.EventManager)
|
||||
k := keeper.NewKeeper(in.Environment)
|
||||
m := NewAppModule(k)
|
||||
|
||||
return ModuleOutputs{
|
||||
|
||||
@ -9,8 +9,8 @@ import (
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"cosmossdk.io/collections"
|
||||
"cosmossdk.io/core/appmodule"
|
||||
"cosmossdk.io/core/event"
|
||||
storetypes "cosmossdk.io/core/store"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/x/counter/types"
|
||||
)
|
||||
@ -18,15 +18,15 @@ import (
|
||||
var StoreKey = "Counter"
|
||||
|
||||
type Keeper struct {
|
||||
event event.Service
|
||||
env appmodule.Environment
|
||||
|
||||
CountStore collections.Item[int64]
|
||||
}
|
||||
|
||||
func NewKeeper(storeService storetypes.KVStoreService, em event.Service) Keeper {
|
||||
sb := collections.NewSchemaBuilder(storeService)
|
||||
func NewKeeper(env appmodule.Environment) Keeper {
|
||||
sb := collections.NewSchemaBuilder(env.KVStoreService)
|
||||
return Keeper{
|
||||
event: em,
|
||||
env: env,
|
||||
CountStore: collections.NewItem(sb, collections.NewPrefix(0), "count", collections.Int64Value),
|
||||
}
|
||||
}
|
||||
@ -67,7 +67,7 @@ func (k Keeper) IncreaseCount(ctx context.Context, msg *types.MsgIncreaseCounter
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := k.event.EventManager(ctx).EmitKV(
|
||||
if err := k.env.EventService.EventManager(ctx).EmitKV(
|
||||
"increase_counter",
|
||||
event.NewAttribute("signer", msg.Signer),
|
||||
event.NewAttribute("new count", fmt.Sprint(num+msg.Count))); err != nil {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user