diff --git a/baseapp/streaming.go b/baseapp/streaming.go index 67ba9d086d..78b19d5337 100644 --- a/baseapp/streaming.go +++ b/baseapp/streaming.go @@ -38,10 +38,11 @@ const ( // types of streaming listeners. func (app *BaseApp) EnableIndexer(indexerOpts interface{}, keys map[string]*storetypes.KVStoreKey, appModules map[string]any) error { listener, err := indexer.StartIndexing(indexer.IndexingOptions{ - Config: indexerOpts, - Resolver: decoding.ModuleSetDecoderResolver(appModules), - SyncSource: nil, - Logger: app.logger.With(log.ModuleKey, "indexer"), + Config: indexerOpts, + Resolver: decoding.ModuleSetDecoderResolver(appModules), + Logger: app.logger.With(log.ModuleKey, "indexer"), + SyncSource: nil, // TODO: Support catch-up syncs + AddressCodec: app.interfaceRegistry.SigningContext().AddressCodec(), }) if err != nil { return err diff --git a/collections/pair.go b/collections/pair.go index 955cfe3d22..bf22a6711d 100644 --- a/collections/pair.go +++ b/collections/pair.go @@ -245,17 +245,43 @@ func (p pairKeyCodec[K1, K2]) SchemaCodec() (codec.SchemaCodec[Pair[K1, K2]], er return codec.SchemaCodec[Pair[K1, K2]]{}, fmt.Errorf("error getting key2 field: %w", err) } + codec1, err := codec.KeySchemaCodec(p.keyCodec1) + if err != nil { + return codec.SchemaCodec[Pair[K1, K2]]{}, fmt.Errorf("error getting key1 schema codec: %w", err) + } + + codec2, err := codec.KeySchemaCodec(p.keyCodec2) + if err != nil { + return codec.SchemaCodec[Pair[K1, K2]]{}, fmt.Errorf("error getting key2 schema codec: %w", err) + } + return codec.SchemaCodec[Pair[K1, K2]]{ Fields: []schema.Field{field1, field2}, ToSchemaType: func(pair Pair[K1, K2]) (any, error) { - return []interface{}{pair.K1(), pair.K2()}, nil + k1, err := toKeySchemaType(codec1, pair.K1()) + if err != nil { + return nil, err + } + k2, err := toKeySchemaType(codec2, pair.K2()) + if err != nil { + return nil, err + } + return []interface{}{k1, k2}, nil }, FromSchemaType: func(a any) (Pair[K1, K2], error) { aSlice, ok := a.([]interface{}) if !ok || len(aSlice) != 2 { return Pair[K1, K2]{}, fmt.Errorf("expected slice of length 2, got %T", a) } - return Join(aSlice[0].(K1), aSlice[1].(K2)), nil + k1, err := fromKeySchemaType(codec1, aSlice[0]) + if err != nil { + return Pair[K1, K2]{}, err + } + k2, err := fromKeySchemaType(codec2, aSlice[1]) + if err != nil { + return Pair[K1, K2]{}, err + } + return Join(k1, k2), nil }, }, nil } @@ -273,6 +299,25 @@ func getNamedKeyField[T any](keyCdc codec.KeyCodec[T], name string) (schema.Fiel return field, nil } +func toKeySchemaType[T any](cdc codec.SchemaCodec[T], key T) (any, error) { + if cdc.ToSchemaType != nil { + return cdc.ToSchemaType(key) + } + return key, nil +} + +func fromKeySchemaType[T any](cdc codec.SchemaCodec[T], key any) (T, error) { + if cdc.FromSchemaType != nil { + return cdc.FromSchemaType(key) + } + tKey, ok := key.(T) + if !ok { + var zero T + return zero, fmt.Errorf("expected type %T, got %T", zero, key) + } + return tKey, nil +} + // NewPrefixUntilPairRange defines a collection query which ranges until the provided Pair prefix. // Unstable: this API might change in the future. func NewPrefixUntilPairRange[K1, K2 any](prefix K1) *PairRange[K1, K2] { diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index 1182924a4f..02cf66dbf3 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -151,9 +151,11 @@ func New[T transaction.Tx]( var listener *appdata.Listener if indexerCfg := srv.config.AppTomlConfig.Indexer; len(indexerCfg.Target) > 0 { indexingTarget, err := indexer.StartIndexing(indexer.IndexingOptions{ - Config: indexerCfg, - Resolver: decoderResolver, - Logger: logger.With(log.ModuleKey, "indexer"), + Config: indexerCfg, + Resolver: decoderResolver, + Logger: logger.With(log.ModuleKey, "indexer"), + SyncSource: nil, // TODO: Support catch-up syncs + AddressCodec: appCodecs.AppCodec.InterfaceRegistry().SigningContext().AddressCodec(), }) if err != nil { return nil, fmt.Errorf("failed to start indexing: %w", err) diff --git a/types/collections.go b/types/collections.go index 18ed47ef45..f132112659 100644 --- a/types/collections.go +++ b/types/collections.go @@ -9,6 +9,7 @@ import ( "cosmossdk.io/collections" collcodec "cosmossdk.io/collections/codec" "cosmossdk.io/math" + "cosmossdk.io/schema" ) var ( @@ -120,6 +121,28 @@ func (a genericAddressKey[T]) SizeNonTerminal(key T) int { return collections.BytesKey.SizeNonTerminal(key) } +func (a genericAddressKey[T]) SchemaCodec() (collcodec.SchemaCodec[T], error) { + return collcodec.SchemaCodec[T]{ + Fields: []schema.Field{{Kind: schema.AddressKind}}, + ToSchemaType: func(t T) (any, error) { + if len(t) == 0 { + return nil, fmt.Errorf("invalid empty address") + } + return t, nil + }, + FromSchemaType: func(s any) (T, error) { + addr, ok := s.([]byte) + if !ok { + return nil, fmt.Errorf("expected []byte, got %T", s) + } + if len(addr) == 0 { + return nil, fmt.Errorf("invalid empty address") + } + return T(addr), nil + }, + }, nil +} + // Deprecated: lengthPrefixedAddressKey is a special key codec used to retain state backwards compatibility // when a generic address key (be: AccAddress, ValAddress, ConsAddress), is used as an index key. // More docs can be found in the LengthPrefixedAddressKey function. @@ -222,6 +245,26 @@ func (i intValueCodec) ValueType() string { return Int } +func (i intValueCodec) SchemaCodec() (collcodec.SchemaCodec[math.Int], error) { + return collcodec.SchemaCodec[math.Int]{ + Fields: []schema.Field{{Kind: schema.IntegerKind}}, + ToSchemaType: func(t math.Int) (any, error) { + return t.String(), nil + }, + FromSchemaType: func(s any) (math.Int, error) { + sz, ok := s.(string) + if !ok { + return math.Int{}, fmt.Errorf("expected string, got %T", s) + } + t, ok := math.NewIntFromString(sz) + if !ok { + return math.Int{}, fmt.Errorf("failed to parse Int from string: %s", sz) + } + return t, nil + }, + }, nil +} + type uintValueCodec struct{} func (i uintValueCodec) Encode(value math.Uint) ([]byte, error) {