Add COPY support for inserting multiple rows in a single operation. (#328)

* Add COPY support for inserting multiple rows in a single command.
This commit is contained in:
Thomas E Lackey 2023-03-09 20:02:40 -06:00 committed by GitHub
parent 2b073c1a51
commit 9c4ead5580
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 334 additions and 46 deletions

View File

@ -248,6 +248,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
if ctx.IsSet(utils.StateDiffLogStatements.Name) { if ctx.IsSet(utils.StateDiffLogStatements.Name) {
pgConfig.LogStatements = ctx.Bool(utils.StateDiffLogStatements.Name) pgConfig.LogStatements = ctx.Bool(utils.StateDiffLogStatements.Name)
} }
if ctx.IsSet(utils.StateDiffCopyFrom.Name) {
pgConfig.CopyFrom = ctx.Bool(utils.StateDiffCopyFrom.Name)
}
indexerConfig = pgConfig indexerConfig = pgConfig
case shared.DUMP: case shared.DUMP:
dumpTypeStr := ctx.String(utils.StateDiffDBDumpDst.Name) dumpTypeStr := ctx.String(utils.StateDiffDBDumpDst.Name)

View File

@ -175,6 +175,7 @@ var (
utils.StateDiffWatchedAddressesFilePath, utils.StateDiffWatchedAddressesFilePath,
utils.StateDiffUpsert, utils.StateDiffUpsert,
utils.StateDiffLogStatements, utils.StateDiffLogStatements,
utils.StateDiffCopyFrom,
configFileFlag, configFileFlag,
}, utils.NetworkFlags, utils.DatabasePathFlags) }, utils.NetworkFlags, utils.DatabasePathFlags)

View File

@ -1100,6 +1100,13 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server.
Usage: "Should the statediff service log all database statements? (Note: pgx only)", Usage: "Should the statediff service log all database statements? (Note: pgx only)",
Value: false, Value: false,
} }
StateDiffCopyFrom = &cli.BoolFlag{
Name: "statediff.db.copyfrom",
Usage: "Should the statediff service use COPY FROM for multiple inserts? (Note: pgx only)",
Value: false,
}
StateDiffWritingFlag = &cli.BoolFlag{ StateDiffWritingFlag = &cli.BoolFlag{
Name: "statediff.writing", Name: "statediff.writing",
Usage: "Activates progressive writing of state diffs to database as new block are synced", Usage: "Activates progressive writing of state diffs to database as new block are synced",

View File

@ -31,6 +31,7 @@ type Database interface {
// Driver interface has all the methods required by a driver implementation to support the sql indexer // Driver interface has all the methods required by a driver implementation to support the sql indexer
type Driver interface { type Driver interface {
UseCopyFrom() bool
QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow
Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error)
Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error
@ -56,12 +57,27 @@ type Statements interface {
InsertIPLDStm() string InsertIPLDStm() string
InsertIPLDsStm() string InsertIPLDsStm() string
InsertKnownGapsStm() string InsertKnownGapsStm() string
// Table/column descriptions for use with CopyFrom and similar commands.
AccountTableName() []string
AccountColumnNames() []string
LogTableName() []string
LogColumnNames() []string
RctTableName() []string
RctColumnNames() []string
StateTableName() []string
StateColumnNames() []string
StorageTableName() []string
StorageColumnNames() []string
TxTableName() []string
TxColumnNames() []string
} }
// Tx interface to accommodate different concrete SQL transaction types // Tx interface to accommodate different concrete SQL transaction types
type Tx interface { type Tx interface {
QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow
Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error)
CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error)
Commit(ctx context.Context) error Commit(ctx context.Context) error
Rollback(ctx context.Context) error Rollback(ctx context.Context) error
} }

View File

@ -2,10 +2,16 @@ package sql
import ( import (
"context" "context"
"reflect"
"github.com/ethereum/go-ethereum/log"
) )
// Changing this to 1 would make sure only sequential COPYs were combined.
const copyFromCheckLimit = 100
type DelayedTx struct { type DelayedTx struct {
cache []cachedStmt cache []interface{}
db Database db Database
} }
type cachedStmt struct { type cachedStmt struct {
@ -13,6 +19,20 @@ type cachedStmt struct {
args []interface{} args []interface{}
} }
type copyFrom struct {
tableName []string
columnNames []string
rows [][]interface{}
}
func (cf *copyFrom) appendRows(rows [][]interface{}) {
cf.rows = append(cf.rows, rows...)
}
func (cf *copyFrom) matches(tableName []string, columnNames []string) bool {
return reflect.DeepEqual(cf.tableName, tableName) && reflect.DeepEqual(cf.columnNames, columnNames)
}
func NewDelayedTx(db Database) *DelayedTx { func NewDelayedTx(db Database) *DelayedTx {
return &DelayedTx{db: db} return &DelayedTx{db: db}
} }
@ -21,6 +41,28 @@ func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface
return tx.db.QueryRow(ctx, sql, args...) return tx.db.QueryRow(ctx, sql, args...)
} }
func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) (*copyFrom, int) {
for pos, count := len(tx.cache)-1, 0; pos >= 0 && count < limit; pos, count = pos-1, count+1 {
prevCopy, ok := tx.cache[pos].(*copyFrom)
if ok && prevCopy.matches(tableName, columnNames) {
return prevCopy, count
}
}
return nil, -1
}
func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) {
if prevCopy, distance := tx.findPrevCopyFrom(tableName, columnNames, copyFromCheckLimit); nil != prevCopy {
log.Trace("statediff lazy_tx : Appending to COPY", "table", tableName,
"current", len(prevCopy.rows), "new", len(rows), "distance", distance)
prevCopy.appendRows(rows)
} else {
tx.cache = append(tx.cache, &copyFrom{tableName, columnNames, rows})
}
return 0, nil
}
func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) { func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) {
tx.cache = append(tx.cache, cachedStmt{sql, args}) tx.cache = append(tx.cache, cachedStmt{sql, args})
return nil, nil return nil, nil
@ -39,12 +81,21 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
rollback(ctx, base) rollback(ctx, base)
} }
}() }()
for _, stmt := range tx.cache { for _, item := range tx.cache {
_, err := base.Exec(ctx, stmt.sql, stmt.args...) switch item := item.(type) {
case *copyFrom:
_, err := base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows)
if err != nil {
log.Error("COPY error", "table", item.tableName, "err", err)
return err
}
case cachedStmt:
_, err := base.Exec(ctx, item.sql, item.args...)
if err != nil { if err != nil {
return err return err
} }
} }
}
tx.cache = nil tx.cache = nil
return base.Commit(ctx) return base.Commit(ctx)
} }

View File

@ -28,7 +28,7 @@ import (
) )
func setupLegacyPGXIndexer(t *testing.T) { func setupLegacyPGXIndexer(t *testing.T) {
db, err = postgres.SetupPGXDB() db, err = postgres.SetupPGXDB(postgres.DefaultConfig)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -29,8 +29,8 @@ import (
"github.com/ethereum/go-ethereum/statediff/indexer/test" "github.com/ethereum/go-ethereum/statediff/indexer/test"
) )
func setupPGXIndexer(t *testing.T) { func setupPGXIndexer(t *testing.T, config postgres.Config) {
db, err = postgres.SetupPGXDB() db, err = postgres.SetupPGXDB(config)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -39,12 +39,16 @@ func setupPGXIndexer(t *testing.T) {
} }
func setupPGX(t *testing.T) { func setupPGX(t *testing.T) {
setupPGXIndexer(t) setupPGXWithConfig(t, postgres.DefaultConfig)
}
func setupPGXWithConfig(t *testing.T, config postgres.Config) {
setupPGXIndexer(t, config)
test.SetupTestData(t, ind) test.SetupTestData(t, ind)
} }
func setupPGXNonCanonical(t *testing.T) { func setupPGXNonCanonical(t *testing.T) {
setupPGXIndexer(t) setupPGXIndexer(t, postgres.DefaultConfig)
test.SetupTestDataNonCanonical(t, ind) test.SetupTestDataNonCanonical(t, ind)
} }
@ -97,6 +101,20 @@ func TestPGXIndexer(t *testing.T) {
test.TestPublishAndIndexStorageIPLDs(t, db) test.TestPublishAndIndexStorageIPLDs(t, db)
}) })
t.Run("Publish and index with CopyFrom enabled.", func(t *testing.T) {
config := postgres.DefaultConfig
config.CopyFrom = true
setupPGXWithConfig(t, config)
defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1)
test.TestPublishAndIndexStateIPLDs(t, db)
test.TestPublishAndIndexStorageIPLDs(t, db)
test.TestPublishAndIndexReceiptIPLDs(t, db)
test.TestPublishAndIndexLogIPLDs(t, db)
})
} }
// Test indexer for a canonical + a non-canonical block at London height + a non-canonical block at London height + 1 // Test indexer for a canonical + a non-canonical block at London height + a non-canonical block at London height + 1
@ -151,7 +169,7 @@ func TestPGXIndexerNonCanonical(t *testing.T) {
} }
func TestPGXWatchAddressMethods(t *testing.T) { func TestPGXWatchAddressMethods(t *testing.T) {
setupPGXIndexer(t) setupPGXIndexer(t, postgres.DefaultConfig)
defer tearDown(t) defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1) defer checkTxClosure(t, 1, 0, 1)

View File

@ -81,6 +81,9 @@ type Config struct {
// toggle on/off upserts // toggle on/off upserts
Upsert bool Upsert bool
// toggle on/off CopyFrom
CopyFrom bool
} }
// Type satisfies interfaces.Config // Type satisfies interfaces.Config

View File

@ -121,3 +121,50 @@ func (db *DB) InsertKnownGapsStm() string {
ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ($2, $4) ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ($2, $4)
WHERE eth_meta.known_gaps.ending_block_number <= $2` WHERE eth_meta.known_gaps.ending_block_number <= $2`
} }
func (db *DB) AccountTableName() []string {
return []string{"eth", "state_accounts"}
}
func (db *DB) AccountColumnNames() []string {
return []string{"block_number", "header_id", "state_path", "balance", "nonce", "code_hash", "storage_root"}
}
func (db *DB) LogTableName() []string {
return []string{"eth", "log_cids"}
}
func (db *DB) LogColumnNames() []string {
return []string{"block_number", "header_id", "leaf_cid", "leaf_mh_key", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3", "log_data"}
}
func (db *DB) RctTableName() []string {
return []string{"eth", "receipt_cids"}
}
func (db *DB) RctColumnNames() []string {
return []string{"block_number", "header_id", "tx_id", "leaf_cid", "contract", "contract_hash", "leaf_mh_key", "post_state", "post_status", "log_root"}
}
func (db *DB) StateTableName() []string {
return []string{"eth", "state_cids"}
}
func (db *DB) StateColumnNames() []string {
return []string{"block_number", "header_id", "state_leaf_key", "cid", "state_path", "node_type", "diff", "mh_key"}
}
func (db *DB) StorageTableName() []string {
return []string{"eth", "storage_cids"}
}
func (db *DB) StorageColumnNames() []string {
return []string{"block_number", "header_id", "state_path", "storage_leaf_key", "cid", "storage_path", "node_type", "diff", "mh_key"}
}
func (db *DB) TxTableName() []string {
return []string{"eth", "transaction_cids"}
}
func (db *DB) TxColumnNames() []string {
return []string{"block_number", "header_id", "tx_hash", "cid", "dst", "src", "index", "mh_key", "tx_data", "tx_type", "value"}
}

View File

@ -38,6 +38,7 @@ type PGXDriver struct {
pool *pgxpool.Pool pool *pgxpool.Pool
nodeInfo node.Info nodeInfo node.Info
nodeID string nodeID string
config Config
} }
// NewPGXDriver returns a new pgx driver // NewPGXDriver returns a new pgx driver
@ -51,7 +52,7 @@ func NewPGXDriver(ctx context.Context, config Config, node node.Info) (*PGXDrive
if err != nil { if err != nil {
return nil, ErrDBConnectionFailed(err) return nil, ErrDBConnectionFailed(err)
} }
pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node} pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node, config: config}
nodeErr := pg.createNode() nodeErr := pg.createNode()
if nodeErr != nil { if nodeErr != nil {
return &PGXDriver{}, ErrUnableToSetNode(nodeErr) return &PGXDriver{}, ErrUnableToSetNode(nodeErr)
@ -161,6 +162,11 @@ func (pgx *PGXDriver) Context() context.Context {
return pgx.ctx return pgx.ctx
} }
// HasCopy satisfies sql.Database
func (pgx *PGXDriver) UseCopyFrom() bool {
return pgx.config.CopyFrom
}
type resultWrapper struct { type resultWrapper struct {
ct pgconn.CommandTag ct pgconn.CommandTag
} }
@ -239,3 +245,7 @@ func (t pgxTxWrapper) Commit(ctx context.Context) error {
func (t pgxTxWrapper) Rollback(ctx context.Context) error { func (t pgxTxWrapper) Rollback(ctx context.Context) error {
return t.tx.Rollback(ctx) return t.tx.Rollback(ctx)
} }
func (t pgxTxWrapper) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) {
return t.tx.CopyFrom(ctx, tableName, columnNames, pgx.CopyFromRows(rows))
}

View File

@ -19,6 +19,7 @@ package postgres
import ( import (
"context" "context"
coresql "database/sql" coresql "database/sql"
"errors"
"time" "time"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
@ -119,6 +120,12 @@ func (driver *SQLXDriver) Context() context.Context {
return driver.ctx return driver.ctx
} }
// HasCopy satisfies sql.Database
func (driver *SQLXDriver) UseCopyFrom() bool {
// sqlx does not currently support COPY.
return false
}
type sqlxStatsWrapper struct { type sqlxStatsWrapper struct {
stats coresql.DBStats stats coresql.DBStats
} }
@ -186,3 +193,7 @@ func (t sqlxTxWrapper) Commit(ctx context.Context) error {
func (t sqlxTxWrapper) Rollback(ctx context.Context) error { func (t sqlxTxWrapper) Rollback(ctx context.Context) error {
return t.tx.Rollback() return t.tx.Rollback()
} }
func (t sqlxTxWrapper) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) {
return 0, errors.New("Unsupported Operation")
}

View File

@ -35,8 +35,8 @@ func SetupSQLXDB() (sql.Database, error) {
} }
// SetupPGXDB is used to setup a pgx db for tests // SetupPGXDB is used to setup a pgx db for tests
func SetupPGXDB() (sql.Database, error) { func SetupPGXDB(config Config) (sql.Database, error) {
driver, err := NewPGXDriver(context.Background(), DefaultConfig, node.Info{}) driver, err := NewPGXDriver(context.Background(), config, node.Info{})
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -18,6 +18,7 @@ package sql
import ( import (
"fmt" "fmt"
"strconv"
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
@ -81,12 +82,31 @@ INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, sr
ON CONFLICT (tx_hash, header_id, block_number) DO NOTHING ON CONFLICT (tx_hash, header_id, block_number) DO NOTHING
*/ */
func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
if w.useCopyForTx(tx) {
blockNum, err := strconv.ParseInt(transaction.BlockNumber, 10, 64)
if err != nil {
return insertError{"eth.transaction_cids", err, "COPY", transaction}
}
value, err := strconv.ParseFloat(transaction.Value, 64)
if err != nil {
return insertError{"eth.transaction_cids", err, "COPY", transaction}
}
_, err = tx.CopyFrom(w.db.Context(), w.db.TxTableName(), w.db.TxColumnNames(),
toRows(toRow(blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, int(transaction.Type), value)))
if err != nil {
return insertError{"eth.transaction_cids", err, "COPY", transaction}
}
} else {
_, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(), _, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(),
transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src,
transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value) transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value)
if err != nil { if err != nil {
return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction} return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction}
} }
}
metrics.IndexerMetrics.TransactionsCounter.Inc(1) metrics.IndexerMetrics.TransactionsCounter.Inc(1)
return nil return nil
} }
@ -111,12 +131,26 @@ INSERT INTO eth.receipt_cids (block_number, header_id, tx_id, leaf_cid, contract
ON CONFLICT (tx_id, header_id, block_number) DO NOTHING ON CONFLICT (tx_id, header_id, block_number) DO NOTHING
*/ */
func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
if w.useCopyForTx(tx) {
blockNum, err := strconv.ParseInt(rct.BlockNumber, 10, 64)
if err != nil {
return insertError{"eth.receipt_cids", err, "COPY", rct}
}
_, err = tx.CopyFrom(w.db.Context(), w.db.RctTableName(), w.db.RctColumnNames(),
toRows(toRow(blockNum, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash,
rct.LeafMhKey, rct.PostState, int(rct.PostStatus), rct.LogRoot)))
if err != nil {
return insertError{"eth.receipt_cids", err, "COPY", rct}
}
} else {
_, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(), _, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(),
rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState,
rct.PostStatus, rct.LogRoot) rct.PostStatus, rct.LogRoot)
if err != nil { if err != nil {
return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct} return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct}
} }
}
metrics.IndexerMetrics.ReceiptsCounter.Inc(1) metrics.IndexerMetrics.ReceiptsCounter.Inc(1)
return nil return nil
} }
@ -126,6 +160,25 @@ INSERT INTO eth.log_cids (block_number, header_id, leaf_cid, leaf_mh_key, rct_id
ON CONFLICT (rct_id, index, header_id, block_number) DO NOTHING ON CONFLICT (rct_id, index, header_id, block_number) DO NOTHING
*/ */
func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
if w.useCopyForTx(tx) {
var rows [][]interface{}
for _, log := range logs {
blockNum, err := strconv.ParseInt(log.BlockNumber, 10, 64)
if err != nil {
return insertError{"eth.log_cids", err, "COPY", log}
}
rows = append(rows, toRow(blockNum, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID,
log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data))
}
if nil != rows && len(rows) >= 0 {
_, err := tx.CopyFrom(w.db.Context(), w.db.LogTableName(), w.db.LogColumnNames(), rows)
if err != nil {
return insertError{"eth.log_cids", err, "COPY", rows}
}
metrics.IndexerMetrics.LogsCounter.Inc(int64(len(rows)))
}
} else {
for _, log := range logs { for _, log := range logs {
_, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(), _, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(),
log.BlockNumber, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.BlockNumber, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1,
@ -135,6 +188,7 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
} }
metrics.IndexerMetrics.LogsCounter.Inc(1) metrics.IndexerMetrics.LogsCounter.Inc(1)
} }
}
return nil return nil
} }
@ -147,12 +201,26 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
if stateNode.StateKey != nullHash.String() { if stateNode.StateKey != nullHash.String() {
stateKey = stateNode.StateKey stateKey = stateNode.StateKey
} }
if w.useCopyForTx(tx) {
blockNum, err := strconv.ParseInt(stateNode.BlockNumber, 10, 64)
if err != nil {
return insertError{"eth.state_cids", err, "COPY", stateNode}
}
_, err = tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(),
toRows(toRow(blockNum, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path,
stateNode.NodeType, true, stateNode.MhKey)))
if err != nil {
return insertError{"eth.state_cids", err, "COPY", stateNode}
}
} else {
_, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(), _, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(),
stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true,
stateNode.MhKey) stateNode.MhKey)
if err != nil { if err != nil {
return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode} return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode}
} }
}
return nil return nil
} }
@ -161,12 +229,30 @@ INSERT INTO eth.state_accounts (block_number, header_id, state_path, balance, no
ON CONFLICT (header_id, state_path, block_number) DO NOTHING ON CONFLICT (header_id, state_path, block_number) DO NOTHING
*/ */
func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error { func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error {
if w.useCopyForTx(tx) {
blockNum, err := strconv.ParseInt(stateAccount.BlockNumber, 10, 64)
if err != nil {
return insertError{"eth.state_accounts", err, "COPY", stateAccount}
}
balance, err := strconv.ParseFloat(stateAccount.Balance, 64)
if err != nil {
return insertError{"eth.state_accounts", err, "COPY", stateAccount}
}
_, err = tx.CopyFrom(w.db.Context(), w.db.AccountTableName(), w.db.AccountColumnNames(),
toRows(toRow(blockNum, stateAccount.HeaderID, stateAccount.StatePath, balance, stateAccount.Nonce,
stateAccount.CodeHash, stateAccount.StorageRoot)))
if err != nil {
return insertError{"eth.state_accounts", err, "COPY", stateAccount}
}
} else {
_, err := tx.Exec(w.db.Context(), w.db.InsertAccountStm(), _, err := tx.Exec(w.db.Context(), w.db.InsertAccountStm(),
stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance,
stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)
if err != nil { if err != nil {
return insertError{"eth.state_accounts", err, w.db.InsertAccountStm(), stateAccount} return insertError{"eth.state_accounts", err, w.db.InsertAccountStm(), stateAccount}
} }
}
return nil return nil
} }
@ -179,15 +265,50 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
if storageCID.StorageKey != nullHash.String() { if storageCID.StorageKey != nullHash.String() {
storageKey = storageCID.StorageKey storageKey = storageCID.StorageKey
} }
if w.useCopyForTx(tx) {
blockNum, err := strconv.ParseInt(storageCID.BlockNumber, 10, 64)
if err != nil {
return insertError{"eth.storage_cids", err, "COPY", storageCID}
}
_, err = tx.CopyFrom(w.db.Context(), w.db.StorageTableName(), w.db.StorageColumnNames(),
toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID,
storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)))
if err != nil {
return insertError{"eth.storage_cids", err, "COPY", storageCID}
}
} else {
_, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(), _, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(),
storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path,
storageCID.NodeType, true, storageCID.MhKey) storageCID.NodeType, true, storageCID.MhKey)
if err != nil { if err != nil {
return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID} return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID}
} }
}
return nil return nil
} }
func (w *Writer) useCopyForTx(tx Tx) bool {
// Using COPY instead of INSERT only makes much sense if also using a DelayedTx, so that operations
// can be collected over time and then all submitted within in a single TX.
if _, ok := tx.(*DelayedTx); ok {
return w.db.UseCopyFrom()
}
return false
}
// combine args into a row
func toRow(args ...interface{}) []interface{} {
var row []interface{}
row = append(row, args...)
return row
}
// combine row (or rows) into a slice of rows for CopyFrom
func toRows(rows ...[]interface{}) [][]interface{} {
return rows
}
type insertError struct { type insertError struct {
table string table string
err error err error