cleanup
This commit is contained in:
parent
f4392d1009
commit
9f973d5cbf
@ -68,9 +68,9 @@ type BlockTx struct {
|
||||
Close func() error
|
||||
}
|
||||
|
||||
// Pushes and indexes block data in database, excluding state & storage nodes (header, uncles, transactions & receipts)
|
||||
// Pushes and indexes block data in database, except state & storage nodes (includes header, uncles, transactions & receipts)
|
||||
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
|
||||
func (sdt *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) {
|
||||
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) {
|
||||
start, t := time.Now(), time.Now()
|
||||
blockHash := block.Hash()
|
||||
blockHashStr := blockHash.String()
|
||||
@ -78,7 +78,7 @@ func (sdt *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHashStr)
|
||||
transactions := block.Transactions()
|
||||
// Derive any missing fields
|
||||
if err := receipts.DeriveFields(sdt.chainConfig, blockHash, height, transactions); err != nil {
|
||||
if err := receipts.DeriveFields(sdi.chainConfig, blockHash, height, transactions); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Generate the block iplds
|
||||
@ -94,7 +94,7 @@ func (sdt *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
traceMsg += fmt.Sprintf("payload decoding duration: %s\r\n", time.Now().Sub(t).String())
|
||||
t = time.Now()
|
||||
// Begin new db tx for everything
|
||||
tx, err := sdt.dbWriter.db.Beginx()
|
||||
tx, err := sdi.dbWriter.db.Beginx()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -119,20 +119,20 @@ func (sdt *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
t = time.Now()
|
||||
|
||||
// Publish and index header, collect headerID
|
||||
headerID, err := sdt.processHeader(tx, block.Header(), headerNode, reward, totalDifficulty)
|
||||
headerID, err := sdi.processHeader(tx, block.Header(), headerNode, reward, totalDifficulty)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
traceMsg += fmt.Sprintf("header processing duration: %s\r\n", time.Now().Sub(t).String())
|
||||
t = time.Now()
|
||||
// Publish and index uncles
|
||||
if err := sdt.processUncles(tx, headerID, height, uncleNodes); err != nil {
|
||||
if err := sdi.processUncles(tx, headerID, height, uncleNodes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
traceMsg += fmt.Sprintf("uncle processing duration: %s\r\n", time.Now().Sub(t).String())
|
||||
t = time.Now()
|
||||
// Publish and index receipts and txs
|
||||
if err := sdt.processReceiptsAndTxs(tx, processArgs{
|
||||
if err := sdi.processReceiptsAndTxs(tx, processArgs{
|
||||
headerID: headerID,
|
||||
blockNumber: block.Number(),
|
||||
receipts: receipts,
|
||||
@ -153,13 +153,13 @@ func (sdt *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
|
||||
// processHeader publishes and indexes a header IPLD in Postgres
|
||||
// it returns the headerID
|
||||
func (sdt *StateDiffIndexer) processHeader(tx *sqlx.Tx, header *types.Header, headerNode node.Node, reward, td *big.Int) (int64, error) {
|
||||
func (sdi *StateDiffIndexer) processHeader(tx *sqlx.Tx, header *types.Header, headerNode node.Node, reward, td *big.Int) (int64, error) {
|
||||
// publish header
|
||||
if err := shared.PublishIPLD(tx, headerNode); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// index header
|
||||
return sdt.dbWriter.upsertHeaderCID(tx, models.HeaderModel{
|
||||
return sdi.dbWriter.upsertHeaderCID(tx, models.HeaderModel{
|
||||
CID: headerNode.Cid().String(),
|
||||
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
|
||||
ParentHash: header.ParentHash.String(),
|
||||
@ -176,7 +176,7 @@ func (sdt *StateDiffIndexer) processHeader(tx *sqlx.Tx, header *types.Header, he
|
||||
})
|
||||
}
|
||||
|
||||
func (sdt *StateDiffIndexer) processUncles(tx *sqlx.Tx, headerID int64, blockNumber uint64, uncleNodes []*ipld.EthHeader) error {
|
||||
func (sdi *StateDiffIndexer) processUncles(tx *sqlx.Tx, headerID int64, blockNumber uint64, uncleNodes []*ipld.EthHeader) error {
|
||||
// publish and index uncles
|
||||
for _, uncleNode := range uncleNodes {
|
||||
if err := shared.PublishIPLD(tx, uncleNode); err != nil {
|
||||
@ -190,7 +190,7 @@ func (sdt *StateDiffIndexer) processUncles(tx *sqlx.Tx, headerID int64, blockNum
|
||||
BlockHash: uncleNode.Hash().String(),
|
||||
Reward: uncleReward.String(),
|
||||
}
|
||||
if err := sdt.dbWriter.upsertUncleCID(tx, uncle, headerID); err != nil {
|
||||
if err := sdi.dbWriter.upsertUncleCID(tx, uncle, headerID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -210,9 +210,9 @@ type processArgs struct {
|
||||
}
|
||||
|
||||
// processReceiptsAndTxs publishes and indexes receipt and transaction IPLDs in Postgres
|
||||
func (sdt *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs) error {
|
||||
func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs) error {
|
||||
// Process receipts and txs
|
||||
signer := types.MakeSigner(sdt.chainConfig, args.blockNumber)
|
||||
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber)
|
||||
for i, receipt := range args.receipts {
|
||||
// tx that corresponds with this receipt
|
||||
trx := args.txs[i]
|
||||
@ -278,7 +278,7 @@ func (sdt *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs
|
||||
CID: txNode.Cid().String(),
|
||||
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
|
||||
}
|
||||
txID, err := sdt.dbWriter.upsertTransactionCID(tx, txModel, args.headerID)
|
||||
txID, err := sdi.dbWriter.upsertTransactionCID(tx, txModel, args.headerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -294,14 +294,14 @@ func (sdt *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs
|
||||
CID: rctNode.Cid().String(),
|
||||
MhKey: shared.MultihashKeyFromCID(rctNode.Cid()),
|
||||
}
|
||||
if err := sdt.dbWriter.upsertReceiptCID(tx, rctModel, txID); err != nil {
|
||||
if err := sdi.dbWriter.upsertReceiptCID(tx, rctModel, txID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sdt *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error {
|
||||
func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error {
|
||||
// publish the state node
|
||||
stateCIDStr, err := shared.PublishRaw(tx.dbtx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
|
||||
if err != nil {
|
||||
@ -316,7 +316,7 @@ func (sdt *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN
|
||||
NodeType: ResolveFromNodeType(stateNode.NodeType),
|
||||
}
|
||||
// index the state node, collect the stateID to reference by FK
|
||||
stateID, err := sdt.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID)
|
||||
stateID, err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -339,7 +339,7 @@ func (sdt *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN
|
||||
CodeHash: account.CodeHash,
|
||||
StorageRoot: account.Root.String(),
|
||||
}
|
||||
if err := sdt.dbWriter.upsertStateAccount(tx.dbtx, accountModel, stateID); err != nil {
|
||||
if err := sdi.dbWriter.upsertStateAccount(tx.dbtx, accountModel, stateID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -357,7 +357,7 @@ func (sdt *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN
|
||||
MhKey: mhKey,
|
||||
NodeType: ResolveFromNodeType(storageNode.NodeType),
|
||||
}
|
||||
if err := sdt.dbWriter.upsertStorageCID(tx.dbtx, storageModel, stateID); err != nil {
|
||||
if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel, stateID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user