Concurrent statediff iteration #12

Merged
roysc merged 14 commits from use-concurrent-iterator into main 2023-09-22 08:44:38 +00:00
4 changed files with 26 additions and 58 deletions
Showing only changes of commit afec76d65f - Show all commits

View File

@ -58,18 +58,6 @@ type Statements interface {
InsertStorageStm() string InsertStorageStm() string
InsertIPLDStm() string InsertIPLDStm() string
InsertIPLDsStm() string InsertIPLDsStm() string
// Table/column descriptions for use with CopyFrom and similar commands.
LogTableName() []string
LogColumnNames() []string
RctTableName() []string
RctColumnNames() []string
StateTableName() []string
StateColumnNames() []string
StorageTableName() []string
StorageColumnNames() []string
TxTableName() []string
TxColumnNames() []string
} }
// Tx interface to accommodate different concrete SQL transaction types // Tx interface to accommodate different concrete SQL transaction types

View File

@ -107,43 +107,3 @@ func (db *DB) InsertIPLDStm() string {
func (db *DB) InsertIPLDsStm() string { func (db *DB) InsertIPLDsStm() string {
return `INSERT INTO ipld.blocks (block_number, key, data) VALUES (unnest($1::BIGINT[]), unnest($2::TEXT[]), unnest($3::BYTEA[])) ON CONFLICT DO NOTHING` return `INSERT INTO ipld.blocks (block_number, key, data) VALUES (unnest($1::BIGINT[]), unnest($2::TEXT[]), unnest($3::BYTEA[])) ON CONFLICT DO NOTHING`
} }
func (db *DB) LogTableName() []string {
return []string{"eth", "log_cids"}
}
func (db *DB) LogColumnNames() []string {
return []string{"block_number", "header_id", "cid", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3"}
}
func (db *DB) RctTableName() []string {
return []string{"eth", "receipt_cids"}
}
func (db *DB) RctColumnNames() []string {
return []string{"block_number", "header_id", "tx_id", "cid", "contract", "post_state", "post_status"}
}
func (db *DB) StateTableName() []string {
return []string{"eth", "state_cids"}
}
func (db *DB) StateColumnNames() []string {
return []string{"block_number", "header_id", "state_leaf_key", "cid", "diff", "balance", "nonce", "code_hash", "storage_root", "removed"}
}
func (db *DB) StorageTableName() []string {
return []string{"eth", "storage_cids"}
}
func (db *DB) StorageColumnNames() []string {
return []string{"block_number", "header_id", "state_leaf_key", "storage_leaf_key", "cid", "diff", "val", "removed"}
}
func (db *DB) TxTableName() []string {
return []string{"eth", "transaction_cids"}
}
func (db *DB) TxColumnNames() []string {
return []string{"block_number", "header_id", "tx_hash", "cid", "dst", "src", "index", "tx_type", "value"}
}

View File

@ -29,6 +29,7 @@ import (
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics" "github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
"github.com/cerc-io/plugeth-statediff/indexer/interfaces" "github.com/cerc-io/plugeth-statediff/indexer/interfaces"
"github.com/cerc-io/plugeth-statediff/indexer/models" "github.com/cerc-io/plugeth-statediff/indexer/models"
"github.com/cerc-io/plugeth-statediff/indexer/shared/schema"
) )
// Writer handles processing and writing of indexed IPLD objects to Postgres // Writer handles processing and writing of indexed IPLD objects to Postgres
@ -65,7 +66,8 @@ func (w *Writer) detectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*
// pgx misdetects the parameter OIDs and selects int8, which can overflow. // pgx misdetects the parameter OIDs and selects int8, which can overflow.
// unfortunately there is no good place to override it, so it is safer to pass the uint64s as text // unfortunately there is no good place to override it, so it is safer to pass the uint64s as text
// and let PG handle the cast // and let PG handle the cast
err := w.db.Select(w.db.Context(), &gaps, w.db.DetectGapsStm(), strconv.FormatUint(beginBlockNumber, 10), strconv.FormatUint(endBlockNumber, 10)) err := w.db.Select(w.db.Context(), &gaps, w.db.DetectGapsStm(),
strconv.FormatUint(beginBlockNumber, 10), strconv.FormatUint(endBlockNumber, 10))
return gaps, err return gaps, err
} }
@ -177,7 +179,8 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
return insertError{"eth.transaction_cids", err, "COPY", transaction} return insertError{"eth.transaction_cids", err, "COPY", transaction}
} }
_, err = tx.CopyFrom(w.db.Context(), w.db.TxTableName(), w.db.TxColumnNames(), _, err = tx.CopyFrom(w.db.Context(),
schema.TableTransaction.TableName(), schema.TableTransaction.ColumnNames(),
toRows(toRow(blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, toRows(toRow(blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, int(transaction.Type), value))) transaction.Src, transaction.Index, int(transaction.Type), value)))
if err != nil { if err != nil {
@ -213,7 +216,7 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
return insertError{"eth.receipt_cids", err, "COPY", rct} return insertError{"eth.receipt_cids", err, "COPY", rct}
} }
_, err = tx.CopyFrom(w.db.Context(), w.db.RctTableName(), w.db.RctColumnNames(), _, err = tx.CopyFrom(w.db.Context(), schema.TableReceipt.TableName(), schema.TableReceipt.ColumnNames(),
toRows(toRow(blockNum, rct.HeaderID, rct.TxID, rct.CID, rct.Contract, toRows(toRow(blockNum, rct.HeaderID, rct.TxID, rct.CID, rct.Contract,
rct.PostState, int(rct.PostStatus)))) rct.PostState, int(rct.PostStatus))))
if err != nil { if err != nil {
@ -253,7 +256,7 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3)) log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3))
} }
if nil != rows && len(rows) >= 0 { if nil != rows && len(rows) >= 0 {
_, err := tx.CopyFrom(w.db.Context(), w.db.LogTableName(), w.db.LogColumnNames(), rows) _, err := tx.CopyFrom(w.db.Context(), schema.TableLog.TableName(), schema.TableLog.ColumnNames(), rows)
if err != nil { if err != nil {
return insertError{"eth.log_cids", err, "COPY", rows} return insertError{"eth.log_cids", err, "COPY", rows}
} }
@ -302,7 +305,8 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
return insertError{"eth.state_cids", err, "COPY", stateNode} return insertError{"eth.state_cids", err, "COPY", stateNode}
} }
_, err = tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(), _, err = tx.CopyFrom(w.db.Context(),
schema.TableStateNode.TableName(), schema.TableStateNode.ColumnNames(),
toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID, toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed))) true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)))
if err != nil { if err != nil {
@ -339,7 +343,8 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
return insertError{"eth.storage_cids", err, "COPY", storageCID} return insertError{"eth.storage_cids", err, "COPY", storageCID}
} }
_, err = tx.CopyFrom(w.db.Context(), w.db.StorageTableName(), w.db.StorageColumnNames(), _, err = tx.CopyFrom(w.db.Context(),
schema.TableStorageNode.TableName(), schema.TableStorageNode.ColumnNames(),
toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID, toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
true, storageCID.Value, storageCID.Removed))) true, storageCID.Value, storageCID.Removed)))
if err != nil { if err != nil {

View File

@ -45,6 +45,7 @@ type Column struct {
Type colType Type colType
Array bool Array bool
} }
type Table struct { type Table struct {
Name string Name string
Columns []Column Columns []Column
@ -117,6 +118,20 @@ func (tbl *Table) ToInsertStatement(upsert bool) string {
) )
} }
// TableName returns a pgx-compatible table name.
func (tbl *Table) TableName() []string {
return strings.Split(tbl.Name, ".")
}
// ColumnNames returns the ordered list of column names.
func (tbl *Table) ColumnNames() []string {
var names []string
for _, col := range tbl.Columns {
names = append(names, col.Name)
}
return names
}
func sprintf(f string) colfmt { func sprintf(f string) colfmt {
return func(x interface{}) string { return fmt.Sprintf(f, x) } return func(x interface{}) string { return fmt.Sprintf(f, x) }
} }