ipld-eth-beacon-indexer/pkg/database/sql/postgres/pgx.go
Abdul Rabbani 24fc6358d6
Testing for Batch Processing (#56)
* Set starting slot and improve error gap capturing

* Set starting slot and improve error gap capturing

* Tests + Significant Refactor

The code for historical processing has been significantly refactored to use a context to signify a shutdown.

There have also been many tests added for historical and knownGaps processing.

* Update MhKeys in test

* Update correct values

* Update Max Retry

Genesis is not working as expected.

* Ensure we release locks properly

* Add ordered testing

* Include system tests

* Update workflow calls

* Add secrets

* Add required secrets

* update path

* Try using the absolute path

* Remove volumes at the end.

* Update system-tests.yml

* Update system-tests.yml

* Update test err

* Update and test the shutdown

* rename ethcl --> eth-beacon

* Try forcing /bin/bash for docker-compose

* Update system-tests.yml

* Update system-tests.yml

* Update system-tests.yml

* Update system-tests.yml

* Update system-tests.yml

* Update system-tests.yml

* Use single quote cron

* Dont run generic on schedule
2022-06-09 17:32:46 -04:00

206 lines
5.5 KiB
Go

// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package postgres
import (
"context"
"time"
"github.com/georgysavva/scany/pgxscan"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/database/sql"
)
// pgxDriver driver, implements sql.Driver
type pgxDriver struct {
ctx context.Context
pool *pgxpool.Pool
}
// newPGXDriver returns a new pgx driver.
// It initializes the connection pool.
func newPGXDriver(ctx context.Context, config Config) (*pgxDriver, error) {
pgConf, err := makeConfig(config)
if err != nil {
return nil, err
}
dbPool, err := pgxpool.ConnectConfig(ctx, pgConf)
if err != nil {
return nil, sql.ErrDBConnectionFailed(err)
}
pg := &pgxDriver{ctx: ctx, pool: dbPool}
return pg, nil
}
// makeConfig creates a pgxpool.Config from the provided Config
func makeConfig(config Config) (*pgxpool.Config, error) {
conf, err := pgxpool.ParseConfig("")
if err != nil {
return nil, err
}
//conf.ConnConfig.BuildStatementCache = nil
conf.ConnConfig.Config.Host = config.Hostname
conf.ConnConfig.Config.Port = uint16(config.Port)
conf.ConnConfig.Config.Database = config.DatabaseName
conf.ConnConfig.Config.User = config.Username
conf.ConnConfig.Config.Password = config.Password
if config.ConnTimeout != 0 {
conf.ConnConfig.Config.ConnectTimeout = config.ConnTimeout
}
if config.MaxConns != 0 {
conf.MaxConns = int32(config.MaxConns)
}
if config.MinConns != 0 {
conf.MinConns = int32(config.MinConns)
}
if config.MaxConnLifetime != 0 {
conf.MaxConnLifetime = config.MaxConnLifetime
}
if config.MaxConnIdleTime != 0 {
conf.MaxConnIdleTime = config.MaxConnIdleTime
}
return conf, nil
}
// QueryRow satisfies sql.Database
func (pgx *pgxDriver) QueryRow(ctx context.Context, sql string, args ...interface{}) sql.ScannableRow {
return pgx.pool.QueryRow(ctx, sql, args...)
}
// Exec satisfies sql.Database
func (pgx *pgxDriver) Exec(ctx context.Context, sql string, args ...interface{}) (sql.Result, error) {
res, err := pgx.pool.Exec(ctx, sql, args...)
return resultWrapper{ct: res}, err
}
// Select satisfies sql.Database
func (pgx *pgxDriver) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
return pgxscan.Select(ctx, pgx.pool, dest, query, args...)
}
// Get satisfies sql.Database
func (pgx *pgxDriver) Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
return pgxscan.Get(ctx, pgx.pool, dest, query, args...)
}
// Begin satisfies sql.Database
func (pgx *pgxDriver) Begin(ctx context.Context) (sql.Tx, error) {
tx, err := pgx.pool.Begin(ctx)
if err != nil {
return nil, err
}
return pgxTxWrapper{tx: tx}, nil
}
func (pgx *pgxDriver) Stats() sql.Stats {
stats := pgx.pool.Stat()
return pgxStatsWrapper{stats: stats}
}
// Close satisfies sql.Database/io.Closer
func (pgx *pgxDriver) Close() error {
pgx.pool.Close()
return nil
}
// Context satisfies sql.Database
func (pgx *pgxDriver) Context() context.Context {
return pgx.ctx
}
type resultWrapper struct {
ct pgconn.CommandTag
}
// RowsAffected satisfies sql.Result
func (r resultWrapper) RowsAffected() (int64, error) {
return r.ct.RowsAffected(), nil
}
type pgxStatsWrapper struct {
stats *pgxpool.Stat
}
// MaxOpen satisfies sql.Stats
func (s pgxStatsWrapper) MaxOpen() int64 {
return int64(s.stats.MaxConns())
}
// Open satisfies sql.Stats
func (s pgxStatsWrapper) Open() int64 {
return int64(s.stats.TotalConns())
}
// InUse satisfies sql.Stats
func (s pgxStatsWrapper) InUse() int64 {
return int64(s.stats.AcquiredConns())
}
// Idle satisfies sql.Stats
func (s pgxStatsWrapper) Idle() int64 {
return int64(s.stats.IdleConns())
}
// WaitCount satisfies sql.Stats
func (s pgxStatsWrapper) WaitCount() int64 {
return s.stats.EmptyAcquireCount()
}
// WaitDuration satisfies sql.Stats
func (s pgxStatsWrapper) WaitDuration() time.Duration {
return s.stats.AcquireDuration()
}
// MaxIdleClosed satisfies sql.Stats
func (s pgxStatsWrapper) MaxIdleClosed() int64 {
// this stat isn't supported by pgxpool, but we don't want to panic
return 0
}
// MaxLifetimeClosed satisfies sql.Stats
func (s pgxStatsWrapper) MaxLifetimeClosed() int64 {
return s.stats.CanceledAcquireCount()
}
type pgxTxWrapper struct {
tx pgx.Tx
}
// QueryRow satisfies sql.Tx
func (t pgxTxWrapper) QueryRow(ctx context.Context, sql string, args ...interface{}) sql.ScannableRow {
return t.tx.QueryRow(ctx, sql, args...)
}
// Exec satisfies sql.Tx
func (t pgxTxWrapper) Exec(ctx context.Context, sql string, args ...interface{}) (sql.Result, error) {
res, err := t.tx.Exec(ctx, sql, args...)
return resultWrapper{ct: res}, err
}
// Commit satisfies sql.Tx
func (t pgxTxWrapper) Commit(ctx context.Context) error {
return t.tx.Commit(ctx)
}
// Rollback satisfies sql.Tx
func (t pgxTxWrapper) Rollback(ctx context.Context) error {
return t.tx.Rollback(ctx)
}