Add test utilities for dependents #30

Merged
roysc merged 5 commits from utils-for-server-tests into main 2024-07-13 11:09:40 +00:00
13 changed files with 81 additions and 104 deletions

View File

@ -70,12 +70,12 @@ func NewStateDiffIndexer(
var driver sql.Driver var driver sql.Driver
switch pgc.Driver { switch pgc.Driver {
case postgres.PGX: case postgres.PGX:
driver, err = postgres.NewPGXDriver(ctx, pgc, nodeInfo) driver, err = postgres.ConnectPGXDriver(ctx, pgc, nodeInfo)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
case postgres.SQLX: case postgres.SQLX:
driver, err = postgres.NewSQLXDriver(ctx, pgc, nodeInfo) driver, err = postgres.ConnectSQLXDriver(ctx, pgc, nodeInfo)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -67,7 +67,7 @@ func dumpCSVFileData(t *testing.T) {
localOutputDir := filepath.Join(workingDir, file.CSVTestConfig.OutputDir) localOutputDir := filepath.Join(workingDir, file.CSVTestConfig.OutputDir)
for _, tbl := range schema.Tables { for _, tbl := range schema.EthTables {
err := test_helpers.DedupFile(file.TableFilePath(localOutputDir, tbl.Name)) err := test_helpers.DedupFile(file.TableFilePath(localOutputDir, tbl.Name))
require.NoError(t, err) require.NoError(t, err)

View File

@ -120,7 +120,7 @@ func NewCSVWriter(path string, watchedAddressesFilePath string, diff bool) (*CSV
return nil, fmt.Errorf("unable to create directory '%s': %w", path, err) return nil, fmt.Errorf("unable to create directory '%s': %w", path, err)
} }
writers, err := makeFileWriters(path, schema.Tables) writers, err := makeFileWriters(path, schema.EthTables)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -37,7 +37,6 @@ type PGXDriver struct {
ctx context.Context ctx context.Context
pool *pgxpool.Pool pool *pgxpool.Pool
nodeInfo node.Info nodeInfo node.Info
nodeID string
config Config config Config
} }
@ -50,21 +49,25 @@ func ConnectPGX(ctx context.Context, config Config) (*pgxpool.Pool, error) {
return pgxpool.ConnectConfig(ctx, pgConf) return pgxpool.ConnectConfig(ctx, pgConf)
} }
// NewPGXDriver returns a new pgx driver // ConnectPGXDriver returns a new pgx driver
// it initializes the connection pool and creates the node info table // it initializes the connection pool and creates the node info table
func NewPGXDriver(ctx context.Context, config Config, node node.Info) (*PGXDriver, error) { func ConnectPGXDriver(ctx context.Context, config Config, node node.Info) (*PGXDriver, error) {
dbPool, err := ConnectPGX(ctx, config) dbPool, err := ConnectPGX(ctx, config)
if err != nil { if err != nil {
return nil, ErrDBConnectionFailed(err) return nil, ErrDBConnectionFailed(err)
} }
pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node, config: config} pg := NewPGXDriver(ctx, dbPool, config)
nodeErr := pg.createNode() nodeErr := pg.createNode(node)
if nodeErr != nil { if nodeErr != nil {
return &PGXDriver{}, ErrUnableToSetNode(nodeErr) return &PGXDriver{}, ErrUnableToSetNode(nodeErr)
} }
return pg, nil return pg, nil
} }
func NewPGXDriver(ctx context.Context, pool *pgxpool.Pool, config Config) *PGXDriver {
return &PGXDriver{ctx: ctx, pool: pool, config: config}
}
// MakeConfig creates a pgxpool.Config from the provided Config // MakeConfig creates a pgxpool.Config from the provided Config
func MakeConfig(config Config) (*pgxpool.Config, error) { func MakeConfig(config Config) (*pgxpool.Config, error) {
conf, err := pgxpool.ParseConfig("") conf, err := pgxpool.ParseConfig("")
@ -102,19 +105,19 @@ func MakeConfig(config Config) (*pgxpool.Config, error) {
return conf, nil return conf, nil
} }
func (pgx *PGXDriver) createNode() error { func (pgx *PGXDriver) createNode(nodeInfo node.Info) error {
_, err := pgx.pool.Exec( _, err := pgx.pool.Exec(
pgx.ctx, pgx.ctx,
createNodeStm, createNodeStm,
pgx.nodeInfo.GenesisBlock, nodeInfo.GenesisBlock,
pgx.nodeInfo.NetworkID, nodeInfo.NetworkID,
pgx.nodeInfo.ID, nodeInfo.ID,
pgx.nodeInfo.ClientName, nodeInfo.ClientName,
pgx.nodeInfo.ChainID) nodeInfo.ChainID)
if err != nil { if err != nil {
return ErrUnableToSetNode(err) return ErrUnableToSetNode(err)
} }
pgx.nodeID = pgx.nodeInfo.ID pgx.nodeInfo = nodeInfo
return nil return nil
} }
@ -155,7 +158,7 @@ func (pgx *PGXDriver) Stats() metrics.DbStats {
// NodeID satisfies sql.Database // NodeID satisfies sql.Database
func (pgx *PGXDriver) NodeID() string { func (pgx *PGXDriver) NodeID() string {
return pgx.nodeID return pgx.nodeInfo.ID
} }
// Close satisfies sql.Database/io.Closer // Close satisfies sql.Database/io.Closer

View File

@ -94,7 +94,7 @@ func TestPostgresPGX(t *testing.T) {
t.Run("throws error when can't connect to the database", func(t *testing.T) { t.Run("throws error when can't connect to the database", func(t *testing.T) {
goodInfo := node.Info{GenesisBlock: "GENESIS", NetworkID: "1", ID: "x123", ClientName: "geth"} goodInfo := node.Info{GenesisBlock: "GENESIS", NetworkID: "1", ID: "x123", ClientName: "geth"}
_, err := postgres.NewPGXDriver(ctx, postgres.Config{}, goodInfo) _, err := postgres.ConnectPGXDriver(ctx, postgres.Config{}, goodInfo)
if err == nil { if err == nil {
t.Fatal("Expected an error") t.Fatal("Expected an error")
} }
@ -106,7 +106,7 @@ func TestPostgresPGX(t *testing.T) {
badHash := fmt.Sprintf("x %s", strings.Repeat("1", 100)) badHash := fmt.Sprintf("x %s", strings.Repeat("1", 100))
badInfo := node.Info{GenesisBlock: badHash, NetworkID: "1", ID: "x123", ClientName: "geth"} badInfo := node.Info{GenesisBlock: badHash, NetworkID: "1", ID: "x123", ClientName: "geth"}
_, err := postgres.NewPGXDriver(ctx, pgConfig, badInfo) _, err := postgres.ConnectPGXDriver(ctx, pgConfig, badInfo)
if err == nil { if err == nil {
t.Fatal("Expected an error") t.Fatal("Expected an error")
} }

View File

@ -34,7 +34,6 @@ type SQLXDriver struct {
ctx context.Context ctx context.Context
db *sqlx.DB db *sqlx.DB
nodeInfo node.Info nodeInfo node.Info
nodeID string
} }
// ConnectSQLX initializes and returns a SQLX connection pool for postgres // ConnectSQLX initializes and returns a SQLX connection pool for postgres
@ -53,32 +52,36 @@ func ConnectSQLX(ctx context.Context, config Config) (*sqlx.DB, error) {
return db, nil return db, nil
} }
// NewSQLXDriver returns a new sqlx driver for Postgres // ConnectSQLXDriver returns a new sqlx driver for Postgres
// it initializes the connection pool and creates the node info table // it initializes the connection pool and creates the node info table
func NewSQLXDriver(ctx context.Context, config Config, node node.Info) (*SQLXDriver, error) { func ConnectSQLXDriver(ctx context.Context, config Config, node node.Info) (*SQLXDriver, error) {
db, err := ConnectSQLX(ctx, config) db, err := ConnectSQLX(ctx, config)
if err != nil { if err != nil {
return nil, err return nil, err
} }
driver := &SQLXDriver{ctx: ctx, db: db, nodeInfo: node} driver := NewSQLXDriver(ctx, db)
if err := driver.createNode(); err != nil { if err := driver.createNode(node); err != nil {
return nil, err return nil, err
} }
return driver, nil return driver, nil
} }
func (driver *SQLXDriver) createNode() error { func NewSQLXDriver(ctx context.Context, db *sqlx.DB) *SQLXDriver {
return &SQLXDriver{ctx: ctx, db: db}
}
func (driver *SQLXDriver) createNode(nodeInfo node.Info) error {
_, err := driver.db.Exec( _, err := driver.db.Exec(
createNodeStm, createNodeStm,
driver.nodeInfo.GenesisBlock, nodeInfo.GenesisBlock,
driver.nodeInfo.NetworkID, nodeInfo.NetworkID,
driver.nodeInfo.ID, nodeInfo.ID,
driver.nodeInfo.ClientName, nodeInfo.ClientName,
driver.nodeInfo.ChainID) nodeInfo.ChainID)
if err != nil { if err != nil {
return ErrUnableToSetNode(err) return ErrUnableToSetNode(err)
} }
driver.nodeID = driver.nodeInfo.ID driver.nodeInfo = nodeInfo
return nil return nil
} }
@ -118,7 +121,7 @@ func (driver *SQLXDriver) Stats() metrics.DbStats {
// NodeID satisfies sql.Database // NodeID satisfies sql.Database
func (driver *SQLXDriver) NodeID() string { func (driver *SQLXDriver) NodeID() string {
return driver.nodeID return driver.nodeInfo.ID
} }
// Close satisfies sql.Database/io.Closer // Close satisfies sql.Database/io.Closer

View File

@ -97,7 +97,7 @@ func TestPostgresSQLX(t *testing.T) {
t.Run("throws error when can't connect to the database", func(t *testing.T) { t.Run("throws error when can't connect to the database", func(t *testing.T) {
goodInfo := node.Info{GenesisBlock: "GENESIS", NetworkID: "1", ID: "x123", ClientName: "geth"} goodInfo := node.Info{GenesisBlock: "GENESIS", NetworkID: "1", ID: "x123", ClientName: "geth"}
_, err := postgres.NewSQLXDriver(ctx, postgres.Config{}, goodInfo) _, err := postgres.ConnectSQLXDriver(ctx, postgres.Config{}, goodInfo)
if err == nil { if err == nil {
t.Fatal("Expected an error") t.Fatal("Expected an error")
} }
@ -109,7 +109,7 @@ func TestPostgresSQLX(t *testing.T) {
badHash := fmt.Sprintf("x %s", strings.Repeat("1", 100)) badHash := fmt.Sprintf("x %s", strings.Repeat("1", 100))
badInfo := node.Info{GenesisBlock: badHash, NetworkID: "1", ID: "x123", ClientName: "geth"} badInfo := node.Info{GenesisBlock: badHash, NetworkID: "1", ID: "x123", ClientName: "geth"}
_, err := postgres.NewSQLXDriver(ctx, pgConfig, badInfo) _, err := postgres.ConnectSQLXDriver(ctx, pgConfig, badInfo)
if err == nil { if err == nil {
t.Fatal("Expected an error") t.Fatal("Expected an error")
} }

View File

@ -30,7 +30,7 @@ func SetupSQLXDB() (sql.Database, error) {
return nil, err return nil, err
} }
conf.MaxIdle = 0 conf.MaxIdle = 0
driver, err := NewSQLXDriver(context.Background(), conf, node.Info{}) driver, err := ConnectSQLXDriver(context.Background(), conf, node.Info{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -39,7 +39,7 @@ func SetupSQLXDB() (sql.Database, error) {
// SetupPGXDB is used to setup a pgx db for tests // SetupPGXDB is used to setup a pgx db for tests
func SetupPGXDB(config Config) (sql.Database, error) { func SetupPGXDB(config Config) (sql.Database, error) {
driver, err := NewPGXDriver(context.Background(), config, node.Info{}) driver, err := ConnectPGXDriver(context.Background(), config, node.Info{})
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -38,8 +38,8 @@ func EncodeHeader(header *types.Header) (IPLD, error) {
}, nil }, nil
} }
// encodeTx converts a *types.Transaction to an IPLD node // EncodeTx converts a *types.Transaction to an IPLD node
func encodeTx(tx *types.Transaction) (IPLD, error) { func EncodeTx(tx *types.Transaction) (IPLD, error) {
txRaw, err := tx.MarshalBinary() txRaw, err := tx.MarshalBinary()
if err != nil { if err != nil {
return nil, err return nil, err
@ -54,8 +54,8 @@ func encodeTx(tx *types.Transaction) (IPLD, error) {
}, nil }, nil
} }
// encodeReceipt converts a types.Receipt to an IPLD node // EncodeReceipt converts a types.Receipt to an IPLD node
func encodeReceipt(receipt *types.Receipt) (IPLD, error) { func EncodeReceipt(receipt *types.Receipt) (IPLD, error) {
rctRaw, err := receipt.MarshalBinary() rctRaw, err := receipt.MarshalBinary()
if err != nil { if err != nil {
return nil, err return nil, err
@ -70,8 +70,8 @@ func encodeReceipt(receipt *types.Receipt) (IPLD, error) {
}, nil }, nil
} }
// encodeLog converts a Log to an IPLD node // EncodeLog converts a Log to an IPLD node
func encodeLog(log *types.Log) (IPLD, error) { func EncodeLog(log *types.Log) (IPLD, error) {
logRaw, err := rlp.EncodeToBytes(log) logRaw, err := rlp.EncodeToBytes(log)
if err != nil { if err != nil {
return nil, err return nil, err
@ -86,7 +86,7 @@ func encodeLog(log *types.Log) (IPLD, error) {
}, nil }, nil
} }
func encodeWithdrawal(w *types.Withdrawal) (IPLD, error) { func EncodeWithdrawal(w *types.Withdrawal) (IPLD, error) {
wRaw, err := rlp.EncodeToBytes(w) wRaw, err := rlp.EncodeToBytes(w)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -44,7 +44,7 @@ func FromBlockAndReceipts(block *types.Block, receipts []*types.Receipt) ([]IPLD
func processTransactions(txs []*types.Transaction) ([]IPLD, error) { func processTransactions(txs []*types.Transaction) ([]IPLD, error) {
var ethTxNodes []IPLD var ethTxNodes []IPLD
for _, tx := range txs { for _, tx := range txs {
ethTx, err := encodeTx(tx) ethTx, err := EncodeTx(tx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -57,7 +57,7 @@ func processTransactions(txs []*types.Transaction) ([]IPLD, error) {
func processWithdrawals(withdrawals []*types.Withdrawal) ([]IPLD, error) { func processWithdrawals(withdrawals []*types.Withdrawal) ([]IPLD, error) {
var withdrawalNodes []IPLD var withdrawalNodes []IPLD
for _, withdrawal := range withdrawals { for _, withdrawal := range withdrawals {
ethW, err := encodeWithdrawal(withdrawal) ethW, err := EncodeWithdrawal(withdrawal)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -80,7 +80,7 @@ func processReceiptsAndLogs(rcts []*types.Receipt) ([]IPLD, [][]IPLD, error) {
return nil, nil, err return nil, nil, err
} }
ethRct, err := encodeReceipt(rct) ethRct, err := EncodeReceipt(rct)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -95,7 +95,7 @@ func processReceiptsAndLogs(rcts []*types.Receipt) ([]IPLD, [][]IPLD, error) {
func processLogs(logs []*types.Log) ([]IPLD, error) { func processLogs(logs []*types.Log) ([]IPLD, error) {
logNodes := make([]IPLD, len(logs)) logNodes := make([]IPLD, len(logs))
for idx, log := range logs { for idx, log := range logs {
logNode, err := encodeLog(log) logNode, err := EncodeLog(log)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -16,7 +16,7 @@
package schema package schema
var Tables = []*Table{ var EthTables = []*Table{
&TableIPLDBlock, &TableIPLDBlock,
&TableNodeInfo, &TableNodeInfo,
&TableHeader, &TableHeader,
@ -29,6 +29,11 @@ var Tables = []*Table{
&TableWithdrawal, &TableWithdrawal,
} }
var AllTables = append(
EthTables,
&TableWatchedAddresses,
)
var TableIPLDBlock = Table{ var TableIPLDBlock = Table{
Name: `ipld.blocks`, Name: `ipld.blocks`,
Columns: []Column{ Columns: []Column{

View File

@ -19,10 +19,14 @@ package test_helpers
import ( import (
"bufio" "bufio"
"context" "context"
"fmt"
"os" "os"
"testing" "testing"
"github.com/cerc-io/plugeth-statediff/indexer/database/sql" "github.com/cerc-io/plugeth-statediff/indexer/database/sql"
"github.com/cerc-io/plugeth-statediff/indexer/database/sql/postgres"
"github.com/cerc-io/plugeth-statediff/indexer/shared/schema"
"github.com/jmoiron/sqlx"
) )
// DedupFile removes duplicates from the given file // DedupFile removes duplicates from the given file
@ -39,9 +43,6 @@ func DedupFile(filePath string) error {
s := sc.Text() s := sc.Text()
stmts[s] = struct{}{} stmts[s] = struct{}{}
} }
if err != nil {
return err
}
f.Close() f.Close()
@ -60,31 +61,30 @@ func DedupFile(filePath string) error {
// TearDownDB is used to tear down the watcher dbs after tests // TearDownDB is used to tear down the watcher dbs after tests
func TearDownDB(t *testing.T, db sql.Database) { func TearDownDB(t *testing.T, db sql.Database) {
ctx := context.Background() err := ClearDB(db)
tx, err := db.Begin(ctx)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
}
statements := []string{ func ClearSqlxDB(sqlxdb *sqlx.DB) error {
`TRUNCATE nodes`, driver := postgres.NewSQLXDriver(context.Background(), sqlxdb)
`TRUNCATE ipld.blocks`, db := postgres.NewPostgresDB(driver, false)
`TRUNCATE eth.header_cids`, return ClearDB(db)
`TRUNCATE eth.uncle_cids`, }
`TRUNCATE eth.transaction_cids`,
`TRUNCATE eth.receipt_cids`, func ClearDB(db sql.Database) error {
`TRUNCATE eth.state_cids`, ctx := context.Background()
`TRUNCATE eth.storage_cids`, tx, err := db.Begin(ctx)
`TRUNCATE eth.log_cids`, if err != nil {
`TRUNCATE eth.withdrawal_cids`, return err
`TRUNCATE eth_meta.watched_addresses`,
} }
for _, stm := range statements {
for _, tbl := range schema.AllTables {
stm := fmt.Sprintf("TRUNCATE %s", tbl.Name)
if _, err = tx.Exec(ctx, stm); err != nil { if _, err = tx.Exec(ctx, stm); err != nil {
t.Fatal(err) return fmt.Errorf("error executing `%s`: %w", stm, err)
} }
} }
if err = tx.Commit(ctx); err != nil { return tx.Commit(ctx)
t.Fatal(err)
}
} }

View File

@ -1,34 +0,0 @@
package test_helpers
import (
"fmt"
"github.com/jmoiron/sqlx"
)
// ClearDB is used to empty the IPLD-ETH tables after tests
func ClearDB(db *sqlx.DB) error {
tx, err := db.Beginx()
if err != nil {
return err
}
statements := []string{
`TRUNCATE nodes`,
`TRUNCATE ipld.blocks`,
`TRUNCATE eth.header_cids`,
`TRUNCATE eth.uncle_cids`,
`TRUNCATE eth.transaction_cids`,
`TRUNCATE eth.receipt_cids`,
`TRUNCATE eth.state_cids`,
`TRUNCATE eth.storage_cids`,
`TRUNCATE eth.log_cids`,
`TRUNCATE eth.withdrawal_cids`,
`TRUNCATE eth_meta.watched_addresses`,
}
for _, stm := range statements {
if _, err = tx.Exec(stm); err != nil {
return fmt.Errorf("error executing `%s`: %w", stm, err)
}
}
return tx.Commit()
}