Merge pull request #248 from deep-stack/pm-disassociate-blocknumber

Disassociate block number from the indexer object
This commit is contained in:
Ashwin Phatak 2022-06-23 10:51:09 +05:30 committed by GitHub
commit 5e3de63be6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 130 additions and 113 deletions

View File

@ -5,7 +5,7 @@ on:
env: env:
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || '382aca8e42bc5e33f301f77cdd2e09cc80602fc3'}} stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || '382aca8e42bc5e33f301f77cdd2e09cc80602fc3'}}
ipld-eth-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || '48eb594ea95763bda8e51590f105f7a2657ac6d4' }} ipld-eth-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || '65b7bee7a6757c1fc527c8bfdc4f99ab915fcf36' }}
GOPATH: /tmp/go GOPATH: /tmp/go
jobs: jobs:

View File

@ -30,7 +30,7 @@ import (
// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration // BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration
type BatchTx struct { type BatchTx struct {
BlockNumber uint64 BlockNumber string
dump io.Writer dump io.Writer
quit chan struct{} quit chan struct{}
iplds chan models.IPLDModel iplds chan models.IPLDModel
@ -68,15 +68,17 @@ func (tx *BatchTx) cache() {
func (tx *BatchTx) cacheDirect(key string, value []byte) { func (tx *BatchTx) cacheDirect(key string, value []byte) {
tx.iplds <- models.IPLDModel{ tx.iplds <- models.IPLDModel{
Key: key, BlockNumber: tx.BlockNumber,
Data: value, Key: key,
Data: value,
} }
} }
func (tx *BatchTx) cacheIPLD(i node.Node) { func (tx *BatchTx) cacheIPLD(i node.Node) {
tx.iplds <- models.IPLDModel{ tx.iplds <- models.IPLDModel{
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(), BlockNumber: tx.BlockNumber,
Data: i.RawData(), Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
Data: i.RawData(),
} }
} }
@ -87,8 +89,9 @@ func (tx *BatchTx) cacheRaw(codec, mh uint64, raw []byte) (string, string, error
} }
prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String() prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String()
tx.iplds <- models.IPLDModel{ tx.iplds <- models.IPLDModel{
Key: prefixedKey, BlockNumber: tx.BlockNumber,
Data: raw, Key: prefixedKey,
Data: raw,
} }
return c.String(), prefixedKey, err return c.String(), prefixedKey, err
} }

View File

@ -102,7 +102,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
t = time.Now() t = time.Now()
blockTx := &BatchTx{ blockTx := &BatchTx{
BlockNumber: height, BlockNumber: block.Number().String(),
dump: sdi.dump, dump: sdi.dump,
iplds: make(chan models.IPLDModel), iplds: make(chan models.IPLDModel),
quit: make(chan struct{}), quit: make(chan struct{}),
@ -146,7 +146,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now() t = time.Now()
// Publish and index uncles // Publish and index uncles
err = sdi.processUncles(blockTx, headerID, height, uncleNodes) err = sdi.processUncles(blockTx, headerID, block.Number(), uncleNodes)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -206,7 +206,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
} }
// processUncles publishes and indexes uncle IPLDs in Postgres // processUncles publishes and indexes uncle IPLDs in Postgres
func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error { func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber *big.Int, uncleNodes []*ipld2.EthHeader) error {
// publish and index uncles // publish and index uncles
for _, uncleNode := range uncleNodes { for _, uncleNode := range uncleNodes {
tx.cacheIPLD(uncleNode) tx.cacheIPLD(uncleNode)
@ -215,15 +215,16 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu
if sdi.chainConfig.Clique != nil { if sdi.chainConfig.Clique != nil {
uncleReward = big.NewInt(0) uncleReward = big.NewInt(0)
} else { } else {
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) uncleReward = shared.CalcUncleMinerReward(blockNumber.Uint64(), uncleNode.Number.Uint64())
} }
uncle := models.UncleModel{ uncle := models.UncleModel{
HeaderID: headerID, BlockNumber: blockNumber.String(),
CID: uncleNode.Cid().String(), HeaderID: headerID,
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), CID: uncleNode.Cid().String(),
ParentHash: uncleNode.ParentHash.String(), MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
BlockHash: uncleNode.Hash().String(), ParentHash: uncleNode.ParentHash.String(),
Reward: uncleReward.String(), BlockHash: uncleNode.Hash().String(),
Reward: uncleReward.String(),
} }
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", uncle); err != nil { if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", uncle); err != nil {
return err return err
@ -274,16 +275,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
return fmt.Errorf("error deriving tx sender: %v", err) return fmt.Errorf("error deriving tx sender: %v", err)
} }
txModel := models.TxModel{ txModel := models.TxModel{
HeaderID: args.headerID, BlockNumber: args.blockNumber.String(),
Dst: shared.HandleZeroAddrPointer(trx.To()), HeaderID: args.headerID,
Src: shared.HandleZeroAddr(from), Dst: shared.HandleZeroAddrPointer(trx.To()),
TxHash: trxID, Src: shared.HandleZeroAddr(from),
Index: int64(i), TxHash: trxID,
Data: trx.Data(), Index: int64(i),
CID: txNode.Cid().String(), Data: trx.Data(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()), CID: txNode.Cid().String(),
Type: trx.Type(), MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Value: val, Type: trx.Type(),
Value: val,
} }
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", txModel); err != nil { if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", txModel); err != nil {
return err return err
@ -296,6 +298,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
storageKeys[k] = storageKey.Hex() storageKeys[k] = storageKey.Hex()
} }
accessListElementModel := models.AccessListElementModel{ accessListElementModel := models.AccessListElementModel{
BlockNumber: args.blockNumber.String(),
TxID: trxID, TxID: trxID,
Index: int64(j), Index: int64(j),
Address: accessListElement.Address.Hex(), Address: accessListElement.Address.Hex(),
@ -319,6 +322,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
} }
rctModel := &models.ReceiptModel{ rctModel := &models.ReceiptModel{
BlockNumber: args.blockNumber.String(),
TxID: trxID, TxID: trxID,
Contract: contract, Contract: contract,
ContractHash: contractHash, ContractHash: contractHash,
@ -348,16 +352,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
} }
logDataSet[idx] = &models.LogsModel{ logDataSet[idx] = &models.LogsModel{
ReceiptID: trxID, BlockNumber: args.blockNumber.String(),
Address: l.Address.String(), ReceiptID: trxID,
Index: int64(l.Index), Address: l.Address.String(),
Data: l.Data, Index: int64(l.Index),
LeafCID: args.logLeafNodeCIDs[i][idx].String(), Data: l.Data,
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), LeafCID: args.logLeafNodeCIDs[i][idx].String(),
Topic0: topicSet[0], LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
Topic1: topicSet[1], Topic0: topicSet[0],
Topic2: topicSet[2], Topic1: topicSet[1],
Topic3: topicSet[3], Topic2: topicSet[2],
Topic3: topicSet[3],
} }
} }
@ -379,7 +384,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error {
tx, ok := batch.(*BatchTx) tx, ok := batch.(*BatchTx)
if !ok { if !ok {
return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch) return fmt.Errorf("dump: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
} }
// publish the state node // publish the state node
var stateModel models.StateNodeModel var stateModel models.StateNodeModel
@ -387,12 +392,13 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node // short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present // this assumes the db has been initialized and a public.blocks entry for the Removed node is present
stateModel = models.StateNodeModel{ stateModel = models.StateNodeModel{
HeaderID: headerID, BlockNumber: tx.BlockNumber,
Path: stateNode.Path, HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.LeafKey).String(), Path: stateNode.Path,
CID: shared.RemovedNodeStateCID, StateKey: common.BytesToHash(stateNode.LeafKey).String(),
MhKey: shared.RemovedNodeMhKey, CID: shared.RemovedNodeStateCID,
NodeType: stateNode.NodeType.Int(), MhKey: shared.RemovedNodeMhKey,
NodeType: stateNode.NodeType.Int(),
} }
} else { } else {
stateCIDStr, stateMhKey, err := tx.cacheRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) stateCIDStr, stateMhKey, err := tx.cacheRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
@ -400,12 +406,13 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
} }
stateModel = models.StateNodeModel{ stateModel = models.StateNodeModel{
HeaderID: headerID, BlockNumber: tx.BlockNumber,
Path: stateNode.Path, HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.LeafKey).String(), Path: stateNode.Path,
CID: stateCIDStr, StateKey: common.BytesToHash(stateNode.LeafKey).String(),
MhKey: stateMhKey, CID: stateCIDStr,
NodeType: stateNode.NodeType.Int(), MhKey: stateMhKey,
NodeType: stateNode.NodeType.Int(),
} }
} }
@ -428,6 +435,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error decoding state account rlp: %s", err.Error()) return fmt.Errorf("error decoding state account rlp: %s", err.Error())
} }
accountModel := models.StateAccountModel{ accountModel := models.StateAccountModel{
BlockNumber: tx.BlockNumber,
HeaderID: headerID, HeaderID: headerID,
StatePath: stateNode.Path, StatePath: stateNode.Path,
Balance: account.Balance.String(), Balance: account.Balance.String(),
@ -446,13 +454,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node // short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present // this assumes the db has been initialized and a public.blocks entry for the Removed node is present
storageModel := models.StorageNodeModel{ storageModel := models.StorageNodeModel{
HeaderID: headerID, BlockNumber: tx.BlockNumber,
StatePath: stateNode.Path, HeaderID: headerID,
Path: storageNode.Path, StatePath: stateNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(), Path: storageNode.Path,
CID: shared.RemovedNodeStorageCID, StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
MhKey: shared.RemovedNodeMhKey, CID: shared.RemovedNodeStorageCID,
NodeType: storageNode.NodeType.Int(), MhKey: shared.RemovedNodeMhKey,
NodeType: storageNode.NodeType.Int(),
} }
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", storageModel); err != nil { if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", storageModel); err != nil {
return err return err
@ -464,13 +473,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
} }
storageModel := models.StorageNodeModel{ storageModel := models.StorageNodeModel{
HeaderID: headerID, BlockNumber: tx.BlockNumber,
StatePath: stateNode.Path, HeaderID: headerID,
Path: storageNode.Path, StatePath: stateNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(), Path: storageNode.Path,
CID: storageCIDStr, StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
MhKey: storageMhKey, CID: storageCIDStr,
NodeType: storageNode.NodeType.Int(), MhKey: storageMhKey,
NodeType: storageNode.NodeType.Int(),
} }
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", storageModel); err != nil { if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", storageModel); err != nil {
return err return err
@ -484,7 +494,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error { func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error {
tx, ok := batch.(*BatchTx) tx, ok := batch.(*BatchTx)
if !ok { if !ok {
return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch) return fmt.Errorf("dump: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
} }
// codec doesn't matter since db key is multihash-based // codec doesn't matter since db key is multihash-based
mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash) mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash)

View File

@ -64,7 +64,6 @@ type StateDiffIndexer struct {
chainConfig *params.ChainConfig chainConfig *params.ChainConfig
nodeID string nodeID string
wg *sync.WaitGroup wg *sync.WaitGroup
blockNumber string
removedCacheFlag *uint32 removedCacheFlag *uint32
watchedAddressesFilePath string watchedAddressesFilePath string
@ -111,7 +110,6 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {}
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback // Returns an initiated DB transaction which must be Closed via defer to commit or rollback
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
sdi.removedCacheFlag = new(uint32) sdi.removedCacheFlag = new(uint32)
sdi.blockNumber = block.Number().String()
start, t := time.Now(), time.Now() start, t := time.Now(), time.Now()
blockHash := block.Hash() blockHash := block.Hash()
blockHashStr := blockHash.String() blockHashStr := blockHash.String()
@ -147,7 +145,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
t = time.Now() t = time.Now()
blockTx := &BatchTx{ blockTx := &BatchTx{
BlockNumber: sdi.blockNumber, BlockNumber: block.Number().String(),
submit: func(self *BatchTx, err error) error { submit: func(self *BatchTx, err error) error {
tDiff := time.Since(t) tDiff := time.Since(t)
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff) indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
@ -175,7 +173,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
t = time.Now() t = time.Now()
// write uncles // write uncles
sdi.processUncles(headerID, height, uncleNodes) sdi.processUncles(headerID, block.Number(), uncleNodes)
tDiff = time.Since(t) tDiff = time.Since(t)
indexerMetrics.tUncleProcessing.Update(tDiff) indexerMetrics.tUncleProcessing.Update(tDiff)
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
@ -209,7 +207,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
// processHeader write a header IPLD insert SQL stmt to a file // processHeader write a header IPLD insert SQL stmt to a file
// it returns the headerID // it returns the headerID
func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node.Node, reward, td *big.Int) string { func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node.Node, reward, td *big.Int) string {
sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, headerNode) sdi.fileWriter.upsertIPLDNode(header.Number.String(), headerNode)
var baseFee *string var baseFee *string
if header.BaseFee != nil { if header.BaseFee != nil {
@ -222,7 +220,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node
CID: headerNode.Cid().String(), CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
ParentHash: header.ParentHash.String(), ParentHash: header.ParentHash.String(),
BlockNumber: sdi.blockNumber, BlockNumber: header.Number.String(),
BlockHash: headerID, BlockHash: headerID,
TotalDifficulty: td.String(), TotalDifficulty: td.String(),
Reward: reward.String(), Reward: reward.String(),
@ -238,19 +236,19 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node
} }
// processUncles writes uncle IPLD insert SQL stmts to a file // processUncles writes uncle IPLD insert SQL stmts to a file
func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) { func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber *big.Int, uncleNodes []*ipld2.EthHeader) {
// publish and index uncles // publish and index uncles
for _, uncleNode := range uncleNodes { for _, uncleNode := range uncleNodes {
sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, uncleNode) sdi.fileWriter.upsertIPLDNode(blockNumber.String(), uncleNode)
var uncleReward *big.Int var uncleReward *big.Int
// in PoA networks uncle reward is 0 // in PoA networks uncle reward is 0
if sdi.chainConfig.Clique != nil { if sdi.chainConfig.Clique != nil {
uncleReward = big.NewInt(0) uncleReward = big.NewInt(0)
} else { } else {
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) uncleReward = shared.CalcUncleMinerReward(blockNumber.Uint64(), uncleNode.Number.Uint64())
} }
sdi.fileWriter.upsertUncleCID(models.UncleModel{ sdi.fileWriter.upsertUncleCID(models.UncleModel{
BlockNumber: sdi.blockNumber, BlockNumber: blockNumber.String(),
HeaderID: headerID, HeaderID: headerID,
CID: uncleNode.Cid().String(), CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
@ -282,10 +280,10 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber) signer := types.MakeSigner(sdi.chainConfig, args.blockNumber)
for i, receipt := range args.receipts { for i, receipt := range args.receipts {
for _, logTrieNode := range args.logTrieNodes[i] { for _, logTrieNode := range args.logTrieNodes[i] {
sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, logTrieNode) sdi.fileWriter.upsertIPLDNode(args.blockNumber.String(), logTrieNode)
} }
txNode := args.txNodes[i] txNode := args.txNodes[i]
sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, txNode) sdi.fileWriter.upsertIPLDNode(args.blockNumber.String(), txNode)
// index tx // index tx
trx := args.txs[i] trx := args.txs[i]
@ -302,7 +300,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
return fmt.Errorf("error deriving tx sender: %v", err) return fmt.Errorf("error deriving tx sender: %v", err)
} }
txModel := models.TxModel{ txModel := models.TxModel{
BlockNumber: sdi.blockNumber, BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID, HeaderID: args.headerID,
Dst: shared.HandleZeroAddrPointer(trx.To()), Dst: shared.HandleZeroAddrPointer(trx.To()),
Src: shared.HandleZeroAddr(from), Src: shared.HandleZeroAddr(from),
@ -323,7 +321,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
storageKeys[k] = storageKey.Hex() storageKeys[k] = storageKey.Hex()
} }
accessListElementModel := models.AccessListElementModel{ accessListElementModel := models.AccessListElementModel{
BlockNumber: sdi.blockNumber, BlockNumber: args.blockNumber.String(),
TxID: txID, TxID: txID,
Index: int64(j), Index: int64(j),
Address: accessListElement.Address.Hex(), Address: accessListElement.Address.Hex(),
@ -345,7 +343,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
} }
rctModel := &models.ReceiptModel{ rctModel := &models.ReceiptModel{
BlockNumber: sdi.blockNumber, BlockNumber: args.blockNumber.String(),
TxID: txID, TxID: txID,
Contract: contract, Contract: contract,
ContractHash: contractHash, ContractHash: contractHash,
@ -373,7 +371,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
} }
logDataSet[idx] = &models.LogsModel{ logDataSet[idx] = &models.LogsModel{
BlockNumber: sdi.blockNumber, BlockNumber: args.blockNumber.String(),
ReceiptID: txID, ReceiptID: txID,
Address: l.Address.String(), Address: l.Address.String(),
Index: int64(l.Index), Index: int64(l.Index),
@ -391,8 +389,8 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
// publish trie nodes, these aren't indexed directly // publish trie nodes, these aren't indexed directly
for i, n := range args.txTrieNodes { for i, n := range args.txTrieNodes {
sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, n) sdi.fileWriter.upsertIPLDNode(args.blockNumber.String(), n)
sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, args.rctTrieNodes[i]) sdi.fileWriter.upsertIPLDNode(args.blockNumber.String(), args.rctTrieNodes[i])
} }
return nil return nil
@ -400,15 +398,19 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
// PushStateNode writes a state diff node object (including any child storage nodes) IPLD insert SQL stmt to a file // PushStateNode writes a state diff node object (including any child storage nodes) IPLD insert SQL stmt to a file
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error {
tx, ok := batch.(*BatchTx)
if !ok {
return fmt.Errorf("file: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
}
// publish the state node // publish the state node
var stateModel models.StateNodeModel var stateModel models.StateNodeModel
if stateNode.NodeType == sdtypes.Removed { if stateNode.NodeType == sdtypes.Removed {
if atomic.LoadUint32(sdi.removedCacheFlag) == 0 { if atomic.LoadUint32(sdi.removedCacheFlag) == 0 {
atomic.StoreUint32(sdi.removedCacheFlag, 1) atomic.StoreUint32(sdi.removedCacheFlag, 1)
sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, shared.RemovedNodeMhKey, []byte{}) sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeMhKey, []byte{})
} }
stateModel = models.StateNodeModel{ stateModel = models.StateNodeModel{
BlockNumber: sdi.blockNumber, BlockNumber: tx.BlockNumber,
HeaderID: headerID, HeaderID: headerID,
Path: stateNode.Path, Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(), StateKey: common.BytesToHash(stateNode.LeafKey).String(),
@ -417,12 +419,12 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
NodeType: stateNode.NodeType.Int(), NodeType: stateNode.NodeType.Int(),
} }
} else { } else {
stateCIDStr, stateMhKey, err := sdi.fileWriter.upsertIPLDRaw(sdi.blockNumber, ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) stateCIDStr, stateMhKey, err := sdi.fileWriter.upsertIPLDRaw(tx.BlockNumber, ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
if err != nil { if err != nil {
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
} }
stateModel = models.StateNodeModel{ stateModel = models.StateNodeModel{
BlockNumber: sdi.blockNumber, BlockNumber: tx.BlockNumber,
HeaderID: headerID, HeaderID: headerID,
Path: stateNode.Path, Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(), StateKey: common.BytesToHash(stateNode.LeafKey).String(),
@ -449,7 +451,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error decoding state account rlp: %s", err.Error()) return fmt.Errorf("error decoding state account rlp: %s", err.Error())
} }
accountModel := models.StateAccountModel{ accountModel := models.StateAccountModel{
BlockNumber: sdi.blockNumber, BlockNumber: tx.BlockNumber,
HeaderID: headerID, HeaderID: headerID,
StatePath: stateNode.Path, StatePath: stateNode.Path,
Balance: account.Balance.String(), Balance: account.Balance.String(),
@ -465,10 +467,10 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
if storageNode.NodeType == sdtypes.Removed { if storageNode.NodeType == sdtypes.Removed {
if atomic.LoadUint32(sdi.removedCacheFlag) == 0 { if atomic.LoadUint32(sdi.removedCacheFlag) == 0 {
atomic.StoreUint32(sdi.removedCacheFlag, 1) atomic.StoreUint32(sdi.removedCacheFlag, 1)
sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, shared.RemovedNodeMhKey, []byte{}) sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeMhKey, []byte{})
} }
storageModel := models.StorageNodeModel{ storageModel := models.StorageNodeModel{
BlockNumber: sdi.blockNumber, BlockNumber: tx.BlockNumber,
HeaderID: headerID, HeaderID: headerID,
StatePath: stateNode.Path, StatePath: stateNode.Path,
Path: storageNode.Path, Path: storageNode.Path,
@ -480,12 +482,12 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
sdi.fileWriter.upsertStorageCID(storageModel) sdi.fileWriter.upsertStorageCID(storageModel)
continue continue
} }
storageCIDStr, storageMhKey, err := sdi.fileWriter.upsertIPLDRaw(sdi.blockNumber, ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue) storageCIDStr, storageMhKey, err := sdi.fileWriter.upsertIPLDRaw(tx.BlockNumber, ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
if err != nil { if err != nil {
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
} }
storageModel := models.StorageNodeModel{ storageModel := models.StorageNodeModel{
BlockNumber: sdi.blockNumber, BlockNumber: tx.BlockNumber,
HeaderID: headerID, HeaderID: headerID,
StatePath: stateNode.Path, StatePath: stateNode.Path,
Path: storageNode.Path, Path: storageNode.Path,
@ -502,12 +504,16 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// PushCodeAndCodeHash writes code and codehash pairs insert SQL stmts to a file // PushCodeAndCodeHash writes code and codehash pairs insert SQL stmts to a file
func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error { func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error {
tx, ok := batch.(*BatchTx)
if !ok {
return fmt.Errorf("file: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
}
// codec doesn't matter since db key is multihash-based // codec doesn't matter since db key is multihash-based
mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash) mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash)
if err != nil { if err != nil {
return fmt.Errorf("error deriving multihash key from codehash: %v", err) return fmt.Errorf("error deriving multihash key from codehash: %v", err)
} }
sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, mhKey, codeAndCodeHash.Code) sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, mhKey, codeAndCodeHash.Code)
return nil return nil
} }

View File

@ -55,7 +55,6 @@ type StateDiffIndexer struct {
ctx context.Context ctx context.Context
chainConfig *params.ChainConfig chainConfig *params.ChainConfig
dbWriter *Writer dbWriter *Writer
blockNumber string
} }
// NewStateDiffIndexer creates a sql implementation of interfaces.StateDiffIndexer // NewStateDiffIndexer creates a sql implementation of interfaces.StateDiffIndexer
@ -90,7 +89,6 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback // Returns an initiated DB transaction which must be Closed via defer to commit or rollback
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
start, t := time.Now(), time.Now() start, t := time.Now(), time.Now()
sdi.blockNumber = block.Number().String()
blockHash := block.Hash() blockHash := block.Hash()
blockHashStr := blockHash.String() blockHashStr := blockHash.String()
height := block.NumberU64() height := block.NumberU64()
@ -140,7 +138,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
blockTx := &BatchTx{ blockTx := &BatchTx{
removedCacheFlag: new(uint32), removedCacheFlag: new(uint32),
ctx: sdi.ctx, ctx: sdi.ctx,
BlockNumber: sdi.blockNumber, BlockNumber: block.Number().String(),
stm: sdi.dbWriter.db.InsertIPLDsStm(), stm: sdi.dbWriter.db.InsertIPLDsStm(),
iplds: make(chan models.IPLDModel), iplds: make(chan models.IPLDModel),
quit: make(chan struct{}), quit: make(chan struct{}),
@ -203,7 +201,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now() t = time.Now()
// Publish and index uncles // Publish and index uncles
err = sdi.processUncles(blockTx, headerID, block.NumberU64(), uncleNodes) err = sdi.processUncles(blockTx, headerID, block.Number(), uncleNodes)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -252,7 +250,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
CID: headerNode.Cid().String(), CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
ParentHash: header.ParentHash.String(), ParentHash: header.ParentHash.String(),
BlockNumber: sdi.blockNumber, BlockNumber: header.Number.String(),
BlockHash: headerID, BlockHash: headerID,
TotalDifficulty: td.String(), TotalDifficulty: td.String(),
Reward: reward.String(), Reward: reward.String(),
@ -267,7 +265,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
} }
// processUncles publishes and indexes uncle IPLDs in Postgres // processUncles publishes and indexes uncle IPLDs in Postgres
func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error { func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber *big.Int, uncleNodes []*ipld2.EthHeader) error {
// publish and index uncles // publish and index uncles
for _, uncleNode := range uncleNodes { for _, uncleNode := range uncleNodes {
tx.cacheIPLD(uncleNode) tx.cacheIPLD(uncleNode)
@ -276,10 +274,10 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu
if sdi.chainConfig.Clique != nil { if sdi.chainConfig.Clique != nil {
uncleReward = big.NewInt(0) uncleReward = big.NewInt(0)
} else { } else {
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) uncleReward = shared.CalcUncleMinerReward(blockNumber.Uint64(), uncleNode.Number.Uint64())
} }
uncle := models.UncleModel{ uncle := models.UncleModel{
BlockNumber: sdi.blockNumber, BlockNumber: blockNumber.String(),
HeaderID: headerID, HeaderID: headerID,
CID: uncleNode.Cid().String(), CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
@ -335,7 +333,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
return fmt.Errorf("error deriving tx sender: %v", err) return fmt.Errorf("error deriving tx sender: %v", err)
} }
txModel := models.TxModel{ txModel := models.TxModel{
BlockNumber: sdi.blockNumber, BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID, HeaderID: args.headerID,
Dst: shared.HandleZeroAddrPointer(trx.To()), Dst: shared.HandleZeroAddrPointer(trx.To()),
Src: shared.HandleZeroAddr(from), Src: shared.HandleZeroAddr(from),
@ -358,7 +356,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
storageKeys[k] = storageKey.Hex() storageKeys[k] = storageKey.Hex()
} }
accessListElementModel := models.AccessListElementModel{ accessListElementModel := models.AccessListElementModel{
BlockNumber: sdi.blockNumber, BlockNumber: args.blockNumber.String(),
TxID: txID, TxID: txID,
Index: int64(j), Index: int64(j),
Address: accessListElement.Address.Hex(), Address: accessListElement.Address.Hex(),
@ -382,7 +380,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
} }
rctModel := &models.ReceiptModel{ rctModel := &models.ReceiptModel{
BlockNumber: sdi.blockNumber, BlockNumber: args.blockNumber.String(),
TxID: txID, TxID: txID,
Contract: contract, Contract: contract,
ContractHash: contractHash, ContractHash: contractHash,
@ -413,7 +411,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
} }
logDataSet[idx] = &models.LogsModel{ logDataSet[idx] = &models.LogsModel{
BlockNumber: sdi.blockNumber, BlockNumber: args.blockNumber.String(),
ReceiptID: txID, ReceiptID: txID,
Address: l.Address.String(), Address: l.Address.String(),
Index: int64(l.Index), Index: int64(l.Index),
@ -445,14 +443,14 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error {
tx, ok := batch.(*BatchTx) tx, ok := batch.(*BatchTx)
if !ok { if !ok {
return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch) return fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
} }
// publish the state node // publish the state node
var stateModel models.StateNodeModel var stateModel models.StateNodeModel
if stateNode.NodeType == sdtypes.Removed { if stateNode.NodeType == sdtypes.Removed {
tx.cacheRemoved(shared.RemovedNodeMhKey, []byte{}) tx.cacheRemoved(shared.RemovedNodeMhKey, []byte{})
stateModel = models.StateNodeModel{ stateModel = models.StateNodeModel{
BlockNumber: sdi.blockNumber, BlockNumber: tx.BlockNumber,
HeaderID: headerID, HeaderID: headerID,
Path: stateNode.Path, Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(), StateKey: common.BytesToHash(stateNode.LeafKey).String(),
@ -466,7 +464,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
} }
stateModel = models.StateNodeModel{ stateModel = models.StateNodeModel{
BlockNumber: sdi.blockNumber, BlockNumber: tx.BlockNumber,
HeaderID: headerID, HeaderID: headerID,
Path: stateNode.Path, Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(), StateKey: common.BytesToHash(stateNode.LeafKey).String(),
@ -495,7 +493,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error decoding state account rlp: %s", err.Error()) return fmt.Errorf("error decoding state account rlp: %s", err.Error())
} }
accountModel := models.StateAccountModel{ accountModel := models.StateAccountModel{
BlockNumber: sdi.blockNumber, BlockNumber: tx.BlockNumber,
HeaderID: headerID, HeaderID: headerID,
StatePath: stateNode.Path, StatePath: stateNode.Path,
Balance: account.Balance.String(), Balance: account.Balance.String(),
@ -513,7 +511,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
if storageNode.NodeType == sdtypes.Removed { if storageNode.NodeType == sdtypes.Removed {
tx.cacheRemoved(shared.RemovedNodeMhKey, []byte{}) tx.cacheRemoved(shared.RemovedNodeMhKey, []byte{})
storageModel := models.StorageNodeModel{ storageModel := models.StorageNodeModel{
BlockNumber: sdi.blockNumber, BlockNumber: tx.BlockNumber,
HeaderID: headerID, HeaderID: headerID,
StatePath: stateNode.Path, StatePath: stateNode.Path,
Path: storageNode.Path, Path: storageNode.Path,
@ -532,7 +530,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
} }
storageModel := models.StorageNodeModel{ storageModel := models.StorageNodeModel{
BlockNumber: sdi.blockNumber, BlockNumber: tx.BlockNumber,
HeaderID: headerID, HeaderID: headerID,
StatePath: stateNode.Path, StatePath: stateNode.Path,
Path: storageNode.Path, Path: storageNode.Path,
@ -553,7 +551,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error { func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error {
tx, ok := batch.(*BatchTx) tx, ok := batch.(*BatchTx)
if !ok { if !ok {
return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch) return fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
} }
// codec doesn't matter since db key is multihash-based // codec doesn't matter since db key is multihash-based
mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash) mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash)