Add COPY support for tx and rct tables.

This commit is contained in:
Thomas E Lackey 2023-03-08 14:38:23 -06:00
parent af1586800e
commit 30fd493464
3 changed files with 119 additions and 47 deletions

View File

@ -50,21 +50,27 @@ type Statements interface {
InsertTxStm() string
InsertAccessListElementStm() string
InsertRctStm() string
LogsTableName() []string
LogsColumnNames() []string
InsertLogStm() string
StateTableName() []string
StateColumnNames() []string
InsertStateStm() string
AccountTableName() []string
AccountColumnNames() []string
InsertAccountStm() string
StorageTableName() []string
StorageColumnNames() []string
InsertStorageStm() string
InsertIPLDStm() string
InsertIPLDsStm() string
InsertKnownGapsStm() string
// Table/column descriptions for use with CopyFrom and similar commands.
AccountTableName() []string
AccountColumnNames() []string
LogTableName() []string
LogColumnNames() []string
RctTableName() []string
RctColumnNames() []string
StateTableName() []string
StateColumnNames() []string
StorageTableName() []string
StorageColumnNames() []string
TxTableName() []string
TxColumnNames() []string
}
// Tx interface to accommodate different concrete SQL transaction types

View File

@ -122,33 +122,49 @@ func (db *DB) InsertKnownGapsStm() string {
WHERE eth_meta.known_gaps.ending_block_number <= $2`
}
func (db *DB) StateTableName() []string {
return []string{"eth", "state_cids"}
}
func (db *DB) StorageTableName() []string {
return []string{"eth", "storage_cids"}
}
func (db *DB) StateColumnNames() []string {
return []string{"block_number", "header_id", "state_leaf_key", "cid", "state_path", "node_type", "diff", "mh_key"}
}
func (db *DB) StorageColumnNames() []string {
return []string{"block_number", "header_id", "state_path", "storage_leaf_key", "cid", "storage_path", "node_type", "diff", "mh_key"}
}
func (db *DB) LogsTableName() []string {
return []string{"eth", "log_cids"}
}
func (db *DB) LogsColumnNames() []string {
return []string{"block_number", "header_id", "leaf_cid", "leaf_mh_key", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3", "log_data"}
}
func (db *DB) AccountTableName() []string {
return []string{"eth", "state_accounts"}
}
func (db *DB) AccountColumnNames() []string {
return []string{"block_number", "header_id", "state_path", "balance", "nonce", "code_hash", "storage_root"}
}
func (db *DB) LogTableName() []string {
return []string{"eth", "log_cids"}
}
func (db *DB) LogColumnNames() []string {
return []string{"block_number", "header_id", "leaf_cid", "leaf_mh_key", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3", "log_data"}
}
func (db *DB) RctTableName() []string {
return []string{"eth", "receipt_cids"}
}
func (db *DB) RctColumnNames() []string {
return []string{"block_number", "header_id", "tx_id", "leaf_cid", "contract", "contract_hash", "leaf_mh_key", "post_state", "post_status", "log_root"}
}
func (db *DB) StateTableName() []string {
return []string{"eth", "state_cids"}
}
func (db *DB) StateColumnNames() []string {
return []string{"block_number", "header_id", "state_leaf_key", "cid", "state_path", "node_type", "diff", "mh_key"}
}
func (db *DB) StorageTableName() []string {
return []string{"eth", "storage_cids"}
}
func (db *DB) StorageColumnNames() []string {
return []string{"block_number", "header_id", "state_path", "storage_leaf_key", "cid", "storage_path", "node_type", "diff", "mh_key"}
}
func (db *DB) TxTableName() []string {
return []string{"eth", "transaction_cids"}
}
func (db *DB) TxColumnNames() []string {
return []string{"block_number", "header_id", "tx_hash", "cid", "dst", "src", "index", "mh_key", "tx_data", "tx_type", "value"}
}

View File

@ -82,11 +82,35 @@ INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, sr
ON CONFLICT (tx_hash, header_id, block_number) DO NOTHING
*/
func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
_, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(),
transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src,
transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value)
if err != nil {
return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction}
if w.useCopyForTx(tx) {
var row []interface{}
blockNum, err := strconv.ParseInt(transaction.BlockNumber, 10, 64)
if err != nil {
return insertError{"eth.transaction_cids", err, "COPY", transaction}
}
value, err := strconv.ParseInt(transaction.Value, 10, 64)
if err != nil {
return insertError{"eth.transaction_cids", err, "COPY", transaction}
}
row = append(row, blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src,
transaction.Index, transaction.MhKey, transaction.Data, int(transaction.Type), value)
var rows [][]interface{}
rows = append(rows, row)
_, err = tx.CopyFrom(w.db.Context(), w.db.TxTableName(), w.db.TxColumnNames(), rows)
if err != nil {
return insertError{"eth.transaction_cids", err, "COPY", transaction}
}
} else {
_, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(),
transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src,
transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value)
if err != nil {
return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction}
}
}
metrics.IndexerMetrics.TransactionsCounter.Inc(1)
return nil
@ -112,11 +136,30 @@ INSERT INTO eth.receipt_cids (block_number, header_id, tx_id, leaf_cid, contract
ON CONFLICT (tx_id, header_id, block_number) DO NOTHING
*/
func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
_, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(),
rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState,
rct.PostStatus, rct.LogRoot)
if err != nil {
return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct}
if w.useCopyForTx(tx) {
var row []interface{}
blockNum, err := strconv.ParseInt(rct.BlockNumber, 10, 64)
if err != nil {
return insertError{"eth.receipt_cids", err, "COPY", rct}
}
row = append(row, blockNum, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash,
rct.LeafMhKey, rct.PostState, int(rct.PostStatus), rct.LogRoot)
var rows [][]interface{}
rows = append(rows, row)
_, err = tx.CopyFrom(w.db.Context(), w.db.RctTableName(), w.db.RctColumnNames(), rows)
if err != nil {
return insertError{"eth.receipt_cids", err, "COPY", rct}
}
} else {
_, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(),
rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState,
rct.PostStatus, rct.LogRoot)
if err != nil {
return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct}
}
}
metrics.IndexerMetrics.ReceiptsCounter.Inc(1)
return nil
@ -127,7 +170,7 @@ INSERT INTO eth.log_cids (block_number, header_id, leaf_cid, leaf_mh_key, rct_id
ON CONFLICT (rct_id, index, header_id, block_number) DO NOTHING
*/
func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
if w.db.UseCopyFrom() {
if w.useCopyForTx(tx) {
var rows [][]interface{}
for _, log := range logs {
var row []interface{}
@ -142,7 +185,7 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
rows = append(rows, row)
}
if nil != rows && len(rows) >= 0 {
_, err := tx.CopyFrom(w.db.Context(), w.db.LogsTableName(), w.db.LogsColumnNames(), rows)
_, err := tx.CopyFrom(w.db.Context(), w.db.LogTableName(), w.db.LogColumnNames(), rows)
if err != nil {
return insertError{"eth.log_cids", err, "COPY", rows}
}
@ -171,7 +214,7 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
if stateNode.StateKey != nullHash.String() {
stateKey = stateNode.StateKey
}
if w.db.UseCopyFrom() {
if w.useCopyForTx(tx) {
var row []interface{}
blockNum, err := strconv.ParseInt(stateNode.BlockNumber, 10, 64)
if err != nil {
@ -204,7 +247,7 @@ INSERT INTO eth.state_accounts (block_number, header_id, state_path, balance, no
ON CONFLICT (header_id, state_path, block_number) DO NOTHING
*/
func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error {
if w.db.UseCopyFrom() {
if w.useCopyForTx(tx) {
var row []interface{}
blockNum, err := strconv.ParseInt(stateAccount.BlockNumber, 10, 64)
if err != nil {
@ -245,7 +288,7 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
if storageCID.StorageKey != nullHash.String() {
storageKey = storageCID.StorageKey
}
if w.db.UseCopyFrom() {
if w.useCopyForTx(tx) {
var row []interface{}
blockNum, err := strconv.ParseInt(storageCID.BlockNumber, 10, 64)
if err != nil {
@ -273,6 +316,13 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
return nil
}
func (w *Writer) useCopyForTx(tx Tx) bool {
if _, ok := tx.(*DelayedTx); ok {
return w.db.UseCopyFrom()
}
return false
}
type insertError struct {
table string
err error