diff --git a/statediff/indexer/database/file/batch_tx.go b/statediff/indexer/database/file/batch_tx.go index 39e5d3713..d38bd1211 100644 --- a/statediff/indexer/database/file/batch_tx.go +++ b/statediff/indexer/database/file/batch_tx.go @@ -18,7 +18,7 @@ package file // BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration type BatchTx struct { - BlockNumber uint64 + BlockNumber string submit func(blockTx *BatchTx, err error) error } diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index 870c1f259..045ac238a 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -23,6 +23,7 @@ import ( "math/big" "os" "sync" + "sync/atomic" "time" "github.com/ipfs/go-cid" @@ -53,10 +54,12 @@ var ( // StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void type StateDiffIndexer struct { - fileWriter *SQLWriter - chainConfig *params.ChainConfig - nodeID string - wg *sync.WaitGroup + fileWriter *SQLWriter + chainConfig *params.ChainConfig + nodeID string + wg *sync.WaitGroup + blockNumber string + removedCacheFlag *uint32 } // NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer @@ -77,7 +80,6 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c wg := new(sync.WaitGroup) w.Loop() w.upsertNode(config.NodeInfo) - w.upsertIPLDDirect(shared.RemovedNodeMhKey, []byte{}) return &StateDiffIndexer{ fileWriter: w, chainConfig: chainConfig, @@ -92,6 +94,8 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {} // PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts) // Returns an initiated DB transaction which must be Closed via defer to commit or rollback func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { + sdi.removedCacheFlag = new(uint32) + sdi.blockNumber = block.Number().String() start, t := time.Now(), time.Now() blockHash := block.Hash() blockHashStr := blockHash.String() @@ -127,7 +131,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip t = time.Now() blockTx := &BatchTx{ - BlockNumber: height, + BlockNumber: sdi.blockNumber, submit: func(self *BatchTx, err error) error { tDiff := time.Since(t) indexerMetrics.tStateStoreCodeProcessing.Update(tDiff) @@ -189,7 +193,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip // processHeader write a header IPLD insert SQL stmt to a file // it returns the headerID func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node.Node, reward, td *big.Int) string { - sdi.fileWriter.upsertIPLDNode(headerNode) + sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, headerNode) var baseFee *string if header.BaseFee != nil { @@ -202,7 +206,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node CID: headerNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), ParentHash: header.ParentHash.String(), - BlockNumber: header.Number.String(), + BlockNumber: sdi.blockNumber, BlockHash: headerID, TotalDifficulty: td.String(), Reward: reward.String(), @@ -221,7 +225,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) { // publish and index uncles for _, uncleNode := range uncleNodes { - sdi.fileWriter.upsertIPLDNode(uncleNode) + sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, uncleNode) var uncleReward *big.Int // in PoA networks uncle reward is 0 if sdi.chainConfig.Clique != nil { @@ -230,12 +234,13 @@ func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64, uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) } sdi.fileWriter.upsertUncleCID(models.UncleModel{ - HeaderID: headerID, - CID: uncleNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), - ParentHash: uncleNode.ParentHash.String(), - BlockHash: uncleNode.Hash().String(), - Reward: uncleReward.String(), + BlockNumber: sdi.blockNumber, + HeaderID: headerID, + CID: uncleNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), + ParentHash: uncleNode.ParentHash.String(), + BlockHash: uncleNode.Hash().String(), + Reward: uncleReward.String(), }) } } @@ -261,10 +266,10 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { signer := types.MakeSigner(sdi.chainConfig, args.blockNumber) for i, receipt := range args.receipts { for _, logTrieNode := range args.logTrieNodes[i] { - sdi.fileWriter.upsertIPLDNode(logTrieNode) + sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, logTrieNode) } txNode := args.txNodes[i] - sdi.fileWriter.upsertIPLDNode(txNode) + sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, txNode) // index tx trx := args.txs[i] @@ -281,16 +286,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { return fmt.Errorf("error deriving tx sender: %v", err) } txModel := models.TxModel{ - HeaderID: args.headerID, - Dst: shared.HandleZeroAddrPointer(trx.To()), - Src: shared.HandleZeroAddr(from), - TxHash: txID, - Index: int64(i), - Data: trx.Data(), - CID: txNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(txNode.Cid()), - Type: trx.Type(), - Value: val, + BlockNumber: sdi.blockNumber, + HeaderID: args.headerID, + Dst: shared.HandleZeroAddrPointer(trx.To()), + Src: shared.HandleZeroAddr(from), + TxHash: txID, + Index: int64(i), + Data: trx.Data(), + CID: txNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(txNode.Cid()), + Type: trx.Type(), + Value: val, } sdi.fileWriter.upsertTransactionCID(txModel) @@ -301,6 +307,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { storageKeys[k] = storageKey.Hex() } accessListElementModel := models.AccessListElementModel{ + BlockNumber: sdi.blockNumber, TxID: txID, Index: int64(j), Address: accessListElement.Address.Hex(), @@ -322,6 +329,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { } rctModel := &models.ReceiptModel{ + BlockNumber: sdi.blockNumber, TxID: txID, Contract: contract, ContractHash: contractHash, @@ -349,16 +357,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { } logDataSet[idx] = &models.LogsModel{ - ReceiptID: txID, - Address: l.Address.String(), - Index: int64(l.Index), - Data: l.Data, - LeafCID: args.logLeafNodeCIDs[i][idx].String(), - LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), - Topic0: topicSet[0], - Topic1: topicSet[1], - Topic2: topicSet[2], - Topic3: topicSet[3], + BlockNumber: sdi.blockNumber, + ReceiptID: txID, + Address: l.Address.String(), + Index: int64(l.Index), + Data: l.Data, + LeafCID: args.logLeafNodeCIDs[i][idx].String(), + LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), + Topic0: topicSet[0], + Topic1: topicSet[1], + Topic2: topicSet[2], + Topic3: topicSet[3], } } sdi.fileWriter.upsertLogCID(logDataSet) @@ -366,8 +375,8 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { // publish trie nodes, these aren't indexed directly for i, n := range args.txTrieNodes { - sdi.fileWriter.upsertIPLDNode(n) - sdi.fileWriter.upsertIPLDNode(args.rctTrieNodes[i]) + sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, n) + sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, args.rctTrieNodes[i]) } return nil @@ -377,30 +386,34 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { // publish the state node if stateNode.NodeType == sdtypes.Removed { - // short circuit if it is a Removed node - // this assumes the db has been initialized and a public.blocks entry for the Removed node is present + if atomic.LoadUint32(sdi.removedCacheFlag) == 0 { + atomic.StoreUint32(sdi.removedCacheFlag, 1) + sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, shared.RemovedNodeMhKey, []byte{}) + } stateModel := models.StateNodeModel{ - HeaderID: headerID, - Path: stateNode.Path, - StateKey: common.BytesToHash(stateNode.LeafKey).String(), - CID: shared.RemovedNodeStateCID, - MhKey: shared.RemovedNodeMhKey, - NodeType: stateNode.NodeType.Int(), + BlockNumber: sdi.blockNumber, + HeaderID: headerID, + Path: stateNode.Path, + StateKey: common.BytesToHash(stateNode.LeafKey).String(), + CID: shared.RemovedNodeStateCID, + MhKey: shared.RemovedNodeMhKey, + NodeType: stateNode.NodeType.Int(), } sdi.fileWriter.upsertStateCID(stateModel) return nil } - stateCIDStr, stateMhKey, err := sdi.fileWriter.upsertIPLDRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) + stateCIDStr, stateMhKey, err := sdi.fileWriter.upsertIPLDRaw(sdi.blockNumber, ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) if err != nil { return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) } stateModel := models.StateNodeModel{ - HeaderID: headerID, - Path: stateNode.Path, - StateKey: common.BytesToHash(stateNode.LeafKey).String(), - CID: stateCIDStr, - MhKey: stateMhKey, - NodeType: stateNode.NodeType.Int(), + BlockNumber: sdi.blockNumber, + HeaderID: headerID, + Path: stateNode.Path, + StateKey: common.BytesToHash(stateNode.LeafKey).String(), + CID: stateCIDStr, + MhKey: stateMhKey, + NodeType: stateNode.NodeType.Int(), } // index the state node sdi.fileWriter.upsertStateCID(stateModel) @@ -418,6 +431,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error decoding state account rlp: %s", err.Error()) } accountModel := models.StateAccountModel{ + BlockNumber: sdi.blockNumber, HeaderID: headerID, StatePath: stateNode.Path, Balance: account.Balance.String(), @@ -430,32 +444,36 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // if there are any storage nodes associated with this node, publish and index them for _, storageNode := range stateNode.StorageNodes { if storageNode.NodeType == sdtypes.Removed { - // short circuit if it is a Removed node - // this assumes the db has been initialized and a public.blocks entry for the Removed node is present + if atomic.LoadUint32(sdi.removedCacheFlag) == 0 { + atomic.StoreUint32(sdi.removedCacheFlag, 1) + sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, shared.RemovedNodeMhKey, []byte{}) + } storageModel := models.StorageNodeModel{ - HeaderID: headerID, - StatePath: stateNode.Path, - Path: storageNode.Path, - StorageKey: common.BytesToHash(storageNode.LeafKey).String(), - CID: shared.RemovedNodeStorageCID, - MhKey: shared.RemovedNodeMhKey, - NodeType: storageNode.NodeType.Int(), + BlockNumber: sdi.blockNumber, + HeaderID: headerID, + StatePath: stateNode.Path, + Path: storageNode.Path, + StorageKey: common.BytesToHash(storageNode.LeafKey).String(), + CID: shared.RemovedNodeStorageCID, + MhKey: shared.RemovedNodeMhKey, + NodeType: storageNode.NodeType.Int(), } sdi.fileWriter.upsertStorageCID(storageModel) continue } - storageCIDStr, storageMhKey, err := sdi.fileWriter.upsertIPLDRaw(ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue) + storageCIDStr, storageMhKey, err := sdi.fileWriter.upsertIPLDRaw(sdi.blockNumber, ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue) if err != nil { return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) } storageModel := models.StorageNodeModel{ - HeaderID: headerID, - StatePath: stateNode.Path, - Path: storageNode.Path, - StorageKey: common.BytesToHash(storageNode.LeafKey).String(), - CID: storageCIDStr, - MhKey: storageMhKey, - NodeType: storageNode.NodeType.Int(), + BlockNumber: sdi.blockNumber, + HeaderID: headerID, + StatePath: stateNode.Path, + Path: storageNode.Path, + StorageKey: common.BytesToHash(storageNode.LeafKey).String(), + CID: storageCIDStr, + MhKey: storageMhKey, + NodeType: storageNode.NodeType.Int(), } sdi.fileWriter.upsertStorageCID(storageModel) } @@ -470,7 +488,7 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd if err != nil { return fmt.Errorf("error deriving multihash key from codehash: %v", err) } - sdi.fileWriter.upsertIPLDDirect(mhKey, codeAndCodeHash.Code) + sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, mhKey, codeAndCodeHash.Code) return nil } diff --git a/statediff/indexer/database/file/writer.go b/statediff/indexer/database/file/writer.go index 48de0853d..3c11b5eea 100644 --- a/statediff/indexer/database/file/writer.go +++ b/statediff/indexer/database/file/writer.go @@ -127,34 +127,35 @@ const ( nodeInsert = "INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES " + "('%s', '%s', '%s', '%s', %d);\n" - ipldInsert = "INSERT INTO public.blocks (key, data) VALUES ('%s', '\\x%x');\n" + ipldInsert = "INSERT INTO public.blocks (block_number, key, data) VALUES ('%s', '%s', '\\x%x');\n" headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, " + "state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES " + "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %d, '%s');\n" - uncleInsert = "INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES " + - "('%s', '%s', '%s', '%s', '%s', '%s');\n" + uncleInsert = "INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES " + + "('%s', '%s', '%s', '%s', '%s', '%s', '%s');\n" - txInsert = "INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, " + - "value) VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '\\x%x', %d, '%s');\n" + txInsert = "INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, " + + "value) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', %d, '%s', '\\x%x', %d, '%s');\n" - alInsert = "INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ('%s', %d, '%s', '%s');\n" + alInsert = "INSERT INTO eth.access_list_elements (block_number, tx_id, index, address, storage_keys) VALUES " + + "('%s', '%s', %d, '%s', '%s');\n" - rctInsert = "INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, " + - "post_status, log_root) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', %d, '%s');\n" + rctInsert = "INSERT INTO eth.receipt_cids (block_number, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, " + + "post_status, log_root) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', %d, '%s');\n" - logInsert = "INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, " + - "topic3, log_data) VALUES ('%s', '%s', '%s', '%s', %d, '%s', '%s', '%s', '%s', '\\x%x');\n" + logInsert = "INSERT INTO eth.log_cids (block_number, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, " + + "topic3, log_data) VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '%s', '%s', '%s', '\\x%x');\n" - stateInsert = "INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) " + - "VALUES ('%s', '%s', '%s', '\\x%x', %d, %t, '%s');\n" + stateInsert = "INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) " + + "VALUES ('%s', '%s', '%s', '%s', '\\x%x', %d, %t, '%s');\n" - accountInsert = "INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) " + - "VALUES ('%s', '\\x%x', '%s', %d, '\\x%x', '%s');\n" + accountInsert = "INSERT INTO eth.state_accounts (block_number, header_id, state_path, balance, nonce, code_hash, storage_root) " + + "VALUES ('%s', '%s', '\\x%x', '%s', %d, '\\x%x', '%s');\n" - storageInsert = "INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, " + - "node_type, diff, mh_key) VALUES ('%s', '\\x%x', '%s', '%s', '\\x%x', %d, %t, '%s');\n" + storageInsert = "INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, " + + "node_type, diff, mh_key) VALUES ('%s', '%s', '\\x%x', '%s', '%s', '\\x%x', %d, %t, '%s');\n" ) func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) { @@ -162,32 +163,35 @@ func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) { } func (sqw *SQLWriter) upsertIPLD(ipld models.IPLDModel) { - sqw.stmts <- []byte(fmt.Sprintf(ipldInsert, ipld.Key, ipld.Data)) + sqw.stmts <- []byte(fmt.Sprintf(ipldInsert, ipld.BlockNumber, ipld.Key, ipld.Data)) } -func (sqw *SQLWriter) upsertIPLDDirect(key string, value []byte) { +func (sqw *SQLWriter) upsertIPLDDirect(blockNumber, key string, value []byte) { sqw.upsertIPLD(models.IPLDModel{ - Key: key, - Data: value, + BlockNumber: blockNumber, + Key: key, + Data: value, }) } -func (sqw *SQLWriter) upsertIPLDNode(i node.Node) { +func (sqw *SQLWriter) upsertIPLDNode(blockNumber string, i node.Node) { sqw.upsertIPLD(models.IPLDModel{ - Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(), - Data: i.RawData(), + BlockNumber: blockNumber, + Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(), + Data: i.RawData(), }) } -func (sqw *SQLWriter) upsertIPLDRaw(codec, mh uint64, raw []byte) (string, string, error) { +func (sqw *SQLWriter) upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []byte) (string, string, error) { c, err := ipld.RawdataToCid(codec, raw, mh) if err != nil { return "", "", err } prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String() sqw.upsertIPLD(models.IPLDModel{ - Key: prefixedKey, - Data: raw, + BlockNumber: blockNumber, + Key: prefixedKey, + Data: raw, }) return c.String(), prefixedKey, err } @@ -201,31 +205,31 @@ func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) { } func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) { - sqw.stmts <- []byte(fmt.Sprintf(uncleInsert, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, + sqw.stmts <- []byte(fmt.Sprintf(uncleInsert, uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey)) } func (sqw *SQLWriter) upsertTransactionCID(transaction models.TxModel) { - sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, + sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value)) indexerMetrics.transactions.Inc(1) } func (sqw *SQLWriter) upsertAccessListElement(accessListElement models.AccessListElementModel) { - sqw.stmts <- []byte(fmt.Sprintf(alInsert, accessListElement.TxID, accessListElement.Index, accessListElement.Address, + sqw.stmts <- []byte(fmt.Sprintf(alInsert, accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address, formatPostgresStringArray(accessListElement.StorageKeys))) indexerMetrics.accessListEntries.Inc(1) } func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) { - sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, + sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.BlockNumber, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot)) indexerMetrics.receipts.Inc(1) } func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) { for _, l := range logs { - sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, + sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.BlockNumber, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, l.Topic1, l.Topic2, l.Topic3, l.Data)) indexerMetrics.logs.Inc(1) } @@ -236,12 +240,12 @@ func (sqw *SQLWriter) upsertStateCID(stateNode models.StateNodeModel) { if stateNode.StateKey != nullHash.String() { stateKey = stateNode.StateKey } - sqw.stmts <- []byte(fmt.Sprintf(stateInsert, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, + sqw.stmts <- []byte(fmt.Sprintf(stateInsert, stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey)) } func (sqw *SQLWriter) upsertStateAccount(stateAccount models.StateAccountModel) { - sqw.stmts <- []byte(fmt.Sprintf(accountInsert, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, + sqw.stmts <- []byte(fmt.Sprintf(accountInsert, stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)) } @@ -250,6 +254,6 @@ func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) { if storageCID.StorageKey != nullHash.String() { storageKey = storageCID.StorageKey } - sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, + sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)) } diff --git a/statediff/indexer/database/sql/batch_tx.go b/statediff/indexer/database/sql/batch_tx.go index a7c9a474a..06bb49c9e 100644 --- a/statediff/indexer/database/sql/batch_tx.go +++ b/statediff/indexer/database/sql/batch_tx.go @@ -18,6 +18,7 @@ package sql import ( "context" + "sync/atomic" blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" @@ -33,13 +34,14 @@ const startingCacheCapacity = 1024 * 24 // BatchTx wraps a sql tx with the state necessary for building the tx concurrently during trie difference iteration type BatchTx struct { - BlockNumber string - ctx context.Context - dbtx Tx - stm string - quit chan struct{} - iplds chan models.IPLDModel - ipldCache models.IPLDBatch + BlockNumber string + ctx context.Context + dbtx Tx + stm string + quit chan struct{} + iplds chan models.IPLDModel + ipldCache models.IPLDBatch + removedCacheFlag *uint32 submit func(blockTx *BatchTx, err error) error } @@ -104,6 +106,17 @@ func (tx *BatchTx) cacheRaw(codec, mh uint64, raw []byte) (string, string, error return c.String(), prefixedKey, err } +func (tx *BatchTx) cacheRemoved(key string, value []byte) { + if atomic.LoadUint32(tx.removedCacheFlag) == 0 { + atomic.StoreUint32(tx.removedCacheFlag, 1) + tx.iplds <- models.IPLDModel{ + BlockNumber: tx.BlockNumber, + Key: key, + Data: value, + } + } +} + // rollback sql transaction and log any error func rollback(ctx context.Context, tx Tx) { if err := tx.Rollback(ctx); err != nil { diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index 4db268dc7..15fcd60b2 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -55,14 +55,11 @@ type StateDiffIndexer struct { ctx context.Context chainConfig *params.ChainConfig dbWriter *Writer + blockNumber string } // NewStateDiffIndexer creates a sql implementation of interfaces.StateDiffIndexer func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, db Database) (*StateDiffIndexer, error) { - // Write the removed node to the db on init - if _, err := db.Exec(ctx, db.InsertIPLDStm(), shared.RemovedNodeMhKey, []byte{}); err != nil { - return nil, err - } return &StateDiffIndexer{ ctx: ctx, chainConfig: chainConfig, @@ -93,6 +90,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo // Returns an initiated DB transaction which must be Closed via defer to commit or rollback func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { start, t := time.Now(), time.Now() + sdi.blockNumber = block.Number().String() blockHash := block.Hash() blockHashStr := blockHash.String() height := block.NumberU64() @@ -140,11 +138,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } }() blockTx := &BatchTx{ - ctx: sdi.ctx, - BlockNumber: block.Number().String(), - stm: sdi.dbWriter.db.InsertIPLDsStm(), - iplds: make(chan models.IPLDModel), - quit: make(chan struct{}), + removedCacheFlag: new(uint32), + ctx: sdi.ctx, + BlockNumber: sdi.blockNumber, + stm: sdi.dbWriter.db.InsertIPLDsStm(), + iplds: make(chan models.IPLDModel), + quit: make(chan struct{}), ipldCache: models.IPLDBatch{ BlockNumbers: make([]string, 0, startingCacheCapacity), Keys: make([]string, 0, startingCacheCapacity), @@ -204,7 +203,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index uncles - err = sdi.processUncles(blockTx, headerID, block.Number(), uncleNodes) + err = sdi.processUncles(blockTx, headerID, block.NumberU64(), uncleNodes) if err != nil { return nil, err } @@ -253,7 +252,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he CID: headerNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), ParentHash: header.ParentHash.String(), - BlockNumber: header.Number.String(), + BlockNumber: sdi.blockNumber, BlockHash: headerID, TotalDifficulty: td.String(), Reward: reward.String(), @@ -268,7 +267,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he } // processUncles publishes and indexes uncle IPLDs in Postgres -func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber *big.Int, uncleNodes []*ipld2.EthHeader) error { +func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error { // publish and index uncles for _, uncleNode := range uncleNodes { tx.cacheIPLD(uncleNode) @@ -277,10 +276,10 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu if sdi.chainConfig.Clique != nil { uncleReward = big.NewInt(0) } else { - uncleReward = shared.CalcUncleMinerReward(blockNumber.Uint64(), uncleNode.Number.Uint64()) + uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) } uncle := models.UncleModel{ - BlockNumber: blockNumber.String(), + BlockNumber: sdi.blockNumber, HeaderID: headerID, CID: uncleNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), @@ -336,7 +335,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs return fmt.Errorf("error deriving tx sender: %v", err) } txModel := models.TxModel{ - BlockNumber: args.blockNumber.String(), + BlockNumber: sdi.blockNumber, HeaderID: args.headerID, Dst: shared.HandleZeroAddrPointer(trx.To()), Src: shared.HandleZeroAddr(from), @@ -359,7 +358,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs storageKeys[k] = storageKey.Hex() } accessListElementModel := models.AccessListElementModel{ - BlockNumber: args.blockNumber.String(), + BlockNumber: sdi.blockNumber, TxID: txID, Index: int64(j), Address: accessListElement.Address.Hex(), @@ -383,7 +382,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } rctModel := &models.ReceiptModel{ - BlockNumber: args.blockNumber.String(), + BlockNumber: sdi.blockNumber, TxID: txID, Contract: contract, ContractHash: contractHash, @@ -414,7 +413,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } logDataSet[idx] = &models.LogsModel{ - BlockNumber: args.blockNumber.String(), + BlockNumber: sdi.blockNumber, ReceiptID: txID, Address: l.Address.String(), Index: int64(l.Index), @@ -443,17 +442,16 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } // PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql -func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, blockNumber, headerID string) error { +func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { tx, ok := batch.(*BatchTx) if !ok { return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch) } // publish the state node if stateNode.NodeType == sdtypes.Removed { - // short circuit if it is a Removed node - // this assumes the db has been initialized and a public.blocks entry for the Removed node is present + tx.cacheRemoved(shared.RemovedNodeMhKey, []byte{}) stateModel := models.StateNodeModel{ - BlockNumber: blockNumber, + BlockNumber: sdi.blockNumber, HeaderID: headerID, Path: stateNode.Path, StateKey: common.BytesToHash(stateNode.LeafKey).String(), @@ -468,7 +466,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) } stateModel := models.StateNodeModel{ - BlockNumber: blockNumber, + BlockNumber: sdi.blockNumber, HeaderID: headerID, Path: stateNode.Path, StateKey: common.BytesToHash(stateNode.LeafKey).String(), @@ -494,7 +492,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error decoding state account rlp: %s", err.Error()) } accountModel := models.StateAccountModel{ - BlockNumber: blockNumber, + BlockNumber: sdi.blockNumber, HeaderID: headerID, StatePath: stateNode.Path, Balance: account.Balance.String(), @@ -509,10 +507,9 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // if there are any storage nodes associated with this node, publish and index them for _, storageNode := range stateNode.StorageNodes { if storageNode.NodeType == sdtypes.Removed { - // short circuit if it is a Removed node - // this assumes the db has been initialized and a public.blocks entry for the Removed node is present + tx.cacheRemoved(shared.RemovedNodeMhKey, []byte{}) storageModel := models.StorageNodeModel{ - BlockNumber: blockNumber, + BlockNumber: sdi.blockNumber, HeaderID: headerID, StatePath: stateNode.Path, Path: storageNode.Path, @@ -531,7 +528,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) } storageModel := models.StorageNodeModel{ - BlockNumber: blockNumber, + BlockNumber: sdi.blockNumber, HeaderID: headerID, StatePath: stateNode.Path, Path: storageNode.Path, diff --git a/statediff/indexer/interfaces/interfaces.go b/statediff/indexer/interfaces/interfaces.go index 4ad9175a5..8f951230d 100644 --- a/statediff/indexer/interfaces/interfaces.go +++ b/statediff/indexer/interfaces/interfaces.go @@ -29,7 +29,7 @@ import ( // StateDiffIndexer interface required to index statediff data type StateDiffIndexer interface { PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) - PushStateNode(tx Batch, stateNode sdtypes.StateNode, blockNumber, headerID string) error + PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error ReportDBMetrics(delay time.Duration, quit <-chan bool) io.Closer