From f102b4ef8e2c98a67dc81d291e40b00f65181e87 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Tue, 7 Mar 2023 12:24:30 -0600 Subject: [PATCH 01/16] Add COPY support for inserting multiple rows in a single command. --- cmd/geth/config.go | 3 ++ cmd/utils/flags.go | 7 +++ statediff/indexer/database/sql/interfaces.go | 6 +++ statediff/indexer/database/sql/lazy_tx.go | 49 ++++++++++++++++-- .../indexer/database/sql/postgres/config.go | 3 ++ .../indexer/database/sql/postgres/database.go | 16 ++++++ .../indexer/database/sql/postgres/pgx.go | 12 ++++- .../indexer/database/sql/postgres/sqlx.go | 11 ++++ statediff/indexer/database/sql/writer.go | 51 +++++++++++++++---- 9 files changed, 142 insertions(+), 16 deletions(-) diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 89d3ef450..36724cb17 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -248,6 +248,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { if ctx.IsSet(utils.StateDiffLogStatements.Name) { pgConfig.LogStatements = ctx.Bool(utils.StateDiffLogStatements.Name) } + if ctx.IsSet(utils.StateDiffCopyFrom.Name) { + pgConfig.CopyFrom = ctx.Bool(utils.StateDiffCopyFrom.Name) + } indexerConfig = pgConfig case shared.DUMP: dumpTypeStr := ctx.String(utils.StateDiffDBDumpDst.Name) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index f97dce8ac..7c4f957a3 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1100,6 +1100,13 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server. Usage: "Should the statediff service log all database statements? (Note: pgx only)", Value: false, } + + StateDiffCopyFrom = &cli.BoolFlag{ + Name: "statediff.db.copyfrom", + Usage: "Should the statediff service use COPY FROM for multiple inserts? (Note: pgx only)", + Value: false, + } + StateDiffWritingFlag = &cli.BoolFlag{ Name: "statediff.writing", Usage: "Activates progressive writing of state diffs to database as new block are synced", diff --git a/statediff/indexer/database/sql/interfaces.go b/statediff/indexer/database/sql/interfaces.go index 1e7278db6..7ad295c98 100644 --- a/statediff/indexer/database/sql/interfaces.go +++ b/statediff/indexer/database/sql/interfaces.go @@ -31,6 +31,7 @@ type Database interface { // Driver interface has all the methods required by a driver implementation to support the sql indexer type Driver interface { + UseCopyFrom() bool QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error @@ -50,8 +51,12 @@ type Statements interface { InsertAccessListElementStm() string InsertRctStm() string InsertLogStm() string + StateTableName() []string + StateColumnNames() []string InsertStateStm() string InsertAccountStm() string + StorageTableName() []string + StorageColumnNames() []string InsertStorageStm() string InsertIPLDStm() string InsertIPLDsStm() string @@ -62,6 +67,7 @@ type Statements interface { type Tx interface { QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) + CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) Commit(ctx context.Context) error Rollback(ctx context.Context) error } diff --git a/statediff/indexer/database/sql/lazy_tx.go b/statediff/indexer/database/sql/lazy_tx.go index 922bf84a0..55c2afad6 100644 --- a/statediff/indexer/database/sql/lazy_tx.go +++ b/statediff/indexer/database/sql/lazy_tx.go @@ -2,10 +2,12 @@ package sql import ( "context" + "github.com/ethereum/go-ethereum/log" + "reflect" ) type DelayedTx struct { - cache []cachedStmt + cache []interface{} db Database } type cachedStmt struct { @@ -13,6 +15,12 @@ type cachedStmt struct { args []interface{} } +type copyFrom struct { + tableName []string + columnNames []string + rows [][]interface{} +} + func NewDelayedTx(db Database) *DelayedTx { return &DelayedTx{db: db} } @@ -21,12 +29,32 @@ func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface return tx.db.QueryRow(ctx, sql, args...) } +func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) { + appendedToExisting := false + if len(tx.cache) > 0 { + prevCopy, ok := tx.cache[len(tx.cache)-1].(copyFrom) + if ok && reflect.DeepEqual(prevCopy.tableName, tableName) && reflect.DeepEqual(prevCopy.columnNames, columnNames) { + log.Info("statediff lazy_tx : Appending rows to COPY", "table", tableName, + "current", len(prevCopy.rows), "append", len(rows)) + prevCopy.rows = append(prevCopy.rows, rows...) + appendedToExisting = true + } + } + + if !appendedToExisting { + tx.cache = append(tx.cache, copyFrom{tableName, columnNames, rows}) + } + + return 0, nil +} + func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) { tx.cache = append(tx.cache, cachedStmt{sql, args}) return nil, nil } func (tx *DelayedTx) Commit(ctx context.Context) error { + base, err := tx.db.Begin(ctx) if err != nil { return err @@ -39,10 +67,21 @@ func (tx *DelayedTx) Commit(ctx context.Context) error { rollback(ctx, base) } }() - for _, stmt := range tx.cache { - _, err := base.Exec(ctx, stmt.sql, stmt.args...) - if err != nil { - return err + for _, item := range tx.cache { + switch item.(type) { + case copyFrom: + copy := item.(copyFrom) + log.Info("statediff lazy_tx : COPY", "table", copy.tableName, "rows", len(copy.rows)) + _, err := base.CopyFrom(ctx, copy.tableName, copy.columnNames, copy.rows) + if err != nil { + return err + } + case cachedStmt: + stmt := item.(cachedStmt) + _, err := base.Exec(ctx, stmt.sql, stmt.args...) + if err != nil { + return err + } } } tx.cache = nil diff --git a/statediff/indexer/database/sql/postgres/config.go b/statediff/indexer/database/sql/postgres/config.go index b5cdc02ab..e50e836c7 100644 --- a/statediff/indexer/database/sql/postgres/config.go +++ b/statediff/indexer/database/sql/postgres/config.go @@ -81,6 +81,9 @@ type Config struct { // toggle on/off upserts Upsert bool + + // toggle on/off CopyFrom + CopyFrom bool } // Type satisfies interfaces.Config diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index 27f89ab83..4a0b7d9e4 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -121,3 +121,19 @@ func (db *DB) InsertKnownGapsStm() string { ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ($2, $4) WHERE eth_meta.known_gaps.ending_block_number <= $2` } + +func (db *DB) StateTableName() []string { + return []string{"eth", "state_cids"} +} + +func (db *DB) StorageTableName() []string { + return []string{"eth", "storage_cids"} +} + +func (db *DB) StateColumnNames() []string { + return []string{"block_number", "header_id", "state_leaf_key", "cid", "state_path", "node_type", "diff", "mh_key"} +} + +func (db *DB) StorageColumnNames() []string { + return []string{"block_number", "header_id", "state_path", "storage_leaf_key", "cid", "storage_path", "node_type", "diff", "mh_key"} +} diff --git a/statediff/indexer/database/sql/postgres/pgx.go b/statediff/indexer/database/sql/postgres/pgx.go index 6b75559df..17a08191f 100644 --- a/statediff/indexer/database/sql/postgres/pgx.go +++ b/statediff/indexer/database/sql/postgres/pgx.go @@ -38,6 +38,7 @@ type PGXDriver struct { pool *pgxpool.Pool nodeInfo node.Info nodeID string + config Config } // NewPGXDriver returns a new pgx driver @@ -51,7 +52,7 @@ func NewPGXDriver(ctx context.Context, config Config, node node.Info) (*PGXDrive if err != nil { return nil, ErrDBConnectionFailed(err) } - pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node} + pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node, config: config} nodeErr := pg.createNode() if nodeErr != nil { return &PGXDriver{}, ErrUnableToSetNode(nodeErr) @@ -161,6 +162,11 @@ func (pgx *PGXDriver) Context() context.Context { return pgx.ctx } +// HasCopy satisfies sql.Database +func (pgx *PGXDriver) UseCopyFrom() bool { + return pgx.config.CopyFrom +} + type resultWrapper struct { ct pgconn.CommandTag } @@ -239,3 +245,7 @@ func (t pgxTxWrapper) Commit(ctx context.Context) error { func (t pgxTxWrapper) Rollback(ctx context.Context) error { return t.tx.Rollback(ctx) } + +func (t pgxTxWrapper) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) { + return t.tx.CopyFrom(ctx, tableName, columnNames, pgx.CopyFromRows(rows)) +} diff --git a/statediff/indexer/database/sql/postgres/sqlx.go b/statediff/indexer/database/sql/postgres/sqlx.go index 529e7f7c8..bc134210e 100644 --- a/statediff/indexer/database/sql/postgres/sqlx.go +++ b/statediff/indexer/database/sql/postgres/sqlx.go @@ -19,6 +19,7 @@ package postgres import ( "context" coresql "database/sql" + "errors" "time" "github.com/jmoiron/sqlx" @@ -119,6 +120,12 @@ func (driver *SQLXDriver) Context() context.Context { return driver.ctx } +// HasCopy satisfies sql.Database +func (driver *SQLXDriver) UseCopyFrom() bool { + // sqlx does not currently support COPY. + return false +} + type sqlxStatsWrapper struct { stats coresql.DBStats } @@ -186,3 +193,7 @@ func (t sqlxTxWrapper) Commit(ctx context.Context) error { func (t sqlxTxWrapper) Rollback(ctx context.Context) error { return t.tx.Rollback() } + +func (t sqlxTxWrapper) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) { + return 0, errors.New("Unsupported Operation") +} diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 36b0703dc..9e4003745 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -18,6 +18,7 @@ package sql import ( "fmt" + "strconv" "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" @@ -147,11 +148,26 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { if stateNode.StateKey != nullHash.String() { stateKey = stateNode.StateKey } - _, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(), - stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, - stateNode.MhKey) - if err != nil { - return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode} + if w.db.UseCopyFrom() { + var row []interface{} + blockNum, _ := strconv.ParseInt(stateNode.BlockNumber, 10, 64) + row = append(row, blockNum, stateNode.HeaderID, stateKey, stateNode.CID, + stateNode.Path, stateNode.NodeType, true, stateNode.MhKey) + + var rows [][]interface{} + rows = append(rows, row) + + _, err := tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(), rows) + if err != nil { + return insertError{"eth.state_cids", err, "COPY", stateNode} + } + } else { + _, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(), + stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, + stateNode.MhKey) + if err != nil { + return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode} + } } return nil } @@ -179,11 +195,26 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err if storageCID.StorageKey != nullHash.String() { storageKey = storageCID.StorageKey } - _, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(), - storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, - storageCID.NodeType, true, storageCID.MhKey) - if err != nil { - return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID} + if w.db.UseCopyFrom() { + var row []interface{} + blockNum, _ := strconv.ParseInt(storageCID.BlockNumber, 10, 64) + row = append(row, blockNum, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, + storageCID.Path, storageCID.NodeType, true, storageCID.MhKey) + + var rows [][]interface{} + rows = append(rows, row) + + _, err := tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(), rows) + if err != nil { + return insertError{"eth.state_cids", err, "COPY", storageCID} + } + } else { + _, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(), + storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, + storageCID.NodeType, true, storageCID.MhKey) + if err != nil { + return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID} + } } return nil } -- 2.45.2 From 122e5dbf4a3874be350d35dcdec2dc47d0e4b21d Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Tue, 7 Mar 2023 12:28:08 -0600 Subject: [PATCH 02/16] spaceing --- statediff/indexer/database/sql/lazy_tx.go | 1 - 1 file changed, 1 deletion(-) diff --git a/statediff/indexer/database/sql/lazy_tx.go b/statediff/indexer/database/sql/lazy_tx.go index 55c2afad6..97c3e4abc 100644 --- a/statediff/indexer/database/sql/lazy_tx.go +++ b/statediff/indexer/database/sql/lazy_tx.go @@ -54,7 +54,6 @@ func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) } func (tx *DelayedTx) Commit(ctx context.Context) error { - base, err := tx.db.Begin(ctx) if err != nil { return err -- 2.45.2 From 7ae7cf9affb5e175c6142bdb7a4740df20c8c56e Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Tue, 7 Mar 2023 12:29:14 -0600 Subject: [PATCH 03/16] gofmt/goimports --- statediff/indexer/database/sql/lazy_tx.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/statediff/indexer/database/sql/lazy_tx.go b/statediff/indexer/database/sql/lazy_tx.go index 97c3e4abc..e13300306 100644 --- a/statediff/indexer/database/sql/lazy_tx.go +++ b/statediff/indexer/database/sql/lazy_tx.go @@ -2,8 +2,9 @@ package sql import ( "context" - "github.com/ethereum/go-ethereum/log" "reflect" + + "github.com/ethereum/go-ethereum/log" ) type DelayedTx struct { -- 2.45.2 From 302d49d0c27103726118333154db3e1aabbe8ae2 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Tue, 7 Mar 2023 12:54:27 -0600 Subject: [PATCH 04/16] Add flag --- cmd/geth/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 8f4bb6727..b75a53dce 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -175,6 +175,7 @@ var ( utils.StateDiffWatchedAddressesFilePath, utils.StateDiffUpsert, utils.StateDiffLogStatements, + utils.StateDiffCopyFrom, configFileFlag, }, utils.NetworkFlags, utils.DatabasePathFlags) -- 2.45.2 From aff8c27bb36f8b87972eaeb6561d3cae955424eb Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Tue, 7 Mar 2023 14:50:50 -0600 Subject: [PATCH 05/16] Allow combining non-sequential COPYs. --- statediff/indexer/database/sql/lazy_tx.go | 40 +++++++++++++++-------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/statediff/indexer/database/sql/lazy_tx.go b/statediff/indexer/database/sql/lazy_tx.go index e13300306..a8eacefcd 100644 --- a/statediff/indexer/database/sql/lazy_tx.go +++ b/statediff/indexer/database/sql/lazy_tx.go @@ -7,6 +7,9 @@ import ( "github.com/ethereum/go-ethereum/log" ) +// Changing this to 1 would make sure only sequential COPYs were combined. +const COPY_FROM_CHECK_LIMIT = 100 + type DelayedTx struct { cache []interface{} db Database @@ -22,6 +25,14 @@ type copyFrom struct { rows [][]interface{} } +func (cf *copyFrom) appendRows(rows [][]interface{}) { + cf.rows = append(cf.rows, rows...) +} + +func (cf *copyFrom) matches(tableName []string, columnNames []string) bool { + return reflect.DeepEqual(cf.tableName, tableName) && reflect.DeepEqual(cf.columnNames, columnNames) +} + func NewDelayedTx(db Database) *DelayedTx { return &DelayedTx{db: db} } @@ -30,20 +41,23 @@ func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface return tx.db.QueryRow(ctx, sql, args...) } -func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) { - appendedToExisting := false - if len(tx.cache) > 0 { - prevCopy, ok := tx.cache[len(tx.cache)-1].(copyFrom) - if ok && reflect.DeepEqual(prevCopy.tableName, tableName) && reflect.DeepEqual(prevCopy.columnNames, columnNames) { - log.Info("statediff lazy_tx : Appending rows to COPY", "table", tableName, - "current", len(prevCopy.rows), "append", len(rows)) - prevCopy.rows = append(prevCopy.rows, rows...) - appendedToExisting = true +func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) *copyFrom { + for pos, count := len(tx.cache)-1, 0; pos >= 0 && count < limit; pos, count = pos-1, count+1 { + prevCopy, ok := tx.cache[pos].(*copyFrom) + if ok && prevCopy.matches(tableName, columnNames) { + return prevCopy } } + return nil +} - if !appendedToExisting { - tx.cache = append(tx.cache, copyFrom{tableName, columnNames, rows}) +func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) { + if prevCopy := tx.findPrevCopyFrom(tableName, columnNames, COPY_FROM_CHECK_LIMIT); nil != prevCopy { + log.Info("statediff lazy_tx : Appending rows to COPY", "table", tableName, + "current", len(prevCopy.rows), "append", len(rows)) + prevCopy.appendRows(rows) + } else { + tx.cache = append(tx.cache, ©From{tableName, columnNames, rows}) } return 0, nil @@ -69,8 +83,8 @@ func (tx *DelayedTx) Commit(ctx context.Context) error { }() for _, item := range tx.cache { switch item.(type) { - case copyFrom: - copy := item.(copyFrom) + case *copyFrom: + copy := item.(*copyFrom) log.Info("statediff lazy_tx : COPY", "table", copy.tableName, "rows", len(copy.rows)) _, err := base.CopyFrom(ctx, copy.tableName, copy.columnNames, copy.rows) if err != nil { -- 2.45.2 From a5c4091f434c33e789a6244a7f967f6105e336c8 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Tue, 7 Mar 2023 14:52:22 -0600 Subject: [PATCH 06/16] Log at Trace --- statediff/indexer/database/sql/lazy_tx.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/statediff/indexer/database/sql/lazy_tx.go b/statediff/indexer/database/sql/lazy_tx.go index a8eacefcd..b69414ca8 100644 --- a/statediff/indexer/database/sql/lazy_tx.go +++ b/statediff/indexer/database/sql/lazy_tx.go @@ -53,7 +53,7 @@ func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) { if prevCopy := tx.findPrevCopyFrom(tableName, columnNames, COPY_FROM_CHECK_LIMIT); nil != prevCopy { - log.Info("statediff lazy_tx : Appending rows to COPY", "table", tableName, + log.Trace("statediff lazy_tx : Appending rows to COPY", "table", tableName, "current", len(prevCopy.rows), "append", len(rows)) prevCopy.appendRows(rows) } else { @@ -85,7 +85,7 @@ func (tx *DelayedTx) Commit(ctx context.Context) error { switch item.(type) { case *copyFrom: copy := item.(*copyFrom) - log.Info("statediff lazy_tx : COPY", "table", copy.tableName, "rows", len(copy.rows)) + log.Trace("statediff lazy_tx : COPY", "table", copy.tableName, "rows", len(copy.rows)) _, err := base.CopyFrom(ctx, copy.tableName, copy.columnNames, copy.rows) if err != nil { return err -- 2.45.2 From e6a99267bf434e986eb8b3770155b1ea0d2b6252 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Tue, 7 Mar 2023 15:00:07 -0600 Subject: [PATCH 07/16] lint --- statediff/indexer/database/sql/lazy_tx.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/statediff/indexer/database/sql/lazy_tx.go b/statediff/indexer/database/sql/lazy_tx.go index b69414ca8..3e4fe4c5c 100644 --- a/statediff/indexer/database/sql/lazy_tx.go +++ b/statediff/indexer/database/sql/lazy_tx.go @@ -82,17 +82,15 @@ func (tx *DelayedTx) Commit(ctx context.Context) error { } }() for _, item := range tx.cache { - switch item.(type) { + switch item := item.(type) { case *copyFrom: - copy := item.(*copyFrom) - log.Trace("statediff lazy_tx : COPY", "table", copy.tableName, "rows", len(copy.rows)) - _, err := base.CopyFrom(ctx, copy.tableName, copy.columnNames, copy.rows) + log.Trace("statediff lazy_tx : COPY", "table", item.tableName, "rows", len(item.rows)) + _, err := base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows) if err != nil { return err } case cachedStmt: - stmt := item.(cachedStmt) - _, err := base.Exec(ctx, stmt.sql, stmt.args...) + _, err := base.Exec(ctx, item.sql, item.args...) if err != nil { return err } -- 2.45.2 From 3604d9889189b09cfa507f0fbf80b1d19e49597e Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Tue, 7 Mar 2023 15:04:52 -0600 Subject: [PATCH 08/16] camelCase --- statediff/indexer/database/sql/lazy_tx.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/statediff/indexer/database/sql/lazy_tx.go b/statediff/indexer/database/sql/lazy_tx.go index 3e4fe4c5c..ad7a22857 100644 --- a/statediff/indexer/database/sql/lazy_tx.go +++ b/statediff/indexer/database/sql/lazy_tx.go @@ -8,7 +8,7 @@ import ( ) // Changing this to 1 would make sure only sequential COPYs were combined. -const COPY_FROM_CHECK_LIMIT = 100 +const copyFromCheckLimit = 100 type DelayedTx struct { cache []interface{} @@ -52,7 +52,7 @@ func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, } func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) { - if prevCopy := tx.findPrevCopyFrom(tableName, columnNames, COPY_FROM_CHECK_LIMIT); nil != prevCopy { + if prevCopy := tx.findPrevCopyFrom(tableName, columnNames, copyFromCheckLimit); nil != prevCopy { log.Trace("statediff lazy_tx : Appending rows to COPY", "table", tableName, "current", len(prevCopy.rows), "append", len(rows)) prevCopy.appendRows(rows) -- 2.45.2 From 9b535e2901fa0ebb7cbc11d97b426c17ef7d5f44 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Tue, 7 Mar 2023 19:02:58 -0600 Subject: [PATCH 09/16] Fix storage_cids --- statediff/indexer/database/sql/lazy_tx.go | 14 +++++++------- statediff/indexer/database/sql/writer.go | 18 +++++++++++++----- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/statediff/indexer/database/sql/lazy_tx.go b/statediff/indexer/database/sql/lazy_tx.go index ad7a22857..b2445e0d8 100644 --- a/statediff/indexer/database/sql/lazy_tx.go +++ b/statediff/indexer/database/sql/lazy_tx.go @@ -41,20 +41,20 @@ func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface return tx.db.QueryRow(ctx, sql, args...) } -func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) *copyFrom { +func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) (*copyFrom, int) { for pos, count := len(tx.cache)-1, 0; pos >= 0 && count < limit; pos, count = pos-1, count+1 { prevCopy, ok := tx.cache[pos].(*copyFrom) if ok && prevCopy.matches(tableName, columnNames) { - return prevCopy + return prevCopy, count } } - return nil + return nil, -1 } func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) { - if prevCopy := tx.findPrevCopyFrom(tableName, columnNames, copyFromCheckLimit); nil != prevCopy { - log.Trace("statediff lazy_tx : Appending rows to COPY", "table", tableName, - "current", len(prevCopy.rows), "append", len(rows)) + if prevCopy, distance := tx.findPrevCopyFrom(tableName, columnNames, copyFromCheckLimit); nil != prevCopy { + log.Trace("statediff lazy_tx : Appending to COPY", "table", tableName, + "current", len(prevCopy.rows), "new", len(rows), "distance", distance) prevCopy.appendRows(rows) } else { tx.cache = append(tx.cache, ©From{tableName, columnNames, rows}) @@ -84,9 +84,9 @@ func (tx *DelayedTx) Commit(ctx context.Context) error { for _, item := range tx.cache { switch item := item.(type) { case *copyFrom: - log.Trace("statediff lazy_tx : COPY", "table", item.tableName, "rows", len(item.rows)) _, err := base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows) if err != nil { + log.Error("COPY error", "table", item.tableName, "err", err) return err } case cachedStmt: diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 9e4003745..6384ccfed 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -150,14 +150,18 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { } if w.db.UseCopyFrom() { var row []interface{} - blockNum, _ := strconv.ParseInt(stateNode.BlockNumber, 10, 64) + blockNum, err := strconv.ParseInt(stateNode.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.state_cids", err, "COPY", stateNode} + } + row = append(row, blockNum, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey) var rows [][]interface{} rows = append(rows, row) - _, err := tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(), rows) + _, err = tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(), rows) if err != nil { return insertError{"eth.state_cids", err, "COPY", stateNode} } @@ -197,16 +201,20 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err } if w.db.UseCopyFrom() { var row []interface{} - blockNum, _ := strconv.ParseInt(storageCID.BlockNumber, 10, 64) + blockNum, err := strconv.ParseInt(storageCID.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.storage_cids", err, "COPY", storageCID} + } + row = append(row, blockNum, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey) var rows [][]interface{} rows = append(rows, row) - _, err := tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(), rows) + _, err = tx.CopyFrom(w.db.Context(), w.db.StorageTableName(), w.db.StorageColumnNames(), rows) if err != nil { - return insertError{"eth.state_cids", err, "COPY", storageCID} + return insertError{"eth.storage_cids", err, "COPY", storageCID} } } else { _, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(), -- 2.45.2 From 5b9fdc16160bad3483936a00bde48e7f77a03d38 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Tue, 7 Mar 2023 23:08:51 -0600 Subject: [PATCH 10/16] More tables --- statediff/indexer/database/sql/interfaces.go | 4 ++ .../indexer/database/sql/pgx_indexer_test.go | 2 +- .../indexer/database/sql/postgres/database.go | 15 ++++ .../database/sql/postgres/test_helpers.go | 4 +- statediff/indexer/database/sql/writer.go | 70 +++++++++++++++---- 5 files changed, 80 insertions(+), 15 deletions(-) diff --git a/statediff/indexer/database/sql/interfaces.go b/statediff/indexer/database/sql/interfaces.go index 7ad295c98..890342902 100644 --- a/statediff/indexer/database/sql/interfaces.go +++ b/statediff/indexer/database/sql/interfaces.go @@ -50,10 +50,14 @@ type Statements interface { InsertTxStm() string InsertAccessListElementStm() string InsertRctStm() string + LogsTableName() []string + LogsColumnNames() []string InsertLogStm() string StateTableName() []string StateColumnNames() []string InsertStateStm() string + AccountTableName() []string + AccountColumnNames() []string InsertAccountStm() string StorageTableName() []string StorageColumnNames() []string diff --git a/statediff/indexer/database/sql/pgx_indexer_test.go b/statediff/indexer/database/sql/pgx_indexer_test.go index 1dbf2dfa0..c3a758b01 100644 --- a/statediff/indexer/database/sql/pgx_indexer_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_test.go @@ -30,7 +30,7 @@ import ( ) func setupPGXIndexer(t *testing.T) { - db, err = postgres.SetupPGXDB() + db, err = postgres.SetupPGXDB(postgres.DefaultConfig) if err != nil { t.Fatal(err) } diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index 4a0b7d9e4..06aa9c03e 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -137,3 +137,18 @@ func (db *DB) StateColumnNames() []string { func (db *DB) StorageColumnNames() []string { return []string{"block_number", "header_id", "state_path", "storage_leaf_key", "cid", "storage_path", "node_type", "diff", "mh_key"} } + +func (db *DB) LogsTableName() []string { + return []string{"eth", "log_cids"} +} + +func (db *DB) LogsColumnNames() []string { + return []string{"block_number", "header_id", "leaf_cid", "leaf_mh_key", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3", "log_data"} +} + +func (db *DB) AccountTableName() []string { + return []string{"eth", "state_accounts"} +} +func (db *DB) AccountColumnNames() []string { + return []string{"block_number", "header_id", "state_path", "balance", "nonce", "code_hash", "storage_root"} +} diff --git a/statediff/indexer/database/sql/postgres/test_helpers.go b/statediff/indexer/database/sql/postgres/test_helpers.go index f8311b413..cb5255429 100644 --- a/statediff/indexer/database/sql/postgres/test_helpers.go +++ b/statediff/indexer/database/sql/postgres/test_helpers.go @@ -35,8 +35,8 @@ func SetupSQLXDB() (sql.Database, error) { } // SetupPGXDB is used to setup a pgx db for tests -func SetupPGXDB() (sql.Database, error) { - driver, err := NewPGXDriver(context.Background(), DefaultConfig, node.Info{}) +func SetupPGXDB(config Config) (sql.Database, error) { + driver, err := NewPGXDriver(context.Background(), config, node.Info{}) if err != nil { return nil, err } diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 6384ccfed..80436b44e 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -127,14 +127,37 @@ INSERT INTO eth.log_cids (block_number, header_id, leaf_cid, leaf_mh_key, rct_id ON CONFLICT (rct_id, index, header_id, block_number) DO NOTHING */ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { - for _, log := range logs { - _, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(), - log.BlockNumber, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, - log.Topic2, log.Topic3, log.Data) - if err != nil { - return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log} + if w.db.UseCopyFrom() { + var rows [][]interface{} + for _, log := range logs { + var row []interface{} + blockNum, err := strconv.ParseInt(log.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.log_cids", err, "COPY", log} + } + + row = append(row, blockNum, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, + log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data) + + rows = append(rows, row) + } + if nil != rows && len(rows) >= 0 { + _, err := tx.CopyFrom(w.db.Context(), w.db.LogsTableName(), w.db.LogsColumnNames(), rows) + if err != nil { + return insertError{"eth.log_cids", err, "COPY", rows} + } + metrics.IndexerMetrics.LogsCounter.Inc(int64(len(rows))) + } + } else { + for _, log := range logs { + _, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(), + log.BlockNumber, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, + log.Topic2, log.Topic3, log.Data) + if err != nil { + return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log} + } + metrics.IndexerMetrics.LogsCounter.Inc(1) } - metrics.IndexerMetrics.LogsCounter.Inc(1) } return nil } @@ -181,11 +204,34 @@ INSERT INTO eth.state_accounts (block_number, header_id, state_path, balance, no ON CONFLICT (header_id, state_path, block_number) DO NOTHING */ func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error { - _, err := tx.Exec(w.db.Context(), w.db.InsertAccountStm(), - stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, - stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) - if err != nil { - return insertError{"eth.state_accounts", err, w.db.InsertAccountStm(), stateAccount} + if w.db.UseCopyFrom() { + var row []interface{} + blockNum, err := strconv.ParseInt(stateAccount.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.state_accounts", err, "COPY", stateAccount} + } + balance, err := strconv.ParseFloat(stateAccount.Balance, 64) + if err != nil { + return insertError{"eth.state_accounts", err, "COPY", stateAccount} + } + + row = append(row, blockNum, stateAccount.HeaderID, stateAccount.StatePath, balance, + stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) + + var rows [][]interface{} + rows = append(rows, row) + + _, err = tx.CopyFrom(w.db.Context(), w.db.AccountTableName(), w.db.AccountColumnNames(), rows) + if err != nil { + return insertError{"eth.state_accounts", err, "COPY", stateAccount} + } + } else { + _, err := tx.Exec(w.db.Context(), w.db.InsertAccountStm(), + stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, + stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) + if err != nil { + return insertError{"eth.state_accounts", err, w.db.InsertAccountStm(), stateAccount} + } } return nil } -- 2.45.2 From af1586800eeeb806f32648d9db7f3e7bc81f4977 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Tue, 7 Mar 2023 23:17:59 -0600 Subject: [PATCH 11/16] lint --- statediff/indexer/database/sql/pgx_indexer_legacy_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go index 292548b75..80094a8d0 100644 --- a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go @@ -28,7 +28,7 @@ import ( ) func setupLegacyPGXIndexer(t *testing.T) { - db, err = postgres.SetupPGXDB() + db, err = postgres.SetupPGXDB(postgres.DefaultConfig) if err != nil { t.Fatal(err) } -- 2.45.2 From 30fd4934649add51e64fc10cf048e833ef6f03d1 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Wed, 8 Mar 2023 14:38:23 -0600 Subject: [PATCH 12/16] Add COPY support for tx and rct tables. --- statediff/indexer/database/sql/interfaces.go | 22 +++-- .../indexer/database/sql/postgres/database.go | 64 +++++++++------ statediff/indexer/database/sql/writer.go | 80 +++++++++++++++---- 3 files changed, 119 insertions(+), 47 deletions(-) diff --git a/statediff/indexer/database/sql/interfaces.go b/statediff/indexer/database/sql/interfaces.go index 890342902..f85f33ae4 100644 --- a/statediff/indexer/database/sql/interfaces.go +++ b/statediff/indexer/database/sql/interfaces.go @@ -50,21 +50,27 @@ type Statements interface { InsertTxStm() string InsertAccessListElementStm() string InsertRctStm() string - LogsTableName() []string - LogsColumnNames() []string InsertLogStm() string - StateTableName() []string - StateColumnNames() []string InsertStateStm() string - AccountTableName() []string - AccountColumnNames() []string InsertAccountStm() string - StorageTableName() []string - StorageColumnNames() []string InsertStorageStm() string InsertIPLDStm() string InsertIPLDsStm() string InsertKnownGapsStm() string + + // Table/column descriptions for use with CopyFrom and similar commands. + AccountTableName() []string + AccountColumnNames() []string + LogTableName() []string + LogColumnNames() []string + RctTableName() []string + RctColumnNames() []string + StateTableName() []string + StateColumnNames() []string + StorageTableName() []string + StorageColumnNames() []string + TxTableName() []string + TxColumnNames() []string } // Tx interface to accommodate different concrete SQL transaction types diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index 06aa9c03e..8ee86251a 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -122,33 +122,49 @@ func (db *DB) InsertKnownGapsStm() string { WHERE eth_meta.known_gaps.ending_block_number <= $2` } -func (db *DB) StateTableName() []string { - return []string{"eth", "state_cids"} -} - -func (db *DB) StorageTableName() []string { - return []string{"eth", "storage_cids"} -} - -func (db *DB) StateColumnNames() []string { - return []string{"block_number", "header_id", "state_leaf_key", "cid", "state_path", "node_type", "diff", "mh_key"} -} - -func (db *DB) StorageColumnNames() []string { - return []string{"block_number", "header_id", "state_path", "storage_leaf_key", "cid", "storage_path", "node_type", "diff", "mh_key"} -} - -func (db *DB) LogsTableName() []string { - return []string{"eth", "log_cids"} -} - -func (db *DB) LogsColumnNames() []string { - return []string{"block_number", "header_id", "leaf_cid", "leaf_mh_key", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3", "log_data"} -} - func (db *DB) AccountTableName() []string { return []string{"eth", "state_accounts"} } func (db *DB) AccountColumnNames() []string { return []string{"block_number", "header_id", "state_path", "balance", "nonce", "code_hash", "storage_root"} } + +func (db *DB) LogTableName() []string { + return []string{"eth", "log_cids"} +} + +func (db *DB) LogColumnNames() []string { + return []string{"block_number", "header_id", "leaf_cid", "leaf_mh_key", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3", "log_data"} +} + +func (db *DB) RctTableName() []string { + return []string{"eth", "receipt_cids"} +} + +func (db *DB) RctColumnNames() []string { + return []string{"block_number", "header_id", "tx_id", "leaf_cid", "contract", "contract_hash", "leaf_mh_key", "post_state", "post_status", "log_root"} +} + +func (db *DB) StateTableName() []string { + return []string{"eth", "state_cids"} +} + +func (db *DB) StateColumnNames() []string { + return []string{"block_number", "header_id", "state_leaf_key", "cid", "state_path", "node_type", "diff", "mh_key"} +} + +func (db *DB) StorageTableName() []string { + return []string{"eth", "storage_cids"} +} + +func (db *DB) StorageColumnNames() []string { + return []string{"block_number", "header_id", "state_path", "storage_leaf_key", "cid", "storage_path", "node_type", "diff", "mh_key"} +} + +func (db *DB) TxTableName() []string { + return []string{"eth", "transaction_cids"} +} + +func (db *DB) TxColumnNames() []string { + return []string{"block_number", "header_id", "tx_hash", "cid", "dst", "src", "index", "mh_key", "tx_data", "tx_type", "value"} +} diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 80436b44e..df059c535 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -82,11 +82,35 @@ INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, sr ON CONFLICT (tx_hash, header_id, block_number) DO NOTHING */ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { - _, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(), - transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, - transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value) - if err != nil { - return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction} + if w.useCopyForTx(tx) { + var row []interface{} + blockNum, err := strconv.ParseInt(transaction.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.transaction_cids", err, "COPY", transaction} + } + + value, err := strconv.ParseInt(transaction.Value, 10, 64) + if err != nil { + return insertError{"eth.transaction_cids", err, "COPY", transaction} + } + + row = append(row, blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, + transaction.Index, transaction.MhKey, transaction.Data, int(transaction.Type), value) + + var rows [][]interface{} + rows = append(rows, row) + + _, err = tx.CopyFrom(w.db.Context(), w.db.TxTableName(), w.db.TxColumnNames(), rows) + if err != nil { + return insertError{"eth.transaction_cids", err, "COPY", transaction} + } + } else { + _, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(), + transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, + transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value) + if err != nil { + return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction} + } } metrics.IndexerMetrics.TransactionsCounter.Inc(1) return nil @@ -112,11 +136,30 @@ INSERT INTO eth.receipt_cids (block_number, header_id, tx_id, leaf_cid, contract ON CONFLICT (tx_id, header_id, block_number) DO NOTHING */ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { - _, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(), - rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, - rct.PostStatus, rct.LogRoot) - if err != nil { - return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct} + if w.useCopyForTx(tx) { + var row []interface{} + blockNum, err := strconv.ParseInt(rct.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.receipt_cids", err, "COPY", rct} + } + + row = append(row, blockNum, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, + rct.LeafMhKey, rct.PostState, int(rct.PostStatus), rct.LogRoot) + + var rows [][]interface{} + rows = append(rows, row) + + _, err = tx.CopyFrom(w.db.Context(), w.db.RctTableName(), w.db.RctColumnNames(), rows) + if err != nil { + return insertError{"eth.receipt_cids", err, "COPY", rct} + } + } else { + _, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(), + rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, + rct.PostStatus, rct.LogRoot) + if err != nil { + return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct} + } } metrics.IndexerMetrics.ReceiptsCounter.Inc(1) return nil @@ -127,7 +170,7 @@ INSERT INTO eth.log_cids (block_number, header_id, leaf_cid, leaf_mh_key, rct_id ON CONFLICT (rct_id, index, header_id, block_number) DO NOTHING */ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { - if w.db.UseCopyFrom() { + if w.useCopyForTx(tx) { var rows [][]interface{} for _, log := range logs { var row []interface{} @@ -142,7 +185,7 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { rows = append(rows, row) } if nil != rows && len(rows) >= 0 { - _, err := tx.CopyFrom(w.db.Context(), w.db.LogsTableName(), w.db.LogsColumnNames(), rows) + _, err := tx.CopyFrom(w.db.Context(), w.db.LogTableName(), w.db.LogColumnNames(), rows) if err != nil { return insertError{"eth.log_cids", err, "COPY", rows} } @@ -171,7 +214,7 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { if stateNode.StateKey != nullHash.String() { stateKey = stateNode.StateKey } - if w.db.UseCopyFrom() { + if w.useCopyForTx(tx) { var row []interface{} blockNum, err := strconv.ParseInt(stateNode.BlockNumber, 10, 64) if err != nil { @@ -204,7 +247,7 @@ INSERT INTO eth.state_accounts (block_number, header_id, state_path, balance, no ON CONFLICT (header_id, state_path, block_number) DO NOTHING */ func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error { - if w.db.UseCopyFrom() { + if w.useCopyForTx(tx) { var row []interface{} blockNum, err := strconv.ParseInt(stateAccount.BlockNumber, 10, 64) if err != nil { @@ -245,7 +288,7 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err if storageCID.StorageKey != nullHash.String() { storageKey = storageCID.StorageKey } - if w.db.UseCopyFrom() { + if w.useCopyForTx(tx) { var row []interface{} blockNum, err := strconv.ParseInt(storageCID.BlockNumber, 10, 64) if err != nil { @@ -273,6 +316,13 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err return nil } +func (w *Writer) useCopyForTx(tx Tx) bool { + if _, ok := tx.(*DelayedTx); ok { + return w.db.UseCopyFrom() + } + return false +} + type insertError struct { table string err error -- 2.45.2 From ece2696c9ee489727ad4d966024533cb2a5d4121 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Wed, 8 Mar 2023 15:31:05 -0600 Subject: [PATCH 13/16] use Float for PG numeric, not int64 --- statediff/indexer/database/sql/writer.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index df059c535..e7667d6dd 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -89,7 +89,7 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { return insertError{"eth.transaction_cids", err, "COPY", transaction} } - value, err := strconv.ParseInt(transaction.Value, 10, 64) + value, err := strconv.ParseFloat(transaction.Value, 64) if err != nil { return insertError{"eth.transaction_cids", err, "COPY", transaction} } @@ -317,6 +317,8 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err } func (w *Writer) useCopyForTx(tx Tx) bool { + // Using COPY instead of INSERT only makes much sense if also using a DelayedTx, so that operations + // can be collected over time and then all submitted within in a single TX. if _, ok := tx.(*DelayedTx); ok { return w.db.UseCopyFrom() } -- 2.45.2 From 1b20ccf4cd1112aaa022dca2ee9638c79e2a7109 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Wed, 8 Mar 2023 18:25:11 -0600 Subject: [PATCH 14/16] Add new test with CopyFrom enabled. --- .../indexer/database/sql/pgx_indexer_test.go | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/statediff/indexer/database/sql/pgx_indexer_test.go b/statediff/indexer/database/sql/pgx_indexer_test.go index c3a758b01..c0ce57c1f 100644 --- a/statediff/indexer/database/sql/pgx_indexer_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_test.go @@ -29,8 +29,8 @@ import ( "github.com/ethereum/go-ethereum/statediff/indexer/test" ) -func setupPGXIndexer(t *testing.T) { - db, err = postgres.SetupPGXDB(postgres.DefaultConfig) +func setupPGXIndexer(t *testing.T, config postgres.Config) { + db, err = postgres.SetupPGXDB(config) if err != nil { t.Fatal(err) } @@ -39,12 +39,16 @@ func setupPGXIndexer(t *testing.T) { } func setupPGX(t *testing.T) { - setupPGXIndexer(t) + setupPGXWithConfig(t, postgres.DefaultConfig) +} + +func setupPGXWithConfig(t *testing.T, config postgres.Config) { + setupPGXIndexer(t, config) test.SetupTestData(t, ind) } func setupPGXNonCanonical(t *testing.T) { - setupPGXIndexer(t) + setupPGXIndexer(t, postgres.DefaultConfig) test.SetupTestDataNonCanonical(t, ind) } @@ -97,6 +101,20 @@ func TestPGXIndexer(t *testing.T) { test.TestPublishAndIndexStorageIPLDs(t, db) }) + + t.Run("Publish and index with CopyFrom enabled.", func(t *testing.T) { + config := postgres.DefaultConfig + config.CopyFrom = true + + setupPGXWithConfig(t, config) + defer tearDown(t) + defer checkTxClosure(t, 1, 0, 1) + + test.TestPublishAndIndexStateIPLDs(t, db) + test.TestPublishAndIndexStorageIPLDs(t, db) + test.TestPublishAndIndexReceiptIPLDs(t, db) + test.TestPublishAndIndexLogIPLDs(t, db) + }) } // Test indexer for a canonical + a non-canonical block at London height + a non-canonical block at London height + 1 @@ -151,7 +169,7 @@ func TestPGXIndexerNonCanonical(t *testing.T) { } func TestPGXWatchAddressMethods(t *testing.T) { - setupPGXIndexer(t) + setupPGXIndexer(t, postgres.DefaultConfig) defer tearDown(t) defer checkTxClosure(t, 1, 0, 1) -- 2.45.2 From 37adca432a4482fc59ff50156d69e2c8d5adb503 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Wed, 8 Mar 2023 22:22:50 -0600 Subject: [PATCH 15/16] Simplify code a bit --- statediff/indexer/database/sql/writer.go | 65 +++++++++--------------- 1 file changed, 25 insertions(+), 40 deletions(-) diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index e7667d6dd..6b1c0940f 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -83,7 +83,6 @@ ON CONFLICT (tx_hash, header_id, block_number) DO NOTHING */ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { if w.useCopyForTx(tx) { - var row []interface{} blockNum, err := strconv.ParseInt(transaction.BlockNumber, 10, 64) if err != nil { return insertError{"eth.transaction_cids", err, "COPY", transaction} @@ -94,13 +93,9 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { return insertError{"eth.transaction_cids", err, "COPY", transaction} } - row = append(row, blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, - transaction.Index, transaction.MhKey, transaction.Data, int(transaction.Type), value) - - var rows [][]interface{} - rows = append(rows, row) - - _, err = tx.CopyFrom(w.db.Context(), w.db.TxTableName(), w.db.TxColumnNames(), rows) + _, err = tx.CopyFrom(w.db.Context(), w.db.TxTableName(), w.db.TxColumnNames(), + toRows(toValues(blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, + transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, int(transaction.Type), value))) if err != nil { return insertError{"eth.transaction_cids", err, "COPY", transaction} } @@ -137,19 +132,14 @@ ON CONFLICT (tx_id, header_id, block_number) DO NOTHING */ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { if w.useCopyForTx(tx) { - var row []interface{} blockNum, err := strconv.ParseInt(rct.BlockNumber, 10, 64) if err != nil { return insertError{"eth.receipt_cids", err, "COPY", rct} } - row = append(row, blockNum, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, - rct.LeafMhKey, rct.PostState, int(rct.PostStatus), rct.LogRoot) - - var rows [][]interface{} - rows = append(rows, row) - - _, err = tx.CopyFrom(w.db.Context(), w.db.RctTableName(), w.db.RctColumnNames(), rows) + _, err = tx.CopyFrom(w.db.Context(), w.db.RctTableName(), w.db.RctColumnNames(), + toRows(toValues(blockNum, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, + rct.LeafMhKey, rct.PostState, int(rct.PostStatus), rct.LogRoot))) if err != nil { return insertError{"eth.receipt_cids", err, "COPY", rct} } @@ -215,19 +205,14 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { stateKey = stateNode.StateKey } if w.useCopyForTx(tx) { - var row []interface{} blockNum, err := strconv.ParseInt(stateNode.BlockNumber, 10, 64) if err != nil { return insertError{"eth.state_cids", err, "COPY", stateNode} } - row = append(row, blockNum, stateNode.HeaderID, stateKey, stateNode.CID, - stateNode.Path, stateNode.NodeType, true, stateNode.MhKey) - - var rows [][]interface{} - rows = append(rows, row) - - _, err = tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(), rows) + _, err = tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(), + toRows(toValues(blockNum, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, + stateNode.NodeType, true, stateNode.MhKey))) if err != nil { return insertError{"eth.state_cids", err, "COPY", stateNode} } @@ -248,7 +233,6 @@ ON CONFLICT (header_id, state_path, block_number) DO NOTHING */ func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error { if w.useCopyForTx(tx) { - var row []interface{} blockNum, err := strconv.ParseInt(stateAccount.BlockNumber, 10, 64) if err != nil { return insertError{"eth.state_accounts", err, "COPY", stateAccount} @@ -258,13 +242,9 @@ func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel return insertError{"eth.state_accounts", err, "COPY", stateAccount} } - row = append(row, blockNum, stateAccount.HeaderID, stateAccount.StatePath, balance, - stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) - - var rows [][]interface{} - rows = append(rows, row) - - _, err = tx.CopyFrom(w.db.Context(), w.db.AccountTableName(), w.db.AccountColumnNames(), rows) + _, err = tx.CopyFrom(w.db.Context(), w.db.AccountTableName(), w.db.AccountColumnNames(), + toRows(toValues(blockNum, stateAccount.HeaderID, stateAccount.StatePath, balance, stateAccount.Nonce, + stateAccount.CodeHash, stateAccount.StorageRoot))) if err != nil { return insertError{"eth.state_accounts", err, "COPY", stateAccount} } @@ -289,19 +269,14 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err storageKey = storageCID.StorageKey } if w.useCopyForTx(tx) { - var row []interface{} blockNum, err := strconv.ParseInt(storageCID.BlockNumber, 10, 64) if err != nil { return insertError{"eth.storage_cids", err, "COPY", storageCID} } - row = append(row, blockNum, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, - storageCID.Path, storageCID.NodeType, true, storageCID.MhKey) - - var rows [][]interface{} - rows = append(rows, row) - - _, err = tx.CopyFrom(w.db.Context(), w.db.StorageTableName(), w.db.StorageColumnNames(), rows) + _, err = tx.CopyFrom(w.db.Context(), w.db.StorageTableName(), w.db.StorageColumnNames(), + toRows(toValues(blockNum, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, + storageCID.Path, storageCID.NodeType, true, storageCID.MhKey))) if err != nil { return insertError{"eth.storage_cids", err, "COPY", storageCID} } @@ -325,6 +300,16 @@ func (w *Writer) useCopyForTx(tx Tx) bool { return false } +func toValues(args ...interface{}) []interface{} { + var row []interface{} + row = append(row, args...) + return row +} + +func toRows(rows ...[]interface{}) [][]interface{} { + return rows +} + type insertError struct { table string err error -- 2.45.2 From f5b76d40af7233bbb130f9f9c878a3231773641d Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Wed, 8 Mar 2023 22:34:14 -0600 Subject: [PATCH 16/16] Refactor --- statediff/indexer/database/sql/writer.go | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 6b1c0940f..91adffbfb 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -94,7 +94,7 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { } _, err = tx.CopyFrom(w.db.Context(), w.db.TxTableName(), w.db.TxColumnNames(), - toRows(toValues(blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, + toRows(toRow(blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, int(transaction.Type), value))) if err != nil { return insertError{"eth.transaction_cids", err, "COPY", transaction} @@ -138,7 +138,7 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { } _, err = tx.CopyFrom(w.db.Context(), w.db.RctTableName(), w.db.RctColumnNames(), - toRows(toValues(blockNum, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, + toRows(toRow(blockNum, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, int(rct.PostStatus), rct.LogRoot))) if err != nil { return insertError{"eth.receipt_cids", err, "COPY", rct} @@ -163,16 +163,13 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { if w.useCopyForTx(tx) { var rows [][]interface{} for _, log := range logs { - var row []interface{} blockNum, err := strconv.ParseInt(log.BlockNumber, 10, 64) if err != nil { return insertError{"eth.log_cids", err, "COPY", log} } - row = append(row, blockNum, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, - log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data) - - rows = append(rows, row) + rows = append(rows, toRow(blockNum, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, + log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data)) } if nil != rows && len(rows) >= 0 { _, err := tx.CopyFrom(w.db.Context(), w.db.LogTableName(), w.db.LogColumnNames(), rows) @@ -211,7 +208,7 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { } _, err = tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(), - toRows(toValues(blockNum, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, + toRows(toRow(blockNum, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey))) if err != nil { return insertError{"eth.state_cids", err, "COPY", stateNode} @@ -243,7 +240,7 @@ func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel } _, err = tx.CopyFrom(w.db.Context(), w.db.AccountTableName(), w.db.AccountColumnNames(), - toRows(toValues(blockNum, stateAccount.HeaderID, stateAccount.StatePath, balance, stateAccount.Nonce, + toRows(toRow(blockNum, stateAccount.HeaderID, stateAccount.StatePath, balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot))) if err != nil { return insertError{"eth.state_accounts", err, "COPY", stateAccount} @@ -275,7 +272,7 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err } _, err = tx.CopyFrom(w.db.Context(), w.db.StorageTableName(), w.db.StorageColumnNames(), - toRows(toValues(blockNum, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, + toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey))) if err != nil { return insertError{"eth.storage_cids", err, "COPY", storageCID} @@ -300,12 +297,14 @@ func (w *Writer) useCopyForTx(tx Tx) bool { return false } -func toValues(args ...interface{}) []interface{} { +// combine args into a row +func toRow(args ...interface{}) []interface{} { var row []interface{} row = append(row, args...) return row } +// combine row (or rows) into a slice of rows for CopyFrom func toRows(rows ...[]interface{}) [][]interface{} { return rows } -- 2.45.2