refactor(indexer/postgres)!: use schema/indexer API and hide types/methods (#21363)
This commit is contained in:
parent
fb7775ad0c
commit
e88c138760
@ -1,7 +1,7 @@
|
||||
package postgres
|
||||
|
||||
// BaseSQL is the base SQL that is always included in the schema.
|
||||
const BaseSQL = `
|
||||
// baseSQL is the base SQL that is always included in the schema.
|
||||
const baseSQL = `
|
||||
CREATE OR REPLACE FUNCTION nanos_to_timestamptz(nanos bigint) RETURNS timestamptz AS $$
|
||||
SELECT to_timestamp(nanos / 1000000000) + (nanos / 1000000000) * INTERVAL '1 microsecond'
|
||||
$$ LANGUAGE SQL IMMUTABLE;
|
||||
|
||||
@ -8,7 +8,7 @@ import (
|
||||
)
|
||||
|
||||
// createColumnDefinition writes a column definition within a CREATE TABLE statement for the field.
|
||||
func (tm *ObjectIndexer) createColumnDefinition(writer io.Writer, field schema.Field) error {
|
||||
func (tm *objectIndexer) createColumnDefinition(writer io.Writer, field schema.Field) error {
|
||||
_, err := fmt.Fprintf(writer, "%q ", field.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -110,7 +110,7 @@ func simpleColumnType(kind schema.Kind) string {
|
||||
// updatableColumnName is the name of the insertable/updatable column name for the field.
|
||||
// This is the field name in most cases, except for time columns which are stored as nanos
|
||||
// and then converted to timestamp generated columns.
|
||||
func (tm *ObjectIndexer) updatableColumnName(field schema.Field) (name string, err error) {
|
||||
func (tm *objectIndexer) updatableColumnName(field schema.Field) (name string, err error) {
|
||||
name = field.Name
|
||||
if field.Kind == schema.TimeKind {
|
||||
name = fmt.Sprintf("%s_nanos", name)
|
||||
|
||||
@ -5,8 +5,8 @@ import (
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
// DBConn is an interface that abstracts the *sql.DB, *sql.Tx and *sql.Conn types.
|
||||
type DBConn interface {
|
||||
// dbConn is an interface that abstracts the *sql.DB, *sql.Tx and *sql.Conn types.
|
||||
type dbConn interface {
|
||||
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
|
||||
PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
|
||||
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
|
||||
|
||||
@ -7,25 +7,25 @@ import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
// CreateTable creates the table for the object type.
|
||||
func (tm *ObjectIndexer) CreateTable(ctx context.Context, conn DBConn) error {
|
||||
// createTable creates the table for the object type.
|
||||
func (tm *objectIndexer) createTable(ctx context.Context, conn dbConn) error {
|
||||
buf := new(strings.Builder)
|
||||
err := tm.CreateTableSql(buf)
|
||||
err := tm.createTableSql(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sqlStr := buf.String()
|
||||
if tm.options.Logger != nil {
|
||||
tm.options.Logger(fmt.Sprintf("Creating table %s", tm.TableName()), sqlStr)
|
||||
if tm.options.logger != nil {
|
||||
tm.options.logger.Debug("Creating table %s", "table", tm.tableName(), "sql", sqlStr)
|
||||
}
|
||||
_, err = conn.ExecContext(ctx, sqlStr)
|
||||
return err
|
||||
}
|
||||
|
||||
// CreateTableSql generates a CREATE TABLE statement for the object type.
|
||||
func (tm *ObjectIndexer) CreateTableSql(writer io.Writer) error {
|
||||
_, err := fmt.Fprintf(writer, "CREATE TABLE IF NOT EXISTS %q (\n\t", tm.TableName())
|
||||
// createTableSql generates a CREATE TABLE statement for the object type.
|
||||
func (tm *objectIndexer) createTableSql(writer io.Writer) error {
|
||||
_, err := fmt.Fprintf(writer, "CREATE TABLE IF NOT EXISTS %q (\n\t", tm.tableName())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -53,7 +53,7 @@ func (tm *ObjectIndexer) CreateTableSql(writer io.Writer) error {
|
||||
}
|
||||
|
||||
// add _deleted column when we have RetainDeletions set and enabled
|
||||
if !tm.options.DisableRetainDeletions && tm.typ.RetainDeletions {
|
||||
if !tm.options.disableRetainDeletions && tm.typ.RetainDeletions {
|
||||
_, err = fmt.Fprintf(writer, "_deleted BOOLEAN NOT NULL DEFAULT FALSE,\n\t")
|
||||
if err != nil {
|
||||
return err
|
||||
@ -87,7 +87,7 @@ func (tm *ObjectIndexer) CreateTableSql(writer io.Writer) error {
|
||||
// we GRANT SELECT on the table to PUBLIC so that the table is automatically available
|
||||
// for querying using off-the-shelf tools like pg_graphql, Postgrest, Postgraphile, etc.
|
||||
// without any login permissions
|
||||
_, err = fmt.Fprintf(writer, "GRANT SELECT ON TABLE %q TO PUBLIC;", tm.TableName())
|
||||
_, err = fmt.Fprintf(writer, "GRANT SELECT ON TABLE %q TO PUBLIC;", tm.tableName())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -5,9 +5,10 @@ import (
|
||||
|
||||
"cosmossdk.io/indexer/postgres/internal/testdata"
|
||||
"cosmossdk.io/schema"
|
||||
"cosmossdk.io/schema/logutil"
|
||||
)
|
||||
|
||||
func ExampleObjectIndexer_CreateTableSql_allKinds() {
|
||||
func Example_objectIndexer_createTableSql_allKinds() {
|
||||
exampleCreateTable(testdata.AllKindsObject)
|
||||
// Output:
|
||||
// CREATE TABLE IF NOT EXISTS "test_all_kinds" (
|
||||
@ -40,7 +41,7 @@ func ExampleObjectIndexer_CreateTableSql_allKinds() {
|
||||
// GRANT SELECT ON TABLE "test_all_kinds" TO PUBLIC;
|
||||
}
|
||||
|
||||
func ExampleObjectIndexer_CreateTableSql_singleton() {
|
||||
func Example_objectIndexer_createTableSql_singleton() {
|
||||
exampleCreateTable(testdata.SingletonObject)
|
||||
// Output:
|
||||
// CREATE TABLE IF NOT EXISTS "test_singleton" (
|
||||
@ -53,7 +54,7 @@ func ExampleObjectIndexer_CreateTableSql_singleton() {
|
||||
// GRANT SELECT ON TABLE "test_singleton" TO PUBLIC;
|
||||
}
|
||||
|
||||
func ExampleObjectIndexer_CreateTableSql_vote() {
|
||||
func Example_objectIndexer_createTableSql_vote() {
|
||||
exampleCreateTable(testdata.VoteObject)
|
||||
// Output:
|
||||
// CREATE TABLE IF NOT EXISTS "test_vote" (
|
||||
@ -66,7 +67,7 @@ func ExampleObjectIndexer_CreateTableSql_vote() {
|
||||
// GRANT SELECT ON TABLE "test_vote" TO PUBLIC;
|
||||
}
|
||||
|
||||
func ExampleObjectIndexer_CreateTableSql_vote_no_retain_delete() {
|
||||
func Example_objectIndexer_createTableSql_vote_no_retain_delete() {
|
||||
exampleCreateTableOpt(testdata.VoteObject, true)
|
||||
// Output:
|
||||
// CREATE TABLE IF NOT EXISTS "test_vote" (
|
||||
@ -83,11 +84,11 @@ func exampleCreateTable(objectType schema.ObjectType) {
|
||||
}
|
||||
|
||||
func exampleCreateTableOpt(objectType schema.ObjectType, noRetainDelete bool) {
|
||||
tm := NewObjectIndexer("test", objectType, Options{
|
||||
Logger: func(msg, sql string, params ...interface{}) {},
|
||||
DisableRetainDeletions: noRetainDelete,
|
||||
tm := newObjectIndexer("test", objectType, options{
|
||||
logger: logutil.NoopLogger{},
|
||||
disableRetainDeletions: noRetainDelete,
|
||||
})
|
||||
err := tm.CreateTableSql(os.Stdout)
|
||||
err := tm.createTableSql(os.Stdout)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
@ -10,8 +10,8 @@ import (
|
||||
"cosmossdk.io/schema"
|
||||
)
|
||||
|
||||
// CreateEnumType creates an enum type in the database.
|
||||
func (m *ModuleIndexer) CreateEnumType(ctx context.Context, conn DBConn, enum schema.EnumType) error {
|
||||
// createEnumType creates an enum type in the database.
|
||||
func (m *moduleIndexer) createEnumType(ctx context.Context, conn dbConn, enum schema.EnumType) error {
|
||||
typeName := enumTypeName(m.moduleName, enum)
|
||||
row := conn.QueryRowContext(ctx, "SELECT 1 FROM pg_type WHERE typname = $1", typeName)
|
||||
var res interface{}
|
||||
@ -25,21 +25,21 @@ func (m *ModuleIndexer) CreateEnumType(ctx context.Context, conn DBConn, enum sc
|
||||
}
|
||||
|
||||
buf := new(strings.Builder)
|
||||
err := CreateEnumTypeSql(buf, m.moduleName, enum)
|
||||
err := createEnumTypeSql(buf, m.moduleName, enum)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sqlStr := buf.String()
|
||||
if m.options.Logger != nil {
|
||||
m.options.Logger("Creating enum type", sqlStr)
|
||||
if m.options.logger != nil {
|
||||
m.options.logger.Debug("Creating enum type", "sql", sqlStr)
|
||||
}
|
||||
_, err = conn.ExecContext(ctx, sqlStr)
|
||||
return err
|
||||
}
|
||||
|
||||
// CreateEnumTypeSql generates a CREATE TYPE statement for the enum definition.
|
||||
func CreateEnumTypeSql(writer io.Writer, moduleName string, enum schema.EnumType) error {
|
||||
// createEnumTypeSql generates a CREATE TYPE statement for the enum definition.
|
||||
func createEnumTypeSql(writer io.Writer, moduleName string, enum schema.EnumType) error {
|
||||
_, err := fmt.Fprintf(writer, "CREATE TYPE %q AS ENUM (", enumTypeName(moduleName, enum))
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@ -6,8 +6,8 @@ import (
|
||||
"cosmossdk.io/indexer/postgres/internal/testdata"
|
||||
)
|
||||
|
||||
func ExampleCreateEnumTypeSql() {
|
||||
err := CreateEnumTypeSql(os.Stdout, "test", testdata.MyEnum)
|
||||
func Example_createEnumTypeSql() {
|
||||
err := createEnumTypeSql(os.Stdout, "test", testdata.MyEnum)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
@ -3,10 +3,11 @@ package postgres
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"cosmossdk.io/schema/appdata"
|
||||
"cosmossdk.io/schema/indexer"
|
||||
"cosmossdk.io/schema/logutil"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
@ -22,9 +23,28 @@ type Config struct {
|
||||
|
||||
type SqlLogger = func(msg, sql string, params ...interface{})
|
||||
|
||||
func StartIndexer(ctx context.Context, logger SqlLogger, config Config) (appdata.Listener, error) {
|
||||
type indexerImpl struct {
|
||||
ctx context.Context
|
||||
db *sql.DB
|
||||
tx *sql.Tx
|
||||
opts options
|
||||
modules map[string]*moduleIndexer
|
||||
logger logutil.Logger
|
||||
}
|
||||
|
||||
func StartIndexer(params indexer.InitParams) (indexer.InitResult, error) {
|
||||
config, err := decodeConfig(params.Config.Config)
|
||||
if err != nil {
|
||||
return indexer.InitResult{}, err
|
||||
}
|
||||
|
||||
ctx := params.Context
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
if config.DatabaseURL == "" {
|
||||
return appdata.Listener{}, errors.New("missing database URL")
|
||||
return indexer.InitResult{}, errors.New("missing database URL")
|
||||
}
|
||||
|
||||
driver := config.DatabaseDriver
|
||||
@ -34,48 +54,51 @@ func StartIndexer(ctx context.Context, logger SqlLogger, config Config) (appdata
|
||||
|
||||
db, err := sql.Open(driver, config.DatabaseURL)
|
||||
if err != nil {
|
||||
return appdata.Listener{}, err
|
||||
return indexer.InitResult{}, err
|
||||
}
|
||||
|
||||
tx, err := db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return appdata.Listener{}, err
|
||||
return indexer.InitResult{}, err
|
||||
}
|
||||
|
||||
// commit base schema
|
||||
_, err = tx.Exec(BaseSQL)
|
||||
_, err = tx.Exec(baseSQL)
|
||||
if err != nil {
|
||||
return appdata.Listener{}, err
|
||||
return indexer.InitResult{}, err
|
||||
}
|
||||
|
||||
moduleIndexers := map[string]*ModuleIndexer{}
|
||||
opts := Options{
|
||||
DisableRetainDeletions: config.DisableRetainDeletions,
|
||||
Logger: logger,
|
||||
moduleIndexers := map[string]*moduleIndexer{}
|
||||
opts := options{
|
||||
disableRetainDeletions: config.DisableRetainDeletions,
|
||||
logger: params.Logger,
|
||||
}
|
||||
|
||||
return appdata.Listener{
|
||||
InitializeModuleData: func(data appdata.ModuleInitializationData) error {
|
||||
moduleName := data.ModuleName
|
||||
modSchema := data.Schema
|
||||
_, ok := moduleIndexers[moduleName]
|
||||
if ok {
|
||||
return fmt.Errorf("module %s already initialized", moduleName)
|
||||
}
|
||||
idx := &indexerImpl{
|
||||
ctx: ctx,
|
||||
db: db,
|
||||
tx: tx,
|
||||
opts: opts,
|
||||
modules: moduleIndexers,
|
||||
logger: params.Logger,
|
||||
}
|
||||
|
||||
mm := NewModuleIndexer(moduleName, modSchema, opts)
|
||||
moduleIndexers[moduleName] = mm
|
||||
|
||||
return mm.InitializeSchema(ctx, tx)
|
||||
},
|
||||
Commit: func(data appdata.CommitData) (completionCallback func() error, err error) {
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tx, err = db.BeginTx(ctx, nil)
|
||||
return nil, err
|
||||
},
|
||||
return indexer.InitResult{
|
||||
Listener: idx.listener(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func decodeConfig(rawConfig map[string]interface{}) (*Config, error) {
|
||||
bz, err := json.Marshal(rawConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var config Config
|
||||
err = json.Unmarshal(bz, &config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &config, nil
|
||||
}
|
||||
|
||||
38
indexer/postgres/listener.go
Normal file
38
indexer/postgres/listener.go
Normal file
@ -0,0 +1,38 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"cosmossdk.io/schema/appdata"
|
||||
)
|
||||
|
||||
func (i *indexerImpl) listener() appdata.Listener {
|
||||
return appdata.Listener{
|
||||
InitializeModuleData: func(data appdata.ModuleInitializationData) error {
|
||||
moduleName := data.ModuleName
|
||||
modSchema := data.Schema
|
||||
_, ok := i.modules[moduleName]
|
||||
if ok {
|
||||
return fmt.Errorf("module %s already initialized", moduleName)
|
||||
}
|
||||
|
||||
mm := newModuleIndexer(moduleName, modSchema, i.opts)
|
||||
i.modules[moduleName] = mm
|
||||
|
||||
return mm.initializeSchema(i.ctx, i.tx)
|
||||
},
|
||||
StartBlock: func(data appdata.StartBlockData) error {
|
||||
_, err := i.tx.Exec("INSERT INTO block (number) VALUES ($1)", data.Height)
|
||||
return err
|
||||
},
|
||||
Commit: func(data appdata.CommitData) (func() error, error) {
|
||||
err := i.tx.Commit()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
i.tx, err = i.db.BeginTx(i.ctx, nil)
|
||||
return nil, err
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -7,32 +7,32 @@ import (
|
||||
"cosmossdk.io/schema"
|
||||
)
|
||||
|
||||
// ModuleIndexer manages the tables for a module.
|
||||
type ModuleIndexer struct {
|
||||
// moduleIndexer manages the tables for a module.
|
||||
type moduleIndexer struct {
|
||||
moduleName string
|
||||
schema schema.ModuleSchema
|
||||
tables map[string]*ObjectIndexer
|
||||
tables map[string]*objectIndexer
|
||||
definedEnums map[string]schema.EnumType
|
||||
options Options
|
||||
options options
|
||||
}
|
||||
|
||||
// NewModuleIndexer creates a new ModuleIndexer for the given module schema.
|
||||
func NewModuleIndexer(moduleName string, modSchema schema.ModuleSchema, options Options) *ModuleIndexer {
|
||||
return &ModuleIndexer{
|
||||
// newModuleIndexer creates a new moduleIndexer for the given module schema.
|
||||
func newModuleIndexer(moduleName string, modSchema schema.ModuleSchema, options options) *moduleIndexer {
|
||||
return &moduleIndexer{
|
||||
moduleName: moduleName,
|
||||
schema: modSchema,
|
||||
tables: map[string]*ObjectIndexer{},
|
||||
tables: map[string]*objectIndexer{},
|
||||
definedEnums: map[string]schema.EnumType{},
|
||||
options: options,
|
||||
}
|
||||
}
|
||||
|
||||
// InitializeSchema creates tables for all object types in the module schema and creates enum types.
|
||||
func (m *ModuleIndexer) InitializeSchema(ctx context.Context, conn DBConn) error {
|
||||
// initializeSchema creates tables for all object types in the module schema and creates enum types.
|
||||
func (m *moduleIndexer) initializeSchema(ctx context.Context, conn dbConn) error {
|
||||
// create enum types
|
||||
var err error
|
||||
m.schema.EnumTypes(func(enumType schema.EnumType) bool {
|
||||
err = m.CreateEnumType(ctx, conn, enumType)
|
||||
err = m.createEnumType(ctx, conn, enumType)
|
||||
return err == nil
|
||||
})
|
||||
if err != nil {
|
||||
@ -41,9 +41,9 @@ func (m *ModuleIndexer) InitializeSchema(ctx context.Context, conn DBConn) error
|
||||
|
||||
// create tables for all object types
|
||||
m.schema.ObjectTypes(func(typ schema.ObjectType) bool {
|
||||
tm := NewObjectIndexer(m.moduleName, typ, m.options)
|
||||
tm := newObjectIndexer(m.moduleName, typ, m.options)
|
||||
m.tables[typ.Name] = tm
|
||||
err = tm.CreateTable(ctx, conn)
|
||||
err = tm.createTable(ctx, conn)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to create table for %s in module %s: %v", typ.Name, m.moduleName, err) //nolint:errorlint // using %v for go 1.12 compat
|
||||
}
|
||||
@ -52,8 +52,3 @@ func (m *ModuleIndexer) InitializeSchema(ctx context.Context, conn DBConn) error
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// ObjectIndexers returns the object indexers for the module.
|
||||
func (m *ModuleIndexer) ObjectIndexers() map[string]*ObjectIndexer {
|
||||
return m.tables
|
||||
}
|
||||
|
||||
@ -6,17 +6,17 @@ import (
|
||||
"cosmossdk.io/schema"
|
||||
)
|
||||
|
||||
// ObjectIndexer is a helper struct that generates SQL for a given object type.
|
||||
type ObjectIndexer struct {
|
||||
// objectIndexer is a helper struct that generates SQL for a given object type.
|
||||
type objectIndexer struct {
|
||||
moduleName string
|
||||
typ schema.ObjectType
|
||||
valueFields map[string]schema.Field
|
||||
allFields map[string]schema.Field
|
||||
options Options
|
||||
options options
|
||||
}
|
||||
|
||||
// NewObjectIndexer creates a new ObjectIndexer for the given object type.
|
||||
func NewObjectIndexer(moduleName string, typ schema.ObjectType, options Options) *ObjectIndexer {
|
||||
// newObjectIndexer creates a new objectIndexer for the given object type.
|
||||
func newObjectIndexer(moduleName string, typ schema.ObjectType, options options) *objectIndexer {
|
||||
allFields := make(map[string]schema.Field)
|
||||
valueFields := make(map[string]schema.Field)
|
||||
|
||||
@ -29,7 +29,7 @@ func NewObjectIndexer(moduleName string, typ schema.ObjectType, options Options)
|
||||
allFields[field.Name] = field
|
||||
}
|
||||
|
||||
return &ObjectIndexer{
|
||||
return &objectIndexer{
|
||||
moduleName: moduleName,
|
||||
typ: typ,
|
||||
allFields: allFields,
|
||||
@ -38,7 +38,7 @@ func NewObjectIndexer(moduleName string, typ schema.ObjectType, options Options)
|
||||
}
|
||||
}
|
||||
|
||||
// TableName returns the name of the table for the object type scoped to its module.
|
||||
func (tm *ObjectIndexer) TableName() string {
|
||||
// tableName returns the name of the table for the object type scoped to its module.
|
||||
func (tm *objectIndexer) tableName() string {
|
||||
return fmt.Sprintf("%s_%s", tm.moduleName, tm.typ.Name)
|
||||
}
|
||||
|
||||
@ -1,10 +1,12 @@
|
||||
package postgres
|
||||
|
||||
// Options are the options for module and object indexers.
|
||||
type Options struct {
|
||||
// DisableRetainDeletions disables retain deletions functionality even on object types that have it set.
|
||||
DisableRetainDeletions bool
|
||||
import "cosmossdk.io/schema/logutil"
|
||||
|
||||
// Logger is the logger for the indexer to use.
|
||||
Logger SqlLogger
|
||||
// options are the options for module and object indexers.
|
||||
type options struct {
|
||||
// disableRetainDeletions disables retain deletions functionality even on object types that have it set.
|
||||
disableRetainDeletions bool
|
||||
|
||||
// logger is the logger for the indexer to use. It may be nil.
|
||||
logger logutil.Logger
|
||||
}
|
||||
|
||||
26
indexer/postgres/tests/config.go
Normal file
26
indexer/postgres/tests/config.go
Normal file
@ -0,0 +1,26 @@
|
||||
package tests
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"cosmossdk.io/indexer/postgres"
|
||||
"cosmossdk.io/schema/indexer"
|
||||
)
|
||||
|
||||
func postgresConfigToIndexerConfig(cfg postgres.Config) (indexer.Config, error) {
|
||||
cfgBz, err := json.Marshal(cfg)
|
||||
if err != nil {
|
||||
return indexer.Config{}, err
|
||||
}
|
||||
|
||||
var cfgMap map[string]interface{}
|
||||
err = json.Unmarshal(cfgBz, &cfgMap)
|
||||
if err != nil {
|
||||
return indexer.Config{}, err
|
||||
}
|
||||
|
||||
return indexer.Config{
|
||||
Type: "postgres",
|
||||
Config: cfgMap,
|
||||
}, nil
|
||||
}
|
||||
@ -2,7 +2,6 @@ package tests
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
@ -16,6 +15,7 @@ import (
|
||||
"cosmossdk.io/indexer/postgres"
|
||||
"cosmossdk.io/indexer/postgres/internal/testdata"
|
||||
"cosmossdk.io/schema/appdata"
|
||||
"cosmossdk.io/schema/indexer"
|
||||
)
|
||||
|
||||
func TestInitSchema(t *testing.T) {
|
||||
@ -33,24 +33,21 @@ func testInitSchema(t *testing.T, disableRetainDeletions bool, goldenFileName st
|
||||
connectionUrl := createTestDB(t)
|
||||
|
||||
buf := &strings.Builder{}
|
||||
logger := func(msg, sql string, params ...interface{}) {
|
||||
_, err := fmt.Fprintln(buf, msg)
|
||||
require.NoError(t, err)
|
||||
_, err = fmt.Fprintln(buf, sql)
|
||||
require.NoError(t, err)
|
||||
if len(params) != 0 {
|
||||
_, err = fmt.Fprintln(buf, "Params:", params)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
_, err = fmt.Fprintln(buf)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
listener, err := postgres.StartIndexer(context.Background(), logger, postgres.Config{
|
||||
|
||||
cfg, err := postgresConfigToIndexerConfig(postgres.Config{
|
||||
DatabaseURL: connectionUrl,
|
||||
DisableRetainDeletions: disableRetainDeletions,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := postgres.StartIndexer(indexer.InitParams{
|
||||
Config: cfg,
|
||||
Context: context.Background(),
|
||||
Logger: &prettyLogger{buf},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
listener := res.Listener
|
||||
|
||||
require.NotNil(t, listener.InitializeModuleData)
|
||||
require.NoError(t, listener.InitializeModuleData(appdata.ModuleInitializationData{
|
||||
ModuleName: "test",
|
||||
|
||||
44
indexer/postgres/tests/log.go
Normal file
44
indexer/postgres/tests/log.go
Normal file
@ -0,0 +1,44 @@
|
||||
package tests
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"cosmossdk.io/schema/logutil"
|
||||
)
|
||||
|
||||
type prettyLogger struct {
|
||||
out io.Writer
|
||||
}
|
||||
|
||||
func (l prettyLogger) Info(msg string, keyVals ...interface{}) {
|
||||
l.write("INFO", msg, keyVals...)
|
||||
}
|
||||
|
||||
func (l prettyLogger) Warn(msg string, keyVals ...interface{}) {
|
||||
l.write("WARN", msg, keyVals...)
|
||||
}
|
||||
|
||||
func (l prettyLogger) Error(msg string, keyVals ...interface{}) {
|
||||
l.write("ERROR", msg, keyVals...)
|
||||
}
|
||||
|
||||
func (l prettyLogger) Debug(msg string, keyVals ...interface{}) {
|
||||
l.write("DEBUG", msg, keyVals...)
|
||||
}
|
||||
|
||||
func (l prettyLogger) write(level, msg string, keyVals ...interface{}) {
|
||||
_, err := fmt.Fprintf(l.out, "%s: %s\n", level, msg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
for i := 0; i < len(keyVals); i += 2 {
|
||||
_, err = fmt.Fprintf(l.out, " %s: %v\n", keyVals[i], keyVals[i+1])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var _ logutil.Logger = &prettyLogger{}
|
||||
28
indexer/postgres/tests/testdata/init_schema.txt
vendored
28
indexer/postgres/tests/testdata/init_schema.txt
vendored
@ -1,11 +1,10 @@
|
||||
Creating enum type
|
||||
CREATE TYPE "test_my_enum" AS ENUM ('a', 'b', 'c');
|
||||
|
||||
Creating enum type
|
||||
CREATE TYPE "test_vote_type" AS ENUM ('yes', 'no', 'abstain');
|
||||
|
||||
Creating table test_all_kinds
|
||||
CREATE TABLE IF NOT EXISTS "test_all_kinds" (
|
||||
DEBUG: Creating enum type
|
||||
sql: CREATE TYPE "test_my_enum" AS ENUM ('a', 'b', 'c');
|
||||
DEBUG: Creating enum type
|
||||
sql: CREATE TYPE "test_vote_type" AS ENUM ('yes', 'no', 'abstain');
|
||||
DEBUG: Creating table %s
|
||||
table: test_all_kinds
|
||||
sql: CREATE TABLE IF NOT EXISTS "test_all_kinds" (
|
||||
"id" BIGINT NOT NULL,
|
||||
"ts" TIMESTAMPTZ GENERATED ALWAYS AS (nanos_to_timestamptz("ts_nanos")) STORED,
|
||||
"ts_nanos" BIGINT NOT NULL,
|
||||
@ -33,9 +32,9 @@ CREATE TABLE IF NOT EXISTS "test_all_kinds" (
|
||||
PRIMARY KEY ("id", "ts_nanos")
|
||||
);
|
||||
GRANT SELECT ON TABLE "test_all_kinds" TO PUBLIC;
|
||||
|
||||
Creating table test_singleton
|
||||
CREATE TABLE IF NOT EXISTS "test_singleton" (
|
||||
DEBUG: Creating table %s
|
||||
table: test_singleton
|
||||
sql: CREATE TABLE IF NOT EXISTS "test_singleton" (
|
||||
_id INTEGER NOT NULL CHECK (_id = 1),
|
||||
"foo" TEXT NOT NULL,
|
||||
"bar" INTEGER NULL,
|
||||
@ -43,9 +42,9 @@ CREATE TABLE IF NOT EXISTS "test_singleton" (
|
||||
PRIMARY KEY (_id)
|
||||
);
|
||||
GRANT SELECT ON TABLE "test_singleton" TO PUBLIC;
|
||||
|
||||
Creating table test_vote
|
||||
CREATE TABLE IF NOT EXISTS "test_vote" (
|
||||
DEBUG: Creating table %s
|
||||
table: test_vote
|
||||
sql: CREATE TABLE IF NOT EXISTS "test_vote" (
|
||||
"proposal" BIGINT NOT NULL,
|
||||
"address" TEXT NOT NULL,
|
||||
"vote" "test_vote_type" NOT NULL,
|
||||
@ -53,4 +52,3 @@ CREATE TABLE IF NOT EXISTS "test_vote" (
|
||||
PRIMARY KEY ("proposal", "address")
|
||||
);
|
||||
GRANT SELECT ON TABLE "test_vote" TO PUBLIC;
|
||||
|
||||
|
||||
@ -1,11 +1,10 @@
|
||||
Creating enum type
|
||||
CREATE TYPE "test_my_enum" AS ENUM ('a', 'b', 'c');
|
||||
|
||||
Creating enum type
|
||||
CREATE TYPE "test_vote_type" AS ENUM ('yes', 'no', 'abstain');
|
||||
|
||||
Creating table test_all_kinds
|
||||
CREATE TABLE IF NOT EXISTS "test_all_kinds" (
|
||||
DEBUG: Creating enum type
|
||||
sql: CREATE TYPE "test_my_enum" AS ENUM ('a', 'b', 'c');
|
||||
DEBUG: Creating enum type
|
||||
sql: CREATE TYPE "test_vote_type" AS ENUM ('yes', 'no', 'abstain');
|
||||
DEBUG: Creating table %s
|
||||
table: test_all_kinds
|
||||
sql: CREATE TABLE IF NOT EXISTS "test_all_kinds" (
|
||||
"id" BIGINT NOT NULL,
|
||||
"ts" TIMESTAMPTZ GENERATED ALWAYS AS (nanos_to_timestamptz("ts_nanos")) STORED,
|
||||
"ts_nanos" BIGINT NOT NULL,
|
||||
@ -33,9 +32,9 @@ CREATE TABLE IF NOT EXISTS "test_all_kinds" (
|
||||
PRIMARY KEY ("id", "ts_nanos")
|
||||
);
|
||||
GRANT SELECT ON TABLE "test_all_kinds" TO PUBLIC;
|
||||
|
||||
Creating table test_singleton
|
||||
CREATE TABLE IF NOT EXISTS "test_singleton" (
|
||||
DEBUG: Creating table %s
|
||||
table: test_singleton
|
||||
sql: CREATE TABLE IF NOT EXISTS "test_singleton" (
|
||||
_id INTEGER NOT NULL CHECK (_id = 1),
|
||||
"foo" TEXT NOT NULL,
|
||||
"bar" INTEGER NULL,
|
||||
@ -43,13 +42,12 @@ CREATE TABLE IF NOT EXISTS "test_singleton" (
|
||||
PRIMARY KEY (_id)
|
||||
);
|
||||
GRANT SELECT ON TABLE "test_singleton" TO PUBLIC;
|
||||
|
||||
Creating table test_vote
|
||||
CREATE TABLE IF NOT EXISTS "test_vote" (
|
||||
DEBUG: Creating table %s
|
||||
table: test_vote
|
||||
sql: CREATE TABLE IF NOT EXISTS "test_vote" (
|
||||
"proposal" BIGINT NOT NULL,
|
||||
"address" TEXT NOT NULL,
|
||||
"vote" "test_vote_type" NOT NULL,
|
||||
PRIMARY KEY ("proposal", "address")
|
||||
);
|
||||
GRANT SELECT ON TABLE "test_vote" TO PUBLIC;
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user