diff --git a/indexer/database/sql/interfaces.go b/indexer/database/sql/interfaces.go index db13ac7..845f603 100644 --- a/indexer/database/sql/interfaces.go +++ b/indexer/database/sql/interfaces.go @@ -58,18 +58,6 @@ type Statements interface { InsertStorageStm() string InsertIPLDStm() string InsertIPLDsStm() string - - // Table/column descriptions for use with CopyFrom and similar commands. - LogTableName() []string - LogColumnNames() []string - RctTableName() []string - RctColumnNames() []string - StateTableName() []string - StateColumnNames() []string - StorageTableName() []string - StorageColumnNames() []string - TxTableName() []string - TxColumnNames() []string } // Tx interface to accommodate different concrete SQL transaction types diff --git a/indexer/database/sql/postgres/database.go b/indexer/database/sql/postgres/database.go index ecebd7d..f73b882 100644 --- a/indexer/database/sql/postgres/database.go +++ b/indexer/database/sql/postgres/database.go @@ -107,43 +107,3 @@ func (db *DB) InsertIPLDStm() string { func (db *DB) InsertIPLDsStm() string { return `INSERT INTO ipld.blocks (block_number, key, data) VALUES (unnest($1::BIGINT[]), unnest($2::TEXT[]), unnest($3::BYTEA[])) ON CONFLICT DO NOTHING` } - -func (db *DB) LogTableName() []string { - return []string{"eth", "log_cids"} -} - -func (db *DB) LogColumnNames() []string { - return []string{"block_number", "header_id", "cid", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3"} -} - -func (db *DB) RctTableName() []string { - return []string{"eth", "receipt_cids"} -} - -func (db *DB) RctColumnNames() []string { - return []string{"block_number", "header_id", "tx_id", "cid", "contract", "post_state", "post_status"} -} - -func (db *DB) StateTableName() []string { - return []string{"eth", "state_cids"} -} - -func (db *DB) StateColumnNames() []string { - return []string{"block_number", "header_id", "state_leaf_key", "cid", "diff", "balance", "nonce", "code_hash", "storage_root", "removed"} -} - -func (db *DB) StorageTableName() []string { - return []string{"eth", "storage_cids"} -} - -func (db *DB) StorageColumnNames() []string { - return []string{"block_number", "header_id", "state_leaf_key", "storage_leaf_key", "cid", "diff", "val", "removed"} -} - -func (db *DB) TxTableName() []string { - return []string{"eth", "transaction_cids"} -} - -func (db *DB) TxColumnNames() []string { - return []string{"block_number", "header_id", "tx_hash", "cid", "dst", "src", "index", "tx_type", "value"} -} diff --git a/indexer/database/sql/writer.go b/indexer/database/sql/writer.go index 6d6dd31..e5e0c81 100644 --- a/indexer/database/sql/writer.go +++ b/indexer/database/sql/writer.go @@ -29,6 +29,7 @@ import ( "github.com/cerc-io/plugeth-statediff/indexer/database/metrics" "github.com/cerc-io/plugeth-statediff/indexer/interfaces" "github.com/cerc-io/plugeth-statediff/indexer/models" + "github.com/cerc-io/plugeth-statediff/indexer/shared/schema" ) // Writer handles processing and writing of indexed IPLD objects to Postgres @@ -65,7 +66,8 @@ func (w *Writer) detectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]* // pgx misdetects the parameter OIDs and selects int8, which can overflow. // unfortunately there is no good place to override it, so it is safer to pass the uint64s as text // and let PG handle the cast - err := w.db.Select(w.db.Context(), &gaps, w.db.DetectGapsStm(), strconv.FormatUint(beginBlockNumber, 10), strconv.FormatUint(endBlockNumber, 10)) + err := w.db.Select(w.db.Context(), &gaps, w.db.DetectGapsStm(), + strconv.FormatUint(beginBlockNumber, 10), strconv.FormatUint(endBlockNumber, 10)) return gaps, err } @@ -177,7 +179,8 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { return insertError{"eth.transaction_cids", err, "COPY", transaction} } - _, err = tx.CopyFrom(w.db.Context(), w.db.TxTableName(), w.db.TxColumnNames(), + _, err = tx.CopyFrom(w.db.Context(), + schema.TableTransaction.TableName(), schema.TableTransaction.ColumnNames(), toRows(toRow(blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, int(transaction.Type), value))) if err != nil { @@ -213,7 +216,7 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { return insertError{"eth.receipt_cids", err, "COPY", rct} } - _, err = tx.CopyFrom(w.db.Context(), w.db.RctTableName(), w.db.RctColumnNames(), + _, err = tx.CopyFrom(w.db.Context(), schema.TableReceipt.TableName(), schema.TableReceipt.ColumnNames(), toRows(toRow(blockNum, rct.HeaderID, rct.TxID, rct.CID, rct.Contract, rct.PostState, int(rct.PostStatus)))) if err != nil { @@ -253,7 +256,7 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3)) } if nil != rows && len(rows) >= 0 { - _, err := tx.CopyFrom(w.db.Context(), w.db.LogTableName(), w.db.LogColumnNames(), rows) + _, err := tx.CopyFrom(w.db.Context(), schema.TableLog.TableName(), schema.TableLog.ColumnNames(), rows) if err != nil { return insertError{"eth.log_cids", err, "COPY", rows} } @@ -302,7 +305,8 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { return insertError{"eth.state_cids", err, "COPY", stateNode} } - _, err = tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(), + _, err = tx.CopyFrom(w.db.Context(), + schema.TableStateNode.TableName(), schema.TableStateNode.ColumnNames(), toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID, true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed))) if err != nil { @@ -339,7 +343,8 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err return insertError{"eth.storage_cids", err, "COPY", storageCID} } - _, err = tx.CopyFrom(w.db.Context(), w.db.StorageTableName(), w.db.StorageColumnNames(), + _, err = tx.CopyFrom(w.db.Context(), + schema.TableStorageNode.TableName(), schema.TableStorageNode.ColumnNames(), toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID, true, storageCID.Value, storageCID.Removed))) if err != nil { diff --git a/indexer/shared/schema/table.go b/indexer/shared/schema/table.go index 9bc19ac..bf6968e 100644 --- a/indexer/shared/schema/table.go +++ b/indexer/shared/schema/table.go @@ -45,6 +45,7 @@ type Column struct { Type colType Array bool } + type Table struct { Name string Columns []Column @@ -117,6 +118,20 @@ func (tbl *Table) ToInsertStatement(upsert bool) string { ) } +// TableName returns a pgx-compatible table name. +func (tbl *Table) TableName() []string { + return strings.Split(tbl.Name, ".") +} + +// ColumnNames returns the ordered list of column names. +func (tbl *Table) ColumnNames() []string { + var names []string + for _, col := range tbl.Columns { + names = append(names, col.Name) + } + return names +} + func sprintf(f string) colfmt { return func(x interface{}) string { return fmt.Sprintf(f, x) } }