From 644d77837207e46a61dffc552934c0ae655eae10 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Wed, 8 May 2024 00:23:32 +0800 Subject: [PATCH] index withdrawals - withdrawal_cids - withdrawal iplds - header withdrawals_root --- indexer/database/dump/indexer.go | 38 ++++-- indexer/database/file/csv_writer.go | 75 ++++++----- indexer/database/file/indexer.go | 39 ++++-- indexer/database/file/interfaces.go | 1 + indexer/database/file/sql_writer.go | 43 ++++++- indexer/database/metrics/metrics.go | 4 + indexer/database/sql/indexer.go | 41 ++++-- indexer/database/sql/interfaces.go | 1 + indexer/database/sql/postgres/database.go | 26 ++-- indexer/database/sql/writer.go | 37 ++++++ indexer/ipld/encode.go | 102 +++++++++++++++ indexer/ipld/eth_header.go | 60 --------- indexer/ipld/eth_log.go | 43 ------- indexer/ipld/eth_parser.go | 43 +++++-- indexer/ipld/eth_receipt.go | 58 --------- indexer/ipld/eth_tx.go | 59 --------- indexer/ipld/interface.go | 18 +++ indexer/ipld/shared.go | 1 + indexer/models/models.go | 14 +- indexer/shared/functions.go | 9 ++ indexer/shared/schema/schema.go | 31 ++++- indexer/shared/schema/table.go | 150 +++++++++++++--------- indexer/shared/schema/table_test.go | 90 +++++++------ 23 files changed, 576 insertions(+), 407 deletions(-) create mode 100644 indexer/ipld/encode.go delete mode 100644 indexer/ipld/eth_header.go delete mode 100644 indexer/ipld/eth_log.go delete mode 100644 indexer/ipld/eth_receipt.go delete mode 100644 indexer/ipld/eth_tx.go diff --git a/indexer/database/dump/indexer.go b/indexer/database/dump/indexer.go index 34f7ec0..cf84afa 100644 --- a/indexer/database/dump/indexer.go +++ b/indexer/database/dump/indexer.go @@ -82,7 +82,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } // Generate the block iplds - txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts) + txNodes, rctNodes, logNodes, wdNodes, err := ipld.FromBlockAndReceipts(block, receipts) if err != nil { return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err) } @@ -123,15 +123,17 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index receipts and txs - err = sdi.processReceiptsAndTxs(blockTx, processArgs{ + err = sdi.processObjects(blockTx, processArgs{ headerID: headerID, blockNumber: block.Number(), blockTime: block.Time(), receipts: receipts, txs: transactions, + withdrawals: block.Withdrawals(), rctNodes: rctNodes, txNodes: txNodes, logNodes: logNodes, + wdNodes: wdNodes, }) if err != nil { return nil, err @@ -151,7 +153,7 @@ func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.He if !ok { return "", fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch) } - headerNode, err := ipld.NewEthHeader(header) + headerNode, err := ipld.EncodeHeader(header) if err != nil { return "", err } @@ -173,6 +175,7 @@ func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.He Timestamp: header.Time, Coinbase: header.Coinbase.String(), Canonical: true, + WithdrawalsRoot: shared.MaybeStringHash(header.WithdrawalsHash), } _, err = fmt.Fprintf(sdi.dump, "%+v\r\n", mod) return headerID, err @@ -225,13 +228,15 @@ type processArgs struct { blockTime uint64 receipts types.Receipts txs types.Transactions - rctNodes []*ipld.EthReceipt - txNodes []*ipld.EthTx - logNodes [][]*ipld.EthLog + withdrawals types.Withdrawals + rctNodes []ipld.IPLD + txNodes []ipld.IPLD + logNodes [][]ipld.IPLD + wdNodes []ipld.IPLD } -// processReceiptsAndTxs publishes and indexes receipt and transaction IPLDs in Postgres -func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs) error { +// processObjects publishes and indexes receipt and transaction IPLDs in Postgres +func (sdi *StateDiffIndexer) processObjects(tx *BatchTx, args processArgs) error { // Process receipts and txs signer := types.MakeSigner(sdi.chainConfig, args.blockNumber, args.blockTime) for i, receipt := range args.receipts { @@ -314,6 +319,23 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs return err } } + // Process withdrawals + for i, withdrawal := range args.withdrawals { + wdNode := args.wdNodes[i] + tx.cacheIPLD(wdNode) + wdModel := models.WithdrawalModel{ + BlockNumber: args.blockNumber.String(), + HeaderID: args.headerID, + CID: wdNode.Cid().String(), + Index: withdrawal.Index, + Validator: withdrawal.Validator, + Address: withdrawal.Address.String(), + Amount: withdrawal.Amount, + } + if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", wdModel); err != nil { + return err + } + } return nil } diff --git a/indexer/database/file/csv_writer.go b/indexer/database/file/csv_writer.go index a634906..ad8a034 100644 --- a/indexer/database/file/csv_writer.go +++ b/indexer/database/file/csv_writer.go @@ -36,22 +36,8 @@ import ( sdtypes "github.com/cerc-io/plugeth-statediff/types" ) -var ( - Tables = []*schema.Table{ - &schema.TableIPLDBlock, - &schema.TableNodeInfo, - &schema.TableHeader, - &schema.TableStateNode, - &schema.TableStorageNode, - &schema.TableUncle, - &schema.TableTransaction, - &schema.TableReceipt, - &schema.TableLog, - } -) - type tableRow struct { - table schema.Table + table *schema.Table values []interface{} } @@ -134,7 +120,7 @@ func NewCSVWriter(path string, watchedAddressesFilePath string, diff bool) (*CSV return nil, fmt.Errorf("unable to create directory '%s': %w", path, err) } - writers, err := makeFileWriters(path, Tables) + writers, err := makeFileWriters(path, schema.Tables) if err != nil { return nil, err } @@ -164,7 +150,7 @@ func (csw *CSVWriter) Loop() { for { select { case row := <-csw.rows: - err := csw.writers.write(&row.table, row.values...) + err := csw.writers.write(row.table, row.values...) if err != nil { panic(fmt.Sprintf("error writing csv buffer: %v", err)) } @@ -204,13 +190,13 @@ func (csw *CSVWriter) Close() error { func (csw *CSVWriter) upsertNode(node nodeinfo.Info) { var values []interface{} values = append(values, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID) - csw.rows <- tableRow{schema.TableNodeInfo, values} + csw.rows <- tableRow{&schema.TableNodeInfo, values} } func (csw *CSVWriter) upsertIPLD(ipld models.IPLDModel) { var values []interface{} values = append(values, ipld.BlockNumber, ipld.Key, ipld.Data) - csw.rows <- tableRow{schema.TableIPLDBlock, values} + csw.rows <- tableRow{&schema.TableIPLDBlock, values} } func (csw *CSVWriter) upsertIPLDDirect(blockNumber, key string, value []byte) { @@ -231,11 +217,25 @@ func (csw *CSVWriter) upsertIPLDNode(blockNumber string, i ipld.IPLD) { func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) { var values []interface{} - values = append(values, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, - header.TotalDifficulty, header.NodeIDs, header.Reward, header.StateRoot, header.TxRoot, - header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase, - header.Canonical) - csw.rows <- tableRow{schema.TableHeader, values} + values = append(values, + header.BlockNumber, + header.BlockHash, + header.ParentHash, + header.CID, + header.TotalDifficulty, + header.NodeIDs, + header.Reward, + header.StateRoot, + header.TxRoot, + header.RctRoot, + header.UnclesHash, + header.Bloom, + strconv.FormatUint(header.Timestamp, 10), + header.Coinbase, + header.Canonical, + header.WithdrawalsRoot, + ) + csw.rows <- tableRow{&schema.TableHeader, values} metrics.IndexerMetrics.BlocksCounter.Inc(1) } @@ -243,14 +243,14 @@ func (csw *CSVWriter) upsertUncleCID(uncle models.UncleModel) { var values []interface{} values = append(values, uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.Index) - csw.rows <- tableRow{schema.TableUncle, values} + csw.rows <- tableRow{&schema.TableUncle, values} } func (csw *CSVWriter) upsertTransactionCID(transaction models.TxModel) { var values []interface{} values = append(values, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.Type, transaction.Value) - csw.rows <- tableRow{schema.TableTransaction, values} + csw.rows <- tableRow{&schema.TableTransaction, values} metrics.IndexerMetrics.TransactionsCounter.Inc(1) } @@ -258,7 +258,7 @@ func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) { var values []interface{} values = append(values, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.CID, rct.Contract, rct.PostState, rct.PostStatus) - csw.rows <- tableRow{schema.TableReceipt, values} + csw.rows <- tableRow{&schema.TableReceipt, values} metrics.IndexerMetrics.ReceiptsCounter.Inc(1) } @@ -267,11 +267,26 @@ func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) { var values []interface{} values = append(values, l.BlockNumber, l.HeaderID, l.CID, l.ReceiptID, l.Address, l.Index, l.Topic0, l.Topic1, l.Topic2, l.Topic3) - csw.rows <- tableRow{schema.TableLog, values} + csw.rows <- tableRow{&schema.TableLog, values} metrics.IndexerMetrics.LogsCounter.Inc(1) } } +func (csw *CSVWriter) upsertWithdrawalCID(withdrawal models.WithdrawalModel) { + var values []interface{} + values = append(values, + withdrawal.BlockNumber, + withdrawal.HeaderID, + withdrawal.CID, + withdrawal.Index, + withdrawal.Validator, + withdrawal.Address, + withdrawal.Amount, + ) + csw.rows <- tableRow{&schema.TableWithdrawal, values} + metrics.IndexerMetrics.WithdrawalsCounter.Inc(1) +} + func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) { balance := stateNode.Balance if stateNode.Removed { @@ -281,14 +296,14 @@ func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) { var values []interface{} values = append(values, stateNode.BlockNumber, stateNode.HeaderID, stateNode.StateKey, stateNode.CID, csw.isDiff, balance, strconv.FormatUint(stateNode.Nonce, 10), stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed) - csw.rows <- tableRow{schema.TableStateNode, values} + csw.rows <- tableRow{&schema.TableStateNode, values} } func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) { var values []interface{} values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID, csw.isDiff, storageCID.Value, storageCID.Removed) - csw.rows <- tableRow{schema.TableStorageNode, values} + csw.rows <- tableRow{&schema.TableStorageNode, values} } // LoadWatchedAddresses loads watched addresses from a file diff --git a/indexer/database/file/indexer.go b/indexer/database/file/indexer.go index ccd3f0e..e6bfa52 100644 --- a/indexer/database/file/indexer.go +++ b/indexer/database/file/indexer.go @@ -84,7 +84,7 @@ func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInf if _, err := os.Stat(outputDir); !errors.Is(err, os.ErrNotExist) { return nil, fmt.Errorf("cannot create output directory, directory (%s) already exists", outputDir) } - log.Info("Writing statediff CSV files to directory", "file", outputDir) + log.Info("Writing statediff CSV files", "directory", outputDir) if watchedAddressesFilePath == "" { watchedAddressesFilePath = defaultWatchedAddressesCSVFilePath @@ -156,7 +156,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } // Generate the block iplds - txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts) + txNodes, rctNodes, logNodes, wdNodes, err := ipld.FromBlockAndReceipts(block, receipts) if err != nil { return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err) } @@ -197,15 +197,16 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) t = time.Now() - // write receipts and txs - err = sdi.processReceiptsAndTxs(processArgs{ + err = sdi.processObjects(processArgs{ headerID: headerID, blockNumber: block.Number(), receipts: receipts, txs: transactions, + withdrawals: block.Withdrawals(), rctNodes: rctNodes, txNodes: txNodes, logNodes: logNodes, + wdNodes: wdNodes, }) if err != nil { return nil, err @@ -222,7 +223,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip // it returns the headerID func (sdi *StateDiffIndexer) PushHeader(_ interfaces.Batch, header *types.Header, reward, td *big.Int) (string, error) { // Process the header - headerNode, err := ipld.NewEthHeader(header) + headerNode, err := ipld.EncodeHeader(header) if err != nil { return "", err } @@ -245,6 +246,7 @@ func (sdi *StateDiffIndexer) PushHeader(_ interfaces.Batch, header *types.Header Timestamp: header.Time, Coinbase: header.Coinbase.String(), Canonical: true, + WithdrawalsRoot: shared.MaybeStringHash(header.WithdrawalsHash), }) return headerID, nil } @@ -293,13 +295,15 @@ type processArgs struct { blockTime uint64 receipts types.Receipts txs types.Transactions - rctNodes []*ipld.EthReceipt - txNodes []*ipld.EthTx - logNodes [][]*ipld.EthLog + withdrawals types.Withdrawals + rctNodes []ipld.IPLD + txNodes []ipld.IPLD + logNodes [][]ipld.IPLD + wdNodes []ipld.IPLD } -// processReceiptsAndTxs writes receipt and tx IPLD insert SQL stmts to a file -func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { +// processObjects writes receipt and tx IPLD insert SQL stmts to a file +func (sdi *StateDiffIndexer) processObjects(args processArgs) error { // Process receipts and txs signer := types.MakeSigner(sdi.chainConfig, args.blockNumber, args.blockTime) for i, receipt := range args.receipts { @@ -376,6 +380,21 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { } sdi.fileWriter.upsertLogCID(logDataSet) } + // Process withdrawals + for i, wd := range args.withdrawals { + wdNode := args.wdNodes[i] + sdi.fileWriter.upsertIPLDNode(args.blockNumber.String(), wdNode) + wdModel := models.WithdrawalModel{ + BlockNumber: args.blockNumber.String(), + HeaderID: args.headerID, + CID: wdNode.Cid().String(), + Index: wd.Index, + Validator: wd.Validator, + Address: wd.Address.String(), + Amount: wd.Amount, + } + sdi.fileWriter.upsertWithdrawalCID(wdModel) + } return nil } diff --git a/indexer/database/file/interfaces.go b/indexer/database/file/interfaces.go index ba38954..9ae4a8b 100644 --- a/indexer/database/file/interfaces.go +++ b/indexer/database/file/interfaces.go @@ -41,6 +41,7 @@ type FileWriter interface { upsertTransactionCID(transaction models.TxModel) upsertReceiptCID(rct *models.ReceiptModel) upsertLogCID(logs []*models.LogsModel) + upsertWithdrawalCID(models.WithdrawalModel) upsertStateCID(stateNode models.StateNodeModel) upsertStorageCID(storageCID models.StorageNodeModel) upsertIPLD(ipld models.IPLDModel) diff --git a/indexer/database/file/sql_writer.go b/indexer/database/file/sql_writer.go index 5326b6f..b4430b0 100644 --- a/indexer/database/file/sql_writer.go +++ b/indexer/database/file/sql_writer.go @@ -32,6 +32,7 @@ import ( "github.com/cerc-io/plugeth-statediff/indexer/ipld" "github.com/cerc-io/plugeth-statediff/indexer/models" nodeinfo "github.com/cerc-io/plugeth-statediff/indexer/node" + "github.com/cerc-io/plugeth-statediff/indexer/shared/schema" "github.com/cerc-io/plugeth-statediff/types" ) @@ -145,8 +146,8 @@ const ( ipldInsert = "INSERT INTO ipld.blocks (block_number, key, data) VALUES ('%s', '%s', '\\x%x');\n" headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, " + - "state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical) VALUES " + - "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %t);\n" + "state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical, withdrawals_root) VALUES " + + "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %t, '%s');\n" uncleInsert = "INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, index) VALUES " + "('%s', '%s', '%s', '%s', '%s', '%s', %d);\n" @@ -167,6 +168,10 @@ const ( "removed, diff, val) VALUES ('%s', '%s', '%s', '%s', '%s', %t, %t, '\\x%x');\n" ) +var ( + withdrawalsInsert = schema.TableWithdrawal.FmtStringInsert() + ";\n" +) + func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) { sqw.stmts <- []byte(fmt.Sprintf(nodeInsert, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID)) } @@ -192,9 +197,24 @@ func (sqw *SQLWriter) upsertIPLDNode(blockNumber string, i ipld.IPLD) { } func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) { - stmt := fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, - header.TotalDifficulty, formatPostgresStringArray(header.NodeIDs), header.Reward, header.StateRoot, header.TxRoot, - header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase, header.Canonical) + stmt := fmt.Sprintf(headerInsert, + header.BlockNumber, + header.BlockHash, + header.ParentHash, + header.CID, + header.TotalDifficulty, + formatPostgresStringArray(header.NodeIDs), + header.Reward, + header.StateRoot, + header.TxRoot, + header.RctRoot, + header.UnclesHash, + header.Bloom, + header.Timestamp, + header.Coinbase, + header.Canonical, + header.WithdrawalsRoot, + ) sqw.stmts <- []byte(stmt) metrics.IndexerMetrics.BlocksCounter.Inc(1) } @@ -224,6 +244,19 @@ func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) { } } +func (sqw *SQLWriter) upsertWithdrawalCID(withdrawal models.WithdrawalModel) { + sqw.stmts <- []byte(fmt.Sprintf(withdrawalsInsert, + withdrawal.BlockNumber, + withdrawal.HeaderID, + withdrawal.CID, + withdrawal.Index, + withdrawal.Validator, + withdrawal.Address, + withdrawal.Amount, + )) + metrics.IndexerMetrics.WithdrawalsCounter.Inc(1) +} + func (sqw *SQLWriter) upsertStateCID(stateNode models.StateNodeModel) { balance := stateNode.Balance if stateNode.Removed { diff --git a/indexer/database/metrics/metrics.go b/indexer/database/metrics/metrics.go index 92e56e6..97527aa 100644 --- a/indexer/database/metrics/metrics.go +++ b/indexer/database/metrics/metrics.go @@ -56,6 +56,8 @@ type IndexerMetricsHandles struct { ReceiptsCounter metrics.Counter // The total number of processed logs LogsCounter metrics.Counter + // The total number of processed logs + WithdrawalsCounter metrics.Counter // The total number of access list entries processed AccessListEntriesCounter metrics.Counter // Time spent waiting for free postgres tx @@ -90,6 +92,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles { TransactionsCounter: metrics.NewCounter(), ReceiptsCounter: metrics.NewCounter(), LogsCounter: metrics.NewCounter(), + WithdrawalsCounter: metrics.NewCounter(), AccessListEntriesCounter: metrics.NewCounter(), FreePostgresTimer: metrics.NewTimer(), PostgresCommitTimer: metrics.NewTimer(), @@ -113,6 +116,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles { reg.Register(metricName(subsys, "transactions"), ctx.TransactionsCounter) reg.Register(metricName(subsys, "receipts"), ctx.ReceiptsCounter) reg.Register(metricName(subsys, "logs"), ctx.LogsCounter) + reg.Register(metricName(subsys, "withdrawals"), ctx.WithdrawalsCounter) reg.Register(metricName(subsys, "access_list_entries"), ctx.AccessListEntriesCounter) reg.Register(metricName(subsys, "t_free_postgres"), ctx.FreePostgresTimer) reg.Register(metricName(subsys, "t_postgres_commit"), ctx.PostgresCommitTimer) diff --git a/indexer/database/sql/indexer.go b/indexer/database/sql/indexer.go index 8ac4a98..776e920 100644 --- a/indexer/database/sql/indexer.go +++ b/indexer/database/sql/indexer.go @@ -105,7 +105,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } // Generate the block iplds - txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts) + txNodes, rctNodes, logNodes, wdNodes, err := ipld.FromBlockAndReceipts(block, receipts) if err != nil { return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %w", err) } @@ -148,16 +148,18 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } metrics2.IndexerMetrics.UncleProcessingTimer.Update(time.Since(t)) t = time.Now() - // Publish and index receipts and txs - err = sdi.processReceiptsAndTxs(batch, processArgs{ + + err = sdi.processObjects(batch, processArgs{ headerID: headerID, blockNumber: block.Number(), blockTime: block.Time(), receipts: receipts, + withdrawals: block.Withdrawals(), txs: transactions, rctNodes: rctNodes, txNodes: txNodes, logNodes: logNodes, + wdNodes: wdNodes, }) if err != nil { return nil, err @@ -185,7 +187,7 @@ func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.He return "", fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch) } // Process the header - headerNode, err := ipld.NewEthHeader(header) + headerNode, err := ipld.EncodeHeader(header) if err != nil { return "", err } @@ -208,6 +210,7 @@ func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.He Timestamp: header.Time, Coinbase: header.Coinbase.String(), Canonical: true, + WithdrawalsRoot: shared.MaybeStringHash(header.WithdrawalsHash), }) } @@ -258,13 +261,15 @@ type processArgs struct { blockTime uint64 receipts types.Receipts txs types.Transactions - rctNodes []*ipld.EthReceipt - txNodes []*ipld.EthTx - logNodes [][]*ipld.EthLog + withdrawals types.Withdrawals + rctNodes []ipld.IPLD + txNodes []ipld.IPLD + logNodes [][]ipld.IPLD + wdNodes []ipld.IPLD } -// processReceiptsAndTxs publishes and indexes receipt and transaction IPLDs in Postgres -func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs) error { +// processObjects publishes and indexes receipt and transaction IPLDs in Postgres +func (sdi *StateDiffIndexer) processObjects(tx *BatchTx, args processArgs) error { // Process receipts and txs signer := types.MakeSigner(sdi.chainConfig, args.blockNumber, args.blockTime) for i, receipt := range args.receipts { @@ -348,7 +353,23 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs return err } } - + // Process withdrawals + for i, withdrawal := range args.withdrawals { + wdNode := args.wdNodes[i] + tx.cacheIPLD(wdNode) + wdModel := models.WithdrawalModel{ + BlockNumber: args.blockNumber.String(), + HeaderID: args.headerID, + CID: wdNode.Cid().String(), + Index: withdrawal.Index, + Validator: withdrawal.Validator, + Address: withdrawal.Address.String(), + Amount: withdrawal.Amount, + } + if err := sdi.dbWriter.upsertWithdrawalCID(tx.dbtx, wdModel); err != nil { + return err + } + } return nil } diff --git a/indexer/database/sql/interfaces.go b/indexer/database/sql/interfaces.go index 845f603..e8c732b 100644 --- a/indexer/database/sql/interfaces.go +++ b/indexer/database/sql/interfaces.go @@ -54,6 +54,7 @@ type Statements interface { InsertTxStm() string InsertRctStm() string InsertLogStm() string + InsertWithdrawalStm() string InsertStateStm() string InsertStorageStm() string InsertIPLDStm() string diff --git a/indexer/database/sql/postgres/database.go b/indexer/database/sql/postgres/database.go index f73b882..0249778 100644 --- a/indexer/database/sql/postgres/database.go +++ b/indexer/database/sql/postgres/database.go @@ -18,6 +18,7 @@ package postgres import ( "fmt" + "strings" "github.com/cerc-io/plugeth-statediff/indexer/database/sql" "github.com/cerc-io/plugeth-statediff/indexer/shared/schema" @@ -43,7 +44,9 @@ type DB struct { // MaxHeaderStm satisfies the sql.Statements interface func (db *DB) MaxHeaderStm() string { - return fmt.Sprintf("SELECT block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase FROM %s ORDER BY block_number DESC LIMIT 1", schema.TableHeader.Name) + return fmt.Sprintf("SELECT %s FROM %s ORDER BY block_number DESC LIMIT 1", + strings.Join(schema.TableHeader.ColumnNames(), ","), + schema.TableHeader.Name) } // ExistsHeaderStm satisfies the sql.Statements interface @@ -59,7 +62,7 @@ func (db *DB) DetectGapsStm() string { // InsertHeaderStm satisfies the sql.Statements interface // Stm == Statement func (db *DB) InsertHeaderStm() string { - return schema.TableHeader.ToInsertStatement(db.upsert) + return schema.TableHeader.PreparedInsert(db.upsert) } // SetCanonicalHeaderStm satisfies the sql.Statements interface @@ -70,37 +73,42 @@ func (db *DB) SetCanonicalHeaderStm() string { // InsertUncleStm satisfies the sql.Statements interface func (db *DB) InsertUncleStm() string { - return schema.TableUncle.ToInsertStatement(db.upsert) + return schema.TableUncle.PreparedInsert(db.upsert) } // InsertTxStm satisfies the sql.Statements interface func (db *DB) InsertTxStm() string { - return schema.TableTransaction.ToInsertStatement(db.upsert) + return schema.TableTransaction.PreparedInsert(db.upsert) } // InsertRctStm satisfies the sql.Statements interface func (db *DB) InsertRctStm() string { - return schema.TableReceipt.ToInsertStatement(db.upsert) + return schema.TableReceipt.PreparedInsert(db.upsert) } // InsertLogStm satisfies the sql.Statements interface func (db *DB) InsertLogStm() string { - return schema.TableLog.ToInsertStatement(db.upsert) + return schema.TableLog.PreparedInsert(db.upsert) +} + +// InsertLogStm satisfies the sql.Statements interface +func (db *DB) InsertWithdrawalStm() string { + return schema.TableWithdrawal.PreparedInsert(db.upsert) } // InsertStateStm satisfies the sql.Statements interface func (db *DB) InsertStateStm() string { - return schema.TableStateNode.ToInsertStatement(db.upsert) + return schema.TableStateNode.PreparedInsert(db.upsert) } // InsertStorageStm satisfies the sql.Statements interface func (db *DB) InsertStorageStm() string { - return schema.TableStorageNode.ToInsertStatement(db.upsert) + return schema.TableStorageNode.PreparedInsert(db.upsert) } // InsertIPLDStm satisfies the sql.Statements interface func (db *DB) InsertIPLDStm() string { - return schema.TableIPLDBlock.ToInsertStatement(db.upsert) + return schema.TableIPLDBlock.PreparedInsert(db.upsert) } // InsertIPLDsStm satisfies the sql.Statements interface diff --git a/indexer/database/sql/writer.go b/indexer/database/sql/writer.go index 752761b..3373166 100644 --- a/indexer/database/sql/writer.go +++ b/indexer/database/sql/writer.go @@ -95,6 +95,7 @@ func (w *Writer) maxHeader() (*models.HeaderModel, error) { &model.Timestamp, &model.Coinbase, &model.Canonical, + &model.WithdrawalsRoot, ) model.BlockNumber = strconv.FormatUint(number, 10) model.TotalDifficulty = strconv.FormatUint(td, 10) @@ -125,6 +126,7 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error { header.Timestamp, header.Coinbase, header.Canonical, + header.WithdrawalsRoot, ) if err != nil { return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header} @@ -286,6 +288,41 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { return nil } +func (w *Writer) upsertWithdrawalCID(tx Tx, withdrawal models.WithdrawalModel) error { + if w.useCopyForTx(tx) { + blockNum, err := strconv.ParseUint(withdrawal.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.withdrawal_cids", err, "COPY", withdrawal} + } + + _, err = tx.CopyFrom(w.db.Context(), schema.TableWithdrawal.TableName(), schema.TableWithdrawal.ColumnNames(), + toRows(toRow(blockNum, + withdrawal.HeaderID, + withdrawal.CID, + withdrawal.Index, + withdrawal.Validator, + withdrawal.Address, + withdrawal.Amount))) + if err != nil { + return insertError{"eth.withdrawal_cids", err, "COPY", withdrawal} + } + } else { + _, err := tx.Exec(w.db.Context(), w.db.InsertWithdrawalStm(), + withdrawal.BlockNumber, + withdrawal.HeaderID, + withdrawal.CID, + withdrawal.Index, + withdrawal.Validator, + withdrawal.Address, + withdrawal.Amount) + if err != nil { + return insertError{"eth.withdrawal_cids", err, w.db.InsertWithdrawalStm(), withdrawal} + } + } + metrics.IndexerMetrics.WithdrawalsCounter.Inc(1) + return nil +} + /* INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, removed, diff, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (header_id, state_leaf_key, block_number) DO NOTHING diff --git a/indexer/ipld/encode.go b/indexer/ipld/encode.go new file mode 100644 index 0000000..8108f8e --- /dev/null +++ b/indexer/ipld/encode.go @@ -0,0 +1,102 @@ +// VulcanizeDB +// Copyright © 2024 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package ipld + +import ( + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rlp" + mh "github.com/multiformats/go-multihash" +) + +// EncodeHeader converts a *types.Header into an IPLD node +func EncodeHeader(header *types.Header) (IPLD, error) { + headerRLP, err := rlp.EncodeToBytes(header) + if err != nil { + return nil, err + } + c, err := RawdataToCid(MEthHeader, headerRLP, mh.KECCAK_256) + if err != nil { + return nil, err + } + return &node{ + cid: c, + rawdata: headerRLP, + }, nil +} + +// encodeTx converts a *types.Transaction to an IPLD node +func encodeTx(tx *types.Transaction) (IPLD, error) { + txRaw, err := tx.MarshalBinary() + if err != nil { + return nil, err + } + c, err := RawdataToCid(MEthTx, txRaw, mh.KECCAK_256) + if err != nil { + return nil, err + } + return &node{ + cid: c, + rawdata: txRaw, + }, nil +} + +// encodeReceipt converts a types.Receipt to an IPLD node +func encodeReceipt(receipt *types.Receipt) (IPLD, error) { + rctRaw, err := receipt.MarshalBinary() + if err != nil { + return nil, err + } + c, err := RawdataToCid(MEthTxReceipt, rctRaw, mh.KECCAK_256) + if err != nil { + return nil, err + } + return &node{ + cid: c, + rawdata: rctRaw, + }, nil +} + +// encodeLog converts a Log to an IPLD node +func encodeLog(log *types.Log) (IPLD, error) { + logRaw, err := rlp.EncodeToBytes(log) + if err != nil { + return nil, err + } + c, err := RawdataToCid(MEthLog, logRaw, mh.KECCAK_256) + if err != nil { + return nil, err + } + return &node{ + cid: c, + rawdata: logRaw, + }, nil +} + +func encodeWithdrawal(w *types.Withdrawal) (IPLD, error) { + wRaw, err := rlp.EncodeToBytes(w) + if err != nil { + return nil, err + } + c, err := RawdataToCid(MEthWithdrawal, wRaw, mh.KECCAK_256) + if err != nil { + return nil, err + } + return &node{ + cid: c, + rawdata: wRaw, + }, nil +} diff --git a/indexer/ipld/eth_header.go b/indexer/ipld/eth_header.go deleted file mode 100644 index d71ea4d..0000000 --- a/indexer/ipld/eth_header.go +++ /dev/null @@ -1,60 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package ipld - -import ( - "github.com/ipfs/go-cid" - mh "github.com/multiformats/go-multihash" - - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/rlp" -) - -// EthHeader (eth-block, codec 0x90), represents an ethereum block header -type EthHeader struct { - cid cid.Cid - rawdata []byte -} - -// Static (compile time) check that EthHeader satisfies the node.Node interface. -var _ IPLD = (*EthHeader)(nil) - -// NewEthHeader converts a *types.Header into an EthHeader IPLD node -func NewEthHeader(header *types.Header) (*EthHeader, error) { - headerRLP, err := rlp.EncodeToBytes(header) - if err != nil { - return nil, err - } - c, err := RawdataToCid(MEthHeader, headerRLP, mh.KECCAK_256) - if err != nil { - return nil, err - } - return &EthHeader{ - cid: c, - rawdata: headerRLP, - }, nil -} - -// RawData returns the binary of the RLP encode of the block header. -func (b *EthHeader) RawData() []byte { - return b.rawdata -} - -// Cid returns the cid of the block header. -func (b *EthHeader) Cid() cid.Cid { - return b.cid -} diff --git a/indexer/ipld/eth_log.go b/indexer/ipld/eth_log.go deleted file mode 100644 index 71db98a..0000000 --- a/indexer/ipld/eth_log.go +++ /dev/null @@ -1,43 +0,0 @@ -package ipld - -import ( - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/rlp" - "github.com/ipfs/go-cid" - mh "github.com/multiformats/go-multihash" -) - -// EthLog (eth-log, codec 0x9a), represents an ethereum block header -type EthLog struct { - rawData []byte - cid cid.Cid -} - -// Static (compile time) check that EthLog satisfies the node.Node interface. -var _ IPLD = (*EthLog)(nil) - -// NewLog create a new EthLog IPLD node -func NewLog(log *types.Log) (*EthLog, error) { - logRaw, err := rlp.EncodeToBytes(log) - if err != nil { - return nil, err - } - c, err := RawdataToCid(MEthLog, logRaw, mh.KECCAK_256) - if err != nil { - return nil, err - } - return &EthLog{ - cid: c, - rawData: logRaw, - }, nil -} - -// RawData returns the binary of the RLP encode of the log. -func (l *EthLog) RawData() []byte { - return l.rawData -} - -// Cid returns the cid of the receipt log. -func (l *EthLog) Cid() cid.Cid { - return l.cid -} diff --git a/indexer/ipld/eth_parser.go b/indexer/ipld/eth_parser.go index 5ec8bf9..c247df6 100644 --- a/indexer/ipld/eth_parser.go +++ b/indexer/ipld/eth_parser.go @@ -22,25 +22,29 @@ import ( // FromBlockAndReceipts takes a block and processes it // to return it a set of IPLD nodes for further processing. -func FromBlockAndReceipts(block *types.Block, receipts []*types.Receipt) ([]*EthTx, []*EthReceipt, [][]*EthLog, error) { +func FromBlockAndReceipts(block *types.Block, receipts []*types.Receipt) ([]IPLD, []IPLD, [][]IPLD, []IPLD, error) { // Process the txs txNodes, err := processTransactions(block.Transactions()) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err + } + withdrawalNodes, err := processWithdrawals(block.Withdrawals()) + if err != nil { + return nil, nil, nil, nil, err } // Process the receipts and logs rctNodes, logNodes, err := processReceiptsAndLogs(receipts) - return txNodes, rctNodes, logNodes, err + return txNodes, rctNodes, logNodes, withdrawalNodes, err } // processTransactions will take the found transactions in a parsed block body // to return IPLD node slices for eth-tx -func processTransactions(txs []*types.Transaction) ([]*EthTx, error) { - var ethTxNodes []*EthTx +func processTransactions(txs []*types.Transaction) ([]IPLD, error) { + var ethTxNodes []IPLD for _, tx := range txs { - ethTx, err := NewEthTx(tx) + ethTx, err := encodeTx(tx) if err != nil { return nil, err } @@ -50,12 +54,25 @@ func processTransactions(txs []*types.Transaction) ([]*EthTx, error) { return ethTxNodes, nil } +func processWithdrawals(withdrawals []*types.Withdrawal) ([]IPLD, error) { + var withdrawalNodes []IPLD + for _, withdrawal := range withdrawals { + ethW, err := encodeWithdrawal(withdrawal) + if err != nil { + return nil, err + } + withdrawalNodes = append(withdrawalNodes, ethW) + } + + return withdrawalNodes, nil +} + // processReceiptsAndLogs will take in receipts // to return IPLD node slices for eth-rct and eth-log -func processReceiptsAndLogs(rcts []*types.Receipt) ([]*EthReceipt, [][]*EthLog, error) { +func processReceiptsAndLogs(rcts []*types.Receipt) ([]IPLD, [][]IPLD, error) { // Pre allocating memory. - ethRctNodes := make([]*EthReceipt, len(rcts)) - ethLogNodes := make([][]*EthLog, len(rcts)) + ethRctNodes := make([]IPLD, len(rcts)) + ethLogNodes := make([][]IPLD, len(rcts)) for idx, rct := range rcts { logNodes, err := processLogs(rct.Logs) @@ -63,7 +80,7 @@ func processReceiptsAndLogs(rcts []*types.Receipt) ([]*EthReceipt, [][]*EthLog, return nil, nil, err } - ethRct, err := NewReceipt(rct) + ethRct, err := encodeReceipt(rct) if err != nil { return nil, nil, err } @@ -75,10 +92,10 @@ func processReceiptsAndLogs(rcts []*types.Receipt) ([]*EthReceipt, [][]*EthLog, return ethRctNodes, ethLogNodes, nil } -func processLogs(logs []*types.Log) ([]*EthLog, error) { - logNodes := make([]*EthLog, len(logs)) +func processLogs(logs []*types.Log) ([]IPLD, error) { + logNodes := make([]IPLD, len(logs)) for idx, log := range logs { - logNode, err := NewLog(log) + logNode, err := encodeLog(log) if err != nil { return nil, err } diff --git a/indexer/ipld/eth_receipt.go b/indexer/ipld/eth_receipt.go deleted file mode 100644 index eac2ba6..0000000 --- a/indexer/ipld/eth_receipt.go +++ /dev/null @@ -1,58 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package ipld - -import ( - "github.com/ipfs/go-cid" - mh "github.com/multiformats/go-multihash" - - "github.com/ethereum/go-ethereum/core/types" -) - -type EthReceipt struct { - rawdata []byte - cid cid.Cid -} - -// Static (compile time) check that EthReceipt satisfies the node.Node interface. -var _ IPLD = (*EthReceipt)(nil) - -// NewReceipt converts a types.ReceiptForStorage to an EthReceipt IPLD node -func NewReceipt(receipt *types.Receipt) (*EthReceipt, error) { - rctRaw, err := receipt.MarshalBinary() - if err != nil { - return nil, err - } - c, err := RawdataToCid(MEthTxReceipt, rctRaw, mh.KECCAK_256) - if err != nil { - return nil, err - } - return &EthReceipt{ - cid: c, - rawdata: rctRaw, - }, nil -} - -// RawData returns the binary of the RLP encode of the receipt. -func (r *EthReceipt) RawData() []byte { - return r.rawdata -} - -// Cid returns the cid of the receipt. -func (r *EthReceipt) Cid() cid.Cid { - return r.cid -} diff --git a/indexer/ipld/eth_tx.go b/indexer/ipld/eth_tx.go deleted file mode 100644 index ca5fe65..0000000 --- a/indexer/ipld/eth_tx.go +++ /dev/null @@ -1,59 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package ipld - -import ( - "github.com/ipfs/go-cid" - mh "github.com/multiformats/go-multihash" - - "github.com/ethereum/go-ethereum/core/types" -) - -// EthTx (eth-tx codec 0x93) represents an ethereum transaction -type EthTx struct { - cid cid.Cid - rawdata []byte -} - -// Static (compile time) check that EthTx satisfies the node.Node interface. -var _ IPLD = (*EthTx)(nil) - -// NewEthTx converts a *types.Transaction to an EthTx IPLD node -func NewEthTx(tx *types.Transaction) (*EthTx, error) { - txRaw, err := tx.MarshalBinary() - if err != nil { - return nil, err - } - c, err := RawdataToCid(MEthTx, txRaw, mh.KECCAK_256) - if err != nil { - return nil, err - } - return &EthTx{ - cid: c, - rawdata: txRaw, - }, nil -} - -// RawData returns the binary of the RLP encode of the transaction. -func (t *EthTx) RawData() []byte { - return t.rawdata -} - -// Cid returns the cid of the transaction. -func (t *EthTx) Cid() cid.Cid { - return t.cid -} diff --git a/indexer/ipld/interface.go b/indexer/ipld/interface.go index 73a4bed..3909c63 100644 --- a/indexer/ipld/interface.go +++ b/indexer/ipld/interface.go @@ -2,7 +2,25 @@ package ipld import "github.com/ipfs/go-cid" +// Check that node satisfies the IPLD Node interface. +var _ IPLD = (*node)(nil) + +type node struct { + cid cid.Cid + rawdata []byte +} + type IPLD interface { Cid() cid.Cid RawData() []byte } + +// RawData returns the RLP encoded bytes of the node. +func (b node) RawData() []byte { + return b.rawdata +} + +// Cid returns the CID of the node. +func (b node) Cid() cid.Cid { + return b.cid +} diff --git a/indexer/ipld/shared.go b/indexer/ipld/shared.go index 7758f32..6e5920e 100644 --- a/indexer/ipld/shared.go +++ b/indexer/ipld/shared.go @@ -37,6 +37,7 @@ const ( MEthStorageTrie = 0x98 MEthLogTrie = 0x99 MEthLog = 0x9a + MEthWithdrawal = 0x9b // TODO ) // RawdataToCid takes the desired codec and a slice of bytes diff --git a/indexer/models/models.go b/indexer/models/models.go index 0fcc964..124b431 100644 --- a/indexer/models/models.go +++ b/indexer/models/models.go @@ -42,6 +42,7 @@ type HeaderModel struct { Timestamp uint64 `db:"timestamp"` Coinbase string `db:"coinbase"` Canonical bool `db:"canonical"` + WithdrawalsRoot string `db:"withdrawals_root"` } // UncleModel is the db model for eth.uncle_cids @@ -105,7 +106,7 @@ type StorageNodeModel struct { Value []byte `db:"val"` } -// LogsModel is the db model for eth.logs +// LogsModel is the db model for eth.log_cids type LogsModel struct { BlockNumber string `db:"block_number"` HeaderID string `db:"header_id"` @@ -118,3 +119,14 @@ type LogsModel struct { Topic2 string `db:"topic2"` Topic3 string `db:"topic3"` } + +// WithdrawalModel is the db model for eth.withdrawal_cids +type WithdrawalModel struct { + BlockNumber string `db:"block_number"` + HeaderID string `db:"header_id"` + CID string `db:"cid"` + Index uint64 `db:"index"` + Validator uint64 `db:"validator"` + Address string `db:"address"` + Amount uint64 `db:"amount"` +} diff --git a/indexer/shared/functions.go b/indexer/shared/functions.go index 58306bd..1797fc2 100644 --- a/indexer/shared/functions.go +++ b/indexer/shared/functions.go @@ -35,3 +35,12 @@ func HandleZeroAddr(to common.Address) string { } return to.String() } + +// MaybeStringHash calls String on its argument and returns a pointer to the result. +// When passed nil, it returns nil. +func MaybeStringHash(hash *common.Hash) string { + if hash == nil { + return "" + } + return hash.String() +} diff --git a/indexer/shared/schema/schema.go b/indexer/shared/schema/schema.go index 1fbc54e..4db2baa 100644 --- a/indexer/shared/schema/schema.go +++ b/indexer/shared/schema/schema.go @@ -16,6 +16,19 @@ package schema +var Tables = []*Table{ + &TableIPLDBlock, + &TableNodeInfo, + &TableHeader, + &TableStateNode, + &TableStorageNode, + &TableUncle, + &TableTransaction, + &TableReceipt, + &TableLog, + &TableWithdrawal, +} + var TableIPLDBlock = Table{ Name: `ipld.blocks`, Columns: []Column{ @@ -52,9 +65,10 @@ var TableHeader = Table{ {Name: "receipt_root", Type: Dvarchar}, {Name: "uncles_hash", Type: Dvarchar}, {Name: "bloom", Type: Dbytea}, - {Name: "timestamp", Type: Dnumeric}, + {Name: "timestamp", Type: Dbigint}, {Name: "coinbase", Type: Dvarchar}, {Name: "canonical", Type: Dboolean}, + {Name: "withdrawals_root", Type: Dvarchar}, }, UpsertClause: OnConflict("block_number", "block_hash").Set( "parent_hash", @@ -70,6 +84,7 @@ var TableHeader = Table{ "timestamp", "coinbase", "canonical", + "withdrawals_root", )} var TableStateNode = Table{ @@ -165,6 +180,20 @@ var TableLog = Table{ UpsertClause: OnConflict("block_number", "header_id", "rct_id", "index"), } +var TableWithdrawal = Table{ + Name: "eth.withdrawal_cids", + Columns: []Column{ + {Name: "block_number", Type: Dbigint}, + {Name: "header_id", Type: Dvarchar}, + {Name: "cid", Type: Dtext}, + {Name: "index", Type: Dinteger}, + {Name: "validator", Type: Dinteger}, + {Name: "address", Type: Dvarchar}, + {Name: "amount", Type: Dinteger}, + }, + UpsertClause: OnConflict("block_number", "header_id", "index"), +} + var TableWatchedAddresses = Table{ Name: "eth_meta.watched_addresses", Columns: []Column{ diff --git a/indexer/shared/schema/table.go b/indexer/shared/schema/table.go index bf6968e..9cc1c2d 100644 --- a/indexer/shared/schema/table.go +++ b/indexer/shared/schema/table.go @@ -53,34 +53,6 @@ type Table struct { UpsertClause ConflictClause } -type colfmt = func(interface{}) string - -func (tbl *Table) ToCsvRow(args ...interface{}) []string { - var row []string - for i, col := range tbl.Columns { - value := col.Type.formatter()(args[i]) - - if col.Array { - valueList := funk.Map(args[i], col.Type.formatter()).([]string) - value = fmt.Sprintf("{%s}", strings.Join(valueList, ",")) - } - - row = append(row, value) - } - return row -} - -func (tbl *Table) VarcharColumns() []string { - columns := funk.Filter(tbl.Columns, func(col Column) bool { - return col.Type == Dvarchar - }).([]Column) - - columnNames := funk.Map(columns, func(col Column) string { - return col.Name - }).([]string) - return columnNames -} - func OnConflict(target ...string) ConflictClause { return ConflictClause{Target: target} } @@ -89,35 +61,6 @@ func (c ConflictClause) Set(fields ...string) ConflictClause { return c } -// ToInsertStatement returns a Postgres-compatible SQL insert statement for the table -// using positional placeholders -func (tbl *Table) ToInsertStatement(upsert bool) string { - var colnames, placeholders []string - for i, col := range tbl.Columns { - colnames = append(colnames, col.Name) - placeholders = append(placeholders, fmt.Sprintf("$%d", i+1)) - } - suffix := fmt.Sprintf("ON CONFLICT (%s)", strings.Join(tbl.UpsertClause.Target, ", ")) - if upsert && len(tbl.UpsertClause.Update) != 0 { - var update_placeholders []string - for _, name := range tbl.UpsertClause.Update { - i := funk.IndexOf(tbl.Columns, func(col Column) bool { return col.Name == name }) - update_placeholders = append(update_placeholders, fmt.Sprintf("$%d", i+1)) - } - suffix += fmt.Sprintf( - " DO UPDATE SET (%s) = (%s)", - strings.Join(tbl.UpsertClause.Update, ", "), strings.Join(update_placeholders, ", "), - ) - } else { - suffix += " DO NOTHING" - } - - return fmt.Sprintf( - "INSERT INTO %s (%s) VALUES (%s) %s", - tbl.Name, strings.Join(colnames, ", "), strings.Join(placeholders, ", "), suffix, - ) -} - // TableName returns a pgx-compatible table name. func (tbl *Table) TableName() []string { return strings.Split(tbl.Name, ".") @@ -132,11 +75,45 @@ func (tbl *Table) ColumnNames() []string { return names } +// PreparedInsert returns a pgx/sqlx-compatible SQL prepared insert statement for the table +// using positional placeholders. +// If upsert is true, include an ON CONFLICT clause handling column updates. +func (tbl *Table) PreparedInsert(upsert bool) string { + var colnames, placeholders []string + for i, col := range tbl.Columns { + colnames = append(colnames, col.Name) + placeholders = append(placeholders, fmt.Sprintf("$%d", i+1)) + } + suffix := " ON CONFLICT" + if len(tbl.UpsertClause.Target) > 0 { + suffix += fmt.Sprintf(" (%s)", strings.Join(tbl.UpsertClause.Target, ", ")) + } + if upsert && len(tbl.UpsertClause.Update) != 0 { + var update_placeholders []string + for _, name := range tbl.UpsertClause.Update { + update_placeholders = append(update_placeholders, "EXCLUDED."+name) + } + suffix += fmt.Sprintf( + " DO UPDATE SET (%s) = ROW(%s)", + strings.Join(tbl.UpsertClause.Update, ", "), strings.Join(update_placeholders, ", "), + ) + } else { + suffix += " DO NOTHING" + } + + return fmt.Sprintf( + "INSERT INTO %s (%s) VALUES (%s)", + tbl.Name, strings.Join(colnames, ", "), strings.Join(placeholders, ", "), + ) + suffix +} + +type colfmt = func(interface{}) string + func sprintf(f string) colfmt { return func(x interface{}) string { return fmt.Sprintf(f, x) } } -func (typ colType) formatter() colfmt { +func (typ colType) csvFormatter() colfmt { switch typ { case Dinteger: return sprintf("%d") @@ -157,6 +134,61 @@ func (typ colType) formatter() colfmt { return sprintf("%s") case Dtext: return sprintf("%s") + default: + panic("invalid column type") } - panic("unreachable") +} + +// ToCsvRow converts a list of values to a list of strings suitable for CSV output. +func (tbl *Table) ToCsvRow(args ...interface{}) []string { + var row []string + for i, col := range tbl.Columns { + value := col.Type.csvFormatter()(args[i]) + + if col.Array { + valueList := funk.Map(args[i], col.Type.csvFormatter()).([]string) + value = fmt.Sprintf("{%s}", strings.Join(valueList, ",")) + } + + row = append(row, value) + } + return row +} + +// VarcharColumns returns the names of columns with type VARCHAR. +func (tbl *Table) VarcharColumns() []string { + columns := funk.Filter(tbl.Columns, func(col Column) bool { + return col.Type == Dvarchar + }).([]Column) + + columnNames := funk.Map(columns, func(col Column) string { + return col.Name + }).([]string) + return columnNames +} + +func formatSpec(typ colType) string { + switch typ { + case Dinteger: + return "%d" + case Dboolean: + return "%t" + case Dbytea: + return `'\x%x'` + default: + return "'%s'" + } +} + +// FmtStringInsert returns a format string for creating a Postgres insert statement. +func (tbl *Table) FmtStringInsert() string { + var colnames, placeholders []string + for _, col := range tbl.Columns { + colnames = append(colnames, col.Name) + placeholders = append(placeholders, formatSpec(col.Type)) + } + return fmt.Sprintf( + "INSERT INTO %s (%s) VALUES (%s);", + tbl.Name, strings.Join(colnames, ", "), strings.Join(placeholders, ", "), + ) } diff --git a/indexer/shared/schema/table_test.go b/indexer/shared/schema/table_test.go index aaa026b..f31e10b 100644 --- a/indexer/shared/schema/table_test.go +++ b/indexer/shared/schema/table_test.go @@ -8,47 +8,55 @@ import ( . "github.com/cerc-io/plugeth-statediff/indexer/shared/schema" ) -var testHeaderTable = Table{ - Name: "eth.header_cids", - Columns: []Column{ - {Name: "block_number", Type: Dbigint}, - {Name: "block_hash", Type: Dvarchar}, - {Name: "parent_hash", Type: Dvarchar}, - {Name: "cid", Type: Dtext}, - {Name: "td", Type: Dnumeric}, - {Name: "node_id", Type: Dvarchar}, - {Name: "reward", Type: Dnumeric}, - {Name: "state_root", Type: Dvarchar}, - {Name: "tx_root", Type: Dvarchar}, - {Name: "receipt_root", Type: Dvarchar}, - {Name: "uncle_root", Type: Dvarchar}, - {Name: "bloom", Type: Dbytea}, - {Name: "timestamp", Type: Dnumeric}, - {Name: "mh_key", Type: Dtext}, - {Name: "times_validated", Type: Dinteger}, - {Name: "coinbase", Type: Dvarchar}, - }, - UpsertClause: OnConflict("block_hash", "block_number").Set( - "parent_hash", - "cid", - "td", - "node_id", - "reward", - "state_root", - "tx_root", - "receipt_root", - "uncle_root", - "bloom", - "timestamp", - "mh_key", - "times_validated", - "coinbase", - ), -} +var ( + testTable = Table{ + Name: "test_table", + Columns: []Column{ + {Name: "id", Type: Dbigint}, + {Name: "name", Type: Dvarchar}, + {Name: "age", Type: Dinteger}, + }, + } + testTableWithConflictClause = Table{ + Name: "test_table_conflict", + Columns: []Column{ + {Name: "id", Type: Dbigint}, + {Name: "name", Type: Dvarchar}, + {Name: "age", Type: Dinteger}, + }, + UpsertClause: OnConflict("id").Set("name", "age"), + } +) + +const ( + expectedHeaderPreparedWithUpsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical, withdrawals_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical, withdrawals_root) = ROW(EXCLUDED.parent_hash, EXCLUDED.cid, EXCLUDED.td, EXCLUDED.node_ids, EXCLUDED.reward, EXCLUDED.state_root, EXCLUDED.tx_root, EXCLUDED.receipt_root, EXCLUDED.uncles_hash, EXCLUDED.bloom, EXCLUDED.timestamp, EXCLUDED.coinbase, EXCLUDED.canonical, EXCLUDED.withdrawals_root)" + + expectedHeaderPreparedWithoutUpsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical, withdrawals_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_number, block_hash) DO NOTHING" + + expectedHeaderFmtString = `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical, withdrawals_root) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\x%x', '%s', '%s', %t, '%s');` +) func TestTable(t *testing.T) { - headerUpsert := `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)` - headerNoUpsert := `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash, block_number) DO NOTHING` - require.Equal(t, headerNoUpsert, testHeaderTable.ToInsertStatement(false)) - require.Equal(t, headerUpsert, testHeaderTable.ToInsertStatement(true)) + require.Equal(t, + "INSERT INTO test_table (id, name, age) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", + testTable.PreparedInsert(true), + ) + require.Equal(t, + "INSERT INTO test_table (id, name, age) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", + testTable.PreparedInsert(false), + ) + require.Equal(t, "INSERT INTO test_table (id, name, age) VALUES ('%s', '%s', %d);", testTable.FmtStringInsert()) + + require.Equal(t, + "INSERT INTO test_table_conflict (id, name, age) VALUES ($1, $2, $3) ON CONFLICT (id) DO UPDATE SET (name, age) = ROW(EXCLUDED.name, EXCLUDED.age)", + testTableWithConflictClause.PreparedInsert(true), + ) + require.Equal(t, + "INSERT INTO test_table_conflict (id, name, age) VALUES ($1, $2, $3) ON CONFLICT (id) DO NOTHING", + testTableWithConflictClause.PreparedInsert(false), + ) + + require.Equal(t, expectedHeaderPreparedWithUpsert, TableHeader.PreparedInsert(true)) + require.Equal(t, expectedHeaderPreparedWithoutUpsert, TableHeader.PreparedInsert(false)) + require.Equal(t, expectedHeaderFmtString, TableHeader.FmtStringInsert()) }