Feature/4 db and lh connection #6
51
pkg/dbtools/sql/postgres/database.go
Normal file
51
pkg/dbtools/sql/postgres/database.go
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
package postgres
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/dbtools/sql"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ sql.Database = &DB{}
|
||||||
|
|
||||||
|
// TODO: Make NewPostgresDB accept a string and Config. IT should
|
||||||
|
// Create a driver of its own.
|
||||||
|
// This will make sure that if you want a driver, it conforms to the interface.
|
||||||
|
|
||||||
|
// NewPostgresDB returns a postgres.DB using the provided Config and driver type.
|
||||||
|
func NewPostgresDB(c Config, driverName string) (*DB, error) {
|
||||||
|
var driver *pgxDriver
|
||||||
|
|
||||||
|
driverType, err := ResolveDriverType(driverName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
driver, err = createDriver(c, driverType)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &DB{driver}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func createDriver(c Config, driverType DriverType) (*pgxDriver, error) {
|
||||||
|
switch driverType {
|
||||||
|
case PGX:
|
||||||
|
driver, err := newPGXDriver(context.Background(), c)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Error Creating Driver, err: %e", err)
|
||||||
|
}
|
||||||
|
return driver, nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("Can't find a driver to create")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// DB implements sql.Database using a configured driver and Postgres statement syntax
|
||||||
|
type DB struct {
|
||||||
|
sql.Driver
|
||||||
|
}
|
@ -11,16 +11,16 @@ import (
|
|||||||
"github.com/vulcanize/ipld-ethcl-indexer/pkg/dbtools/sql"
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/dbtools/sql"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PGXDriver driver, implements sql.Driver
|
// pgxDriver driver, implements sql.Driver
|
||||||
type PGXDriver struct {
|
type pgxDriver struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
pool *pgxpool.Pool
|
pool *pgxpool.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPGXDriver returns a new pgx driver.
|
// newPGXDriver returns a new pgx driver.
|
||||||
// It initializes the connection pool.
|
// It initializes the connection pool.
|
||||||
func NewPGXDriver(ctx context.Context, config Config) (*PGXDriver, error) {
|
func newPGXDriver(ctx context.Context, config Config) (*pgxDriver, error) {
|
||||||
pgConf, err := MakeConfig(config)
|
pgConf, err := makeConfig(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -28,12 +28,12 @@ func NewPGXDriver(ctx context.Context, config Config) (*PGXDriver, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, sql.ErrDBConnectionFailed(err)
|
return nil, sql.ErrDBConnectionFailed(err)
|
||||||
}
|
}
|
||||||
pg := &PGXDriver{ctx: ctx, pool: dbPool}
|
pg := &pgxDriver{ctx: ctx, pool: dbPool}
|
||||||
return pg, nil
|
return pg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// MakeConfig creates a pgxpool.Config from the provided Config
|
// makeConfig creates a pgxpool.Config from the provided Config
|
||||||
func MakeConfig(config Config) (*pgxpool.Config, error) {
|
func makeConfig(config Config) (*pgxpool.Config, error) {
|
||||||
conf, err := pgxpool.ParseConfig("")
|
conf, err := pgxpool.ParseConfig("")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -65,28 +65,28 @@ func MakeConfig(config Config) (*pgxpool.Config, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// QueryRow satisfies sql.Database
|
// QueryRow satisfies sql.Database
|
||||||
func (pgx *PGXDriver) QueryRow(ctx context.Context, sql string, args ...interface{}) sql.ScannableRow {
|
func (pgx *pgxDriver) QueryRow(ctx context.Context, sql string, args ...interface{}) sql.ScannableRow {
|
||||||
return pgx.pool.QueryRow(ctx, sql, args...)
|
return pgx.pool.QueryRow(ctx, sql, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exec satisfies sql.Database
|
// Exec satisfies sql.Database
|
||||||
func (pgx *PGXDriver) Exec(ctx context.Context, sql string, args ...interface{}) (sql.Result, error) {
|
func (pgx *pgxDriver) Exec(ctx context.Context, sql string, args ...interface{}) (sql.Result, error) {
|
||||||
res, err := pgx.pool.Exec(ctx, sql, args...)
|
res, err := pgx.pool.Exec(ctx, sql, args...)
|
||||||
return resultWrapper{ct: res}, err
|
return resultWrapper{ct: res}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Select satisfies sql.Database
|
// Select satisfies sql.Database
|
||||||
func (pgx *PGXDriver) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
|
func (pgx *pgxDriver) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
|
||||||
return pgxscan.Select(ctx, pgx.pool, dest, query, args...)
|
return pgxscan.Select(ctx, pgx.pool, dest, query, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get satisfies sql.Database
|
// Get satisfies sql.Database
|
||||||
func (pgx *PGXDriver) Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
|
func (pgx *pgxDriver) Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
|
||||||
return pgxscan.Get(ctx, pgx.pool, dest, query, args...)
|
return pgxscan.Get(ctx, pgx.pool, dest, query, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Begin satisfies sql.Database
|
// Begin satisfies sql.Database
|
||||||
func (pgx *PGXDriver) Begin(ctx context.Context) (sql.Tx, error) {
|
func (pgx *pgxDriver) Begin(ctx context.Context) (sql.Tx, error) {
|
||||||
tx, err := pgx.pool.Begin(ctx)
|
tx, err := pgx.pool.Begin(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -94,19 +94,19 @@ func (pgx *PGXDriver) Begin(ctx context.Context) (sql.Tx, error) {
|
|||||||
return pgxTxWrapper{tx: tx}, nil
|
return pgxTxWrapper{tx: tx}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pgx *PGXDriver) Stats() sql.Stats {
|
func (pgx *pgxDriver) Stats() sql.Stats {
|
||||||
stats := pgx.pool.Stat()
|
stats := pgx.pool.Stat()
|
||||||
return pgxStatsWrapper{stats: stats}
|
return pgxStatsWrapper{stats: stats}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close satisfies sql.Database/io.Closer
|
// Close satisfies sql.Database/io.Closer
|
||||||
func (pgx *PGXDriver) Close() error {
|
func (pgx *pgxDriver) Close() error {
|
||||||
pgx.pool.Close()
|
pgx.pool.Close()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Context satisfies sql.Database
|
// Context satisfies sql.Database
|
||||||
func (pgx *PGXDriver) Context() context.Context {
|
func (pgx *pgxDriver) Context() context.Context {
|
||||||
return pgx.ctx
|
return pgx.ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -12,7 +12,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
pgConfig, _ = MakeConfig(DefaultConfig)
|
pgConfig, _ = makeConfig(DefaultConfig)
|
||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -79,11 +79,20 @@ func TestPostgresPGX(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("throws error when can't connect to the database", func(t *testing.T) {
|
t.Run("throws error when can't connect to the database", func(t *testing.T) {
|
||||||
_, err := NewPGXDriver(ctx, Config{})
|
_, err := NewPostgresDB(Config{}, "PGX")
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatal("Expected an error")
|
t.Fatal("Expected an error")
|
||||||
}
|
}
|
||||||
|
|
||||||
expectContainsSubstring(t, err.Error(), sql.DbConnectionFailedMsg)
|
expectContainsSubstring(t, err.Error(), sql.DbConnectionFailedMsg)
|
||||||
})
|
})
|
||||||
|
t.Run("Connect to the database", func(t *testing.T) {
|
||||||
|
driver, err := NewPostgresDB(DefaultConfig, "pgx")
|
||||||
|
defer driver.Close()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Error creating the postgres driver")
|
||||||
|
}
|
||||||
|
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user