Update indexer to include block hash in receipts and logs (#256)

* Update indexer to include block hash in receipts and logs

* Upgrade ipld-eth-db image in docker-compose to run tests
This commit is contained in:
prathamesh0 2022-07-12 09:32:54 +05:30 committed by GitHub
parent 558a187391
commit 7b4ef34de2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 37 additions and 25 deletions

View File

@ -5,7 +5,7 @@ services:
restart: on-failure restart: on-failure
depends_on: depends_on:
- ipld-eth-db - ipld-eth-db
image: vulcanize/ipld-eth-db:v4.1.4-alpha image: vulcanize/ipld-eth-db:v4.2.0-alpha
environment: environment:
DATABASE_USER: "vdbm" DATABASE_USER: "vdbm"
DATABASE_NAME: "vulcanize_testing" DATABASE_NAME: "vulcanize_testing"

View File

@ -323,6 +323,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
rctModel := &models.ReceiptModel{ rctModel := &models.ReceiptModel{
BlockNumber: args.blockNumber.String(), BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID,
TxID: trxID, TxID: trxID,
Contract: contract, Contract: contract,
ContractHash: contractHash, ContractHash: contractHash,
@ -353,6 +354,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
logDataSet[idx] = &models.LogsModel{ logDataSet[idx] = &models.LogsModel{
BlockNumber: args.blockNumber.String(), BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID,
ReceiptID: trxID, ReceiptID: trxID,
Address: l.Address.String(), Address: l.Address.String(),
Index: int64(l.Index), Index: int64(l.Index),

View File

@ -281,7 +281,7 @@ func (csw *CSVWriter) upsertAccessListElement(accessListElement models.AccessLis
func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) { func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) {
var values []interface{} var values []interface{}
values = append(values, rct.BlockNumber, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, values = append(values, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey,
rct.PostState, rct.PostStatus, rct.LogRoot) rct.PostState, rct.PostStatus, rct.LogRoot)
csw.rows <- tableRow{types.TableReceipt, values} csw.rows <- tableRow{types.TableReceipt, values}
indexerMetrics.receipts.Inc(1) indexerMetrics.receipts.Inc(1)
@ -290,7 +290,7 @@ func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) {
func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) { func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) {
for _, l := range logs { for _, l := range logs {
var values []interface{} var values []interface{}
values = append(values, l.BlockNumber, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, values = append(values, l.BlockNumber, l.HeaderID, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0,
l.Topic1, l.Topic2, l.Topic3, l.Data) l.Topic1, l.Topic2, l.Topic3, l.Data)
csw.rows <- tableRow{types.TableLog, values} csw.rows <- tableRow{types.TableLog, values}
indexerMetrics.logs.Inc(1) indexerMetrics.logs.Inc(1)

View File

@ -371,6 +371,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
rctModel := &models.ReceiptModel{ rctModel := &models.ReceiptModel{
BlockNumber: args.blockNumber.String(), BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID,
TxID: txID, TxID: txID,
Contract: contract, Contract: contract,
ContractHash: contractHash, ContractHash: contractHash,
@ -399,6 +400,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
logDataSet[idx] = &models.LogsModel{ logDataSet[idx] = &models.LogsModel{
BlockNumber: args.blockNumber.String(), BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID,
ReceiptID: txID, ReceiptID: txID,
Address: l.Address.String(), Address: l.Address.String(),
Index: int64(l.Index), Index: int64(l.Index),

View File

@ -155,11 +155,11 @@ const (
alInsert = "INSERT INTO eth.access_list_elements (block_number, tx_id, index, address, storage_keys) VALUES " + alInsert = "INSERT INTO eth.access_list_elements (block_number, tx_id, index, address, storage_keys) VALUES " +
"('%s', '%s', %d, '%s', '%s');\n" "('%s', '%s', %d, '%s', '%s');\n"
rctInsert = "INSERT INTO eth.receipt_cids (block_number, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, " + rctInsert = "INSERT INTO eth.receipt_cids (block_number, header_id, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, " +
"post_status, log_root) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', %d, '%s');\n" "post_status, log_root) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', %d, '%s');\n"
logInsert = "INSERT INTO eth.log_cids (block_number, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, " + logInsert = "INSERT INTO eth.log_cids (block_number, header_id, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, " +
"topic3, log_data) VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '%s', '%s', '%s', '\\x%x');\n" "topic3, log_data) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', %d, '%s', '%s', '%s', '%s', '\\x%x');\n"
stateInsert = "INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) " + stateInsert = "INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) " +
"VALUES ('%s', '%s', '%s', '%s', '\\x%x', %d, %t, '%s');\n" "VALUES ('%s', '%s', '%s', '%s', '\\x%x', %d, %t, '%s');\n"
@ -235,14 +235,14 @@ func (sqw *SQLWriter) upsertAccessListElement(accessListElement models.AccessLis
} }
func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) { func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) {
sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.BlockNumber, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey,
rct.PostState, rct.PostStatus, rct.LogRoot)) rct.PostState, rct.PostStatus, rct.LogRoot))
indexerMetrics.receipts.Inc(1) indexerMetrics.receipts.Inc(1)
} }
func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) { func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) {
for _, l := range logs { for _, l := range logs {
sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.BlockNumber, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.BlockNumber, l.HeaderID, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0,
l.Topic1, l.Topic2, l.Topic3, l.Data)) l.Topic1, l.Topic2, l.Topic3, l.Data))
indexerMetrics.logs.Inc(1) indexerMetrics.logs.Inc(1)
} }

View File

@ -132,6 +132,7 @@ var TableReceipt = Table{
"eth.receipt_cids", "eth.receipt_cids",
[]column{ []column{
{name: "block_number", dbType: bigint}, {name: "block_number", dbType: bigint},
{name: "header_id", dbType: varchar},
{name: "tx_id", dbType: varchar}, {name: "tx_id", dbType: varchar},
{name: "leaf_cid", dbType: text}, {name: "leaf_cid", dbType: text},
{name: "contract", dbType: varchar}, {name: "contract", dbType: varchar},
@ -147,6 +148,7 @@ var TableLog = Table{
"eth.log_cids", "eth.log_cids",
[]column{ []column{
{name: "block_number", dbType: bigint}, {name: "block_number", dbType: bigint},
{name: "header_id", dbType: varchar},
{name: "leaf_cid", dbType: text}, {name: "leaf_cid", dbType: text},
{name: "leaf_mh_key", dbType: text}, {name: "leaf_mh_key", dbType: text},
{name: "rct_id", dbType: varchar}, {name: "rct_id", dbType: varchar},

View File

@ -381,6 +381,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
rctModel := &models.ReceiptModel{ rctModel := &models.ReceiptModel{
BlockNumber: args.blockNumber.String(), BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID,
TxID: txID, TxID: txID,
Contract: contract, Contract: contract,
ContractHash: contractHash, ContractHash: contractHash,
@ -412,6 +413,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
logDataSet[idx] = &models.LogsModel{ logDataSet[idx] = &models.LogsModel{
BlockNumber: args.blockNumber.String(), BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID,
ReceiptID: txID, ReceiptID: txID,
Address: l.Address.String(), Address: l.Address.String(),
Index: int64(l.Index), Index: int64(l.Index),

View File

@ -52,7 +52,7 @@ func (db *DB) InsertUncleStm() string {
// InsertTxStm satisfies the sql.Statements interface // InsertTxStm satisfies the sql.Statements interface
func (db *DB) InsertTxStm() string { func (db *DB) InsertTxStm() string {
return `INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) return `INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (tx_hash, block_number) DO NOTHING` ON CONFLICT (tx_hash, header_id, block_number) DO NOTHING`
} }
// InsertAccessListElementStm satisfies the sql.Statements interface // InsertAccessListElementStm satisfies the sql.Statements interface
@ -63,14 +63,14 @@ func (db *DB) InsertAccessListElementStm() string {
// InsertRctStm satisfies the sql.Statements interface // InsertRctStm satisfies the sql.Statements interface
func (db *DB) InsertRctStm() string { func (db *DB) InsertRctStm() string {
return `INSERT INTO eth.receipt_cids (block_number, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) return `INSERT INTO eth.receipt_cids (block_number, header_id, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (tx_id, block_number) DO NOTHING` ON CONFLICT (tx_id, header_id, block_number) DO NOTHING`
} }
// InsertLogStm satisfies the sql.Statements interface // InsertLogStm satisfies the sql.Statements interface
func (db *DB) InsertLogStm() string { func (db *DB) InsertLogStm() string {
return `INSERT INTO eth.log_cids (block_number, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) return `INSERT INTO eth.log_cids (block_number, header_id, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
ON CONFLICT (rct_id, index, block_number) DO NOTHING` ON CONFLICT (rct_id, index, header_id, block_number) DO NOTHING`
} }
// InsertStateStm satisfies the sql.Statements interface // InsertStateStm satisfies the sql.Statements interface

View File

@ -76,7 +76,7 @@ func (w *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error {
/* /*
INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (tx_hash, block_number) DO NOTHING ON CONFLICT (tx_hash, header_id, block_number) DO NOTHING
*/ */
func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
_, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(), _, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(),
@ -105,12 +105,12 @@ func (w *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessL
} }
/* /*
INSERT INTO eth.receipt_cids (block_number, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) INSERT INTO eth.receipt_cids (block_number, header_id, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (tx_id, block_number) DO NOTHING ON CONFLICT (tx_id, header_id, block_number) DO NOTHING
*/ */
func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
_, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(), _, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(),
rct.BlockNumber, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState,
rct.PostStatus, rct.LogRoot) rct.PostStatus, rct.LogRoot)
if err != nil { if err != nil {
return fmt.Errorf("error upserting receipt_cids entry: %w", err) return fmt.Errorf("error upserting receipt_cids entry: %w", err)
@ -120,13 +120,13 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
} }
/* /*
INSERT INTO eth.log_cids (block_number, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) INSERT INTO eth.log_cids (block_number, header_id, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
ON CONFLICT (rct_id, index, block_number) DO NOTHING ON CONFLICT (rct_id, index, header_id, block_number) DO NOTHING
*/ */
func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
for _, log := range logs { for _, log := range logs {
_, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(), _, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(),
log.BlockNumber, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.BlockNumber, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1,
log.Topic2, log.Topic3, log.Data) log.Topic2, log.Topic3, log.Data)
if err != nil { if err != nil {
return fmt.Errorf("error upserting logs entry: %w", err) return fmt.Errorf("error upserting logs entry: %w", err)

View File

@ -39,7 +39,7 @@ type UncleBatch struct {
// TxBatch holds the arguments for a batch insert of tx data // TxBatch holds the arguments for a batch insert of tx data
type TxBatch struct { type TxBatch struct {
BlockNumbers []string BlockNumbers []string
HeaderID string HeaderIDs []string
Indexes []int64 Indexes []int64
TxHashes []string TxHashes []string
CIDs []string CIDs []string
@ -62,6 +62,7 @@ type AccessListBatch struct {
// ReceiptBatch holds the arguments for a batch insert of receipt data // ReceiptBatch holds the arguments for a batch insert of receipt data
type ReceiptBatch struct { type ReceiptBatch struct {
BlockNumbers []string BlockNumbers []string
HeaderIDs []string
TxIDs []string TxIDs []string
LeafCIDs []string LeafCIDs []string
LeafMhKeys []string LeafMhKeys []string
@ -75,6 +76,7 @@ type ReceiptBatch struct {
// LogBatch holds the arguments for a batch insert of log data // LogBatch holds the arguments for a batch insert of log data
type LogBatch struct { type LogBatch struct {
BlockNumbers []string BlockNumbers []string
HeaderIDs []string
LeafCIDs []string LeafCIDs []string
LeafMhKeys []string LeafMhKeys []string
ReceiptIDs []string ReceiptIDs []string
@ -90,7 +92,7 @@ type LogBatch struct {
// StateBatch holds the arguments for a batch insert of state data // StateBatch holds the arguments for a batch insert of state data
type StateBatch struct { type StateBatch struct {
BlockNumbers []string BlockNumbers []string
HeaderID string HeaderIDs []string
Paths [][]byte Paths [][]byte
StateKeys []string StateKeys []string
NodeTypes []int NodeTypes []int
@ -102,7 +104,7 @@ type StateBatch struct {
// AccountBatch holds the arguments for a batch insert of account data // AccountBatch holds the arguments for a batch insert of account data
type AccountBatch struct { type AccountBatch struct {
BlockNumbers []string BlockNumbers []string
HeaderID string HeaderIDs []string
StatePaths [][]byte StatePaths [][]byte
Balances []string Balances []string
Nonces []uint64 Nonces []uint64
@ -113,7 +115,7 @@ type AccountBatch struct {
// StorageBatch holds the arguments for a batch insert of storage data // StorageBatch holds the arguments for a batch insert of storage data
type StorageBatch struct { type StorageBatch struct {
BlockNumbers []string BlockNumbers []string
HeaderID string HeaderIDs []string
StatePaths [][]string StatePaths [][]string
Paths [][]byte Paths [][]byte
StorageKeys []string StorageKeys []string

View File

@ -83,6 +83,7 @@ type AccessListElementModel struct {
// ReceiptModel is the db model for eth.receipt_cids // ReceiptModel is the db model for eth.receipt_cids
type ReceiptModel struct { type ReceiptModel struct {
BlockNumber string `db:"block_number"` BlockNumber string `db:"block_number"`
HeaderID string `db:"header_id"`
TxID string `db:"tx_id"` TxID string `db:"tx_id"`
LeafCID string `db:"leaf_cid"` LeafCID string `db:"leaf_cid"`
LeafMhKey string `db:"leaf_mh_key"` LeafMhKey string `db:"leaf_mh_key"`
@ -146,6 +147,7 @@ type StateAccountModel struct {
// LogsModel is the db model for eth.logs // LogsModel is the db model for eth.logs
type LogsModel struct { type LogsModel struct {
BlockNumber string `db:"block_number"` BlockNumber string `db:"block_number"`
HeaderID string `db:"header_id"`
ReceiptID string `db:"rct_id"` ReceiptID string `db:"rct_id"`
LeafCID string `db:"leaf_cid"` LeafCID string `db:"leaf_cid"`
LeafMhKey string `db:"leaf_mh_key"` LeafMhKey string `db:"leaf_mh_key"`