From b36b3f83cb794dd487596180c0a0aede392563eb Mon Sep 17 00:00:00 2001 From: i-norden Date: Wed, 17 Nov 2021 10:25:11 -0600 Subject: [PATCH] update to use new schema; fix pgx driver --- statediff/builder.go | 6 +- statediff/indexer/database/dump/indexer.go | 117 +++++++------- statediff/indexer/database/sql/batch_tx.go | 1 - statediff/indexer/database/sql/indexer.go | 147 +++++++++--------- .../indexer/database/sql/postgres/config.go | 3 + .../indexer/database/sql/postgres/database.go | 28 ++-- .../indexer/database/sql/postgres/pgx.go | 5 - .../indexer/database/sql/postgres/sqlx.go | 5 - statediff/indexer/database/sql/writer.go | 101 ++++++++---- statediff/indexer/interfaces/interfaces.go | 2 +- statediff/indexer/models/batch.go | 55 ++++--- statediff/indexer/models/models.go | 31 ++-- statediff/service.go | 2 +- 13 files changed, 263 insertions(+), 240 deletions(-) diff --git a/statediff/builder.go b/statediff/builder.go index eacfeca15..8dc3cece8 100644 --- a/statediff/builder.go +++ b/statediff/builder.go @@ -23,16 +23,14 @@ import ( "bytes" "fmt" - "github.com/ethereum/go-ethereum/statediff/trie_helpers" - - types2 "github.com/ethereum/go-ethereum/statediff/types" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff/trie_helpers" + types2 "github.com/ethereum/go-ethereum/statediff/types" "github.com/ethereum/go-ethereum/trie" ) diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go index 357a78ece..b75fb1af9 100644 --- a/statediff/indexer/database/dump/indexer.go +++ b/statediff/indexer/database/dump/indexer.go @@ -136,7 +136,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip t = time.Now() // Publish and index header, collect headerID - var headerID int64 + var headerID string headerID, err = sdi.processHeader(blockTx, block.Header(), headerNode, reward, totalDifficulty) if err != nil { return nil, err @@ -181,7 +181,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip // processHeader publishes and indexes a header IPLD in Postgres // it returns the headerID -func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (int64, error) { +func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) { tx.cacheIPLD(headerNode) var baseFee *int64 @@ -190,12 +190,13 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he *baseFee = header.BaseFee.Int64() } + headerID := header.Hash().String() mod := models.HeaderModel{ CID: headerNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), ParentHash: header.ParentHash.String(), BlockNumber: header.Number.String(), - BlockHash: header.Hash().String(), + BlockHash: headerID, TotalDifficulty: td.String(), Reward: reward.String(), Bloom: header.Bloom.Bytes(), @@ -207,11 +208,11 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he BaseFee: baseFee, } _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod) - return 0, err + return headerID, err } // processUncles publishes and indexes uncle IPLDs in Postgres -func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error { +func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error { // publish and index uncles for _, uncleNode := range uncleNodes { tx.cacheIPLD(uncleNode) @@ -223,6 +224,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNum uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) } uncle := models.UncleModel{ + HeaderID: headerID, CID: uncleNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), ParentHash: uncleNode.ParentHash.String(), @@ -238,7 +240,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNum // processArgs bundles arguments to processReceiptsAndTxs type processArgs struct { - headerID int64 + headerID string blockNumber *big.Int receipts types.Receipts txs types.Transactions @@ -263,59 +265,24 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs tx.cacheIPLD(txNode) // Indexing - // extract topic and contract data from the receipt for indexing - mappedContracts := make(map[string]bool) // use map to avoid duplicate addresses - logDataSet := make([]*models.LogsModel, len(receipt.Logs)) - for idx, l := range receipt.Logs { - topicSet := make([]string, 4) - for ti, topic := range l.Topics { - topicSet[ti] = topic.Hex() - } - - if !args.logLeafNodeCIDs[i][idx].Defined() { - return fmt.Errorf("invalid log cid") - } - - mappedContracts[l.Address.String()] = true - logDataSet[idx] = &models.LogsModel{ - Address: l.Address.String(), - Index: int64(l.Index), - Data: l.Data, - LeafCID: args.logLeafNodeCIDs[i][idx].String(), - LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), - Topic0: topicSet[0], - Topic1: topicSet[1], - Topic2: topicSet[2], - Topic3: topicSet[3], - } - } - // these are the contracts seen in the logs - logContracts := make([]string, 0, len(mappedContracts)) - for addr := range mappedContracts { - logContracts = append(logContracts, addr) - } - // this is the contract address if this receipt is for a contract creation tx - contract := shared.HandleZeroAddr(receipt.ContractAddress) - var contractHash string - if contract != "" { - contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String() - } - // index tx first so that the receipt can reference it by FK + // index tx trx := args.txs[i] + trxID := trx.Hash().String() // derive sender for the tx that corresponds with this receipt from, err := types.Sender(signer, trx) if err != nil { return fmt.Errorf("error deriving tx sender: %v", err) } txModel := models.TxModel{ - Dst: shared.HandleZeroAddrPointer(trx.To()), - Src: shared.HandleZeroAddr(from), - TxHash: trx.Hash().String(), - Index: int64(i), - Data: trx.Data(), - CID: txNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(txNode.Cid()), - Type: trx.Type(), + HeaderID: args.headerID, + Dst: shared.HandleZeroAddrPointer(trx.To()), + Src: shared.HandleZeroAddr(from), + TxHash: trxID, + Index: int64(i), + Data: trx.Data(), + CID: txNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(txNode.Cid()), + Type: trx.Type(), } if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", txModel); err != nil { return err @@ -328,6 +295,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs storageKeys[k] = storageKey.Hex() } accessListElementModel := models.AccessListElementModel{ + TxID: trxID, Index: int64(j), Address: accessListElement.Address.Hex(), StorageKeys: storageKeys, @@ -337,12 +305,20 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } } + // this is the contract address if this receipt is for a contract creation tx + contract := shared.HandleZeroAddr(receipt.ContractAddress) + var contractHash string + if contract != "" { + contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String() + } + // index the receipt if !args.rctLeafNodeCIDs[i].Defined() { return fmt.Errorf("invalid receipt leaf node cid") } rctModel := &models.ReceiptModel{ + TxID: trxID, Contract: contract, ContractHash: contractHash, LeafCID: args.rctLeafNodeCIDs[i].String(), @@ -359,6 +335,31 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs return err } + logDataSet := make([]*models.LogsModel, len(receipt.Logs)) + for idx, l := range receipt.Logs { + topicSet := make([]string, 4) + for ti, topic := range l.Topics { + topicSet[ti] = topic.Hex() + } + + if !args.logLeafNodeCIDs[i][idx].Defined() { + return fmt.Errorf("invalid log cid") + } + + logDataSet[idx] = &models.LogsModel{ + ReceiptID: trxID, + Address: l.Address.String(), + Index: int64(l.Index), + Data: l.Data, + LeafCID: args.logLeafNodeCIDs[i][idx].String(), + LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), + Topic0: topicSet[0], + Topic1: topicSet[1], + Topic2: topicSet[2], + Topic3: topicSet[3], + } + } + if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", logDataSet); err != nil { return err } @@ -374,7 +375,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } // PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql -func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode) error { +func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { tx, ok := batch.(*BatchTx) if !ok { return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch) @@ -384,6 +385,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // short circuit if it is a Removed node // this assumes the db has been initialized and a public.blocks entry for the Removed node is present stateModel := models.StateNodeModel{ + HeaderID: headerID, Path: stateNode.Path, StateKey: common.BytesToHash(stateNode.LeafKey).String(), CID: shared.RemovedNodeStateCID, @@ -398,6 +400,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) } stateModel := models.StateNodeModel{ + HeaderID: headerID, Path: stateNode.Path, StateKey: common.BytesToHash(stateNode.LeafKey).String(), CID: stateCIDStr, @@ -422,6 +425,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error decoding state account rlp: %s", err.Error()) } accountModel := models.StateAccountModel{ + HeaderID: headerID, + StatePath: stateNode.Path, Balance: account.Balance.String(), Nonce: account.Nonce, CodeHash: account.CodeHash, @@ -437,6 +442,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // short circuit if it is a Removed node // this assumes the db has been initialized and a public.blocks entry for the Removed node is present storageModel := models.StorageNodeModel{ + HeaderID: headerID, + StatePath: stateNode.Path, Path: storageNode.Path, StorageKey: common.BytesToHash(storageNode.LeafKey).String(), CID: shared.RemovedNodeStorageCID, @@ -453,6 +460,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) } storageModel := models.StorageNodeModel{ + HeaderID: headerID, + StatePath: stateNode.Path, Path: storageNode.Path, StorageKey: common.BytesToHash(storageNode.LeafKey).String(), CID: storageCIDStr, @@ -482,7 +491,7 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd return nil } -// Close satisfied io.Closer +// Close satisfies io.Closer func (sdi *StateDiffIndexer) Close() error { return sdi.dump.Close() } diff --git a/statediff/indexer/database/sql/batch_tx.go b/statediff/indexer/database/sql/batch_tx.go index ff847eec6..fb1b289a1 100644 --- a/statediff/indexer/database/sql/batch_tx.go +++ b/statediff/indexer/database/sql/batch_tx.go @@ -34,7 +34,6 @@ type BatchTx struct { BlockNumber uint64 ctx context.Context dbtx Tx - headerID int64 stm string quit chan struct{} iplds chan models.IPLDModel diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index fad68bf96..1e89f92ff 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -187,7 +187,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip t = time.Now() // Publish and index header, collect headerID - var headerID int64 + var headerID string headerID, err = sdi.processHeader(blockTx, block.Header(), headerNode, reward, totalDifficulty) if err != nil { return nil, err @@ -227,13 +227,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String()) t = time.Now() - blockTx.headerID = headerID return blockTx, err } // processHeader publishes and indexes a header IPLD in Postgres // it returns the headerID -func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (int64, error) { +func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) { tx.cacheIPLD(headerNode) var baseFee *int64 @@ -241,14 +240,14 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he baseFee = new(int64) *baseFee = header.BaseFee.Int64() } - + headerID := header.Hash().String() // index header - return sdi.dbWriter.upsertHeaderCID(tx.dbtx, models.HeaderModel{ + return headerID, sdi.dbWriter.upsertHeaderCID(tx.dbtx, models.HeaderModel{ CID: headerNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), ParentHash: header.ParentHash.String(), BlockNumber: header.Number.String(), - BlockHash: header.Hash().String(), + BlockHash: headerID, TotalDifficulty: td.String(), Reward: reward.String(), Bloom: header.Bloom.Bytes(), @@ -262,7 +261,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he } // processUncles publishes and indexes uncle IPLDs in Postgres -func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error { +func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error { // publish and index uncles for _, uncleNode := range uncleNodes { tx.cacheIPLD(uncleNode) @@ -274,13 +273,14 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNum uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) } uncle := models.UncleModel{ + HeaderID: headerID, CID: uncleNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), ParentHash: uncleNode.ParentHash.String(), BlockHash: uncleNode.Hash().String(), Reward: uncleReward.String(), } - if err := sdi.dbWriter.upsertUncleCID(tx.dbtx, uncle, headerID); err != nil { + if err := sdi.dbWriter.upsertUncleCID(tx.dbtx, uncle); err != nil { return err } } @@ -289,7 +289,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNum // processArgs bundles arguments to processReceiptsAndTxs type processArgs struct { - headerID int64 + headerID string blockNumber *big.Int receipts types.Receipts txs types.Transactions @@ -313,63 +313,26 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs txNode := args.txNodes[i] tx.cacheIPLD(txNode) - // Indexing - // extract topic and contract data from the receipt for indexing - mappedContracts := make(map[string]bool) // use map to avoid duplicate addresses - logDataSet := make([]*models.LogsModel, len(receipt.Logs)) - for idx, l := range receipt.Logs { - topicSet := make([]string, 4) - for ti, topic := range l.Topics { - topicSet[ti] = topic.Hex() - } - - if !args.logLeafNodeCIDs[i][idx].Defined() { - return fmt.Errorf("invalid log cid") - } - - mappedContracts[l.Address.String()] = true - logDataSet[idx] = &models.LogsModel{ - Address: l.Address.String(), - Index: int64(l.Index), - Data: l.Data, - LeafCID: args.logLeafNodeCIDs[i][idx].String(), - LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), - Topic0: topicSet[0], - Topic1: topicSet[1], - Topic2: topicSet[2], - Topic3: topicSet[3], - } - } - // these are the contracts seen in the logs - logContracts := make([]string, 0, len(mappedContracts)) - for addr := range mappedContracts { - logContracts = append(logContracts, addr) - } - // this is the contract address if this receipt is for a contract creation tx - contract := shared.HandleZeroAddr(receipt.ContractAddress) - var contractHash string - if contract != "" { - contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String() - } - // index tx first so that the receipt can reference it by FK + // index tx trx := args.txs[i] + txID := trx.Hash().String() // derive sender for the tx that corresponds with this receipt from, err := types.Sender(signer, trx) if err != nil { return fmt.Errorf("error deriving tx sender: %v", err) } txModel := models.TxModel{ - Dst: shared.HandleZeroAddrPointer(trx.To()), - Src: shared.HandleZeroAddr(from), - TxHash: trx.Hash().String(), - Index: int64(i), - Data: trx.Data(), - CID: txNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(txNode.Cid()), - Type: trx.Type(), + HeaderID: args.headerID, + Dst: shared.HandleZeroAddrPointer(trx.To()), + Src: shared.HandleZeroAddr(from), + TxHash: txID, + Index: int64(i), + Data: trx.Data(), + CID: txNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(txNode.Cid()), + Type: trx.Type(), } - txID, err := sdi.dbWriter.upsertTransactionCID(tx.dbtx, txModel, args.headerID) - if err != nil { + if err := sdi.dbWriter.upsertTransactionCID(tx.dbtx, txModel); err != nil { return err } @@ -380,21 +343,30 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs storageKeys[k] = storageKey.Hex() } accessListElementModel := models.AccessListElementModel{ + TxID: txID, Index: int64(j), Address: accessListElement.Address.Hex(), StorageKeys: storageKeys, } - if err := sdi.dbWriter.upsertAccessListElement(tx.dbtx, accessListElementModel, txID); err != nil { + if err := sdi.dbWriter.upsertAccessListElement(tx.dbtx, accessListElementModel); err != nil { return err } } - // index the receipt + // this is the contract address if this receipt is for a contract creation tx + contract := shared.HandleZeroAddr(receipt.ContractAddress) + var contractHash string + if contract != "" { + contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String() + } + + // index receipt if !args.rctLeafNodeCIDs[i].Defined() { return fmt.Errorf("invalid receipt leaf node cid") } rctModel := &models.ReceiptModel{ + TxID: txID, Contract: contract, ContractHash: contractHash, LeafCID: args.rctLeafNodeCIDs[i].String(), @@ -407,12 +379,37 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs rctModel.PostState = common.Bytes2Hex(receipt.PostState) } - receiptID, err := sdi.dbWriter.upsertReceiptCID(tx.dbtx, rctModel, txID) - if err != nil { + if err := sdi.dbWriter.upsertReceiptCID(tx.dbtx, rctModel); err != nil { return err } - if err = sdi.dbWriter.upsertLogCID(tx.dbtx, logDataSet, receiptID); err != nil { + // index logs + logDataSet := make([]*models.LogsModel, len(receipt.Logs)) + for idx, l := range receipt.Logs { + topicSet := make([]string, 4) + for ti, topic := range l.Topics { + topicSet[ti] = topic.Hex() + } + + if !args.logLeafNodeCIDs[i][idx].Defined() { + return fmt.Errorf("invalid log cid") + } + + logDataSet[idx] = &models.LogsModel{ + ReceiptID: txID, + Address: l.Address.String(), + Index: int64(l.Index), + Data: l.Data, + LeafCID: args.logLeafNodeCIDs[i][idx].String(), + LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), + Topic0: topicSet[0], + Topic1: topicSet[1], + Topic2: topicSet[2], + Topic3: topicSet[3], + } + } + + if err := sdi.dbWriter.upsertLogCID(tx.dbtx, logDataSet); err != nil { return err } } @@ -427,7 +424,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } // PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql -func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode) error { +func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { tx, ok := batch.(*BatchTx) if !ok { return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch) @@ -437,29 +434,29 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // short circuit if it is a Removed node // this assumes the db has been initialized and a public.blocks entry for the Removed node is present stateModel := models.StateNodeModel{ + HeaderID: headerID, Path: stateNode.Path, StateKey: common.BytesToHash(stateNode.LeafKey).String(), CID: shared.RemovedNodeStateCID, MhKey: shared.RemovedNodeMhKey, NodeType: stateNode.NodeType.Int(), } - _, err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID) - return err + return sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel) } stateCIDStr, stateMhKey, err := tx.cacheRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) if err != nil { return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) } stateModel := models.StateNodeModel{ + HeaderID: headerID, Path: stateNode.Path, StateKey: common.BytesToHash(stateNode.LeafKey).String(), CID: stateCIDStr, MhKey: stateMhKey, NodeType: stateNode.NodeType.Int(), } - // index the state node, collect the stateID to reference by FK - stateID, err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID) - if err != nil { + // index the state node + if err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel); err != nil { return err } // if we have a leaf, decode and index the account data @@ -476,12 +473,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error decoding state account rlp: %s", err.Error()) } accountModel := models.StateAccountModel{ + HeaderID: headerID, + StatePath: stateNode.Path, Balance: account.Balance.String(), Nonce: account.Nonce, CodeHash: account.CodeHash, StorageRoot: account.Root.String(), } - if err := sdi.dbWriter.upsertStateAccount(tx.dbtx, accountModel, stateID); err != nil { + if err := sdi.dbWriter.upsertStateAccount(tx.dbtx, accountModel); err != nil { return err } } @@ -491,13 +490,15 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // short circuit if it is a Removed node // this assumes the db has been initialized and a public.blocks entry for the Removed node is present storageModel := models.StorageNodeModel{ + HeaderID: headerID, + StatePath: stateNode.Path, Path: storageNode.Path, StorageKey: common.BytesToHash(storageNode.LeafKey).String(), CID: shared.RemovedNodeStorageCID, MhKey: shared.RemovedNodeMhKey, NodeType: storageNode.NodeType.Int(), } - if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel, stateID); err != nil { + if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel); err != nil { return err } continue @@ -507,13 +508,15 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) } storageModel := models.StorageNodeModel{ + HeaderID: headerID, + StatePath: stateNode.Path, Path: storageNode.Path, StorageKey: common.BytesToHash(storageNode.LeafKey).String(), CID: storageCIDStr, MhKey: storageMhKey, NodeType: storageNode.NodeType.Int(), } - if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel, stateID); err != nil { + if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel); err != nil { return err } } diff --git a/statediff/indexer/database/sql/postgres/config.go b/statediff/indexer/database/sql/postgres/config.go index a7c7cc9b4..825e50163 100644 --- a/statediff/indexer/database/sql/postgres/config.go +++ b/statediff/indexer/database/sql/postgres/config.go @@ -30,6 +30,7 @@ type DriverType string const ( PGX DriverType = "PGX" SQLX DriverType = "SQLX" + FILE DriverType = "File" Unknown DriverType = "Unknown" ) @@ -40,6 +41,8 @@ func ResolveDriverType(str string) (DriverType, error) { return PGX, nil case "sqlx": return SQLX, nil + case "file": + return FILE, nil default: return Unknown, fmt.Errorf("unrecognized driver type string: %s", str) } diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index 3fe7f652e..ef091760d 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -46,59 +46,55 @@ type DB struct { func (db *DB) InsertHeaderStm() string { return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) - ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16) - RETURNING id` + ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)` } // InsertUncleStm satisfies the sql.Statements interface func (db *DB) InsertUncleStm() string { return `INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (header_id, block_hash) DO UPDATE SET (parent_hash, cid, reward, mh_key) = ($3, $4, $5, $6)` + ON CONFLICT (block_hash) DO NOTHING` } // InsertTxStm satisfies the sql.Statements interface func (db *DB) InsertTxStm() string { return `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, tx_data, tx_type) = ($3, $4, $5, $6, $7, $8, $9) - RETURNING id` + ON CONFLICT (tx_hash) DO NOTHING` } // InsertAccessListElementStm satisfies the sql.Statements interface func (db *DB) InsertAccessListElementStm() string { return `INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4) - ON CONFLICT (tx_id, index) DO UPDATE SET (address, storage_keys) = ($3, $4)` + ON CONFLICT (tx_id, index) DO NOTHING` } // InsertRctStm satisfies the sql.Statements interface func (db *DB) InsertRctStm() string { return `INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - ON CONFLICT (tx_id) DO UPDATE SET (leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) = ($2, $3, $4, $5, $6, $7, $8) - RETURNING id` + ON CONFLICT (tx_id) DO NOTHING` } // InsertLogStm satisfies the sql.Statements interface func (db *DB) InsertLogStm() string { - return `INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, receipt_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) - ON CONFLICT (receipt_id, index) DO UPDATE SET (leaf_cid, leaf_mh_key, address, topic0, topic1, topic2, topic3, log_data) = ($1, $2, $4, $6, $7, $8, $9, $10)` + return `INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + ON CONFLICT (rct_id, index) DO NOTHING` } // InsertStateStm satisfies the sql.Statements interface func (db *DB) InsertStateStm() string { return `INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7) - RETURNING id` + ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)` } // InsertAccountStm satisfies the sql.Statements interface func (db *DB) InsertAccountStm() string { - return `INSERT INTO eth.state_accounts (state_id, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (state_id) DO UPDATE SET (balance, nonce, code_hash, storage_root) = ($2, $3, $4, $5)` + return `INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (header_id, state_path) DO NOTHING` } // InsertStorageStm satisfies the sql.Statements interface func (db *DB) InsertStorageStm() string { - return `INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)` + return `INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($3, $4, $6, $7, $8)` } // InsertIPLDStm satisfies the sql.Statements interface diff --git a/statediff/indexer/database/sql/postgres/pgx.go b/statediff/indexer/database/sql/postgres/pgx.go index 9f6701400..fa9b84dd0 100644 --- a/statediff/indexer/database/sql/postgres/pgx.go +++ b/statediff/indexer/database/sql/postgres/pgx.go @@ -138,11 +138,6 @@ func (pgx *PGXDriver) Stats() sql.Stats { return pgxStatsWrapper{stats: stats} } -// NodeInfo satisfies sql.Database -func (pgx *PGXDriver) NodeInfo() node.Info { - return pgx.nodeInfo -} - // NodeID satisfies sql.Database func (pgx *PGXDriver) NodeID() int64 { return pgx.nodeID diff --git a/statediff/indexer/database/sql/postgres/sqlx.go b/statediff/indexer/database/sql/postgres/sqlx.go index 684fc7bf0..0bbd0d9e9 100644 --- a/statediff/indexer/database/sql/postgres/sqlx.go +++ b/statediff/indexer/database/sql/postgres/sqlx.go @@ -107,11 +107,6 @@ func (driver *SQLXDriver) Stats() sql.Stats { return sqlxStatsWrapper{stats: stats} } -// NodeInfo satisfies sql.Database -func (driver *SQLXDriver) NodeInfo() node.Info { - return driver.nodeInfo -} - // NodeID satisfies sql.Database func (driver *SQLXDriver) NodeID() int64 { return driver.nodeID diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index ea276dfbf..96d13d956 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -39,41 +39,56 @@ func NewWriter(db Database) *Writer { } } -func (in *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) (int64, error) { - var headerID int64 - err := tx.QueryRow(in.db.Context(), in.db.InsertHeaderStm(), +/* +INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) +ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16) +*/ +func (in *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error { + _, err := tx.Exec(in.db.Context(), in.db.InsertHeaderStm(), header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, in.db.NodeID(), header.Reward, header.StateRoot, header.TxRoot, - header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee).Scan(&headerID) + header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee) if err != nil { - return 0, fmt.Errorf("error upserting header_cids entry: %v", err) + return fmt.Errorf("error upserting header_cids entry: %v", err) } indexerMetrics.blocks.Inc(1) - return headerID, nil + return nil } -func (in *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel, headerID int64) error { +/* +INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6) +ON CONFLICT (block_hash) DO NOTHING +*/ +func (in *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error { _, err := tx.Exec(in.db.Context(), in.db.InsertUncleStm(), - uncle.BlockHash, headerID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey) + uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey) if err != nil { return fmt.Errorf("error upserting uncle_cids entry: %v", err) } return nil } -func (in *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel, headerID int64) (int64, error) { - var txID int64 - err := tx.QueryRow(in.db.Context(), in.db.InsertTxStm(), - headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type).Scan(&txID) +/* +INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) +ON CONFLICT (tx_hash) DO NOTHING +*/ +func (in *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { + _, err := tx.Exec(in.db.Context(), in.db.InsertTxStm(), + transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type) if err != nil { - return 0, fmt.Errorf("error upserting transaction_cids entry: %v", err) + return fmt.Errorf("error upserting transaction_cids entry: %v", err) } indexerMetrics.transactions.Inc(1) - return txID, nil + return nil } -func (in *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessListElementModel, txID int64) error { +/* +INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4) +ON CONFLICT (tx_id, index) DO NOTHING +*/ +func (in *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessListElementModel) error { _, err := tx.Exec(in.db.Context(), in.db.InsertAccessListElementStm(), - txID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys) + accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys) if err != nil { return fmt.Errorf("error upserting access_list_element entry: %v", err) } @@ -81,21 +96,28 @@ func (in *Writer) upsertAccessListElement(tx Tx, accessListElement models.Access return nil } -func (in *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel, txID int64) (int64, error) { - var receiptID int64 - err := tx.QueryRow(in.db.Context(), in.db.InsertRctStm(), - txID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot).Scan(&receiptID) +/* +INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +ON CONFLICT (tx_id) DO NOTHING +*/ +func (in *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { + _, err := tx.Exec(in.db.Context(), in.db.InsertRctStm(), + rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot) if err != nil { - return 0, fmt.Errorf("error upserting receipt_cids entry: %w", err) + return fmt.Errorf("error upserting receipt_cids entry: %w", err) } indexerMetrics.receipts.Inc(1) - return receiptID, nil + return nil } -func (in *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel, receiptID int64) error { +/* +INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) +ON CONFLICT (rct_id, index) DO NOTHING +*/ +func (in *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { for _, log := range logs { _, err := tx.Exec(in.db.Context(), in.db.InsertLogStm(), - log.LeafCID, log.LeafMhKey, receiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data) + log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data) if err != nil { return fmt.Errorf("error upserting logs entry: %w", err) } @@ -104,36 +126,47 @@ func (in *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel, receiptID int64) return nil } -func (in *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel, headerID int64) (int64, error) { - var stateID int64 +/* +INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) +ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7) +*/ +func (in *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { var stateKey string if stateNode.StateKey != nullHash.String() { stateKey = stateNode.StateKey } - err := tx.QueryRow(in.db.Context(), in.db.InsertStateStm(), - headerID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey).Scan(&stateID) + _, err := tx.Exec(in.db.Context(), in.db.InsertStateStm(), + stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey) if err != nil { - return 0, fmt.Errorf("error upserting state_cids entry: %v", err) + return fmt.Errorf("error upserting state_cids entry: %v", err) } - return stateID, nil + return nil } -func (in *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel, stateID int64) error { +/* +INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6) +ON CONFLICT (header_id, state_path) DO NOTHING +*/ +func (in *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error { _, err := tx.Exec(in.db.Context(), in.db.InsertAccountStm(), - stateID, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) + stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) if err != nil { return fmt.Errorf("error upserting state_accounts entry: %v", err) } return nil } -func (in *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel, stateID int64) error { +/* +INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($3, $4, $6, $7, $8) +*/ +func (in *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) error { var storageKey string if storageCID.StorageKey != nullHash.String() { storageKey = storageCID.StorageKey } _, err := tx.Exec(in.db.Context(), in.db.InsertStorageStm(), - stateID, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey) + storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey) if err != nil { return fmt.Errorf("error upserting storage_cids entry: %v", err) } diff --git a/statediff/indexer/interfaces/interfaces.go b/statediff/indexer/interfaces/interfaces.go index d32c117eb..8f951230d 100644 --- a/statediff/indexer/interfaces/interfaces.go +++ b/statediff/indexer/interfaces/interfaces.go @@ -29,7 +29,7 @@ import ( // StateDiffIndexer interface required to index statediff data type StateDiffIndexer interface { PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) - PushStateNode(tx Batch, stateNode sdtypes.StateNode) error + PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error ReportDBMetrics(delay time.Duration, quit <-chan bool) io.Closer diff --git a/statediff/indexer/models/batch.go b/statediff/indexer/models/batch.go index 48b2944e0..16096f292 100644 --- a/statediff/indexer/models/batch.go +++ b/statediff/indexer/models/batch.go @@ -26,7 +26,7 @@ type IPLDBatch struct { // UncleBatch holds the arguments for a batch insert of uncle data type UncleBatch struct { - HeaderID []int64 + HeaderID []string BlockHashes []string ParentHashes []string CIDs []string @@ -36,7 +36,7 @@ type UncleBatch struct { // TxBatch holds the arguments for a batch insert of tx data type TxBatch struct { - HeaderID int64 + HeaderID string Indexes []int64 TxHashes []string CIDs []string @@ -44,20 +44,20 @@ type TxBatch struct { Dsts []string Srcs []string Datas [][]byte - Types []*uint8 + Types []uint8 } // AccessListBatch holds the arguments for a batch insert of access list data type AccessListBatch struct { Indexes []int64 - TxIDs []int64 + TxIDs []string Addresses []string StorageKeysSets []pq.StringArray } // ReceiptBatch holds the arguments for a batch insert of receipt data type ReceiptBatch struct { - TxIDs []int64 + TxIDs []string LeafCIDs []string LeafMhKeys []string PostStatuses []uint64 @@ -71,7 +71,7 @@ type ReceiptBatch struct { type LogBatch struct { LeafCIDs []string LeafMhKeys []string - ReceiptIDs []int64 + ReceiptIDs []string Addresses []string Indexes []int64 Datas [][]byte @@ -83,34 +83,33 @@ type LogBatch struct { // StateBatch holds the arguments for a batch insert of state data type StateBatch struct { - ID int64 - HeaderID int64 - Path []byte - StateKey string - NodeType int - CID string - MhKey string - Diff bool + HeaderID string + Paths [][]byte + StateKeys []string + NodeTypes []int + CIDs []string + MhKeys []string + Diff bool } // AccountBatch holds the arguments for a batch insert of account data type AccountBatch struct { - ID int64 - StateID int64 - Balance string - Nonce uint64 - CodeHash []byte - StorageRoot string + HeaderID string + StatePaths [][]byte + Balances []string + Nonces []uint64 + CodeHashes [][]byte + StorageRoots []string } // StorageBatch holds the arguments for a batch insert of storage data type StorageBatch struct { - ID int64 - StateID int64 - Path []byte - StorageKey string - NodeType int - CID string - MhKey string - Diff bool + HeaderID string + StatePaths [][]string + Paths [][]byte + StorageKeys []string + NodeTypes []int + CIDs []string + MhKeys []string + Diff bool } diff --git a/statediff/indexer/models/models.go b/statediff/indexer/models/models.go index 5e849193e..7d2fc43b6 100644 --- a/statediff/indexer/models/models.go +++ b/statediff/indexer/models/models.go @@ -26,7 +26,6 @@ type IPLDModel struct { // HeaderModel is the db model for eth.header_cids type HeaderModel struct { - ID int64 `db:"id"` BlockNumber string `db:"block_number"` BlockHash string `db:"block_hash"` ParentHash string `db:"parent_hash"` @@ -47,8 +46,7 @@ type HeaderModel struct { // UncleModel is the db model for eth.uncle_cids type UncleModel struct { - ID int64 `db:"id"` - HeaderID int64 `db:"header_id"` + HeaderID string `db:"header_id"` BlockHash string `db:"block_hash"` ParentHash string `db:"parent_hash"` CID string `db:"cid"` @@ -58,8 +56,7 @@ type UncleModel struct { // TxModel is the db model for eth.transaction_cids type TxModel struct { - ID int64 `db:"id"` - HeaderID int64 `db:"header_id"` + HeaderID string `db:"header_id"` Index int64 `db:"index"` TxHash string `db:"tx_hash"` CID string `db:"cid"` @@ -72,17 +69,15 @@ type TxModel struct { // AccessListElementModel is the db model for eth.access_list_entry type AccessListElementModel struct { - ID int64 `db:"id"` Index int64 `db:"index"` - TxID int64 `db:"tx_id"` + TxID string `db:"tx_id"` Address string `db:"address"` StorageKeys pq.StringArray `db:"storage_keys"` } // ReceiptModel is the db model for eth.receipt_cids type ReceiptModel struct { - ID int64 `db:"id"` - TxID int64 `db:"tx_id"` + TxID string `db:"tx_id"` LeafCID string `db:"leaf_cid"` LeafMhKey string `db:"leaf_mh_key"` PostStatus uint64 `db:"post_status"` @@ -94,8 +89,7 @@ type ReceiptModel struct { // StateNodeModel is the db model for eth.state_cids type StateNodeModel struct { - ID int64 `db:"id"` - HeaderID int64 `db:"header_id"` + HeaderID string `db:"header_id"` Path []byte `db:"state_path"` StateKey string `db:"state_leaf_key"` NodeType int `db:"node_type"` @@ -106,8 +100,8 @@ type StateNodeModel struct { // StorageNodeModel is the db model for eth.storage_cids type StorageNodeModel struct { - ID int64 `db:"id"` - StateID int64 `db:"state_id"` + HeaderID string `db:"header_id""` + StatePath []byte `db:"state_path"` Path []byte `db:"storage_path"` StorageKey string `db:"storage_leaf_key"` NodeType int `db:"node_type"` @@ -118,8 +112,8 @@ type StorageNodeModel struct { // StorageNodeWithStateKeyModel is a db model for eth.storage_cids + eth.state_cids.state_key type StorageNodeWithStateKeyModel struct { - ID int64 `db:"id"` - StateID int64 `db:"state_id"` + HeaderID string `db:"header_id""` + StatePath []byte `db:"state_path"` Path []byte `db:"storage_path"` StateKey string `db:"state_leaf_key"` StorageKey string `db:"storage_leaf_key"` @@ -131,8 +125,8 @@ type StorageNodeWithStateKeyModel struct { // StateAccountModel is a db model for an eth state account (decoded value of state leaf node) type StateAccountModel struct { - ID int64 `db:"id"` - StateID int64 `db:"state_id"` + HeaderID string `db:"header_id"` + StatePath []byte `db:"state_path"` Balance string `db:"balance"` Nonce uint64 `db:"nonce"` CodeHash []byte `db:"code_hash"` @@ -141,10 +135,9 @@ type StateAccountModel struct { // LogsModel is the db model for eth.logs type LogsModel struct { - ID int64 `db:"id"` + ReceiptID string `db:"rct_id"` LeafCID string `db:"leaf_cid"` LeafMhKey string `db:"leaf_mh_key"` - ReceiptID int64 `db:"receipt_id"` Address string `db:"address"` Index int64 `db:"index"` Data []byte `db:"log_data"` diff --git a/statediff/service.go b/statediff/service.go index 31a56b809..3fc8ac60c 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -664,7 +664,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p } }() output := func(node types2.StateNode) error { - return sds.indexer.PushStateNode(tx, node) + return sds.indexer.PushStateNode(tx, node, block.Hash().String()) } codeOutput := func(c types2.CodeAndCodeHash) error { return sds.indexer.PushCodeAndCodeHash(tx, c)