Add COPY support for inserting multiple rows in a single command.

This commit is contained in:
Thomas E Lackey 2023-03-07 12:24:30 -06:00
parent 67fc4acaf1
commit f102b4ef8e
9 changed files with 142 additions and 16 deletions

View File

@ -248,6 +248,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
if ctx.IsSet(utils.StateDiffLogStatements.Name) {
pgConfig.LogStatements = ctx.Bool(utils.StateDiffLogStatements.Name)
}
if ctx.IsSet(utils.StateDiffCopyFrom.Name) {
pgConfig.CopyFrom = ctx.Bool(utils.StateDiffCopyFrom.Name)
}
indexerConfig = pgConfig
case shared.DUMP:
dumpTypeStr := ctx.String(utils.StateDiffDBDumpDst.Name)

View File

@ -1100,6 +1100,13 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server.
Usage: "Should the statediff service log all database statements? (Note: pgx only)",
Value: false,
}
StateDiffCopyFrom = &cli.BoolFlag{
Name: "statediff.db.copyfrom",
Usage: "Should the statediff service use COPY FROM for multiple inserts? (Note: pgx only)",
Value: false,
}
StateDiffWritingFlag = &cli.BoolFlag{
Name: "statediff.writing",
Usage: "Activates progressive writing of state diffs to database as new block are synced",

View File

@ -31,6 +31,7 @@ type Database interface {
// Driver interface has all the methods required by a driver implementation to support the sql indexer
type Driver interface {
UseCopyFrom() bool
QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow
Exec(ctx context.Context, sql string, args ...interface{}) (Result, error)
Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error
@ -50,8 +51,12 @@ type Statements interface {
InsertAccessListElementStm() string
InsertRctStm() string
InsertLogStm() string
StateTableName() []string
StateColumnNames() []string
InsertStateStm() string
InsertAccountStm() string
StorageTableName() []string
StorageColumnNames() []string
InsertStorageStm() string
InsertIPLDStm() string
InsertIPLDsStm() string
@ -62,6 +67,7 @@ type Statements interface {
type Tx interface {
QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow
Exec(ctx context.Context, sql string, args ...interface{}) (Result, error)
CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error)
Commit(ctx context.Context) error
Rollback(ctx context.Context) error
}

View File

@ -2,10 +2,12 @@ package sql
import (
"context"
"github.com/ethereum/go-ethereum/log"
"reflect"
)
type DelayedTx struct {
cache []cachedStmt
cache []interface{}
db Database
}
type cachedStmt struct {
@ -13,6 +15,12 @@ type cachedStmt struct {
args []interface{}
}
type copyFrom struct {
tableName []string
columnNames []string
rows [][]interface{}
}
func NewDelayedTx(db Database) *DelayedTx {
return &DelayedTx{db: db}
}
@ -21,12 +29,32 @@ func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface
return tx.db.QueryRow(ctx, sql, args...)
}
func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) {
appendedToExisting := false
if len(tx.cache) > 0 {
prevCopy, ok := tx.cache[len(tx.cache)-1].(copyFrom)
if ok && reflect.DeepEqual(prevCopy.tableName, tableName) && reflect.DeepEqual(prevCopy.columnNames, columnNames) {
log.Info("statediff lazy_tx : Appending rows to COPY", "table", tableName,
"current", len(prevCopy.rows), "append", len(rows))
prevCopy.rows = append(prevCopy.rows, rows...)
appendedToExisting = true
}
}
if !appendedToExisting {
tx.cache = append(tx.cache, copyFrom{tableName, columnNames, rows})
}
return 0, nil
}
func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) {
tx.cache = append(tx.cache, cachedStmt{sql, args})
return nil, nil
}
func (tx *DelayedTx) Commit(ctx context.Context) error {
base, err := tx.db.Begin(ctx)
if err != nil {
return err
@ -39,10 +67,21 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
rollback(ctx, base)
}
}()
for _, stmt := range tx.cache {
_, err := base.Exec(ctx, stmt.sql, stmt.args...)
if err != nil {
return err
for _, item := range tx.cache {
switch item.(type) {
case copyFrom:
copy := item.(copyFrom)
log.Info("statediff lazy_tx : COPY", "table", copy.tableName, "rows", len(copy.rows))
_, err := base.CopyFrom(ctx, copy.tableName, copy.columnNames, copy.rows)
if err != nil {
return err
}
case cachedStmt:
stmt := item.(cachedStmt)
_, err := base.Exec(ctx, stmt.sql, stmt.args...)
if err != nil {
return err
}
}
}
tx.cache = nil

View File

@ -81,6 +81,9 @@ type Config struct {
// toggle on/off upserts
Upsert bool
// toggle on/off CopyFrom
CopyFrom bool
}
// Type satisfies interfaces.Config

View File

@ -121,3 +121,19 @@ func (db *DB) InsertKnownGapsStm() string {
ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ($2, $4)
WHERE eth_meta.known_gaps.ending_block_number <= $2`
}
func (db *DB) StateTableName() []string {
return []string{"eth", "state_cids"}
}
func (db *DB) StorageTableName() []string {
return []string{"eth", "storage_cids"}
}
func (db *DB) StateColumnNames() []string {
return []string{"block_number", "header_id", "state_leaf_key", "cid", "state_path", "node_type", "diff", "mh_key"}
}
func (db *DB) StorageColumnNames() []string {
return []string{"block_number", "header_id", "state_path", "storage_leaf_key", "cid", "storage_path", "node_type", "diff", "mh_key"}
}

View File

@ -38,6 +38,7 @@ type PGXDriver struct {
pool *pgxpool.Pool
nodeInfo node.Info
nodeID string
config Config
}
// NewPGXDriver returns a new pgx driver
@ -51,7 +52,7 @@ func NewPGXDriver(ctx context.Context, config Config, node node.Info) (*PGXDrive
if err != nil {
return nil, ErrDBConnectionFailed(err)
}
pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node}
pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node, config: config}
nodeErr := pg.createNode()
if nodeErr != nil {
return &PGXDriver{}, ErrUnableToSetNode(nodeErr)
@ -161,6 +162,11 @@ func (pgx *PGXDriver) Context() context.Context {
return pgx.ctx
}
// HasCopy satisfies sql.Database
func (pgx *PGXDriver) UseCopyFrom() bool {
return pgx.config.CopyFrom
}
type resultWrapper struct {
ct pgconn.CommandTag
}
@ -239,3 +245,7 @@ func (t pgxTxWrapper) Commit(ctx context.Context) error {
func (t pgxTxWrapper) Rollback(ctx context.Context) error {
return t.tx.Rollback(ctx)
}
func (t pgxTxWrapper) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) {
return t.tx.CopyFrom(ctx, tableName, columnNames, pgx.CopyFromRows(rows))
}

View File

@ -19,6 +19,7 @@ package postgres
import (
"context"
coresql "database/sql"
"errors"
"time"
"github.com/jmoiron/sqlx"
@ -119,6 +120,12 @@ func (driver *SQLXDriver) Context() context.Context {
return driver.ctx
}
// HasCopy satisfies sql.Database
func (driver *SQLXDriver) UseCopyFrom() bool {
// sqlx does not currently support COPY.
return false
}
type sqlxStatsWrapper struct {
stats coresql.DBStats
}
@ -186,3 +193,7 @@ func (t sqlxTxWrapper) Commit(ctx context.Context) error {
func (t sqlxTxWrapper) Rollback(ctx context.Context) error {
return t.tx.Rollback()
}
func (t sqlxTxWrapper) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) {
return 0, errors.New("Unsupported Operation")
}

View File

@ -18,6 +18,7 @@ package sql
import (
"fmt"
"strconv"
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
@ -147,11 +148,26 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
if stateNode.StateKey != nullHash.String() {
stateKey = stateNode.StateKey
}
_, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(),
stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true,
stateNode.MhKey)
if err != nil {
return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode}
if w.db.UseCopyFrom() {
var row []interface{}
blockNum, _ := strconv.ParseInt(stateNode.BlockNumber, 10, 64)
row = append(row, blockNum, stateNode.HeaderID, stateKey, stateNode.CID,
stateNode.Path, stateNode.NodeType, true, stateNode.MhKey)
var rows [][]interface{}
rows = append(rows, row)
_, err := tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(), rows)
if err != nil {
return insertError{"eth.state_cids", err, "COPY", stateNode}
}
} else {
_, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(),
stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true,
stateNode.MhKey)
if err != nil {
return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode}
}
}
return nil
}
@ -179,11 +195,26 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
if storageCID.StorageKey != nullHash.String() {
storageKey = storageCID.StorageKey
}
_, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(),
storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path,
storageCID.NodeType, true, storageCID.MhKey)
if err != nil {
return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID}
if w.db.UseCopyFrom() {
var row []interface{}
blockNum, _ := strconv.ParseInt(storageCID.BlockNumber, 10, 64)
row = append(row, blockNum, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID,
storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)
var rows [][]interface{}
rows = append(rows, row)
_, err := tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(), rows)
if err != nil {
return insertError{"eth.state_cids", err, "COPY", storageCID}
}
} else {
_, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(),
storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path,
storageCID.NodeType, true, storageCID.MhKey)
if err != nil {
return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID}
}
}
return nil
}