From 30fd4934649add51e64fc10cf048e833ef6f03d1 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Wed, 8 Mar 2023 14:38:23 -0600 Subject: [PATCH] Add COPY support for tx and rct tables. --- statediff/indexer/database/sql/interfaces.go | 22 +++-- .../indexer/database/sql/postgres/database.go | 64 +++++++++------ statediff/indexer/database/sql/writer.go | 80 +++++++++++++++---- 3 files changed, 119 insertions(+), 47 deletions(-) diff --git a/statediff/indexer/database/sql/interfaces.go b/statediff/indexer/database/sql/interfaces.go index 890342902..f85f33ae4 100644 --- a/statediff/indexer/database/sql/interfaces.go +++ b/statediff/indexer/database/sql/interfaces.go @@ -50,21 +50,27 @@ type Statements interface { InsertTxStm() string InsertAccessListElementStm() string InsertRctStm() string - LogsTableName() []string - LogsColumnNames() []string InsertLogStm() string - StateTableName() []string - StateColumnNames() []string InsertStateStm() string - AccountTableName() []string - AccountColumnNames() []string InsertAccountStm() string - StorageTableName() []string - StorageColumnNames() []string InsertStorageStm() string InsertIPLDStm() string InsertIPLDsStm() string InsertKnownGapsStm() string + + // Table/column descriptions for use with CopyFrom and similar commands. + AccountTableName() []string + AccountColumnNames() []string + LogTableName() []string + LogColumnNames() []string + RctTableName() []string + RctColumnNames() []string + StateTableName() []string + StateColumnNames() []string + StorageTableName() []string + StorageColumnNames() []string + TxTableName() []string + TxColumnNames() []string } // Tx interface to accommodate different concrete SQL transaction types diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index 06aa9c03e..8ee86251a 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -122,33 +122,49 @@ func (db *DB) InsertKnownGapsStm() string { WHERE eth_meta.known_gaps.ending_block_number <= $2` } -func (db *DB) StateTableName() []string { - return []string{"eth", "state_cids"} -} - -func (db *DB) StorageTableName() []string { - return []string{"eth", "storage_cids"} -} - -func (db *DB) StateColumnNames() []string { - return []string{"block_number", "header_id", "state_leaf_key", "cid", "state_path", "node_type", "diff", "mh_key"} -} - -func (db *DB) StorageColumnNames() []string { - return []string{"block_number", "header_id", "state_path", "storage_leaf_key", "cid", "storage_path", "node_type", "diff", "mh_key"} -} - -func (db *DB) LogsTableName() []string { - return []string{"eth", "log_cids"} -} - -func (db *DB) LogsColumnNames() []string { - return []string{"block_number", "header_id", "leaf_cid", "leaf_mh_key", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3", "log_data"} -} - func (db *DB) AccountTableName() []string { return []string{"eth", "state_accounts"} } func (db *DB) AccountColumnNames() []string { return []string{"block_number", "header_id", "state_path", "balance", "nonce", "code_hash", "storage_root"} } + +func (db *DB) LogTableName() []string { + return []string{"eth", "log_cids"} +} + +func (db *DB) LogColumnNames() []string { + return []string{"block_number", "header_id", "leaf_cid", "leaf_mh_key", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3", "log_data"} +} + +func (db *DB) RctTableName() []string { + return []string{"eth", "receipt_cids"} +} + +func (db *DB) RctColumnNames() []string { + return []string{"block_number", "header_id", "tx_id", "leaf_cid", "contract", "contract_hash", "leaf_mh_key", "post_state", "post_status", "log_root"} +} + +func (db *DB) StateTableName() []string { + return []string{"eth", "state_cids"} +} + +func (db *DB) StateColumnNames() []string { + return []string{"block_number", "header_id", "state_leaf_key", "cid", "state_path", "node_type", "diff", "mh_key"} +} + +func (db *DB) StorageTableName() []string { + return []string{"eth", "storage_cids"} +} + +func (db *DB) StorageColumnNames() []string { + return []string{"block_number", "header_id", "state_path", "storage_leaf_key", "cid", "storage_path", "node_type", "diff", "mh_key"} +} + +func (db *DB) TxTableName() []string { + return []string{"eth", "transaction_cids"} +} + +func (db *DB) TxColumnNames() []string { + return []string{"block_number", "header_id", "tx_hash", "cid", "dst", "src", "index", "mh_key", "tx_data", "tx_type", "value"} +} diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 80436b44e..df059c535 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -82,11 +82,35 @@ INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, sr ON CONFLICT (tx_hash, header_id, block_number) DO NOTHING */ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { - _, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(), - transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, - transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value) - if err != nil { - return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction} + if w.useCopyForTx(tx) { + var row []interface{} + blockNum, err := strconv.ParseInt(transaction.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.transaction_cids", err, "COPY", transaction} + } + + value, err := strconv.ParseInt(transaction.Value, 10, 64) + if err != nil { + return insertError{"eth.transaction_cids", err, "COPY", transaction} + } + + row = append(row, blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, + transaction.Index, transaction.MhKey, transaction.Data, int(transaction.Type), value) + + var rows [][]interface{} + rows = append(rows, row) + + _, err = tx.CopyFrom(w.db.Context(), w.db.TxTableName(), w.db.TxColumnNames(), rows) + if err != nil { + return insertError{"eth.transaction_cids", err, "COPY", transaction} + } + } else { + _, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(), + transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, + transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value) + if err != nil { + return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction} + } } metrics.IndexerMetrics.TransactionsCounter.Inc(1) return nil @@ -112,11 +136,30 @@ INSERT INTO eth.receipt_cids (block_number, header_id, tx_id, leaf_cid, contract ON CONFLICT (tx_id, header_id, block_number) DO NOTHING */ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { - _, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(), - rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, - rct.PostStatus, rct.LogRoot) - if err != nil { - return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct} + if w.useCopyForTx(tx) { + var row []interface{} + blockNum, err := strconv.ParseInt(rct.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.receipt_cids", err, "COPY", rct} + } + + row = append(row, blockNum, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, + rct.LeafMhKey, rct.PostState, int(rct.PostStatus), rct.LogRoot) + + var rows [][]interface{} + rows = append(rows, row) + + _, err = tx.CopyFrom(w.db.Context(), w.db.RctTableName(), w.db.RctColumnNames(), rows) + if err != nil { + return insertError{"eth.receipt_cids", err, "COPY", rct} + } + } else { + _, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(), + rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, + rct.PostStatus, rct.LogRoot) + if err != nil { + return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct} + } } metrics.IndexerMetrics.ReceiptsCounter.Inc(1) return nil @@ -127,7 +170,7 @@ INSERT INTO eth.log_cids (block_number, header_id, leaf_cid, leaf_mh_key, rct_id ON CONFLICT (rct_id, index, header_id, block_number) DO NOTHING */ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { - if w.db.UseCopyFrom() { + if w.useCopyForTx(tx) { var rows [][]interface{} for _, log := range logs { var row []interface{} @@ -142,7 +185,7 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { rows = append(rows, row) } if nil != rows && len(rows) >= 0 { - _, err := tx.CopyFrom(w.db.Context(), w.db.LogsTableName(), w.db.LogsColumnNames(), rows) + _, err := tx.CopyFrom(w.db.Context(), w.db.LogTableName(), w.db.LogColumnNames(), rows) if err != nil { return insertError{"eth.log_cids", err, "COPY", rows} } @@ -171,7 +214,7 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { if stateNode.StateKey != nullHash.String() { stateKey = stateNode.StateKey } - if w.db.UseCopyFrom() { + if w.useCopyForTx(tx) { var row []interface{} blockNum, err := strconv.ParseInt(stateNode.BlockNumber, 10, 64) if err != nil { @@ -204,7 +247,7 @@ INSERT INTO eth.state_accounts (block_number, header_id, state_path, balance, no ON CONFLICT (header_id, state_path, block_number) DO NOTHING */ func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error { - if w.db.UseCopyFrom() { + if w.useCopyForTx(tx) { var row []interface{} blockNum, err := strconv.ParseInt(stateAccount.BlockNumber, 10, 64) if err != nil { @@ -245,7 +288,7 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err if storageCID.StorageKey != nullHash.String() { storageKey = storageCID.StorageKey } - if w.db.UseCopyFrom() { + if w.useCopyForTx(tx) { var row []interface{} blockNum, err := strconv.ParseInt(storageCID.BlockNumber, 10, 64) if err != nil { @@ -273,6 +316,13 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err return nil } +func (w *Writer) useCopyForTx(tx Tx) bool { + if _, ok := tx.(*DelayedTx); ok { + return w.db.UseCopyFrom() + } + return false +} + type insertError struct { table string err error