index withdrawals

- withdrawal_cids

- withdrawal iplds

- header withdrawals_root
This commit is contained in:
Roy Crihfield 2024-05-08 00:23:32 +08:00
parent e6150910af
commit 3cd43e8d14
23 changed files with 576 additions and 407 deletions

View File

@ -82,7 +82,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
// Generate the block iplds
txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
txNodes, rctNodes, logNodes, wdNodes, err := ipld.FromBlockAndReceipts(block, receipts)
if err != nil {
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
}
@ -123,15 +123,17 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index receipts and txs
err = sdi.processReceiptsAndTxs(blockTx, processArgs{
err = sdi.processObjects(blockTx, processArgs{
headerID: headerID,
blockNumber: block.Number(),
blockTime: block.Time(),
receipts: receipts,
txs: transactions,
withdrawals: block.Withdrawals(),
rctNodes: rctNodes,
txNodes: txNodes,
logNodes: logNodes,
wdNodes: wdNodes,
})
if err != nil {
return nil, err
@ -151,7 +153,7 @@ func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.He
if !ok {
return "", fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
}
headerNode, err := ipld.NewEthHeader(header)
headerNode, err := ipld.EncodeHeader(header)
if err != nil {
return "", err
}
@ -173,6 +175,7 @@ func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.He
Timestamp: header.Time,
Coinbase: header.Coinbase.String(),
Canonical: true,
WithdrawalsRoot: shared.MaybeStringHash(header.WithdrawalsHash),
}
_, err = fmt.Fprintf(sdi.dump, "%+v\r\n", mod)
return headerID, err
@ -225,13 +228,15 @@ type processArgs struct {
blockTime uint64
receipts types.Receipts
txs types.Transactions
rctNodes []*ipld.EthReceipt
txNodes []*ipld.EthTx
logNodes [][]*ipld.EthLog
withdrawals types.Withdrawals
rctNodes []ipld.IPLD
txNodes []ipld.IPLD
logNodes [][]ipld.IPLD
wdNodes []ipld.IPLD
}
// processReceiptsAndTxs publishes and indexes receipt and transaction IPLDs in Postgres
func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs) error {
// processObjects publishes and indexes receipt and transaction IPLDs in Postgres
func (sdi *StateDiffIndexer) processObjects(tx *BatchTx, args processArgs) error {
// Process receipts and txs
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber, args.blockTime)
for i, receipt := range args.receipts {
@ -314,6 +319,23 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
return err
}
}
// Process withdrawals
for i, withdrawal := range args.withdrawals {
wdNode := args.wdNodes[i]
tx.cacheIPLD(wdNode)
wdModel := models.WithdrawalModel{
BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID,
CID: wdNode.Cid().String(),
Index: withdrawal.Index,
Validator: withdrawal.Validator,
Address: withdrawal.Address.String(),
Amount: withdrawal.Amount,
}
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", wdModel); err != nil {
return err
}
}
return nil
}

View File

@ -36,22 +36,8 @@ import (
sdtypes "github.com/cerc-io/plugeth-statediff/types"
)
var (
Tables = []*schema.Table{
&schema.TableIPLDBlock,
&schema.TableNodeInfo,
&schema.TableHeader,
&schema.TableStateNode,
&schema.TableStorageNode,
&schema.TableUncle,
&schema.TableTransaction,
&schema.TableReceipt,
&schema.TableLog,
}
)
type tableRow struct {
table schema.Table
table *schema.Table
values []interface{}
}
@ -134,7 +120,7 @@ func NewCSVWriter(path string, watchedAddressesFilePath string, diff bool) (*CSV
return nil, fmt.Errorf("unable to create directory '%s': %w", path, err)
}
writers, err := makeFileWriters(path, Tables)
writers, err := makeFileWriters(path, schema.Tables)
if err != nil {
return nil, err
}
@ -164,7 +150,7 @@ func (csw *CSVWriter) Loop() {
for {
select {
case row := <-csw.rows:
err := csw.writers.write(&row.table, row.values...)
err := csw.writers.write(row.table, row.values...)
if err != nil {
panic(fmt.Sprintf("error writing csv buffer: %v", err))
}
@ -204,13 +190,13 @@ func (csw *CSVWriter) Close() error {
func (csw *CSVWriter) upsertNode(node nodeinfo.Info) {
var values []interface{}
values = append(values, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID)
csw.rows <- tableRow{schema.TableNodeInfo, values}
csw.rows <- tableRow{&schema.TableNodeInfo, values}
}
func (csw *CSVWriter) upsertIPLD(ipld models.IPLDModel) {
var values []interface{}
values = append(values, ipld.BlockNumber, ipld.Key, ipld.Data)
csw.rows <- tableRow{schema.TableIPLDBlock, values}
csw.rows <- tableRow{&schema.TableIPLDBlock, values}
}
func (csw *CSVWriter) upsertIPLDDirect(blockNumber, key string, value []byte) {
@ -231,11 +217,25 @@ func (csw *CSVWriter) upsertIPLDNode(blockNumber string, i ipld.IPLD) {
func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) {
var values []interface{}
values = append(values, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, header.NodeIDs, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase,
header.Canonical)
csw.rows <- tableRow{schema.TableHeader, values}
values = append(values,
header.BlockNumber,
header.BlockHash,
header.ParentHash,
header.CID,
header.TotalDifficulty,
header.NodeIDs,
header.Reward,
header.StateRoot,
header.TxRoot,
header.RctRoot,
header.UnclesHash,
header.Bloom,
strconv.FormatUint(header.Timestamp, 10),
header.Coinbase,
header.Canonical,
header.WithdrawalsRoot,
)
csw.rows <- tableRow{&schema.TableHeader, values}
metrics.IndexerMetrics.BlocksCounter.Inc(1)
}
@ -243,14 +243,14 @@ func (csw *CSVWriter) upsertUncleCID(uncle models.UncleModel) {
var values []interface{}
values = append(values, uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID,
uncle.Reward, uncle.Index)
csw.rows <- tableRow{schema.TableUncle, values}
csw.rows <- tableRow{&schema.TableUncle, values}
}
func (csw *CSVWriter) upsertTransactionCID(transaction models.TxModel) {
var values []interface{}
values = append(values, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, transaction.Type, transaction.Value)
csw.rows <- tableRow{schema.TableTransaction, values}
csw.rows <- tableRow{&schema.TableTransaction, values}
metrics.IndexerMetrics.TransactionsCounter.Inc(1)
}
@ -258,7 +258,7 @@ func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) {
var values []interface{}
values = append(values, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.CID, rct.Contract,
rct.PostState, rct.PostStatus)
csw.rows <- tableRow{schema.TableReceipt, values}
csw.rows <- tableRow{&schema.TableReceipt, values}
metrics.IndexerMetrics.ReceiptsCounter.Inc(1)
}
@ -267,11 +267,26 @@ func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) {
var values []interface{}
values = append(values, l.BlockNumber, l.HeaderID, l.CID, l.ReceiptID, l.Address, l.Index, l.Topic0,
l.Topic1, l.Topic2, l.Topic3)
csw.rows <- tableRow{schema.TableLog, values}
csw.rows <- tableRow{&schema.TableLog, values}
metrics.IndexerMetrics.LogsCounter.Inc(1)
}
}
func (csw *CSVWriter) upsertWithdrawalCID(withdrawal models.WithdrawalModel) {
var values []interface{}
values = append(values,
withdrawal.BlockNumber,
withdrawal.HeaderID,
withdrawal.CID,
withdrawal.Index,
withdrawal.Validator,
withdrawal.Address,
withdrawal.Amount,
)
csw.rows <- tableRow{&schema.TableWithdrawal, values}
metrics.IndexerMetrics.WithdrawalsCounter.Inc(1)
}
func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) {
balance := stateNode.Balance
if stateNode.Removed {
@ -281,14 +296,14 @@ func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) {
var values []interface{}
values = append(values, stateNode.BlockNumber, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
csw.isDiff, balance, strconv.FormatUint(stateNode.Nonce, 10), stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)
csw.rows <- tableRow{schema.TableStateNode, values}
csw.rows <- tableRow{&schema.TableStateNode, values}
}
func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
var values []interface{}
values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
csw.isDiff, storageCID.Value, storageCID.Removed)
csw.rows <- tableRow{schema.TableStorageNode, values}
csw.rows <- tableRow{&schema.TableStorageNode, values}
}
// LoadWatchedAddresses loads watched addresses from a file

View File

@ -84,7 +84,7 @@ func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInf
if _, err := os.Stat(outputDir); !errors.Is(err, os.ErrNotExist) {
return nil, fmt.Errorf("cannot create output directory, directory (%s) already exists", outputDir)
}
log.Info("Writing statediff CSV files to directory", "file", outputDir)
log.Info("Writing statediff CSV files", "directory", outputDir)
if watchedAddressesFilePath == "" {
watchedAddressesFilePath = defaultWatchedAddressesCSVFilePath
@ -156,7 +156,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
// Generate the block iplds
txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
txNodes, rctNodes, logNodes, wdNodes, err := ipld.FromBlockAndReceipts(block, receipts)
if err != nil {
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
}
@ -197,15 +197,16 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
t = time.Now()
// write receipts and txs
err = sdi.processReceiptsAndTxs(processArgs{
err = sdi.processObjects(processArgs{
headerID: headerID,
blockNumber: block.Number(),
receipts: receipts,
txs: transactions,
withdrawals: block.Withdrawals(),
rctNodes: rctNodes,
txNodes: txNodes,
logNodes: logNodes,
wdNodes: wdNodes,
})
if err != nil {
return nil, err
@ -222,7 +223,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
// it returns the headerID
func (sdi *StateDiffIndexer) PushHeader(_ interfaces.Batch, header *types.Header, reward, td *big.Int) (string, error) {
// Process the header
headerNode, err := ipld.NewEthHeader(header)
headerNode, err := ipld.EncodeHeader(header)
if err != nil {
return "", err
}
@ -245,6 +246,7 @@ func (sdi *StateDiffIndexer) PushHeader(_ interfaces.Batch, header *types.Header
Timestamp: header.Time,
Coinbase: header.Coinbase.String(),
Canonical: true,
WithdrawalsRoot: shared.MaybeStringHash(header.WithdrawalsHash),
})
return headerID, nil
}
@ -293,13 +295,15 @@ type processArgs struct {
blockTime uint64
receipts types.Receipts
txs types.Transactions
rctNodes []*ipld.EthReceipt
txNodes []*ipld.EthTx
logNodes [][]*ipld.EthLog
withdrawals types.Withdrawals
rctNodes []ipld.IPLD
txNodes []ipld.IPLD
logNodes [][]ipld.IPLD
wdNodes []ipld.IPLD
}
// processReceiptsAndTxs writes receipt and tx IPLD insert SQL stmts to a file
func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
// processObjects writes receipt and tx IPLD insert SQL stmts to a file
func (sdi *StateDiffIndexer) processObjects(args processArgs) error {
// Process receipts and txs
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber, args.blockTime)
for i, receipt := range args.receipts {
@ -376,6 +380,21 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
}
sdi.fileWriter.upsertLogCID(logDataSet)
}
// Process withdrawals
for i, wd := range args.withdrawals {
wdNode := args.wdNodes[i]
sdi.fileWriter.upsertIPLDNode(args.blockNumber.String(), wdNode)
wdModel := models.WithdrawalModel{
BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID,
CID: wdNode.Cid().String(),
Index: wd.Index,
Validator: wd.Validator,
Address: wd.Address.String(),
Amount: wd.Amount,
}
sdi.fileWriter.upsertWithdrawalCID(wdModel)
}
return nil
}

View File

@ -41,6 +41,7 @@ type FileWriter interface {
upsertTransactionCID(transaction models.TxModel)
upsertReceiptCID(rct *models.ReceiptModel)
upsertLogCID(logs []*models.LogsModel)
upsertWithdrawalCID(models.WithdrawalModel)
upsertStateCID(stateNode models.StateNodeModel)
upsertStorageCID(storageCID models.StorageNodeModel)
upsertIPLD(ipld models.IPLDModel)

View File

@ -32,6 +32,7 @@ import (
"github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/indexer/models"
nodeinfo "github.com/cerc-io/plugeth-statediff/indexer/node"
"github.com/cerc-io/plugeth-statediff/indexer/shared/schema"
"github.com/cerc-io/plugeth-statediff/types"
)
@ -145,8 +146,8 @@ const (
ipldInsert = "INSERT INTO ipld.blocks (block_number, key, data) VALUES ('%s', '%s', '\\x%x');\n"
headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, " +
"state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %t);\n"
"state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical, withdrawals_root) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %t, '%s');\n"
uncleInsert = "INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, index) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', %d);\n"
@ -167,6 +168,10 @@ const (
"removed, diff, val) VALUES ('%s', '%s', '%s', '%s', '%s', %t, %t, '\\x%x');\n"
)
var (
withdrawalsInsert = schema.TableWithdrawal.FmtStringInsert() + ";\n"
)
func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) {
sqw.stmts <- []byte(fmt.Sprintf(nodeInsert, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID))
}
@ -192,9 +197,24 @@ func (sqw *SQLWriter) upsertIPLDNode(blockNumber string, i ipld.IPLD) {
}
func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
stmt := fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, formatPostgresStringArray(header.NodeIDs), header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase, header.Canonical)
stmt := fmt.Sprintf(headerInsert,
header.BlockNumber,
header.BlockHash,
header.ParentHash,
header.CID,
header.TotalDifficulty,
formatPostgresStringArray(header.NodeIDs),
header.Reward,
header.StateRoot,
header.TxRoot,
header.RctRoot,
header.UnclesHash,
header.Bloom,
header.Timestamp,
header.Coinbase,
header.Canonical,
header.WithdrawalsRoot,
)
sqw.stmts <- []byte(stmt)
metrics.IndexerMetrics.BlocksCounter.Inc(1)
}
@ -224,6 +244,19 @@ func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) {
}
}
func (sqw *SQLWriter) upsertWithdrawalCID(withdrawal models.WithdrawalModel) {
sqw.stmts <- []byte(fmt.Sprintf(withdrawalsInsert,
withdrawal.BlockNumber,
withdrawal.HeaderID,
withdrawal.CID,
withdrawal.Index,
withdrawal.Validator,
withdrawal.Address,
withdrawal.Amount,
))
metrics.IndexerMetrics.WithdrawalsCounter.Inc(1)
}
func (sqw *SQLWriter) upsertStateCID(stateNode models.StateNodeModel) {
balance := stateNode.Balance
if stateNode.Removed {

View File

@ -56,6 +56,8 @@ type IndexerMetricsHandles struct {
ReceiptsCounter metrics.Counter
// The total number of processed logs
LogsCounter metrics.Counter
// The total number of processed logs
WithdrawalsCounter metrics.Counter
// The total number of access list entries processed
AccessListEntriesCounter metrics.Counter
// Time spent waiting for free postgres tx
@ -90,6 +92,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
TransactionsCounter: metrics.NewCounter(),
ReceiptsCounter: metrics.NewCounter(),
LogsCounter: metrics.NewCounter(),
WithdrawalsCounter: metrics.NewCounter(),
AccessListEntriesCounter: metrics.NewCounter(),
FreePostgresTimer: metrics.NewTimer(),
PostgresCommitTimer: metrics.NewTimer(),
@ -113,6 +116,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
reg.Register(metricName(subsys, "transactions"), ctx.TransactionsCounter)
reg.Register(metricName(subsys, "receipts"), ctx.ReceiptsCounter)
reg.Register(metricName(subsys, "logs"), ctx.LogsCounter)
reg.Register(metricName(subsys, "withdrawals"), ctx.WithdrawalsCounter)
reg.Register(metricName(subsys, "access_list_entries"), ctx.AccessListEntriesCounter)
reg.Register(metricName(subsys, "t_free_postgres"), ctx.FreePostgresTimer)
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.PostgresCommitTimer)

View File

@ -105,7 +105,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
// Generate the block iplds
txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
txNodes, rctNodes, logNodes, wdNodes, err := ipld.FromBlockAndReceipts(block, receipts)
if err != nil {
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %w", err)
}
@ -148,16 +148,18 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
metrics2.IndexerMetrics.UncleProcessingTimer.Update(time.Since(t))
t = time.Now()
// Publish and index receipts and txs
err = sdi.processReceiptsAndTxs(batch, processArgs{
err = sdi.processObjects(batch, processArgs{
headerID: headerID,
blockNumber: block.Number(),
blockTime: block.Time(),
receipts: receipts,
withdrawals: block.Withdrawals(),
txs: transactions,
rctNodes: rctNodes,
txNodes: txNodes,
logNodes: logNodes,
wdNodes: wdNodes,
})
if err != nil {
return nil, err
@ -185,7 +187,7 @@ func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.He
return "", fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
}
// Process the header
headerNode, err := ipld.NewEthHeader(header)
headerNode, err := ipld.EncodeHeader(header)
if err != nil {
return "", err
}
@ -208,6 +210,7 @@ func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.He
Timestamp: header.Time,
Coinbase: header.Coinbase.String(),
Canonical: true,
WithdrawalsRoot: shared.MaybeStringHash(header.WithdrawalsHash),
})
}
@ -258,13 +261,15 @@ type processArgs struct {
blockTime uint64
receipts types.Receipts
txs types.Transactions
rctNodes []*ipld.EthReceipt
txNodes []*ipld.EthTx
logNodes [][]*ipld.EthLog
withdrawals types.Withdrawals
rctNodes []ipld.IPLD
txNodes []ipld.IPLD
logNodes [][]ipld.IPLD
wdNodes []ipld.IPLD
}
// processReceiptsAndTxs publishes and indexes receipt and transaction IPLDs in Postgres
func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs) error {
// processObjects publishes and indexes receipt and transaction IPLDs in Postgres
func (sdi *StateDiffIndexer) processObjects(tx *BatchTx, args processArgs) error {
// Process receipts and txs
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber, args.blockTime)
for i, receipt := range args.receipts {
@ -348,7 +353,23 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
return err
}
}
// Process withdrawals
for i, withdrawal := range args.withdrawals {
wdNode := args.wdNodes[i]
tx.cacheIPLD(wdNode)
wdModel := models.WithdrawalModel{
BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID,
CID: wdNode.Cid().String(),
Index: withdrawal.Index,
Validator: withdrawal.Validator,
Address: withdrawal.Address.String(),
Amount: withdrawal.Amount,
}
if err := sdi.dbWriter.upsertWithdrawalCID(tx.dbtx, wdModel); err != nil {
return err
}
}
return nil
}

View File

@ -54,6 +54,7 @@ type Statements interface {
InsertTxStm() string
InsertRctStm() string
InsertLogStm() string
InsertWithdrawalStm() string
InsertStateStm() string
InsertStorageStm() string
InsertIPLDStm() string

View File

@ -18,6 +18,7 @@ package postgres
import (
"fmt"
"strings"
"github.com/cerc-io/plugeth-statediff/indexer/database/sql"
"github.com/cerc-io/plugeth-statediff/indexer/shared/schema"
@ -43,7 +44,9 @@ type DB struct {
// MaxHeaderStm satisfies the sql.Statements interface
func (db *DB) MaxHeaderStm() string {
return fmt.Sprintf("SELECT block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase FROM %s ORDER BY block_number DESC LIMIT 1", schema.TableHeader.Name)
return fmt.Sprintf("SELECT %s FROM %s ORDER BY block_number DESC LIMIT 1",
strings.Join(schema.TableHeader.ColumnNames(), ","),
schema.TableHeader.Name)
}
// ExistsHeaderStm satisfies the sql.Statements interface
@ -59,7 +62,7 @@ func (db *DB) DetectGapsStm() string {
// InsertHeaderStm satisfies the sql.Statements interface
// Stm == Statement
func (db *DB) InsertHeaderStm() string {
return schema.TableHeader.ToInsertStatement(db.upsert)
return schema.TableHeader.PreparedInsert(db.upsert)
}
// SetCanonicalHeaderStm satisfies the sql.Statements interface
@ -70,37 +73,42 @@ func (db *DB) SetCanonicalHeaderStm() string {
// InsertUncleStm satisfies the sql.Statements interface
func (db *DB) InsertUncleStm() string {
return schema.TableUncle.ToInsertStatement(db.upsert)
return schema.TableUncle.PreparedInsert(db.upsert)
}
// InsertTxStm satisfies the sql.Statements interface
func (db *DB) InsertTxStm() string {
return schema.TableTransaction.ToInsertStatement(db.upsert)
return schema.TableTransaction.PreparedInsert(db.upsert)
}
// InsertRctStm satisfies the sql.Statements interface
func (db *DB) InsertRctStm() string {
return schema.TableReceipt.ToInsertStatement(db.upsert)
return schema.TableReceipt.PreparedInsert(db.upsert)
}
// InsertLogStm satisfies the sql.Statements interface
func (db *DB) InsertLogStm() string {
return schema.TableLog.ToInsertStatement(db.upsert)
return schema.TableLog.PreparedInsert(db.upsert)
}
// InsertLogStm satisfies the sql.Statements interface
func (db *DB) InsertWithdrawalStm() string {
return schema.TableWithdrawal.PreparedInsert(db.upsert)
}
// InsertStateStm satisfies the sql.Statements interface
func (db *DB) InsertStateStm() string {
return schema.TableStateNode.ToInsertStatement(db.upsert)
return schema.TableStateNode.PreparedInsert(db.upsert)
}
// InsertStorageStm satisfies the sql.Statements interface
func (db *DB) InsertStorageStm() string {
return schema.TableStorageNode.ToInsertStatement(db.upsert)
return schema.TableStorageNode.PreparedInsert(db.upsert)
}
// InsertIPLDStm satisfies the sql.Statements interface
func (db *DB) InsertIPLDStm() string {
return schema.TableIPLDBlock.ToInsertStatement(db.upsert)
return schema.TableIPLDBlock.PreparedInsert(db.upsert)
}
// InsertIPLDsStm satisfies the sql.Statements interface

View File

@ -95,6 +95,7 @@ func (w *Writer) maxHeader() (*models.HeaderModel, error) {
&model.Timestamp,
&model.Coinbase,
&model.Canonical,
&model.WithdrawalsRoot,
)
model.BlockNumber = strconv.FormatUint(number, 10)
model.TotalDifficulty = strconv.FormatUint(td, 10)
@ -125,6 +126,7 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
header.Timestamp,
header.Coinbase,
header.Canonical,
header.WithdrawalsRoot,
)
if err != nil {
return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header}
@ -286,6 +288,41 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
return nil
}
func (w *Writer) upsertWithdrawalCID(tx Tx, withdrawal models.WithdrawalModel) error {
if w.useCopyForTx(tx) {
blockNum, err := strconv.ParseUint(withdrawal.BlockNumber, 10, 64)
if err != nil {
return insertError{"eth.withdrawal_cids", err, "COPY", withdrawal}
}
_, err = tx.CopyFrom(w.db.Context(), schema.TableWithdrawal.TableName(), schema.TableWithdrawal.ColumnNames(),
toRows(toRow(blockNum,
withdrawal.HeaderID,
withdrawal.CID,
withdrawal.Index,
withdrawal.Validator,
withdrawal.Address,
withdrawal.Amount)))
if err != nil {
return insertError{"eth.withdrawal_cids", err, "COPY", withdrawal}
}
} else {
_, err := tx.Exec(w.db.Context(), w.db.InsertWithdrawalStm(),
withdrawal.BlockNumber,
withdrawal.HeaderID,
withdrawal.CID,
withdrawal.Index,
withdrawal.Validator,
withdrawal.Address,
withdrawal.Amount)
if err != nil {
return insertError{"eth.withdrawal_cids", err, w.db.InsertWithdrawalStm(), withdrawal}
}
}
metrics.IndexerMetrics.WithdrawalsCounter.Inc(1)
return nil
}
/*
INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, removed, diff, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (header_id, state_leaf_key, block_number) DO NOTHING

102
indexer/ipld/encode.go Normal file
View File

@ -0,0 +1,102 @@
// VulcanizeDB
// Copyright © 2024 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipld
import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
mh "github.com/multiformats/go-multihash"
)
// EncodeHeader converts a *types.Header into an IPLD node
func EncodeHeader(header *types.Header) (IPLD, error) {
headerRLP, err := rlp.EncodeToBytes(header)
if err != nil {
return nil, err
}
c, err := RawdataToCid(MEthHeader, headerRLP, mh.KECCAK_256)
if err != nil {
return nil, err
}
return &node{
cid: c,
rawdata: headerRLP,
}, nil
}
// encodeTx converts a *types.Transaction to an IPLD node
func encodeTx(tx *types.Transaction) (IPLD, error) {
txRaw, err := tx.MarshalBinary()
if err != nil {
return nil, err
}
c, err := RawdataToCid(MEthTx, txRaw, mh.KECCAK_256)
if err != nil {
return nil, err
}
return &node{
cid: c,
rawdata: txRaw,
}, nil
}
// encodeReceipt converts a types.Receipt to an IPLD node
func encodeReceipt(receipt *types.Receipt) (IPLD, error) {
rctRaw, err := receipt.MarshalBinary()
if err != nil {
return nil, err
}
c, err := RawdataToCid(MEthTxReceipt, rctRaw, mh.KECCAK_256)
if err != nil {
return nil, err
}
return &node{
cid: c,
rawdata: rctRaw,
}, nil
}
// encodeLog converts a Log to an IPLD node
func encodeLog(log *types.Log) (IPLD, error) {
logRaw, err := rlp.EncodeToBytes(log)
if err != nil {
return nil, err
}
c, err := RawdataToCid(MEthLog, logRaw, mh.KECCAK_256)
if err != nil {
return nil, err
}
return &node{
cid: c,
rawdata: logRaw,
}, nil
}
func encodeWithdrawal(w *types.Withdrawal) (IPLD, error) {
wRaw, err := rlp.EncodeToBytes(w)
if err != nil {
return nil, err
}
c, err := RawdataToCid(MEthWithdrawal, wRaw, mh.KECCAK_256)
if err != nil {
return nil, err
}
return &node{
cid: c,
rawdata: wRaw,
}, nil
}

View File

@ -1,60 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipld
import (
"github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
)
// EthHeader (eth-block, codec 0x90), represents an ethereum block header
type EthHeader struct {
cid cid.Cid
rawdata []byte
}
// Static (compile time) check that EthHeader satisfies the node.Node interface.
var _ IPLD = (*EthHeader)(nil)
// NewEthHeader converts a *types.Header into an EthHeader IPLD node
func NewEthHeader(header *types.Header) (*EthHeader, error) {
headerRLP, err := rlp.EncodeToBytes(header)
if err != nil {
return nil, err
}
c, err := RawdataToCid(MEthHeader, headerRLP, mh.KECCAK_256)
if err != nil {
return nil, err
}
return &EthHeader{
cid: c,
rawdata: headerRLP,
}, nil
}
// RawData returns the binary of the RLP encode of the block header.
func (b *EthHeader) RawData() []byte {
return b.rawdata
}
// Cid returns the cid of the block header.
func (b *EthHeader) Cid() cid.Cid {
return b.cid
}

View File

@ -1,43 +0,0 @@
package ipld
import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
)
// EthLog (eth-log, codec 0x9a), represents an ethereum block header
type EthLog struct {
rawData []byte
cid cid.Cid
}
// Static (compile time) check that EthLog satisfies the node.Node interface.
var _ IPLD = (*EthLog)(nil)
// NewLog create a new EthLog IPLD node
func NewLog(log *types.Log) (*EthLog, error) {
logRaw, err := rlp.EncodeToBytes(log)
if err != nil {
return nil, err
}
c, err := RawdataToCid(MEthLog, logRaw, mh.KECCAK_256)
if err != nil {
return nil, err
}
return &EthLog{
cid: c,
rawData: logRaw,
}, nil
}
// RawData returns the binary of the RLP encode of the log.
func (l *EthLog) RawData() []byte {
return l.rawData
}
// Cid returns the cid of the receipt log.
func (l *EthLog) Cid() cid.Cid {
return l.cid
}

View File

@ -22,25 +22,29 @@ import (
// FromBlockAndReceipts takes a block and processes it
// to return it a set of IPLD nodes for further processing.
func FromBlockAndReceipts(block *types.Block, receipts []*types.Receipt) ([]*EthTx, []*EthReceipt, [][]*EthLog, error) {
func FromBlockAndReceipts(block *types.Block, receipts []*types.Receipt) ([]IPLD, []IPLD, [][]IPLD, []IPLD, error) {
// Process the txs
txNodes, err := processTransactions(block.Transactions())
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
withdrawalNodes, err := processWithdrawals(block.Withdrawals())
if err != nil {
return nil, nil, nil, nil, err
}
// Process the receipts and logs
rctNodes, logNodes, err := processReceiptsAndLogs(receipts)
return txNodes, rctNodes, logNodes, err
return txNodes, rctNodes, logNodes, withdrawalNodes, err
}
// processTransactions will take the found transactions in a parsed block body
// to return IPLD node slices for eth-tx
func processTransactions(txs []*types.Transaction) ([]*EthTx, error) {
var ethTxNodes []*EthTx
func processTransactions(txs []*types.Transaction) ([]IPLD, error) {
var ethTxNodes []IPLD
for _, tx := range txs {
ethTx, err := NewEthTx(tx)
ethTx, err := encodeTx(tx)
if err != nil {
return nil, err
}
@ -50,12 +54,25 @@ func processTransactions(txs []*types.Transaction) ([]*EthTx, error) {
return ethTxNodes, nil
}
func processWithdrawals(withdrawals []*types.Withdrawal) ([]IPLD, error) {
var withdrawalNodes []IPLD
for _, withdrawal := range withdrawals {
ethW, err := encodeWithdrawal(withdrawal)
if err != nil {
return nil, err
}
withdrawalNodes = append(withdrawalNodes, ethW)
}
return withdrawalNodes, nil
}
// processReceiptsAndLogs will take in receipts
// to return IPLD node slices for eth-rct and eth-log
func processReceiptsAndLogs(rcts []*types.Receipt) ([]*EthReceipt, [][]*EthLog, error) {
func processReceiptsAndLogs(rcts []*types.Receipt) ([]IPLD, [][]IPLD, error) {
// Pre allocating memory.
ethRctNodes := make([]*EthReceipt, len(rcts))
ethLogNodes := make([][]*EthLog, len(rcts))
ethRctNodes := make([]IPLD, len(rcts))
ethLogNodes := make([][]IPLD, len(rcts))
for idx, rct := range rcts {
logNodes, err := processLogs(rct.Logs)
@ -63,7 +80,7 @@ func processReceiptsAndLogs(rcts []*types.Receipt) ([]*EthReceipt, [][]*EthLog,
return nil, nil, err
}
ethRct, err := NewReceipt(rct)
ethRct, err := encodeReceipt(rct)
if err != nil {
return nil, nil, err
}
@ -75,10 +92,10 @@ func processReceiptsAndLogs(rcts []*types.Receipt) ([]*EthReceipt, [][]*EthLog,
return ethRctNodes, ethLogNodes, nil
}
func processLogs(logs []*types.Log) ([]*EthLog, error) {
logNodes := make([]*EthLog, len(logs))
func processLogs(logs []*types.Log) ([]IPLD, error) {
logNodes := make([]IPLD, len(logs))
for idx, log := range logs {
logNode, err := NewLog(log)
logNode, err := encodeLog(log)
if err != nil {
return nil, err
}

View File

@ -1,58 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipld
import (
"github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
"github.com/ethereum/go-ethereum/core/types"
)
type EthReceipt struct {
rawdata []byte
cid cid.Cid
}
// Static (compile time) check that EthReceipt satisfies the node.Node interface.
var _ IPLD = (*EthReceipt)(nil)
// NewReceipt converts a types.ReceiptForStorage to an EthReceipt IPLD node
func NewReceipt(receipt *types.Receipt) (*EthReceipt, error) {
rctRaw, err := receipt.MarshalBinary()
if err != nil {
return nil, err
}
c, err := RawdataToCid(MEthTxReceipt, rctRaw, mh.KECCAK_256)
if err != nil {
return nil, err
}
return &EthReceipt{
cid: c,
rawdata: rctRaw,
}, nil
}
// RawData returns the binary of the RLP encode of the receipt.
func (r *EthReceipt) RawData() []byte {
return r.rawdata
}
// Cid returns the cid of the receipt.
func (r *EthReceipt) Cid() cid.Cid {
return r.cid
}

View File

@ -1,59 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipld
import (
"github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
"github.com/ethereum/go-ethereum/core/types"
)
// EthTx (eth-tx codec 0x93) represents an ethereum transaction
type EthTx struct {
cid cid.Cid
rawdata []byte
}
// Static (compile time) check that EthTx satisfies the node.Node interface.
var _ IPLD = (*EthTx)(nil)
// NewEthTx converts a *types.Transaction to an EthTx IPLD node
func NewEthTx(tx *types.Transaction) (*EthTx, error) {
txRaw, err := tx.MarshalBinary()
if err != nil {
return nil, err
}
c, err := RawdataToCid(MEthTx, txRaw, mh.KECCAK_256)
if err != nil {
return nil, err
}
return &EthTx{
cid: c,
rawdata: txRaw,
}, nil
}
// RawData returns the binary of the RLP encode of the transaction.
func (t *EthTx) RawData() []byte {
return t.rawdata
}
// Cid returns the cid of the transaction.
func (t *EthTx) Cid() cid.Cid {
return t.cid
}

View File

@ -2,7 +2,25 @@ package ipld
import "github.com/ipfs/go-cid"
// Check that node satisfies the IPLD Node interface.
var _ IPLD = (*node)(nil)
type node struct {
cid cid.Cid
rawdata []byte
}
type IPLD interface {
Cid() cid.Cid
RawData() []byte
}
// RawData returns the RLP encoded bytes of the node.
func (b node) RawData() []byte {
return b.rawdata
}
// Cid returns the CID of the node.
func (b node) Cid() cid.Cid {
return b.cid
}

View File

@ -37,6 +37,7 @@ const (
MEthStorageTrie = 0x98
MEthLogTrie = 0x99
MEthLog = 0x9a
MEthWithdrawal = 0x9b // TODO
)
// RawdataToCid takes the desired codec and a slice of bytes

View File

@ -42,6 +42,7 @@ type HeaderModel struct {
Timestamp uint64 `db:"timestamp"`
Coinbase string `db:"coinbase"`
Canonical bool `db:"canonical"`
WithdrawalsRoot string `db:"withdrawals_root"`
}
// UncleModel is the db model for eth.uncle_cids
@ -105,7 +106,7 @@ type StorageNodeModel struct {
Value []byte `db:"val"`
}
// LogsModel is the db model for eth.logs
// LogsModel is the db model for eth.log_cids
type LogsModel struct {
BlockNumber string `db:"block_number"`
HeaderID string `db:"header_id"`
@ -118,3 +119,14 @@ type LogsModel struct {
Topic2 string `db:"topic2"`
Topic3 string `db:"topic3"`
}
// WithdrawalModel is the db model for eth.withdrawal_cids
type WithdrawalModel struct {
BlockNumber string `db:"block_number"`
HeaderID string `db:"header_id"`
CID string `db:"cid"`
Index uint64 `db:"index"`
Validator uint64 `db:"validator"`
Address string `db:"address"`
Amount uint64 `db:"amount"`
}

View File

@ -35,3 +35,12 @@ func HandleZeroAddr(to common.Address) string {
}
return to.String()
}
// MaybeStringHash calls String on its argument and returns a pointer to the result.
// When passed nil, it returns nil.
func MaybeStringHash(hash *common.Hash) string {
if hash == nil {
return ""
}
return hash.String()
}

View File

@ -16,6 +16,19 @@
package schema
var Tables = []*Table{
&TableIPLDBlock,
&TableNodeInfo,
&TableHeader,
&TableStateNode,
&TableStorageNode,
&TableUncle,
&TableTransaction,
&TableReceipt,
&TableLog,
&TableWithdrawal,
}
var TableIPLDBlock = Table{
Name: `ipld.blocks`,
Columns: []Column{
@ -52,9 +65,10 @@ var TableHeader = Table{
{Name: "receipt_root", Type: Dvarchar},
{Name: "uncles_hash", Type: Dvarchar},
{Name: "bloom", Type: Dbytea},
{Name: "timestamp", Type: Dnumeric},
{Name: "timestamp", Type: Dbigint},
{Name: "coinbase", Type: Dvarchar},
{Name: "canonical", Type: Dboolean},
{Name: "withdrawals_root", Type: Dvarchar},
},
UpsertClause: OnConflict("block_number", "block_hash").Set(
"parent_hash",
@ -70,6 +84,7 @@ var TableHeader = Table{
"timestamp",
"coinbase",
"canonical",
"withdrawals_root",
)}
var TableStateNode = Table{
@ -165,6 +180,20 @@ var TableLog = Table{
UpsertClause: OnConflict("block_number", "header_id", "rct_id", "index"),
}
var TableWithdrawal = Table{
Name: "eth.withdrawal_cids",
Columns: []Column{
{Name: "block_number", Type: Dbigint},
{Name: "header_id", Type: Dvarchar},
{Name: "cid", Type: Dtext},
{Name: "index", Type: Dinteger},
{Name: "validator", Type: Dinteger},
{Name: "address", Type: Dvarchar},
{Name: "amount", Type: Dinteger},
},
UpsertClause: OnConflict("block_number", "header_id", "index"),
}
var TableWatchedAddresses = Table{
Name: "eth_meta.watched_addresses",
Columns: []Column{

View File

@ -53,34 +53,6 @@ type Table struct {
UpsertClause ConflictClause
}
type colfmt = func(interface{}) string
func (tbl *Table) ToCsvRow(args ...interface{}) []string {
var row []string
for i, col := range tbl.Columns {
value := col.Type.formatter()(args[i])
if col.Array {
valueList := funk.Map(args[i], col.Type.formatter()).([]string)
value = fmt.Sprintf("{%s}", strings.Join(valueList, ","))
}
row = append(row, value)
}
return row
}
func (tbl *Table) VarcharColumns() []string {
columns := funk.Filter(tbl.Columns, func(col Column) bool {
return col.Type == Dvarchar
}).([]Column)
columnNames := funk.Map(columns, func(col Column) string {
return col.Name
}).([]string)
return columnNames
}
func OnConflict(target ...string) ConflictClause {
return ConflictClause{Target: target}
}
@ -89,35 +61,6 @@ func (c ConflictClause) Set(fields ...string) ConflictClause {
return c
}
// ToInsertStatement returns a Postgres-compatible SQL insert statement for the table
// using positional placeholders
func (tbl *Table) ToInsertStatement(upsert bool) string {
var colnames, placeholders []string
for i, col := range tbl.Columns {
colnames = append(colnames, col.Name)
placeholders = append(placeholders, fmt.Sprintf("$%d", i+1))
}
suffix := fmt.Sprintf("ON CONFLICT (%s)", strings.Join(tbl.UpsertClause.Target, ", "))
if upsert && len(tbl.UpsertClause.Update) != 0 {
var update_placeholders []string
for _, name := range tbl.UpsertClause.Update {
i := funk.IndexOf(tbl.Columns, func(col Column) bool { return col.Name == name })
update_placeholders = append(update_placeholders, fmt.Sprintf("$%d", i+1))
}
suffix += fmt.Sprintf(
" DO UPDATE SET (%s) = (%s)",
strings.Join(tbl.UpsertClause.Update, ", "), strings.Join(update_placeholders, ", "),
)
} else {
suffix += " DO NOTHING"
}
return fmt.Sprintf(
"INSERT INTO %s (%s) VALUES (%s) %s",
tbl.Name, strings.Join(colnames, ", "), strings.Join(placeholders, ", "), suffix,
)
}
// TableName returns a pgx-compatible table name.
func (tbl *Table) TableName() []string {
return strings.Split(tbl.Name, ".")
@ -132,11 +75,45 @@ func (tbl *Table) ColumnNames() []string {
return names
}
// PreparedInsert returns a pgx/sqlx-compatible SQL prepared insert statement for the table
// using positional placeholders.
// If upsert is true, include an ON CONFLICT clause handling column updates.
func (tbl *Table) PreparedInsert(upsert bool) string {
var colnames, placeholders []string
for i, col := range tbl.Columns {
colnames = append(colnames, col.Name)
placeholders = append(placeholders, fmt.Sprintf("$%d", i+1))
}
suffix := " ON CONFLICT"
if len(tbl.UpsertClause.Target) > 0 {
suffix += fmt.Sprintf(" (%s)", strings.Join(tbl.UpsertClause.Target, ", "))
}
if upsert && len(tbl.UpsertClause.Update) != 0 {
var update_placeholders []string
for _, name := range tbl.UpsertClause.Update {
update_placeholders = append(update_placeholders, "EXCLUDED."+name)
}
suffix += fmt.Sprintf(
" DO UPDATE SET (%s) = ROW(%s)",
strings.Join(tbl.UpsertClause.Update, ", "), strings.Join(update_placeholders, ", "),
)
} else {
suffix += " DO NOTHING"
}
return fmt.Sprintf(
"INSERT INTO %s (%s) VALUES (%s)",
tbl.Name, strings.Join(colnames, ", "), strings.Join(placeholders, ", "),
) + suffix
}
type colfmt = func(interface{}) string
func sprintf(f string) colfmt {
return func(x interface{}) string { return fmt.Sprintf(f, x) }
}
func (typ colType) formatter() colfmt {
func (typ colType) csvFormatter() colfmt {
switch typ {
case Dinteger:
return sprintf("%d")
@ -157,6 +134,61 @@ func (typ colType) formatter() colfmt {
return sprintf("%s")
case Dtext:
return sprintf("%s")
default:
panic("invalid column type")
}
panic("unreachable")
}
// ToCsvRow converts a list of values to a list of strings suitable for CSV output.
func (tbl *Table) ToCsvRow(args ...interface{}) []string {
var row []string
for i, col := range tbl.Columns {
value := col.Type.csvFormatter()(args[i])
if col.Array {
valueList := funk.Map(args[i], col.Type.csvFormatter()).([]string)
value = fmt.Sprintf("{%s}", strings.Join(valueList, ","))
}
row = append(row, value)
}
return row
}
// VarcharColumns returns the names of columns with type VARCHAR.
func (tbl *Table) VarcharColumns() []string {
columns := funk.Filter(tbl.Columns, func(col Column) bool {
return col.Type == Dvarchar
}).([]Column)
columnNames := funk.Map(columns, func(col Column) string {
return col.Name
}).([]string)
return columnNames
}
func formatSpec(typ colType) string {
switch typ {
case Dinteger:
return "%d"
case Dboolean:
return "%t"
case Dbytea:
return `'\x%x'`
default:
return "'%s'"
}
}
// FmtStringInsert returns a format string for creating a Postgres insert statement.
func (tbl *Table) FmtStringInsert() string {
var colnames, placeholders []string
for _, col := range tbl.Columns {
colnames = append(colnames, col.Name)
placeholders = append(placeholders, formatSpec(col.Type))
}
return fmt.Sprintf(
"INSERT INTO %s (%s) VALUES (%s);",
tbl.Name, strings.Join(colnames, ", "), strings.Join(placeholders, ", "),
)
}

View File

@ -8,47 +8,55 @@ import (
. "github.com/cerc-io/plugeth-statediff/indexer/shared/schema"
)
var testHeaderTable = Table{
Name: "eth.header_cids",
Columns: []Column{
{Name: "block_number", Type: Dbigint},
{Name: "block_hash", Type: Dvarchar},
{Name: "parent_hash", Type: Dvarchar},
{Name: "cid", Type: Dtext},
{Name: "td", Type: Dnumeric},
{Name: "node_id", Type: Dvarchar},
{Name: "reward", Type: Dnumeric},
{Name: "state_root", Type: Dvarchar},
{Name: "tx_root", Type: Dvarchar},
{Name: "receipt_root", Type: Dvarchar},
{Name: "uncle_root", Type: Dvarchar},
{Name: "bloom", Type: Dbytea},
{Name: "timestamp", Type: Dnumeric},
{Name: "mh_key", Type: Dtext},
{Name: "times_validated", Type: Dinteger},
{Name: "coinbase", Type: Dvarchar},
},
UpsertClause: OnConflict("block_hash", "block_number").Set(
"parent_hash",
"cid",
"td",
"node_id",
"reward",
"state_root",
"tx_root",
"receipt_root",
"uncle_root",
"bloom",
"timestamp",
"mh_key",
"times_validated",
"coinbase",
),
}
var (
testTable = Table{
Name: "test_table",
Columns: []Column{
{Name: "id", Type: Dbigint},
{Name: "name", Type: Dvarchar},
{Name: "age", Type: Dinteger},
},
}
testTableWithConflictClause = Table{
Name: "test_table_conflict",
Columns: []Column{
{Name: "id", Type: Dbigint},
{Name: "name", Type: Dvarchar},
{Name: "age", Type: Dinteger},
},
UpsertClause: OnConflict("id").Set("name", "age"),
}
)
const (
expectedHeaderPreparedWithUpsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical, withdrawals_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical, withdrawals_root) = ROW(EXCLUDED.parent_hash, EXCLUDED.cid, EXCLUDED.td, EXCLUDED.node_ids, EXCLUDED.reward, EXCLUDED.state_root, EXCLUDED.tx_root, EXCLUDED.receipt_root, EXCLUDED.uncles_hash, EXCLUDED.bloom, EXCLUDED.timestamp, EXCLUDED.coinbase, EXCLUDED.canonical, EXCLUDED.withdrawals_root)"
expectedHeaderPreparedWithoutUpsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical, withdrawals_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_number, block_hash) DO NOTHING"
expectedHeaderFmtString = `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical, withdrawals_root) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\x%x', '%s', '%s', %t, '%s');`
)
func TestTable(t *testing.T) {
headerUpsert := `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)`
headerNoUpsert := `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash, block_number) DO NOTHING`
require.Equal(t, headerNoUpsert, testHeaderTable.ToInsertStatement(false))
require.Equal(t, headerUpsert, testHeaderTable.ToInsertStatement(true))
require.Equal(t,
"INSERT INTO test_table (id, name, age) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
testTable.PreparedInsert(true),
)
require.Equal(t,
"INSERT INTO test_table (id, name, age) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
testTable.PreparedInsert(false),
)
require.Equal(t, "INSERT INTO test_table (id, name, age) VALUES ('%s', '%s', %d);", testTable.FmtStringInsert())
require.Equal(t,
"INSERT INTO test_table_conflict (id, name, age) VALUES ($1, $2, $3) ON CONFLICT (id) DO UPDATE SET (name, age) = ROW(EXCLUDED.name, EXCLUDED.age)",
testTableWithConflictClause.PreparedInsert(true),
)
require.Equal(t,
"INSERT INTO test_table_conflict (id, name, age) VALUES ($1, $2, $3) ON CONFLICT (id) DO NOTHING",
testTableWithConflictClause.PreparedInsert(false),
)
require.Equal(t, expectedHeaderPreparedWithUpsert, TableHeader.PreparedInsert(true))
require.Equal(t, expectedHeaderPreparedWithoutUpsert, TableHeader.PreparedInsert(false))
require.Equal(t, expectedHeaderFmtString, TableHeader.FmtStringInsert())
}