From f102b4ef8e2c98a67dc81d291e40b00f65181e87 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Tue, 7 Mar 2023 12:24:30 -0600 Subject: [PATCH] Add COPY support for inserting multiple rows in a single command. --- cmd/geth/config.go | 3 ++ cmd/utils/flags.go | 7 +++ statediff/indexer/database/sql/interfaces.go | 6 +++ statediff/indexer/database/sql/lazy_tx.go | 49 ++++++++++++++++-- .../indexer/database/sql/postgres/config.go | 3 ++ .../indexer/database/sql/postgres/database.go | 16 ++++++ .../indexer/database/sql/postgres/pgx.go | 12 ++++- .../indexer/database/sql/postgres/sqlx.go | 11 ++++ statediff/indexer/database/sql/writer.go | 51 +++++++++++++++---- 9 files changed, 142 insertions(+), 16 deletions(-) diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 89d3ef450..36724cb17 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -248,6 +248,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { if ctx.IsSet(utils.StateDiffLogStatements.Name) { pgConfig.LogStatements = ctx.Bool(utils.StateDiffLogStatements.Name) } + if ctx.IsSet(utils.StateDiffCopyFrom.Name) { + pgConfig.CopyFrom = ctx.Bool(utils.StateDiffCopyFrom.Name) + } indexerConfig = pgConfig case shared.DUMP: dumpTypeStr := ctx.String(utils.StateDiffDBDumpDst.Name) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index f97dce8ac..7c4f957a3 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1100,6 +1100,13 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server. Usage: "Should the statediff service log all database statements? (Note: pgx only)", Value: false, } + + StateDiffCopyFrom = &cli.BoolFlag{ + Name: "statediff.db.copyfrom", + Usage: "Should the statediff service use COPY FROM for multiple inserts? (Note: pgx only)", + Value: false, + } + StateDiffWritingFlag = &cli.BoolFlag{ Name: "statediff.writing", Usage: "Activates progressive writing of state diffs to database as new block are synced", diff --git a/statediff/indexer/database/sql/interfaces.go b/statediff/indexer/database/sql/interfaces.go index 1e7278db6..7ad295c98 100644 --- a/statediff/indexer/database/sql/interfaces.go +++ b/statediff/indexer/database/sql/interfaces.go @@ -31,6 +31,7 @@ type Database interface { // Driver interface has all the methods required by a driver implementation to support the sql indexer type Driver interface { + UseCopyFrom() bool QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error @@ -50,8 +51,12 @@ type Statements interface { InsertAccessListElementStm() string InsertRctStm() string InsertLogStm() string + StateTableName() []string + StateColumnNames() []string InsertStateStm() string InsertAccountStm() string + StorageTableName() []string + StorageColumnNames() []string InsertStorageStm() string InsertIPLDStm() string InsertIPLDsStm() string @@ -62,6 +67,7 @@ type Statements interface { type Tx interface { QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) + CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) Commit(ctx context.Context) error Rollback(ctx context.Context) error } diff --git a/statediff/indexer/database/sql/lazy_tx.go b/statediff/indexer/database/sql/lazy_tx.go index 922bf84a0..55c2afad6 100644 --- a/statediff/indexer/database/sql/lazy_tx.go +++ b/statediff/indexer/database/sql/lazy_tx.go @@ -2,10 +2,12 @@ package sql import ( "context" + "github.com/ethereum/go-ethereum/log" + "reflect" ) type DelayedTx struct { - cache []cachedStmt + cache []interface{} db Database } type cachedStmt struct { @@ -13,6 +15,12 @@ type cachedStmt struct { args []interface{} } +type copyFrom struct { + tableName []string + columnNames []string + rows [][]interface{} +} + func NewDelayedTx(db Database) *DelayedTx { return &DelayedTx{db: db} } @@ -21,12 +29,32 @@ func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface return tx.db.QueryRow(ctx, sql, args...) } +func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) { + appendedToExisting := false + if len(tx.cache) > 0 { + prevCopy, ok := tx.cache[len(tx.cache)-1].(copyFrom) + if ok && reflect.DeepEqual(prevCopy.tableName, tableName) && reflect.DeepEqual(prevCopy.columnNames, columnNames) { + log.Info("statediff lazy_tx : Appending rows to COPY", "table", tableName, + "current", len(prevCopy.rows), "append", len(rows)) + prevCopy.rows = append(prevCopy.rows, rows...) + appendedToExisting = true + } + } + + if !appendedToExisting { + tx.cache = append(tx.cache, copyFrom{tableName, columnNames, rows}) + } + + return 0, nil +} + func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) { tx.cache = append(tx.cache, cachedStmt{sql, args}) return nil, nil } func (tx *DelayedTx) Commit(ctx context.Context) error { + base, err := tx.db.Begin(ctx) if err != nil { return err @@ -39,10 +67,21 @@ func (tx *DelayedTx) Commit(ctx context.Context) error { rollback(ctx, base) } }() - for _, stmt := range tx.cache { - _, err := base.Exec(ctx, stmt.sql, stmt.args...) - if err != nil { - return err + for _, item := range tx.cache { + switch item.(type) { + case copyFrom: + copy := item.(copyFrom) + log.Info("statediff lazy_tx : COPY", "table", copy.tableName, "rows", len(copy.rows)) + _, err := base.CopyFrom(ctx, copy.tableName, copy.columnNames, copy.rows) + if err != nil { + return err + } + case cachedStmt: + stmt := item.(cachedStmt) + _, err := base.Exec(ctx, stmt.sql, stmt.args...) + if err != nil { + return err + } } } tx.cache = nil diff --git a/statediff/indexer/database/sql/postgres/config.go b/statediff/indexer/database/sql/postgres/config.go index b5cdc02ab..e50e836c7 100644 --- a/statediff/indexer/database/sql/postgres/config.go +++ b/statediff/indexer/database/sql/postgres/config.go @@ -81,6 +81,9 @@ type Config struct { // toggle on/off upserts Upsert bool + + // toggle on/off CopyFrom + CopyFrom bool } // Type satisfies interfaces.Config diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index 27f89ab83..4a0b7d9e4 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -121,3 +121,19 @@ func (db *DB) InsertKnownGapsStm() string { ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ($2, $4) WHERE eth_meta.known_gaps.ending_block_number <= $2` } + +func (db *DB) StateTableName() []string { + return []string{"eth", "state_cids"} +} + +func (db *DB) StorageTableName() []string { + return []string{"eth", "storage_cids"} +} + +func (db *DB) StateColumnNames() []string { + return []string{"block_number", "header_id", "state_leaf_key", "cid", "state_path", "node_type", "diff", "mh_key"} +} + +func (db *DB) StorageColumnNames() []string { + return []string{"block_number", "header_id", "state_path", "storage_leaf_key", "cid", "storage_path", "node_type", "diff", "mh_key"} +} diff --git a/statediff/indexer/database/sql/postgres/pgx.go b/statediff/indexer/database/sql/postgres/pgx.go index 6b75559df..17a08191f 100644 --- a/statediff/indexer/database/sql/postgres/pgx.go +++ b/statediff/indexer/database/sql/postgres/pgx.go @@ -38,6 +38,7 @@ type PGXDriver struct { pool *pgxpool.Pool nodeInfo node.Info nodeID string + config Config } // NewPGXDriver returns a new pgx driver @@ -51,7 +52,7 @@ func NewPGXDriver(ctx context.Context, config Config, node node.Info) (*PGXDrive if err != nil { return nil, ErrDBConnectionFailed(err) } - pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node} + pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node, config: config} nodeErr := pg.createNode() if nodeErr != nil { return &PGXDriver{}, ErrUnableToSetNode(nodeErr) @@ -161,6 +162,11 @@ func (pgx *PGXDriver) Context() context.Context { return pgx.ctx } +// HasCopy satisfies sql.Database +func (pgx *PGXDriver) UseCopyFrom() bool { + return pgx.config.CopyFrom +} + type resultWrapper struct { ct pgconn.CommandTag } @@ -239,3 +245,7 @@ func (t pgxTxWrapper) Commit(ctx context.Context) error { func (t pgxTxWrapper) Rollback(ctx context.Context) error { return t.tx.Rollback(ctx) } + +func (t pgxTxWrapper) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) { + return t.tx.CopyFrom(ctx, tableName, columnNames, pgx.CopyFromRows(rows)) +} diff --git a/statediff/indexer/database/sql/postgres/sqlx.go b/statediff/indexer/database/sql/postgres/sqlx.go index 529e7f7c8..bc134210e 100644 --- a/statediff/indexer/database/sql/postgres/sqlx.go +++ b/statediff/indexer/database/sql/postgres/sqlx.go @@ -19,6 +19,7 @@ package postgres import ( "context" coresql "database/sql" + "errors" "time" "github.com/jmoiron/sqlx" @@ -119,6 +120,12 @@ func (driver *SQLXDriver) Context() context.Context { return driver.ctx } +// HasCopy satisfies sql.Database +func (driver *SQLXDriver) UseCopyFrom() bool { + // sqlx does not currently support COPY. + return false +} + type sqlxStatsWrapper struct { stats coresql.DBStats } @@ -186,3 +193,7 @@ func (t sqlxTxWrapper) Commit(ctx context.Context) error { func (t sqlxTxWrapper) Rollback(ctx context.Context) error { return t.tx.Rollback() } + +func (t sqlxTxWrapper) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) { + return 0, errors.New("Unsupported Operation") +} diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 36b0703dc..9e4003745 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -18,6 +18,7 @@ package sql import ( "fmt" + "strconv" "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" @@ -147,11 +148,26 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { if stateNode.StateKey != nullHash.String() { stateKey = stateNode.StateKey } - _, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(), - stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, - stateNode.MhKey) - if err != nil { - return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode} + if w.db.UseCopyFrom() { + var row []interface{} + blockNum, _ := strconv.ParseInt(stateNode.BlockNumber, 10, 64) + row = append(row, blockNum, stateNode.HeaderID, stateKey, stateNode.CID, + stateNode.Path, stateNode.NodeType, true, stateNode.MhKey) + + var rows [][]interface{} + rows = append(rows, row) + + _, err := tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(), rows) + if err != nil { + return insertError{"eth.state_cids", err, "COPY", stateNode} + } + } else { + _, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(), + stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, + stateNode.MhKey) + if err != nil { + return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode} + } } return nil } @@ -179,11 +195,26 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err if storageCID.StorageKey != nullHash.String() { storageKey = storageCID.StorageKey } - _, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(), - storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, - storageCID.NodeType, true, storageCID.MhKey) - if err != nil { - return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID} + if w.db.UseCopyFrom() { + var row []interface{} + blockNum, _ := strconv.ParseInt(storageCID.BlockNumber, 10, 64) + row = append(row, blockNum, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, + storageCID.Path, storageCID.NodeType, true, storageCID.MhKey) + + var rows [][]interface{} + rows = append(rows, row) + + _, err := tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(), rows) + if err != nil { + return insertError{"eth.state_cids", err, "COPY", storageCID} + } + } else { + _, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(), + storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, + storageCID.NodeType, true, storageCID.MhKey) + if err != nil { + return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID} + } } return nil }