fix uncle processing for file indexer

This commit is contained in:
i-norden 2022-08-10 11:54:37 -05:00
parent c8dd85f417
commit 531e37ccfa
4 changed files with 41 additions and 21 deletions

View File

@ -252,7 +252,7 @@ func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) {
var values []interface{}
values = append(values, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.MhKey, 1, header.Coinbase)
header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.MhKey, 1, header.Coinbase)
csw.rows <- tableRow{types.TableHeader, values}
indexerMetrics.blocks.Inc(1)
}
@ -260,7 +260,7 @@ func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) {
func (csw *CSVWriter) upsertUncleCID(uncle models.UncleModel) {
var values []interface{}
values = append(values, uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID,
uncle.Reward, uncle.MhKey)
uncle.Reward, uncle.MhKey, uncle.Index)
csw.rows <- tableRow{types.TableUncle, values}
}

View File

@ -17,6 +17,7 @@
package file
import (
"bytes"
"context"
"errors"
"fmt"
@ -26,6 +27,9 @@ import (
"sync/atomic"
"time"
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
"github.com/ipfs/go-cid"
node "github.com/ipfs/go-ipld-format"
"github.com/multiformats/go-multihash"
@ -149,7 +153,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
// Generate the block iplds
headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, logTrieNodes, logLeafNodeCIDs, rctLeafNodeCIDs, err := ipld2.FromBlockAndReceipts(block, receipts)
headerNode, txNodes, txTrieNodes, rctNodes, rctTrieNodes, logTrieNodes, logLeafNodeCIDs, rctLeafNodeCIDs, err := ipld2.FromBlockAndReceipts(block, receipts)
if err != nil {
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
}
@ -200,7 +204,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
t = time.Now()
// write uncles
sdi.processUncles(headerID, block.Number(), uncleNodes)
sdi.processUncles(headerID, block.Number(), block.UncleHash(), block.Uncles())
tDiff = time.Since(t)
indexerMetrics.tUncleProcessing.Update(tDiff)
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
@ -255,35 +259,50 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node
StateRoot: header.Root.String(),
RctRoot: header.ReceiptHash.String(),
TxRoot: header.TxHash.String(),
UncleRoot: header.UncleHash.String(),
UnclesHash: header.UncleHash.String(),
Timestamp: header.Time,
Coinbase: header.Coinbase.String(),
})
return headerID
}
// processUncles writes uncle IPLD insert SQL stmts to a file
func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber *big.Int, uncleNodes []*ipld2.EthHeader) {
// processUncles publishes and indexes uncle IPLDs in Postgres
func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber *big.Int, unclesHash common.Hash, uncles []*types.Header) error {
// publish and index uncles
for _, uncleNode := range uncleNodes {
sdi.fileWriter.upsertIPLDNode(blockNumber.String(), uncleNode)
uncleEncoding, err := rlp.EncodeToBytes(uncles)
if err != nil {
return err
}
preparedHash := crypto.Keccak256Hash(uncleEncoding)
if !bytes.Equal(preparedHash.Bytes(), unclesHash.Bytes()) {
return fmt.Errorf("derived uncles hash (%s) does not match the hash in the header (%s)", preparedHash.Hex(), unclesHash.Hex())
}
unclesCID, err := ipld2.RawdataToCid(ipld2.MEthHeaderList, uncleEncoding, multihash.KECCAK_256)
if err != nil {
return err
}
prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(unclesCID.Hash()).String()
sdi.fileWriter.upsertIPLDDirect(blockNumber.String(), prefixedKey, uncleEncoding)
for i, uncle := range uncles {
var uncleReward *big.Int
// in PoA networks uncle reward is 0
if sdi.chainConfig.Clique != nil {
uncleReward = big.NewInt(0)
} else {
uncleReward = shared.CalcUncleMinerReward(blockNumber.Uint64(), uncleNode.Number.Uint64())
uncleReward = shared.CalcUncleMinerReward(blockNumber.Uint64(), uncle.Number.Uint64())
}
sdi.fileWriter.upsertUncleCID(models.UncleModel{
BlockNumber: blockNumber.String(),
HeaderID: headerID,
CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
ParentHash: uncleNode.ParentHash.String(),
BlockHash: uncleNode.Hash().String(),
CID: unclesCID.String(),
MhKey: shared.MultihashKeyFromCID(unclesCID),
ParentHash: uncle.ParentHash.String(),
BlockHash: uncle.Hash().String(),
Reward: uncleReward.String(),
Index: int64(i),
})
}
return nil
}
// processArgs bundles arguments to processReceiptsAndTxs

View File

@ -143,11 +143,11 @@ const (
ipldInsert = "INSERT INTO public.blocks (block_number, key, data) VALUES ('%s', '%s', '\\x%x');\n"
headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, " +
"state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES " +
"state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, mh_key, times_validated, coinbase) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %d, '%s');\n"
uncleInsert = "INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', '%s');\n"
uncleInsert = "INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, mh_key, index) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', %d);\n"
txInsert = "INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, " +
"value) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', %d, '%s', '\\x%x', %d, '%s');\n"
@ -212,14 +212,14 @@ func (sqw *SQLWriter) upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []
func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
stmt := fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.Coinbase)
header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.MhKey, 1, header.Coinbase)
sqw.stmts <- []byte(stmt)
indexerMetrics.blocks.Inc(1)
}
func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) {
sqw.stmts <- []byte(fmt.Sprintf(uncleInsert, uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID,
uncle.Reward, uncle.MhKey))
uncle.Reward, uncle.MhKey, uncle.Index))
}
func (sqw *SQLWriter) upsertTransactionCID(transaction models.TxModel) {

View File

@ -49,7 +49,7 @@ var TableHeader = Table{
{name: "state_root", dbType: varchar},
{name: "tx_root", dbType: varchar},
{name: "receipt_root", dbType: varchar},
{name: "uncle_root", dbType: varchar},
{name: "uncles_hash", dbType: varchar},
{name: "bloom", dbType: bytea},
{name: "timestamp", dbType: numeric},
{name: "mh_key", dbType: text},
@ -97,6 +97,7 @@ var TableUncle = Table{
{name: "cid", dbType: text},
{name: "reward", dbType: numeric},
{name: "mh_key", dbType: text},
{name: "index", dbType: integer},
},
}
@ -170,7 +171,7 @@ var TableStateAccount = Table{
{name: "state_path", dbType: bytea},
{name: "balance", dbType: numeric},
{name: "nonce", dbType: bigint},
{name: "code_hash", dbType: bytea},
{name: "code_hash", dbType: varchar},
{name: "storage_root", dbType: varchar},
},
}