update file writing mode

This commit is contained in:
i-norden 2022-03-24 08:01:26 -05:00
parent 9775355d2b
commit 7bc4c7520c
6 changed files with 175 additions and 143 deletions

View File

@ -18,7 +18,7 @@ package file
// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration // BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration
type BatchTx struct { type BatchTx struct {
BlockNumber uint64 BlockNumber string
submit func(blockTx *BatchTx, err error) error submit func(blockTx *BatchTx, err error) error
} }

View File

@ -23,6 +23,7 @@ import (
"math/big" "math/big"
"os" "os"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -53,10 +54,12 @@ var (
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void // StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void
type StateDiffIndexer struct { type StateDiffIndexer struct {
fileWriter *SQLWriter fileWriter *SQLWriter
chainConfig *params.ChainConfig chainConfig *params.ChainConfig
nodeID string nodeID string
wg *sync.WaitGroup wg *sync.WaitGroup
blockNumber string
removedCacheFlag *uint32
} }
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer // NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
@ -77,7 +80,6 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
w.Loop() w.Loop()
w.upsertNode(config.NodeInfo) w.upsertNode(config.NodeInfo)
w.upsertIPLDDirect(shared.RemovedNodeMhKey, []byte{})
return &StateDiffIndexer{ return &StateDiffIndexer{
fileWriter: w, fileWriter: w,
chainConfig: chainConfig, chainConfig: chainConfig,
@ -92,6 +94,8 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {}
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts) // PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback // Returns an initiated DB transaction which must be Closed via defer to commit or rollback
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
sdi.removedCacheFlag = new(uint32)
sdi.blockNumber = block.Number().String()
start, t := time.Now(), time.Now() start, t := time.Now(), time.Now()
blockHash := block.Hash() blockHash := block.Hash()
blockHashStr := blockHash.String() blockHashStr := blockHash.String()
@ -127,7 +131,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
t = time.Now() t = time.Now()
blockTx := &BatchTx{ blockTx := &BatchTx{
BlockNumber: height, BlockNumber: sdi.blockNumber,
submit: func(self *BatchTx, err error) error { submit: func(self *BatchTx, err error) error {
tDiff := time.Since(t) tDiff := time.Since(t)
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff) indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
@ -189,7 +193,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
// processHeader write a header IPLD insert SQL stmt to a file // processHeader write a header IPLD insert SQL stmt to a file
// it returns the headerID // it returns the headerID
func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node.Node, reward, td *big.Int) string { func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node.Node, reward, td *big.Int) string {
sdi.fileWriter.upsertIPLDNode(headerNode) sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, headerNode)
var baseFee *string var baseFee *string
if header.BaseFee != nil { if header.BaseFee != nil {
@ -202,7 +206,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node
CID: headerNode.Cid().String(), CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
ParentHash: header.ParentHash.String(), ParentHash: header.ParentHash.String(),
BlockNumber: header.Number.String(), BlockNumber: sdi.blockNumber,
BlockHash: headerID, BlockHash: headerID,
TotalDifficulty: td.String(), TotalDifficulty: td.String(),
Reward: reward.String(), Reward: reward.String(),
@ -221,7 +225,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node
func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) { func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) {
// publish and index uncles // publish and index uncles
for _, uncleNode := range uncleNodes { for _, uncleNode := range uncleNodes {
sdi.fileWriter.upsertIPLDNode(uncleNode) sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, uncleNode)
var uncleReward *big.Int var uncleReward *big.Int
// in PoA networks uncle reward is 0 // in PoA networks uncle reward is 0
if sdi.chainConfig.Clique != nil { if sdi.chainConfig.Clique != nil {
@ -230,12 +234,13 @@ func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64,
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
} }
sdi.fileWriter.upsertUncleCID(models.UncleModel{ sdi.fileWriter.upsertUncleCID(models.UncleModel{
HeaderID: headerID, BlockNumber: sdi.blockNumber,
CID: uncleNode.Cid().String(), HeaderID: headerID,
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), CID: uncleNode.Cid().String(),
ParentHash: uncleNode.ParentHash.String(), MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
BlockHash: uncleNode.Hash().String(), ParentHash: uncleNode.ParentHash.String(),
Reward: uncleReward.String(), BlockHash: uncleNode.Hash().String(),
Reward: uncleReward.String(),
}) })
} }
} }
@ -261,10 +266,10 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber) signer := types.MakeSigner(sdi.chainConfig, args.blockNumber)
for i, receipt := range args.receipts { for i, receipt := range args.receipts {
for _, logTrieNode := range args.logTrieNodes[i] { for _, logTrieNode := range args.logTrieNodes[i] {
sdi.fileWriter.upsertIPLDNode(logTrieNode) sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, logTrieNode)
} }
txNode := args.txNodes[i] txNode := args.txNodes[i]
sdi.fileWriter.upsertIPLDNode(txNode) sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, txNode)
// index tx // index tx
trx := args.txs[i] trx := args.txs[i]
@ -281,16 +286,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
return fmt.Errorf("error deriving tx sender: %v", err) return fmt.Errorf("error deriving tx sender: %v", err)
} }
txModel := models.TxModel{ txModel := models.TxModel{
HeaderID: args.headerID, BlockNumber: sdi.blockNumber,
Dst: shared.HandleZeroAddrPointer(trx.To()), HeaderID: args.headerID,
Src: shared.HandleZeroAddr(from), Dst: shared.HandleZeroAddrPointer(trx.To()),
TxHash: txID, Src: shared.HandleZeroAddr(from),
Index: int64(i), TxHash: txID,
Data: trx.Data(), Index: int64(i),
CID: txNode.Cid().String(), Data: trx.Data(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()), CID: txNode.Cid().String(),
Type: trx.Type(), MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Value: val, Type: trx.Type(),
Value: val,
} }
sdi.fileWriter.upsertTransactionCID(txModel) sdi.fileWriter.upsertTransactionCID(txModel)
@ -301,6 +307,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
storageKeys[k] = storageKey.Hex() storageKeys[k] = storageKey.Hex()
} }
accessListElementModel := models.AccessListElementModel{ accessListElementModel := models.AccessListElementModel{
BlockNumber: sdi.blockNumber,
TxID: txID, TxID: txID,
Index: int64(j), Index: int64(j),
Address: accessListElement.Address.Hex(), Address: accessListElement.Address.Hex(),
@ -322,6 +329,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
} }
rctModel := &models.ReceiptModel{ rctModel := &models.ReceiptModel{
BlockNumber: sdi.blockNumber,
TxID: txID, TxID: txID,
Contract: contract, Contract: contract,
ContractHash: contractHash, ContractHash: contractHash,
@ -349,16 +357,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
} }
logDataSet[idx] = &models.LogsModel{ logDataSet[idx] = &models.LogsModel{
ReceiptID: txID, BlockNumber: sdi.blockNumber,
Address: l.Address.String(), ReceiptID: txID,
Index: int64(l.Index), Address: l.Address.String(),
Data: l.Data, Index: int64(l.Index),
LeafCID: args.logLeafNodeCIDs[i][idx].String(), Data: l.Data,
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), LeafCID: args.logLeafNodeCIDs[i][idx].String(),
Topic0: topicSet[0], LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
Topic1: topicSet[1], Topic0: topicSet[0],
Topic2: topicSet[2], Topic1: topicSet[1],
Topic3: topicSet[3], Topic2: topicSet[2],
Topic3: topicSet[3],
} }
} }
sdi.fileWriter.upsertLogCID(logDataSet) sdi.fileWriter.upsertLogCID(logDataSet)
@ -366,8 +375,8 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
// publish trie nodes, these aren't indexed directly // publish trie nodes, these aren't indexed directly
for i, n := range args.txTrieNodes { for i, n := range args.txTrieNodes {
sdi.fileWriter.upsertIPLDNode(n) sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, n)
sdi.fileWriter.upsertIPLDNode(args.rctTrieNodes[i]) sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, args.rctTrieNodes[i])
} }
return nil return nil
@ -377,30 +386,34 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error {
// publish the state node // publish the state node
if stateNode.NodeType == sdtypes.Removed { if stateNode.NodeType == sdtypes.Removed {
// short circuit if it is a Removed node if atomic.LoadUint32(sdi.removedCacheFlag) == 0 {
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present atomic.StoreUint32(sdi.removedCacheFlag, 1)
sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, shared.RemovedNodeMhKey, []byte{})
}
stateModel := models.StateNodeModel{ stateModel := models.StateNodeModel{
HeaderID: headerID, BlockNumber: sdi.blockNumber,
Path: stateNode.Path, HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.LeafKey).String(), Path: stateNode.Path,
CID: shared.RemovedNodeStateCID, StateKey: common.BytesToHash(stateNode.LeafKey).String(),
MhKey: shared.RemovedNodeMhKey, CID: shared.RemovedNodeStateCID,
NodeType: stateNode.NodeType.Int(), MhKey: shared.RemovedNodeMhKey,
NodeType: stateNode.NodeType.Int(),
} }
sdi.fileWriter.upsertStateCID(stateModel) sdi.fileWriter.upsertStateCID(stateModel)
return nil return nil
} }
stateCIDStr, stateMhKey, err := sdi.fileWriter.upsertIPLDRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) stateCIDStr, stateMhKey, err := sdi.fileWriter.upsertIPLDRaw(sdi.blockNumber, ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
if err != nil { if err != nil {
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
} }
stateModel := models.StateNodeModel{ stateModel := models.StateNodeModel{
HeaderID: headerID, BlockNumber: sdi.blockNumber,
Path: stateNode.Path, HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.LeafKey).String(), Path: stateNode.Path,
CID: stateCIDStr, StateKey: common.BytesToHash(stateNode.LeafKey).String(),
MhKey: stateMhKey, CID: stateCIDStr,
NodeType: stateNode.NodeType.Int(), MhKey: stateMhKey,
NodeType: stateNode.NodeType.Int(),
} }
// index the state node // index the state node
sdi.fileWriter.upsertStateCID(stateModel) sdi.fileWriter.upsertStateCID(stateModel)
@ -418,6 +431,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error decoding state account rlp: %s", err.Error()) return fmt.Errorf("error decoding state account rlp: %s", err.Error())
} }
accountModel := models.StateAccountModel{ accountModel := models.StateAccountModel{
BlockNumber: sdi.blockNumber,
HeaderID: headerID, HeaderID: headerID,
StatePath: stateNode.Path, StatePath: stateNode.Path,
Balance: account.Balance.String(), Balance: account.Balance.String(),
@ -430,32 +444,36 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// if there are any storage nodes associated with this node, publish and index them // if there are any storage nodes associated with this node, publish and index them
for _, storageNode := range stateNode.StorageNodes { for _, storageNode := range stateNode.StorageNodes {
if storageNode.NodeType == sdtypes.Removed { if storageNode.NodeType == sdtypes.Removed {
// short circuit if it is a Removed node if atomic.LoadUint32(sdi.removedCacheFlag) == 0 {
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present atomic.StoreUint32(sdi.removedCacheFlag, 1)
sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, shared.RemovedNodeMhKey, []byte{})
}
storageModel := models.StorageNodeModel{ storageModel := models.StorageNodeModel{
HeaderID: headerID, BlockNumber: sdi.blockNumber,
StatePath: stateNode.Path, HeaderID: headerID,
Path: storageNode.Path, StatePath: stateNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(), Path: storageNode.Path,
CID: shared.RemovedNodeStorageCID, StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
MhKey: shared.RemovedNodeMhKey, CID: shared.RemovedNodeStorageCID,
NodeType: storageNode.NodeType.Int(), MhKey: shared.RemovedNodeMhKey,
NodeType: storageNode.NodeType.Int(),
} }
sdi.fileWriter.upsertStorageCID(storageModel) sdi.fileWriter.upsertStorageCID(storageModel)
continue continue
} }
storageCIDStr, storageMhKey, err := sdi.fileWriter.upsertIPLDRaw(ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue) storageCIDStr, storageMhKey, err := sdi.fileWriter.upsertIPLDRaw(sdi.blockNumber, ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
if err != nil { if err != nil {
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
} }
storageModel := models.StorageNodeModel{ storageModel := models.StorageNodeModel{
HeaderID: headerID, BlockNumber: sdi.blockNumber,
StatePath: stateNode.Path, HeaderID: headerID,
Path: storageNode.Path, StatePath: stateNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(), Path: storageNode.Path,
CID: storageCIDStr, StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
MhKey: storageMhKey, CID: storageCIDStr,
NodeType: storageNode.NodeType.Int(), MhKey: storageMhKey,
NodeType: storageNode.NodeType.Int(),
} }
sdi.fileWriter.upsertStorageCID(storageModel) sdi.fileWriter.upsertStorageCID(storageModel)
} }
@ -470,7 +488,7 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd
if err != nil { if err != nil {
return fmt.Errorf("error deriving multihash key from codehash: %v", err) return fmt.Errorf("error deriving multihash key from codehash: %v", err)
} }
sdi.fileWriter.upsertIPLDDirect(mhKey, codeAndCodeHash.Code) sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, mhKey, codeAndCodeHash.Code)
return nil return nil
} }

View File

@ -127,34 +127,35 @@ const (
nodeInsert = "INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES " + nodeInsert = "INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES " +
"('%s', '%s', '%s', '%s', %d);\n" "('%s', '%s', '%s', '%s', %d);\n"
ipldInsert = "INSERT INTO public.blocks (key, data) VALUES ('%s', '\\x%x');\n" ipldInsert = "INSERT INTO public.blocks (block_number, key, data) VALUES ('%s', '%s', '\\x%x');\n"
headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, " + headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, " +
"state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES " + "state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %d, '%s');\n" "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %d, '%s');\n"
uncleInsert = "INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES " + uncleInsert = "INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s');\n" "('%s', '%s', '%s', '%s', '%s', '%s', '%s');\n"
txInsert = "INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, " + txInsert = "INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, " +
"value) VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '\\x%x', %d, '%s');\n" "value) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', %d, '%s', '\\x%x', %d, '%s');\n"
alInsert = "INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ('%s', %d, '%s', '%s');\n" alInsert = "INSERT INTO eth.access_list_elements (block_number, tx_id, index, address, storage_keys) VALUES " +
"('%s', '%s', %d, '%s', '%s');\n"
rctInsert = "INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, " + rctInsert = "INSERT INTO eth.receipt_cids (block_number, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, " +
"post_status, log_root) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', %d, '%s');\n" "post_status, log_root) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', %d, '%s');\n"
logInsert = "INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, " + logInsert = "INSERT INTO eth.log_cids (block_number, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, " +
"topic3, log_data) VALUES ('%s', '%s', '%s', '%s', %d, '%s', '%s', '%s', '%s', '\\x%x');\n" "topic3, log_data) VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '%s', '%s', '%s', '\\x%x');\n"
stateInsert = "INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) " + stateInsert = "INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) " +
"VALUES ('%s', '%s', '%s', '\\x%x', %d, %t, '%s');\n" "VALUES ('%s', '%s', '%s', '%s', '\\x%x', %d, %t, '%s');\n"
accountInsert = "INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) " + accountInsert = "INSERT INTO eth.state_accounts (block_number, header_id, state_path, balance, nonce, code_hash, storage_root) " +
"VALUES ('%s', '\\x%x', '%s', %d, '\\x%x', '%s');\n" "VALUES ('%s', '%s', '\\x%x', '%s', %d, '\\x%x', '%s');\n"
storageInsert = "INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, " + storageInsert = "INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, " +
"node_type, diff, mh_key) VALUES ('%s', '\\x%x', '%s', '%s', '\\x%x', %d, %t, '%s');\n" "node_type, diff, mh_key) VALUES ('%s', '%s', '\\x%x', '%s', '%s', '\\x%x', %d, %t, '%s');\n"
) )
func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) { func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) {
@ -162,32 +163,35 @@ func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) {
} }
func (sqw *SQLWriter) upsertIPLD(ipld models.IPLDModel) { func (sqw *SQLWriter) upsertIPLD(ipld models.IPLDModel) {
sqw.stmts <- []byte(fmt.Sprintf(ipldInsert, ipld.Key, ipld.Data)) sqw.stmts <- []byte(fmt.Sprintf(ipldInsert, ipld.BlockNumber, ipld.Key, ipld.Data))
} }
func (sqw *SQLWriter) upsertIPLDDirect(key string, value []byte) { func (sqw *SQLWriter) upsertIPLDDirect(blockNumber, key string, value []byte) {
sqw.upsertIPLD(models.IPLDModel{ sqw.upsertIPLD(models.IPLDModel{
Key: key, BlockNumber: blockNumber,
Data: value, Key: key,
Data: value,
}) })
} }
func (sqw *SQLWriter) upsertIPLDNode(i node.Node) { func (sqw *SQLWriter) upsertIPLDNode(blockNumber string, i node.Node) {
sqw.upsertIPLD(models.IPLDModel{ sqw.upsertIPLD(models.IPLDModel{
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(), BlockNumber: blockNumber,
Data: i.RawData(), Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
Data: i.RawData(),
}) })
} }
func (sqw *SQLWriter) upsertIPLDRaw(codec, mh uint64, raw []byte) (string, string, error) { func (sqw *SQLWriter) upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []byte) (string, string, error) {
c, err := ipld.RawdataToCid(codec, raw, mh) c, err := ipld.RawdataToCid(codec, raw, mh)
if err != nil { if err != nil {
return "", "", err return "", "", err
} }
prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String() prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String()
sqw.upsertIPLD(models.IPLDModel{ sqw.upsertIPLD(models.IPLDModel{
Key: prefixedKey, BlockNumber: blockNumber,
Data: raw, Key: prefixedKey,
Data: raw,
}) })
return c.String(), prefixedKey, err return c.String(), prefixedKey, err
} }
@ -201,31 +205,31 @@ func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
} }
func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) { func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) {
sqw.stmts <- []byte(fmt.Sprintf(uncleInsert, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, sqw.stmts <- []byte(fmt.Sprintf(uncleInsert, uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID,
uncle.Reward, uncle.MhKey)) uncle.Reward, uncle.MhKey))
} }
func (sqw *SQLWriter) upsertTransactionCID(transaction models.TxModel) { func (sqw *SQLWriter) upsertTransactionCID(transaction models.TxModel) {
sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value)) transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value))
indexerMetrics.transactions.Inc(1) indexerMetrics.transactions.Inc(1)
} }
func (sqw *SQLWriter) upsertAccessListElement(accessListElement models.AccessListElementModel) { func (sqw *SQLWriter) upsertAccessListElement(accessListElement models.AccessListElementModel) {
sqw.stmts <- []byte(fmt.Sprintf(alInsert, accessListElement.TxID, accessListElement.Index, accessListElement.Address, sqw.stmts <- []byte(fmt.Sprintf(alInsert, accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address,
formatPostgresStringArray(accessListElement.StorageKeys))) formatPostgresStringArray(accessListElement.StorageKeys)))
indexerMetrics.accessListEntries.Inc(1) indexerMetrics.accessListEntries.Inc(1)
} }
func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) { func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) {
sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.BlockNumber, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey,
rct.PostState, rct.PostStatus, rct.LogRoot)) rct.PostState, rct.PostStatus, rct.LogRoot))
indexerMetrics.receipts.Inc(1) indexerMetrics.receipts.Inc(1)
} }
func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) { func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) {
for _, l := range logs { for _, l := range logs {
sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.BlockNumber, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0,
l.Topic1, l.Topic2, l.Topic3, l.Data)) l.Topic1, l.Topic2, l.Topic3, l.Data))
indexerMetrics.logs.Inc(1) indexerMetrics.logs.Inc(1)
} }
@ -236,12 +240,12 @@ func (sqw *SQLWriter) upsertStateCID(stateNode models.StateNodeModel) {
if stateNode.StateKey != nullHash.String() { if stateNode.StateKey != nullHash.String() {
stateKey = stateNode.StateKey stateKey = stateNode.StateKey
} }
sqw.stmts <- []byte(fmt.Sprintf(stateInsert, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, sqw.stmts <- []byte(fmt.Sprintf(stateInsert, stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path,
stateNode.NodeType, true, stateNode.MhKey)) stateNode.NodeType, true, stateNode.MhKey))
} }
func (sqw *SQLWriter) upsertStateAccount(stateAccount models.StateAccountModel) { func (sqw *SQLWriter) upsertStateAccount(stateAccount models.StateAccountModel) {
sqw.stmts <- []byte(fmt.Sprintf(accountInsert, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, sqw.stmts <- []byte(fmt.Sprintf(accountInsert, stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance,
stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)) stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot))
} }
@ -250,6 +254,6 @@ func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
if storageCID.StorageKey != nullHash.String() { if storageCID.StorageKey != nullHash.String() {
storageKey = storageCID.StorageKey storageKey = storageCID.StorageKey
} }
sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID,
storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)) storageCID.Path, storageCID.NodeType, true, storageCID.MhKey))
} }

View File

@ -18,6 +18,7 @@ package sql
import ( import (
"context" "context"
"sync/atomic"
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help" dshelp "github.com/ipfs/go-ipfs-ds-help"
@ -33,13 +34,14 @@ const startingCacheCapacity = 1024 * 24
// BatchTx wraps a sql tx with the state necessary for building the tx concurrently during trie difference iteration // BatchTx wraps a sql tx with the state necessary for building the tx concurrently during trie difference iteration
type BatchTx struct { type BatchTx struct {
BlockNumber string BlockNumber string
ctx context.Context ctx context.Context
dbtx Tx dbtx Tx
stm string stm string
quit chan struct{} quit chan struct{}
iplds chan models.IPLDModel iplds chan models.IPLDModel
ipldCache models.IPLDBatch ipldCache models.IPLDBatch
removedCacheFlag *uint32
submit func(blockTx *BatchTx, err error) error submit func(blockTx *BatchTx, err error) error
} }
@ -104,6 +106,17 @@ func (tx *BatchTx) cacheRaw(codec, mh uint64, raw []byte) (string, string, error
return c.String(), prefixedKey, err return c.String(), prefixedKey, err
} }
func (tx *BatchTx) cacheRemoved(key string, value []byte) {
if atomic.LoadUint32(tx.removedCacheFlag) == 0 {
atomic.StoreUint32(tx.removedCacheFlag, 1)
tx.iplds <- models.IPLDModel{
BlockNumber: tx.BlockNumber,
Key: key,
Data: value,
}
}
}
// rollback sql transaction and log any error // rollback sql transaction and log any error
func rollback(ctx context.Context, tx Tx) { func rollback(ctx context.Context, tx Tx) {
if err := tx.Rollback(ctx); err != nil { if err := tx.Rollback(ctx); err != nil {

View File

@ -55,14 +55,11 @@ type StateDiffIndexer struct {
ctx context.Context ctx context.Context
chainConfig *params.ChainConfig chainConfig *params.ChainConfig
dbWriter *Writer dbWriter *Writer
blockNumber string
} }
// NewStateDiffIndexer creates a sql implementation of interfaces.StateDiffIndexer // NewStateDiffIndexer creates a sql implementation of interfaces.StateDiffIndexer
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, db Database) (*StateDiffIndexer, error) { func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, db Database) (*StateDiffIndexer, error) {
// Write the removed node to the db on init
if _, err := db.Exec(ctx, db.InsertIPLDStm(), shared.RemovedNodeMhKey, []byte{}); err != nil {
return nil, err
}
return &StateDiffIndexer{ return &StateDiffIndexer{
ctx: ctx, ctx: ctx,
chainConfig: chainConfig, chainConfig: chainConfig,
@ -93,6 +90,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback // Returns an initiated DB transaction which must be Closed via defer to commit or rollback
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
start, t := time.Now(), time.Now() start, t := time.Now(), time.Now()
sdi.blockNumber = block.Number().String()
blockHash := block.Hash() blockHash := block.Hash()
blockHashStr := blockHash.String() blockHashStr := blockHash.String()
height := block.NumberU64() height := block.NumberU64()
@ -140,11 +138,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} }
}() }()
blockTx := &BatchTx{ blockTx := &BatchTx{
ctx: sdi.ctx, removedCacheFlag: new(uint32),
BlockNumber: block.Number().String(), ctx: sdi.ctx,
stm: sdi.dbWriter.db.InsertIPLDsStm(), BlockNumber: sdi.blockNumber,
iplds: make(chan models.IPLDModel), stm: sdi.dbWriter.db.InsertIPLDsStm(),
quit: make(chan struct{}), iplds: make(chan models.IPLDModel),
quit: make(chan struct{}),
ipldCache: models.IPLDBatch{ ipldCache: models.IPLDBatch{
BlockNumbers: make([]string, 0, startingCacheCapacity), BlockNumbers: make([]string, 0, startingCacheCapacity),
Keys: make([]string, 0, startingCacheCapacity), Keys: make([]string, 0, startingCacheCapacity),
@ -204,7 +203,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now() t = time.Now()
// Publish and index uncles // Publish and index uncles
err = sdi.processUncles(blockTx, headerID, block.Number(), uncleNodes) err = sdi.processUncles(blockTx, headerID, block.NumberU64(), uncleNodes)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -253,7 +252,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
CID: headerNode.Cid().String(), CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
ParentHash: header.ParentHash.String(), ParentHash: header.ParentHash.String(),
BlockNumber: header.Number.String(), BlockNumber: sdi.blockNumber,
BlockHash: headerID, BlockHash: headerID,
TotalDifficulty: td.String(), TotalDifficulty: td.String(),
Reward: reward.String(), Reward: reward.String(),
@ -268,7 +267,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
} }
// processUncles publishes and indexes uncle IPLDs in Postgres // processUncles publishes and indexes uncle IPLDs in Postgres
func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber *big.Int, uncleNodes []*ipld2.EthHeader) error { func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error {
// publish and index uncles // publish and index uncles
for _, uncleNode := range uncleNodes { for _, uncleNode := range uncleNodes {
tx.cacheIPLD(uncleNode) tx.cacheIPLD(uncleNode)
@ -277,10 +276,10 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu
if sdi.chainConfig.Clique != nil { if sdi.chainConfig.Clique != nil {
uncleReward = big.NewInt(0) uncleReward = big.NewInt(0)
} else { } else {
uncleReward = shared.CalcUncleMinerReward(blockNumber.Uint64(), uncleNode.Number.Uint64()) uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
} }
uncle := models.UncleModel{ uncle := models.UncleModel{
BlockNumber: blockNumber.String(), BlockNumber: sdi.blockNumber,
HeaderID: headerID, HeaderID: headerID,
CID: uncleNode.Cid().String(), CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
@ -336,7 +335,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
return fmt.Errorf("error deriving tx sender: %v", err) return fmt.Errorf("error deriving tx sender: %v", err)
} }
txModel := models.TxModel{ txModel := models.TxModel{
BlockNumber: args.blockNumber.String(), BlockNumber: sdi.blockNumber,
HeaderID: args.headerID, HeaderID: args.headerID,
Dst: shared.HandleZeroAddrPointer(trx.To()), Dst: shared.HandleZeroAddrPointer(trx.To()),
Src: shared.HandleZeroAddr(from), Src: shared.HandleZeroAddr(from),
@ -359,7 +358,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
storageKeys[k] = storageKey.Hex() storageKeys[k] = storageKey.Hex()
} }
accessListElementModel := models.AccessListElementModel{ accessListElementModel := models.AccessListElementModel{
BlockNumber: args.blockNumber.String(), BlockNumber: sdi.blockNumber,
TxID: txID, TxID: txID,
Index: int64(j), Index: int64(j),
Address: accessListElement.Address.Hex(), Address: accessListElement.Address.Hex(),
@ -383,7 +382,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
} }
rctModel := &models.ReceiptModel{ rctModel := &models.ReceiptModel{
BlockNumber: args.blockNumber.String(), BlockNumber: sdi.blockNumber,
TxID: txID, TxID: txID,
Contract: contract, Contract: contract,
ContractHash: contractHash, ContractHash: contractHash,
@ -414,7 +413,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
} }
logDataSet[idx] = &models.LogsModel{ logDataSet[idx] = &models.LogsModel{
BlockNumber: args.blockNumber.String(), BlockNumber: sdi.blockNumber,
ReceiptID: txID, ReceiptID: txID,
Address: l.Address.String(), Address: l.Address.String(),
Index: int64(l.Index), Index: int64(l.Index),
@ -443,17 +442,16 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
} }
// PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql // PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, blockNumber, headerID string) error { func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error {
tx, ok := batch.(*BatchTx) tx, ok := batch.(*BatchTx)
if !ok { if !ok {
return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch) return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch)
} }
// publish the state node // publish the state node
if stateNode.NodeType == sdtypes.Removed { if stateNode.NodeType == sdtypes.Removed {
// short circuit if it is a Removed node tx.cacheRemoved(shared.RemovedNodeMhKey, []byte{})
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
stateModel := models.StateNodeModel{ stateModel := models.StateNodeModel{
BlockNumber: blockNumber, BlockNumber: sdi.blockNumber,
HeaderID: headerID, HeaderID: headerID,
Path: stateNode.Path, Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(), StateKey: common.BytesToHash(stateNode.LeafKey).String(),
@ -468,7 +466,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
} }
stateModel := models.StateNodeModel{ stateModel := models.StateNodeModel{
BlockNumber: blockNumber, BlockNumber: sdi.blockNumber,
HeaderID: headerID, HeaderID: headerID,
Path: stateNode.Path, Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(), StateKey: common.BytesToHash(stateNode.LeafKey).String(),
@ -494,7 +492,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error decoding state account rlp: %s", err.Error()) return fmt.Errorf("error decoding state account rlp: %s", err.Error())
} }
accountModel := models.StateAccountModel{ accountModel := models.StateAccountModel{
BlockNumber: blockNumber, BlockNumber: sdi.blockNumber,
HeaderID: headerID, HeaderID: headerID,
StatePath: stateNode.Path, StatePath: stateNode.Path,
Balance: account.Balance.String(), Balance: account.Balance.String(),
@ -509,10 +507,9 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// if there are any storage nodes associated with this node, publish and index them // if there are any storage nodes associated with this node, publish and index them
for _, storageNode := range stateNode.StorageNodes { for _, storageNode := range stateNode.StorageNodes {
if storageNode.NodeType == sdtypes.Removed { if storageNode.NodeType == sdtypes.Removed {
// short circuit if it is a Removed node tx.cacheRemoved(shared.RemovedNodeMhKey, []byte{})
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
storageModel := models.StorageNodeModel{ storageModel := models.StorageNodeModel{
BlockNumber: blockNumber, BlockNumber: sdi.blockNumber,
HeaderID: headerID, HeaderID: headerID,
StatePath: stateNode.Path, StatePath: stateNode.Path,
Path: storageNode.Path, Path: storageNode.Path,
@ -531,7 +528,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
} }
storageModel := models.StorageNodeModel{ storageModel := models.StorageNodeModel{
BlockNumber: blockNumber, BlockNumber: sdi.blockNumber,
HeaderID: headerID, HeaderID: headerID,
StatePath: stateNode.Path, StatePath: stateNode.Path,
Path: storageNode.Path, Path: storageNode.Path,

View File

@ -29,7 +29,7 @@ import (
// StateDiffIndexer interface required to index statediff data // StateDiffIndexer interface required to index statediff data
type StateDiffIndexer interface { type StateDiffIndexer interface {
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error)
PushStateNode(tx Batch, stateNode sdtypes.StateNode, blockNumber, headerID string) error PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error
PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error
ReportDBMetrics(delay time.Duration, quit <-chan bool) ReportDBMetrics(delay time.Duration, quit <-chan bool)
io.Closer io.Closer