diff --git a/indexer/constructor.go b/indexer/constructor.go index 46dc52c..3f9eb42 100644 --- a/indexer/constructor.go +++ b/indexer/constructor.go @@ -70,12 +70,12 @@ func NewStateDiffIndexer( var driver sql.Driver switch pgc.Driver { case postgres.PGX: - driver, err = postgres.NewPGXDriver(ctx, pgc, nodeInfo) + driver, err = postgres.ConnectPGXDriver(ctx, pgc, nodeInfo) if err != nil { return nil, nil, err } case postgres.SQLX: - driver, err = postgres.NewSQLXDriver(ctx, pgc, nodeInfo) + driver, err = postgres.ConnectSQLXDriver(ctx, pgc, nodeInfo) if err != nil { return nil, nil, err } diff --git a/indexer/database/file/csv_indexer_legacy_test.go b/indexer/database/file/csv_indexer_legacy_test.go index 1bb0d46..2d6dbb6 100644 --- a/indexer/database/file/csv_indexer_legacy_test.go +++ b/indexer/database/file/csv_indexer_legacy_test.go @@ -67,7 +67,7 @@ func dumpCSVFileData(t *testing.T) { localOutputDir := filepath.Join(workingDir, file.CSVTestConfig.OutputDir) - for _, tbl := range schema.Tables { + for _, tbl := range schema.EthTables { err := test_helpers.DedupFile(file.TableFilePath(localOutputDir, tbl.Name)) require.NoError(t, err) diff --git a/indexer/database/file/csv_writer.go b/indexer/database/file/csv_writer.go index ad8a034..3764d90 100644 --- a/indexer/database/file/csv_writer.go +++ b/indexer/database/file/csv_writer.go @@ -120,7 +120,7 @@ func NewCSVWriter(path string, watchedAddressesFilePath string, diff bool) (*CSV return nil, fmt.Errorf("unable to create directory '%s': %w", path, err) } - writers, err := makeFileWriters(path, schema.Tables) + writers, err := makeFileWriters(path, schema.EthTables) if err != nil { return nil, err } diff --git a/indexer/database/sql/postgres/pgx.go b/indexer/database/sql/postgres/pgx.go index 928e776..021a28a 100644 --- a/indexer/database/sql/postgres/pgx.go +++ b/indexer/database/sql/postgres/pgx.go @@ -37,7 +37,6 @@ type PGXDriver struct { ctx context.Context pool *pgxpool.Pool nodeInfo node.Info - nodeID string config Config } @@ -50,21 +49,25 @@ func ConnectPGX(ctx context.Context, config Config) (*pgxpool.Pool, error) { return pgxpool.ConnectConfig(ctx, pgConf) } -// NewPGXDriver returns a new pgx driver +// ConnectPGXDriver returns a new pgx driver // it initializes the connection pool and creates the node info table -func NewPGXDriver(ctx context.Context, config Config, node node.Info) (*PGXDriver, error) { +func ConnectPGXDriver(ctx context.Context, config Config, node node.Info) (*PGXDriver, error) { dbPool, err := ConnectPGX(ctx, config) if err != nil { return nil, ErrDBConnectionFailed(err) } - pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node, config: config} - nodeErr := pg.createNode() + pg := NewPGXDriver(ctx, dbPool, config) + nodeErr := pg.createNode(node) if nodeErr != nil { return &PGXDriver{}, ErrUnableToSetNode(nodeErr) } return pg, nil } +func NewPGXDriver(ctx context.Context, pool *pgxpool.Pool, config Config) *PGXDriver { + return &PGXDriver{ctx: ctx, pool: pool, config: config} +} + // MakeConfig creates a pgxpool.Config from the provided Config func MakeConfig(config Config) (*pgxpool.Config, error) { conf, err := pgxpool.ParseConfig("") @@ -102,19 +105,19 @@ func MakeConfig(config Config) (*pgxpool.Config, error) { return conf, nil } -func (pgx *PGXDriver) createNode() error { +func (pgx *PGXDriver) createNode(nodeInfo node.Info) error { _, err := pgx.pool.Exec( pgx.ctx, createNodeStm, - pgx.nodeInfo.GenesisBlock, - pgx.nodeInfo.NetworkID, - pgx.nodeInfo.ID, - pgx.nodeInfo.ClientName, - pgx.nodeInfo.ChainID) + nodeInfo.GenesisBlock, + nodeInfo.NetworkID, + nodeInfo.ID, + nodeInfo.ClientName, + nodeInfo.ChainID) if err != nil { return ErrUnableToSetNode(err) } - pgx.nodeID = pgx.nodeInfo.ID + pgx.nodeInfo = nodeInfo return nil } @@ -155,7 +158,7 @@ func (pgx *PGXDriver) Stats() metrics.DbStats { // NodeID satisfies sql.Database func (pgx *PGXDriver) NodeID() string { - return pgx.nodeID + return pgx.nodeInfo.ID } // Close satisfies sql.Database/io.Closer diff --git a/indexer/database/sql/postgres/pgx_test.go b/indexer/database/sql/postgres/pgx_test.go index 7b01502..4e656e7 100644 --- a/indexer/database/sql/postgres/pgx_test.go +++ b/indexer/database/sql/postgres/pgx_test.go @@ -94,7 +94,7 @@ func TestPostgresPGX(t *testing.T) { t.Run("throws error when can't connect to the database", func(t *testing.T) { goodInfo := node.Info{GenesisBlock: "GENESIS", NetworkID: "1", ID: "x123", ClientName: "geth"} - _, err := postgres.NewPGXDriver(ctx, postgres.Config{}, goodInfo) + _, err := postgres.ConnectPGXDriver(ctx, postgres.Config{}, goodInfo) if err == nil { t.Fatal("Expected an error") } @@ -106,7 +106,7 @@ func TestPostgresPGX(t *testing.T) { badHash := fmt.Sprintf("x %s", strings.Repeat("1", 100)) badInfo := node.Info{GenesisBlock: badHash, NetworkID: "1", ID: "x123", ClientName: "geth"} - _, err := postgres.NewPGXDriver(ctx, pgConfig, badInfo) + _, err := postgres.ConnectPGXDriver(ctx, pgConfig, badInfo) if err == nil { t.Fatal("Expected an error") } diff --git a/indexer/database/sql/postgres/sqlx.go b/indexer/database/sql/postgres/sqlx.go index 0c7ea37..8f82ca6 100644 --- a/indexer/database/sql/postgres/sqlx.go +++ b/indexer/database/sql/postgres/sqlx.go @@ -34,7 +34,6 @@ type SQLXDriver struct { ctx context.Context db *sqlx.DB nodeInfo node.Info - nodeID string } // ConnectSQLX initializes and returns a SQLX connection pool for postgres @@ -53,32 +52,36 @@ func ConnectSQLX(ctx context.Context, config Config) (*sqlx.DB, error) { return db, nil } -// NewSQLXDriver returns a new sqlx driver for Postgres +// ConnectSQLXDriver returns a new sqlx driver for Postgres // it initializes the connection pool and creates the node info table -func NewSQLXDriver(ctx context.Context, config Config, node node.Info) (*SQLXDriver, error) { +func ConnectSQLXDriver(ctx context.Context, config Config, node node.Info) (*SQLXDriver, error) { db, err := ConnectSQLX(ctx, config) if err != nil { return nil, err } - driver := &SQLXDriver{ctx: ctx, db: db, nodeInfo: node} - if err := driver.createNode(); err != nil { + driver := NewSQLXDriver(ctx, db) + if err := driver.createNode(node); err != nil { return nil, err } return driver, nil } -func (driver *SQLXDriver) createNode() error { +func NewSQLXDriver(ctx context.Context, db *sqlx.DB) *SQLXDriver { + return &SQLXDriver{ctx: ctx, db: db} +} + +func (driver *SQLXDriver) createNode(nodeInfo node.Info) error { _, err := driver.db.Exec( createNodeStm, - driver.nodeInfo.GenesisBlock, - driver.nodeInfo.NetworkID, - driver.nodeInfo.ID, - driver.nodeInfo.ClientName, - driver.nodeInfo.ChainID) + nodeInfo.GenesisBlock, + nodeInfo.NetworkID, + nodeInfo.ID, + nodeInfo.ClientName, + nodeInfo.ChainID) if err != nil { return ErrUnableToSetNode(err) } - driver.nodeID = driver.nodeInfo.ID + driver.nodeInfo = nodeInfo return nil } @@ -118,7 +121,7 @@ func (driver *SQLXDriver) Stats() metrics.DbStats { // NodeID satisfies sql.Database func (driver *SQLXDriver) NodeID() string { - return driver.nodeID + return driver.nodeInfo.ID } // Close satisfies sql.Database/io.Closer diff --git a/indexer/database/sql/postgres/sqlx_test.go b/indexer/database/sql/postgres/sqlx_test.go index f9bd2bd..4f497c3 100644 --- a/indexer/database/sql/postgres/sqlx_test.go +++ b/indexer/database/sql/postgres/sqlx_test.go @@ -97,7 +97,7 @@ func TestPostgresSQLX(t *testing.T) { t.Run("throws error when can't connect to the database", func(t *testing.T) { goodInfo := node.Info{GenesisBlock: "GENESIS", NetworkID: "1", ID: "x123", ClientName: "geth"} - _, err := postgres.NewSQLXDriver(ctx, postgres.Config{}, goodInfo) + _, err := postgres.ConnectSQLXDriver(ctx, postgres.Config{}, goodInfo) if err == nil { t.Fatal("Expected an error") } @@ -109,7 +109,7 @@ func TestPostgresSQLX(t *testing.T) { badHash := fmt.Sprintf("x %s", strings.Repeat("1", 100)) badInfo := node.Info{GenesisBlock: badHash, NetworkID: "1", ID: "x123", ClientName: "geth"} - _, err := postgres.NewSQLXDriver(ctx, pgConfig, badInfo) + _, err := postgres.ConnectSQLXDriver(ctx, pgConfig, badInfo) if err == nil { t.Fatal("Expected an error") } diff --git a/indexer/database/sql/postgres/test_helpers.go b/indexer/database/sql/postgres/test_helpers.go index 453c5a2..41a3495 100644 --- a/indexer/database/sql/postgres/test_helpers.go +++ b/indexer/database/sql/postgres/test_helpers.go @@ -30,7 +30,7 @@ func SetupSQLXDB() (sql.Database, error) { return nil, err } conf.MaxIdle = 0 - driver, err := NewSQLXDriver(context.Background(), conf, node.Info{}) + driver, err := ConnectSQLXDriver(context.Background(), conf, node.Info{}) if err != nil { return nil, err } @@ -39,7 +39,7 @@ func SetupSQLXDB() (sql.Database, error) { // SetupPGXDB is used to setup a pgx db for tests func SetupPGXDB(config Config) (sql.Database, error) { - driver, err := NewPGXDriver(context.Background(), config, node.Info{}) + driver, err := ConnectPGXDriver(context.Background(), config, node.Info{}) if err != nil { return nil, err } diff --git a/indexer/ipld/encode.go b/indexer/ipld/encode.go index 8108f8e..68979be 100644 --- a/indexer/ipld/encode.go +++ b/indexer/ipld/encode.go @@ -38,8 +38,8 @@ func EncodeHeader(header *types.Header) (IPLD, error) { }, nil } -// encodeTx converts a *types.Transaction to an IPLD node -func encodeTx(tx *types.Transaction) (IPLD, error) { +// EncodeTx converts a *types.Transaction to an IPLD node +func EncodeTx(tx *types.Transaction) (IPLD, error) { txRaw, err := tx.MarshalBinary() if err != nil { return nil, err @@ -54,8 +54,8 @@ func encodeTx(tx *types.Transaction) (IPLD, error) { }, nil } -// encodeReceipt converts a types.Receipt to an IPLD node -func encodeReceipt(receipt *types.Receipt) (IPLD, error) { +// EncodeReceipt converts a types.Receipt to an IPLD node +func EncodeReceipt(receipt *types.Receipt) (IPLD, error) { rctRaw, err := receipt.MarshalBinary() if err != nil { return nil, err @@ -70,8 +70,8 @@ func encodeReceipt(receipt *types.Receipt) (IPLD, error) { }, nil } -// encodeLog converts a Log to an IPLD node -func encodeLog(log *types.Log) (IPLD, error) { +// EncodeLog converts a Log to an IPLD node +func EncodeLog(log *types.Log) (IPLD, error) { logRaw, err := rlp.EncodeToBytes(log) if err != nil { return nil, err @@ -86,7 +86,7 @@ func encodeLog(log *types.Log) (IPLD, error) { }, nil } -func encodeWithdrawal(w *types.Withdrawal) (IPLD, error) { +func EncodeWithdrawal(w *types.Withdrawal) (IPLD, error) { wRaw, err := rlp.EncodeToBytes(w) if err != nil { return nil, err diff --git a/indexer/ipld/eth_parser.go b/indexer/ipld/eth_parser.go index c247df6..fa5c118 100644 --- a/indexer/ipld/eth_parser.go +++ b/indexer/ipld/eth_parser.go @@ -44,7 +44,7 @@ func FromBlockAndReceipts(block *types.Block, receipts []*types.Receipt) ([]IPLD func processTransactions(txs []*types.Transaction) ([]IPLD, error) { var ethTxNodes []IPLD for _, tx := range txs { - ethTx, err := encodeTx(tx) + ethTx, err := EncodeTx(tx) if err != nil { return nil, err } @@ -57,7 +57,7 @@ func processTransactions(txs []*types.Transaction) ([]IPLD, error) { func processWithdrawals(withdrawals []*types.Withdrawal) ([]IPLD, error) { var withdrawalNodes []IPLD for _, withdrawal := range withdrawals { - ethW, err := encodeWithdrawal(withdrawal) + ethW, err := EncodeWithdrawal(withdrawal) if err != nil { return nil, err } @@ -80,7 +80,7 @@ func processReceiptsAndLogs(rcts []*types.Receipt) ([]IPLD, [][]IPLD, error) { return nil, nil, err } - ethRct, err := encodeReceipt(rct) + ethRct, err := EncodeReceipt(rct) if err != nil { return nil, nil, err } @@ -95,7 +95,7 @@ func processReceiptsAndLogs(rcts []*types.Receipt) ([]IPLD, [][]IPLD, error) { func processLogs(logs []*types.Log) ([]IPLD, error) { logNodes := make([]IPLD, len(logs)) for idx, log := range logs { - logNode, err := encodeLog(log) + logNode, err := EncodeLog(log) if err != nil { return nil, err } diff --git a/indexer/shared/schema/schema.go b/indexer/shared/schema/schema.go index 4db2baa..06e9d35 100644 --- a/indexer/shared/schema/schema.go +++ b/indexer/shared/schema/schema.go @@ -16,7 +16,7 @@ package schema -var Tables = []*Table{ +var EthTables = []*Table{ &TableIPLDBlock, &TableNodeInfo, &TableHeader, @@ -29,6 +29,11 @@ var Tables = []*Table{ &TableWithdrawal, } +var AllTables = append( + EthTables, + &TableWatchedAddresses, +) + var TableIPLDBlock = Table{ Name: `ipld.blocks`, Columns: []Column{ diff --git a/indexer/test_helpers/test_helpers.go b/indexer/test_helpers/test_helpers.go index 7e65154..ee71cb5 100644 --- a/indexer/test_helpers/test_helpers.go +++ b/indexer/test_helpers/test_helpers.go @@ -19,10 +19,14 @@ package test_helpers import ( "bufio" "context" + "fmt" "os" "testing" "github.com/cerc-io/plugeth-statediff/indexer/database/sql" + "github.com/cerc-io/plugeth-statediff/indexer/database/sql/postgres" + "github.com/cerc-io/plugeth-statediff/indexer/shared/schema" + "github.com/jmoiron/sqlx" ) // DedupFile removes duplicates from the given file @@ -39,9 +43,6 @@ func DedupFile(filePath string) error { s := sc.Text() stmts[s] = struct{}{} } - if err != nil { - return err - } f.Close() @@ -60,31 +61,30 @@ func DedupFile(filePath string) error { // TearDownDB is used to tear down the watcher dbs after tests func TearDownDB(t *testing.T, db sql.Database) { - ctx := context.Background() - tx, err := db.Begin(ctx) + err := ClearDB(db) if err != nil { t.Fatal(err) } +} - statements := []string{ - `TRUNCATE nodes`, - `TRUNCATE ipld.blocks`, - `TRUNCATE eth.header_cids`, - `TRUNCATE eth.uncle_cids`, - `TRUNCATE eth.transaction_cids`, - `TRUNCATE eth.receipt_cids`, - `TRUNCATE eth.state_cids`, - `TRUNCATE eth.storage_cids`, - `TRUNCATE eth.log_cids`, - `TRUNCATE eth.withdrawal_cids`, - `TRUNCATE eth_meta.watched_addresses`, +func ClearSqlxDB(sqlxdb *sqlx.DB) error { + driver := postgres.NewSQLXDriver(context.Background(), sqlxdb) + db := postgres.NewPostgresDB(driver, false) + return ClearDB(db) +} + +func ClearDB(db sql.Database) error { + ctx := context.Background() + tx, err := db.Begin(ctx) + if err != nil { + return err } - for _, stm := range statements { + + for _, tbl := range schema.AllTables { + stm := fmt.Sprintf("TRUNCATE %s", tbl.Name) if _, err = tx.Exec(ctx, stm); err != nil { - t.Fatal(err) + return fmt.Errorf("error executing `%s`: %w", stm, err) } } - if err = tx.Commit(ctx); err != nil { - t.Fatal(err) - } + return tx.Commit(ctx) } diff --git a/test_helpers/db.go b/test_helpers/db.go deleted file mode 100644 index 34f5462..0000000 --- a/test_helpers/db.go +++ /dev/null @@ -1,34 +0,0 @@ -package test_helpers - -import ( - "fmt" - - "github.com/jmoiron/sqlx" -) - -// ClearDB is used to empty the IPLD-ETH tables after tests -func ClearDB(db *sqlx.DB) error { - tx, err := db.Beginx() - if err != nil { - return err - } - statements := []string{ - `TRUNCATE nodes`, - `TRUNCATE ipld.blocks`, - `TRUNCATE eth.header_cids`, - `TRUNCATE eth.uncle_cids`, - `TRUNCATE eth.transaction_cids`, - `TRUNCATE eth.receipt_cids`, - `TRUNCATE eth.state_cids`, - `TRUNCATE eth.storage_cids`, - `TRUNCATE eth.log_cids`, - `TRUNCATE eth.withdrawal_cids`, - `TRUNCATE eth_meta.watched_addresses`, - } - for _, stm := range statements { - if _, err = tx.Exec(stm); err != nil { - return fmt.Errorf("error executing `%s`: %w", stm, err) - } - } - return tx.Commit() -}