diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 9a8b169be..d77e261f9 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -45,6 +45,7 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/statediff" dumpdb "github.com/ethereum/go-ethereum/statediff/indexer/database/dump" + "github.com/ethereum/go-ethereum/statediff/indexer/database/file" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/shared" @@ -204,6 +205,10 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { utils.Fatalf("%v", err) } switch dbType { + case shared.FILE: + indexerConfig = file.Config{ + FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name), + } case shared.POSTGRES: driverTypeStr := ctx.GlobalString(utils.StateDiffDBDriverTypeFlag.Name) driverType, err := postgres.ResolveDriverType(driverTypeStr) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index c92810d11..990b40a60 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -167,6 +167,7 @@ var ( utils.StateDiffDBClientNameFlag, utils.StateDiffWritingFlag, utils.StateDiffWorkersFlag, + utils.StateDiffFilePath, configFileFlag, utils.CatalystFlag, } diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 68e2a3f4c..885cc2c16 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -243,6 +243,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.StateDiffDBClientNameFlag, utils.StateDiffWritingFlag, utils.StateDiffWorkersFlag, + utils.StateDiffFilePath, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 08f9088f5..ccc9ac89e 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -788,7 +788,7 @@ var ( } StateDiffDBTypeFlag = cli.StringFlag{ Name: "statediff.db.type", - Usage: "Statediff database type", + Usage: "Statediff database type (current options: postgres, file, dump)", Value: "postgres", } StateDiffDBDriverTypeFlag = cli.StringFlag{ @@ -852,6 +852,10 @@ var ( Name: "statediff.db.nodeid", Usage: "Node ID to use when writing state diffs to database", } + StateDiffFilePath = cli.StringFlag{ + Name: "statediff.file.path", + Usage: "Full path (including filename) to write statediff data out to when operating in file mode", + } StateDiffDBClientNameFlag = cli.StringFlag{ Name: "statediff.db.clientname", Usage: "Client name to use when writing state diffs to database", diff --git a/statediff/README.md b/statediff/README.md index 97666d50a..7170363ae 100644 --- a/statediff/README.md +++ b/statediff/README.md @@ -79,7 +79,7 @@ This service introduces a CLI flag namespace `statediff` `--statediff` flag is used to turn on the service `--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database `--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database -`--statediff.db.type` is the type of database we write out to (current options: postgres and dump) +`--statediff.db.type` is the type of database we write out to (current options: postgres, dump, file) `--statediff.dump.dst` is the destination to write to when operating in database dump mode (stdout, stderr, discard) `--statediff.db.driver` is the specific driver to use for the database (current options for postgres: pgx and sqlx) `--statediff.db.host` is the hostname/ip to dial to connect to the database @@ -95,6 +95,7 @@ This service introduces a CLI flag namespace `statediff` `--statediff.db.maxconnlifetime` is the maximum lifetime for a connection (in seconds) `--statediff.db.nodeid` is the node id to use in the Postgres database `--statediff.db.clientname` is the client name to use in the Postgres database +`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode The service can only operate in full sync mode (`--syncmode=full`), but only the historical RPC endpoints require an archive node (`--gcmode=archive`) diff --git a/statediff/builder.go b/statediff/builder.go index eacfeca15..8dc3cece8 100644 --- a/statediff/builder.go +++ b/statediff/builder.go @@ -23,16 +23,14 @@ import ( "bytes" "fmt" - "github.com/ethereum/go-ethereum/statediff/trie_helpers" - - types2 "github.com/ethereum/go-ethereum/statediff/types" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff/trie_helpers" + types2 "github.com/ethereum/go-ethereum/statediff/types" "github.com/ethereum/go-ethereum/trie" ) diff --git a/statediff/indexer/constructor.go b/statediff/indexer/constructor.go index 7a44638d0..bfb746080 100644 --- a/statediff/indexer/constructor.go +++ b/statediff/indexer/constructor.go @@ -22,6 +22,7 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/statediff/indexer/database/dump" + "github.com/ethereum/go-ethereum/statediff/indexer/database/file" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" @@ -32,10 +33,17 @@ import ( // NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config) (interfaces.StateDiffIndexer, error) { switch config.Type() { + case shared.FILE: + fc, ok := config.(file.Config) + if !ok { + return nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{}) + } + fc.NodeInfo = nodeInfo + return file.NewStateDiffIndexer(ctx, chainConfig, fc) case shared.POSTGRES: pgc, ok := config.(postgres.Config) if !ok { - return nil, fmt.Errorf("ostgres config is not the correct type: got %T, expected %T", config, postgres.Config{}) + return nil, fmt.Errorf("postgres config is not the correct type: got %T, expected %T", config, postgres.Config{}) } var err error var driver sql.Driver diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go index 357a78ece..b75fb1af9 100644 --- a/statediff/indexer/database/dump/indexer.go +++ b/statediff/indexer/database/dump/indexer.go @@ -136,7 +136,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip t = time.Now() // Publish and index header, collect headerID - var headerID int64 + var headerID string headerID, err = sdi.processHeader(blockTx, block.Header(), headerNode, reward, totalDifficulty) if err != nil { return nil, err @@ -181,7 +181,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip // processHeader publishes and indexes a header IPLD in Postgres // it returns the headerID -func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (int64, error) { +func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) { tx.cacheIPLD(headerNode) var baseFee *int64 @@ -190,12 +190,13 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he *baseFee = header.BaseFee.Int64() } + headerID := header.Hash().String() mod := models.HeaderModel{ CID: headerNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), ParentHash: header.ParentHash.String(), BlockNumber: header.Number.String(), - BlockHash: header.Hash().String(), + BlockHash: headerID, TotalDifficulty: td.String(), Reward: reward.String(), Bloom: header.Bloom.Bytes(), @@ -207,11 +208,11 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he BaseFee: baseFee, } _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod) - return 0, err + return headerID, err } // processUncles publishes and indexes uncle IPLDs in Postgres -func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error { +func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error { // publish and index uncles for _, uncleNode := range uncleNodes { tx.cacheIPLD(uncleNode) @@ -223,6 +224,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNum uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) } uncle := models.UncleModel{ + HeaderID: headerID, CID: uncleNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), ParentHash: uncleNode.ParentHash.String(), @@ -238,7 +240,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNum // processArgs bundles arguments to processReceiptsAndTxs type processArgs struct { - headerID int64 + headerID string blockNumber *big.Int receipts types.Receipts txs types.Transactions @@ -263,59 +265,24 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs tx.cacheIPLD(txNode) // Indexing - // extract topic and contract data from the receipt for indexing - mappedContracts := make(map[string]bool) // use map to avoid duplicate addresses - logDataSet := make([]*models.LogsModel, len(receipt.Logs)) - for idx, l := range receipt.Logs { - topicSet := make([]string, 4) - for ti, topic := range l.Topics { - topicSet[ti] = topic.Hex() - } - - if !args.logLeafNodeCIDs[i][idx].Defined() { - return fmt.Errorf("invalid log cid") - } - - mappedContracts[l.Address.String()] = true - logDataSet[idx] = &models.LogsModel{ - Address: l.Address.String(), - Index: int64(l.Index), - Data: l.Data, - LeafCID: args.logLeafNodeCIDs[i][idx].String(), - LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), - Topic0: topicSet[0], - Topic1: topicSet[1], - Topic2: topicSet[2], - Topic3: topicSet[3], - } - } - // these are the contracts seen in the logs - logContracts := make([]string, 0, len(mappedContracts)) - for addr := range mappedContracts { - logContracts = append(logContracts, addr) - } - // this is the contract address if this receipt is for a contract creation tx - contract := shared.HandleZeroAddr(receipt.ContractAddress) - var contractHash string - if contract != "" { - contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String() - } - // index tx first so that the receipt can reference it by FK + // index tx trx := args.txs[i] + trxID := trx.Hash().String() // derive sender for the tx that corresponds with this receipt from, err := types.Sender(signer, trx) if err != nil { return fmt.Errorf("error deriving tx sender: %v", err) } txModel := models.TxModel{ - Dst: shared.HandleZeroAddrPointer(trx.To()), - Src: shared.HandleZeroAddr(from), - TxHash: trx.Hash().String(), - Index: int64(i), - Data: trx.Data(), - CID: txNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(txNode.Cid()), - Type: trx.Type(), + HeaderID: args.headerID, + Dst: shared.HandleZeroAddrPointer(trx.To()), + Src: shared.HandleZeroAddr(from), + TxHash: trxID, + Index: int64(i), + Data: trx.Data(), + CID: txNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(txNode.Cid()), + Type: trx.Type(), } if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", txModel); err != nil { return err @@ -328,6 +295,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs storageKeys[k] = storageKey.Hex() } accessListElementModel := models.AccessListElementModel{ + TxID: trxID, Index: int64(j), Address: accessListElement.Address.Hex(), StorageKeys: storageKeys, @@ -337,12 +305,20 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } } + // this is the contract address if this receipt is for a contract creation tx + contract := shared.HandleZeroAddr(receipt.ContractAddress) + var contractHash string + if contract != "" { + contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String() + } + // index the receipt if !args.rctLeafNodeCIDs[i].Defined() { return fmt.Errorf("invalid receipt leaf node cid") } rctModel := &models.ReceiptModel{ + TxID: trxID, Contract: contract, ContractHash: contractHash, LeafCID: args.rctLeafNodeCIDs[i].String(), @@ -359,6 +335,31 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs return err } + logDataSet := make([]*models.LogsModel, len(receipt.Logs)) + for idx, l := range receipt.Logs { + topicSet := make([]string, 4) + for ti, topic := range l.Topics { + topicSet[ti] = topic.Hex() + } + + if !args.logLeafNodeCIDs[i][idx].Defined() { + return fmt.Errorf("invalid log cid") + } + + logDataSet[idx] = &models.LogsModel{ + ReceiptID: trxID, + Address: l.Address.String(), + Index: int64(l.Index), + Data: l.Data, + LeafCID: args.logLeafNodeCIDs[i][idx].String(), + LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), + Topic0: topicSet[0], + Topic1: topicSet[1], + Topic2: topicSet[2], + Topic3: topicSet[3], + } + } + if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", logDataSet); err != nil { return err } @@ -374,7 +375,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } // PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql -func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode) error { +func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { tx, ok := batch.(*BatchTx) if !ok { return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch) @@ -384,6 +385,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // short circuit if it is a Removed node // this assumes the db has been initialized and a public.blocks entry for the Removed node is present stateModel := models.StateNodeModel{ + HeaderID: headerID, Path: stateNode.Path, StateKey: common.BytesToHash(stateNode.LeafKey).String(), CID: shared.RemovedNodeStateCID, @@ -398,6 +400,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) } stateModel := models.StateNodeModel{ + HeaderID: headerID, Path: stateNode.Path, StateKey: common.BytesToHash(stateNode.LeafKey).String(), CID: stateCIDStr, @@ -422,6 +425,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error decoding state account rlp: %s", err.Error()) } accountModel := models.StateAccountModel{ + HeaderID: headerID, + StatePath: stateNode.Path, Balance: account.Balance.String(), Nonce: account.Nonce, CodeHash: account.CodeHash, @@ -437,6 +442,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // short circuit if it is a Removed node // this assumes the db has been initialized and a public.blocks entry for the Removed node is present storageModel := models.StorageNodeModel{ + HeaderID: headerID, + StatePath: stateNode.Path, Path: storageNode.Path, StorageKey: common.BytesToHash(storageNode.LeafKey).String(), CID: shared.RemovedNodeStorageCID, @@ -453,6 +460,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) } storageModel := models.StorageNodeModel{ + HeaderID: headerID, + StatePath: stateNode.Path, Path: storageNode.Path, StorageKey: common.BytesToHash(storageNode.LeafKey).String(), CID: storageCIDStr, @@ -482,7 +491,7 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd return nil } -// Close satisfied io.Closer +// Close satisfies io.Closer func (sdi *StateDiffIndexer) Close() error { return sdi.dump.Close() } diff --git a/statediff/indexer/database/file/batch_tx.go b/statediff/indexer/database/file/batch_tx.go new file mode 100644 index 000000000..39e5d3713 --- /dev/null +++ b/statediff/indexer/database/file/batch_tx.go @@ -0,0 +1,29 @@ +// VulcanizeDB +// Copyright © 2021 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package file + +// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration +type BatchTx struct { + BlockNumber uint64 + + submit func(blockTx *BatchTx, err error) error +} + +// Submit satisfies indexer.AtomicTx +func (tx *BatchTx) Submit(err error) error { + return tx.submit(tx, err) +} diff --git a/statediff/indexer/database/file/config.go b/statediff/indexer/database/file/config.go new file mode 100644 index 000000000..2553174a3 --- /dev/null +++ b/statediff/indexer/database/file/config.go @@ -0,0 +1,33 @@ +// VulcanizeDB +// Copyright © 2021 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package file + +import ( + "github.com/ethereum/go-ethereum/statediff/indexer/node" + "github.com/ethereum/go-ethereum/statediff/indexer/shared" +) + +// Config holds params for writing sql statements out to a file +type Config struct { + FilePath string + NodeInfo node.Info +} + +// Type satisfies interfaces.Config +func (c Config) Type() shared.DBType { + return shared.FILE +} diff --git a/statediff/indexer/database/file/helpers.go b/statediff/indexer/database/file/helpers.go new file mode 100644 index 000000000..dc635110c --- /dev/null +++ b/statediff/indexer/database/file/helpers.go @@ -0,0 +1,60 @@ +// VulcanizeDB +// Copyright © 2021 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package file + +import "bytes" + +// formatPostgresStringArray parses an array of strings into the proper Postgres string representation of that array +func formatPostgresStringArray(a []string) string { + if a == nil { + return "" + } + + if n := len(a); n > 0 { + // There will be at least two curly brackets, 2*N bytes of quotes, + // and N-1 bytes of delimiters. + b := make([]byte, 1, 1+3*n) + b[0] = '{' + + b = appendArrayQuotedBytes(b, []byte(a[0])) + for i := 1; i < n; i++ { + b = append(b, ',') + b = appendArrayQuotedBytes(b, []byte(a[i])) + } + + return string(append(b, '}')) + } + + return "{}" +} + +func appendArrayQuotedBytes(b, v []byte) []byte { + b = append(b, '"') + for { + i := bytes.IndexAny(v, `"\`) + if i < 0 { + b = append(b, v...) + break + } + if i > 0 { + b = append(b, v[:i]...) + } + b = append(b, '\\', v[i]) + v = v[i+1:] + } + return append(b, '"') +} diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go new file mode 100644 index 000000000..1cc19480a --- /dev/null +++ b/statediff/indexer/database/file/indexer.go @@ -0,0 +1,474 @@ +// VulcanizeDB +// Copyright © 2021 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package file + +import ( + "context" + "errors" + "fmt" + "math/big" + "os" + "sync" + "time" + + "github.com/ipfs/go-cid" + node "github.com/ipfs/go-ipld-format" + "github.com/multiformats/go-multihash" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" + ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld" + "github.com/ethereum/go-ethereum/statediff/indexer/models" + "github.com/ethereum/go-ethereum/statediff/indexer/shared" + sdtypes "github.com/ethereum/go-ethereum/statediff/types" +) + +const defaultFilePath = "./statediff.sql" + +var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} + +var ( + indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) +) + +// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void +type StateDiffIndexer struct { + writer *SQLWriter + chainConfig *params.ChainConfig + nodeID string + wg *sync.WaitGroup +} + +// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer +func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, config Config) (*StateDiffIndexer, error) { + filePath := config.FilePath + if filePath == "" { + filePath = defaultFilePath + } + if _, err := os.Stat(filePath); !errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("cannot create file, file (%s) already exists", filePath) + } + file, err := os.Create(filePath) + if err != nil { + return nil, fmt.Errorf("unable to create file (%s), err: %v", filePath, err) + } + w := NewSQLWriter(file) + wg := new(sync.WaitGroup) + w.Loop() + return &StateDiffIndexer{ + writer: w, + chainConfig: chainConfig, + nodeID: config.NodeInfo.ID, + wg: wg, + }, nil +} + +// ReportDBMetrics has nothing to report for dump +func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {} + +// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts) +// Returns an initiated DB transaction which must be Closed via defer to commit or rollback +func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { + start, t := time.Now(), time.Now() + blockHash := block.Hash() + blockHashStr := blockHash.String() + height := block.NumberU64() + traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHashStr) + transactions := block.Transactions() + // Derive any missing fields + if err := receipts.DeriveFields(sdi.chainConfig, blockHash, height, transactions); err != nil { + return nil, err + } + + // Generate the block iplds + headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, logTrieNodes, logLeafNodeCIDs, rctLeafNodeCIDs, err := ipld2.FromBlockAndReceipts(block, receipts) + if err != nil { + return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err) + } + + if len(txNodes) != len(rctNodes) || len(rctNodes) != len(rctLeafNodeCIDs) { + return nil, fmt.Errorf("expected number of transactions (%d), receipts (%d), and receipt trie leaf nodes (%d) to be equal", len(txNodes), len(rctNodes), len(rctLeafNodeCIDs)) + } + if len(txTrieNodes) != len(rctTrieNodes) { + return nil, fmt.Errorf("expected number of tx trie (%d) and rct trie (%d) nodes to be equal", len(txTrieNodes), len(rctTrieNodes)) + } + + // Calculate reward + var reward *big.Int + // in PoA networks block reward is 0 + if sdi.chainConfig.Clique != nil { + reward = big.NewInt(0) + } else { + reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts) + } + t = time.Now() + + blockTx := &BatchTx{ + BlockNumber: height, + submit: func(self *BatchTx, err error) error { + tDiff := time.Since(t) + indexerMetrics.tStateStoreCodeProcessing.Update(tDiff) + traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) + t = time.Now() + if err := sdi.writer.flush(); err != nil { + traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) + log.Debug(traceMsg) + return err + } + tDiff = time.Since(t) + indexerMetrics.tPostgresCommit.Update(tDiff) + traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) + traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) + log.Debug(traceMsg) + return err + }, + } + tDiff := time.Since(t) + indexerMetrics.tFreePostgres.Update(tDiff) + traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String()) + t = time.Now() + + // write header, collect headerID + headerID := sdi.processHeader(block.Header(), headerNode, reward, totalDifficulty) + tDiff = time.Since(t) + indexerMetrics.tHeaderProcessing.Update(tDiff) + traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) + t = time.Now() + + // write uncles + sdi.processUncles(headerID, height, uncleNodes) + tDiff = time.Since(t) + indexerMetrics.tUncleProcessing.Update(tDiff) + traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) + t = time.Now() + + // write receipts and txs + err = sdi.processReceiptsAndTxs(processArgs{ + headerID: headerID, + blockNumber: block.Number(), + receipts: receipts, + txs: transactions, + rctNodes: rctNodes, + rctTrieNodes: rctTrieNodes, + txNodes: txNodes, + txTrieNodes: txTrieNodes, + logTrieNodes: logTrieNodes, + logLeafNodeCIDs: logLeafNodeCIDs, + rctLeafNodeCIDs: rctLeafNodeCIDs, + }) + if err != nil { + return nil, err + } + tDiff = time.Since(t) + indexerMetrics.tTxAndRecProcessing.Update(tDiff) + traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String()) + t = time.Now() + + return blockTx, err +} + +// processHeader write a header IPLD insert SQL stmt to a file +// it returns the headerID +func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node.Node, reward, td *big.Int) string { + sdi.writer.upsertIPLDNode(headerNode) + + var baseFee *int64 + if header.BaseFee != nil { + baseFee = new(int64) + *baseFee = header.BaseFee.Int64() + } + headerID := header.Hash().String() + sdi.writer.upsertHeaderCID(models.HeaderModel{ + NodeID: sdi.nodeID, + CID: headerNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), + ParentHash: header.ParentHash.String(), + BlockNumber: header.Number.String(), + BlockHash: headerID, + TotalDifficulty: td.String(), + Reward: reward.String(), + Bloom: header.Bloom.Bytes(), + StateRoot: header.Root.String(), + RctRoot: header.ReceiptHash.String(), + TxRoot: header.TxHash.String(), + UncleRoot: header.UncleHash.String(), + Timestamp: header.Time, + BaseFee: baseFee, + }) + return headerID +} + +// processUncles writes uncle IPLD insert SQL stmts to a file +func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) { + // publish and index uncles + for _, uncleNode := range uncleNodes { + sdi.writer.upsertIPLDNode(uncleNode) + var uncleReward *big.Int + // in PoA networks uncle reward is 0 + if sdi.chainConfig.Clique != nil { + uncleReward = big.NewInt(0) + } else { + uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) + } + sdi.writer.upsertUncleCID(models.UncleModel{ + HeaderID: headerID, + CID: uncleNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), + ParentHash: uncleNode.ParentHash.String(), + BlockHash: uncleNode.Hash().String(), + Reward: uncleReward.String(), + }) + } +} + +// processArgs bundles arguments to processReceiptsAndTxs +type processArgs struct { + headerID string + blockNumber *big.Int + receipts types.Receipts + txs types.Transactions + rctNodes []*ipld2.EthReceipt + rctTrieNodes []*ipld2.EthRctTrie + txNodes []*ipld2.EthTx + txTrieNodes []*ipld2.EthTxTrie + logTrieNodes [][]*ipld2.EthLogTrie + logLeafNodeCIDs [][]cid.Cid + rctLeafNodeCIDs []cid.Cid +} + +// processReceiptsAndTxs writes receipt and tx IPLD insert SQL stmts to a file +func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { + // Process receipts and txs + signer := types.MakeSigner(sdi.chainConfig, args.blockNumber) + for i, receipt := range args.receipts { + for _, logTrieNode := range args.logTrieNodes[i] { + sdi.writer.upsertIPLDNode(logTrieNode) + } + txNode := args.txNodes[i] + sdi.writer.upsertIPLDNode(txNode) + + // index tx + trx := args.txs[i] + txID := trx.Hash().String() + // derive sender for the tx that corresponds with this receipt + from, err := types.Sender(signer, trx) + if err != nil { + return fmt.Errorf("error deriving tx sender: %v", err) + } + txModel := models.TxModel{ + HeaderID: args.headerID, + Dst: shared.HandleZeroAddrPointer(trx.To()), + Src: shared.HandleZeroAddr(from), + TxHash: txID, + Index: int64(i), + Data: trx.Data(), + CID: txNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(txNode.Cid()), + Type: trx.Type(), + } + sdi.writer.upsertTransactionCID(txModel) + + // index access list if this is one + for j, accessListElement := range trx.AccessList() { + storageKeys := make([]string, len(accessListElement.StorageKeys)) + for k, storageKey := range accessListElement.StorageKeys { + storageKeys[k] = storageKey.Hex() + } + accessListElementModel := models.AccessListElementModel{ + TxID: txID, + Index: int64(j), + Address: accessListElement.Address.Hex(), + StorageKeys: storageKeys, + } + sdi.writer.upsertAccessListElement(accessListElementModel) + } + + // this is the contract address if this receipt is for a contract creation tx + contract := shared.HandleZeroAddr(receipt.ContractAddress) + var contractHash string + if contract != "" { + contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String() + } + + // index receipt + if !args.rctLeafNodeCIDs[i].Defined() { + return fmt.Errorf("invalid receipt leaf node cid") + } + + rctModel := &models.ReceiptModel{ + TxID: txID, + Contract: contract, + ContractHash: contractHash, + LeafCID: args.rctLeafNodeCIDs[i].String(), + LeafMhKey: shared.MultihashKeyFromCID(args.rctLeafNodeCIDs[i]), + LogRoot: args.rctNodes[i].LogRoot.String(), + } + if len(receipt.PostState) == 0 { + rctModel.PostStatus = receipt.Status + } else { + rctModel.PostState = common.Bytes2Hex(receipt.PostState) + } + sdi.writer.upsertReceiptCID(rctModel) + + // index logs + logDataSet := make([]*models.LogsModel, len(receipt.Logs)) + for idx, l := range receipt.Logs { + topicSet := make([]string, 4) + for ti, topic := range l.Topics { + topicSet[ti] = topic.Hex() + } + + if !args.logLeafNodeCIDs[i][idx].Defined() { + return fmt.Errorf("invalid log cid") + } + + logDataSet[idx] = &models.LogsModel{ + ReceiptID: txID, + Address: l.Address.String(), + Index: int64(l.Index), + Data: l.Data, + LeafCID: args.logLeafNodeCIDs[i][idx].String(), + LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), + Topic0: topicSet[0], + Topic1: topicSet[1], + Topic2: topicSet[2], + Topic3: topicSet[3], + } + } + sdi.writer.upsertLogCID(logDataSet) + } + + // publish trie nodes, these aren't indexed directly + for i, n := range args.txTrieNodes { + sdi.writer.upsertIPLDNode(n) + sdi.writer.upsertIPLDNode(args.rctTrieNodes[i]) + } + + return nil +} + +// PushStateNode writes a state diff node object (including any child storage nodes) IPLD insert SQL stmt to a file +func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { + // publish the state node + if stateNode.NodeType == sdtypes.Removed { + // short circuit if it is a Removed node + // this assumes the db has been initialized and a public.blocks entry for the Removed node is present + stateModel := models.StateNodeModel{ + HeaderID: headerID, + Path: stateNode.Path, + StateKey: common.BytesToHash(stateNode.LeafKey).String(), + CID: shared.RemovedNodeStateCID, + MhKey: shared.RemovedNodeMhKey, + NodeType: stateNode.NodeType.Int(), + } + sdi.writer.upsertStateCID(stateModel) + return nil + } + stateCIDStr, stateMhKey, err := sdi.writer.upsertIPLDRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) + if err != nil { + return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) + } + stateModel := models.StateNodeModel{ + HeaderID: headerID, + Path: stateNode.Path, + StateKey: common.BytesToHash(stateNode.LeafKey).String(), + CID: stateCIDStr, + MhKey: stateMhKey, + NodeType: stateNode.NodeType.Int(), + } + // index the state node + sdi.writer.upsertStateCID(stateModel) + // if we have a leaf, decode and index the account data + if stateNode.NodeType == sdtypes.Leaf { + var i []interface{} + if err := rlp.DecodeBytes(stateNode.NodeValue, &i); err != nil { + return fmt.Errorf("error decoding state leaf node rlp: %s", err.Error()) + } + if len(i) != 2 { + return fmt.Errorf("eth IPLDPublisher expected state leaf node rlp to decode into two elements") + } + var account types.StateAccount + if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil { + return fmt.Errorf("error decoding state account rlp: %s", err.Error()) + } + accountModel := models.StateAccountModel{ + HeaderID: headerID, + StatePath: stateNode.Path, + Balance: account.Balance.String(), + Nonce: account.Nonce, + CodeHash: account.CodeHash, + StorageRoot: account.Root.String(), + } + sdi.writer.upsertStateAccount(accountModel) + } + // if there are any storage nodes associated with this node, publish and index them + for _, storageNode := range stateNode.StorageNodes { + if storageNode.NodeType == sdtypes.Removed { + // short circuit if it is a Removed node + // this assumes the db has been initialized and a public.blocks entry for the Removed node is present + storageModel := models.StorageNodeModel{ + HeaderID: headerID, + StatePath: stateNode.Path, + Path: storageNode.Path, + StorageKey: common.BytesToHash(storageNode.LeafKey).String(), + CID: shared.RemovedNodeStorageCID, + MhKey: shared.RemovedNodeMhKey, + NodeType: storageNode.NodeType.Int(), + } + sdi.writer.upsertStorageCID(storageModel) + continue + } + storageCIDStr, storageMhKey, err := sdi.writer.upsertIPLDRaw(ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue) + if err != nil { + return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) + } + storageModel := models.StorageNodeModel{ + HeaderID: headerID, + StatePath: stateNode.Path, + Path: storageNode.Path, + StorageKey: common.BytesToHash(storageNode.LeafKey).String(), + CID: storageCIDStr, + MhKey: storageMhKey, + NodeType: storageNode.NodeType.Int(), + } + sdi.writer.upsertStorageCID(storageModel) + } + + return nil +} + +// PushCodeAndCodeHash writes code and codehash pairs insert SQL stmts to a file +func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error { + // codec doesn't matter since db key is multihash-based + mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash) + if err != nil { + return fmt.Errorf("error deriving multihash key from codehash: %v", err) + } + sdi.writer.upsertIPLDDirect(mhKey, codeAndCodeHash.Code) + return nil +} + +// Close satisfies io.Closer +func (sdi *StateDiffIndexer) Close() error { + return sdi.writer.Close() +} diff --git a/statediff/indexer/database/file/metrics.go b/statediff/indexer/database/file/metrics.go new file mode 100644 index 000000000..ca6e88f2b --- /dev/null +++ b/statediff/indexer/database/file/metrics.go @@ -0,0 +1,94 @@ +// VulcanizeDB +// Copyright © 2021 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package file + +import ( + "strings" + + "github.com/ethereum/go-ethereum/metrics" +) + +const ( + namespace = "statediff" +) + +// Build a fully qualified metric name +func metricName(subsystem, name string) string { + if name == "" { + return "" + } + parts := []string{namespace, name} + if subsystem != "" { + parts = []string{namespace, subsystem, name} + } + // Prometheus uses _ but geth metrics uses / and replaces + return strings.Join(parts, "/") +} + +type indexerMetricsHandles struct { + // The total number of processed blocks + blocks metrics.Counter + // The total number of processed transactions + transactions metrics.Counter + // The total number of processed receipts + receipts metrics.Counter + // The total number of processed logs + logs metrics.Counter + // The total number of access list entries processed + accessListEntries metrics.Counter + // Time spent waiting for free postgres tx + tFreePostgres metrics.Timer + // Postgres transaction commit duration + tPostgresCommit metrics.Timer + // Header processing time + tHeaderProcessing metrics.Timer + // Uncle processing time + tUncleProcessing metrics.Timer + // Tx and receipt processing time + tTxAndRecProcessing metrics.Timer + // State, storage, and code combined processing time + tStateStoreCodeProcessing metrics.Timer +} + +func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles { + ctx := indexerMetricsHandles{ + blocks: metrics.NewCounter(), + transactions: metrics.NewCounter(), + receipts: metrics.NewCounter(), + logs: metrics.NewCounter(), + accessListEntries: metrics.NewCounter(), + tFreePostgres: metrics.NewTimer(), + tPostgresCommit: metrics.NewTimer(), + tHeaderProcessing: metrics.NewTimer(), + tUncleProcessing: metrics.NewTimer(), + tTxAndRecProcessing: metrics.NewTimer(), + tStateStoreCodeProcessing: metrics.NewTimer(), + } + subsys := "indexer" + reg.Register(metricName(subsys, "blocks"), ctx.blocks) + reg.Register(metricName(subsys, "transactions"), ctx.transactions) + reg.Register(metricName(subsys, "receipts"), ctx.receipts) + reg.Register(metricName(subsys, "logs"), ctx.logs) + reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries) + reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres) + reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit) + reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing) + reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing) + reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing) + reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing) + return ctx +} diff --git a/statediff/indexer/database/file/writer.go b/statediff/indexer/database/file/writer.go new file mode 100644 index 000000000..5ee169229 --- /dev/null +++ b/statediff/indexer/database/file/writer.go @@ -0,0 +1,248 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package file + +import ( + "fmt" + "os" + + blockstore "github.com/ipfs/go-ipfs-blockstore" + dshelp "github.com/ipfs/go-ipfs-ds-help" + node "github.com/ipfs/go-ipld-format" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/statediff/indexer/ipld" + "github.com/ethereum/go-ethereum/statediff/indexer/models" + nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" +) + +var ( + nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000") + collatedStmtSize = 65336 // min(linuxPipeSize, macOSPipeSize) +) + +// SQLWriter writes sql statements to a file +type SQLWriter struct { + file *os.File + stmts chan []byte + collatedStmt []byte + collationIndex int + + quitChan chan struct{} + doneChan chan struct{} +} + +// NewSQLWriter creates a new pointer to a Writer +func NewSQLWriter(file *os.File) *SQLWriter { + return &SQLWriter{ + file: file, + stmts: make(chan []byte), + collatedStmt: make([]byte, collatedStmtSize), + quitChan: make(chan struct{}), + doneChan: make(chan struct{}), + } +} + +// Loop enables concurrent writes to the underlying os.File +// since os.File does not buffer, it utilizes an internal buffer that is the size of a unix pipe +// by using copy() and tracking the index/size of the buffer, we require only the initial memory allocation +func (sqw *SQLWriter) Loop() { + sqw.collationIndex = 0 + go func() { + defer close(sqw.doneChan) + var l int + for { + select { + case stmt := <-sqw.stmts: + l = len(stmt) + if l+sqw.collationIndex+1 > collatedStmtSize { + if err := sqw.flush(); err != nil { + log.Error("error writing cached sql stmts to file", "err", err) + } + } + copy(sqw.collatedStmt[sqw.collationIndex:sqw.collationIndex+l-1], stmt) + sqw.collationIndex += l + case <-sqw.quitChan: + if err := sqw.flush(); err != nil { + log.Error("error writing cached sql stmts to file", "err", err) + } + return + } + } + }() +} + +// Close satisfies io.Closer +func (sqw *SQLWriter) Close() error { + close(sqw.quitChan) + <-sqw.doneChan + return nil +} + +func (sqw *SQLWriter) flush() error { + if _, err := sqw.file.Write(sqw.collatedStmt[0 : sqw.collationIndex-1]); err != nil { + return err + } + sqw.collationIndex = 0 + return nil +} + +const ( + nodeInsert = `INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES (%s, %s, %s, %s, %d) + ON CONFLICT (node_id) DO NOTHING;\n` + + ipldInsert = `INSERT INTO public.blocks (key, data) VALUES (%s, %x) ON CONFLICT (key) DO NOTHING;\n` + + headerInsert = `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) +VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, %d, %d) +ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, eth.header_cids.times_validated + 1, %d);\n` + + headerInsertWithoutBaseFee = `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) +VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, %d, NULL) +ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, eth.header_cids.times_validated + 1, NULL);\n` + + uncleInsert = `INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES (%s, %s, %s, %s, %s, %s) +ON CONFLICT (block_hash) DO NOTHING;\n` + + txInsert = `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES (%s, %s, %s, %s, %s, %d, %s, %s, %d) +ON CONFLICT (tx_hash) DO NOTHING;\n` + + alInsert = `INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES (%s, %d, %s, %s) +ON CONFLICT (tx_id, index) DO NOTHING;\n` + + rctInsert = `INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES (%s, %s, %s, %s, %s, %s, %d, %s) +ON CONFLICT (tx_id) DO NOTHING;\n` + + logInsert = `INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES (%s, %s, %s, %s, %d, %s, %s, %s, %s, %s) +ON CONFLICT (rct_id, index) DO NOTHING;\n` + + stateInsert = `INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES (%s, %s, %s, %s, %d, %t, %s) +ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = (%s, %s, %d, %t, %s);\n` + + accountInsert = `INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES (%s, %s, %s, %d, %s, %s) +ON CONFLICT (header_id, state_path) DO NOTHING;\n` + + storageInsert = `INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES (%s, %s, %s, %s, %s, %d, %t, %s) +ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = (%s, %s, %d, %t, %s);\n` +) + +// ON CONFLICT (node_id) DO UPDATE SET genesis_block = %s, network_id = %s, client_name = %s, chain_id = %s;\n` + +func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) { + sqw.stmts <- []byte(fmt.Sprintf(nodeInsert, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID)) +} + +func (sqw *SQLWriter) upsertIPLD(ipld models.IPLDModel) { + sqw.stmts <- []byte(fmt.Sprintf(ipldInsert, ipld.Key, ipld.Data)) +} + +func (sqw *SQLWriter) upsertIPLDDirect(key string, value []byte) { + sqw.upsertIPLD(models.IPLDModel{ + Key: key, + Data: value, + }) +} + +func (sqw *SQLWriter) upsertIPLDNode(i node.Node) { + sqw.upsertIPLD(models.IPLDModel{ + Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(), + Data: i.RawData(), + }) +} + +func (sqw *SQLWriter) upsertIPLDRaw(codec, mh uint64, raw []byte) (string, string, error) { + c, err := ipld.RawdataToCid(codec, raw, mh) + if err != nil { + return "", "", err + } + prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String() + sqw.upsertIPLD(models.IPLDModel{ + Key: prefixedKey, + Data: raw, + }) + return c.String(), prefixedKey, err +} + +func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) { + var stmt string + if header.BaseFee == nil { + stmt = fmt.Sprintf(headerInsertWithoutBaseFee, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, + header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot, + header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, + header.ParentHash, header.CID, header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, + header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey) + } else { + stmt = fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, + header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot, + header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee, + header.ParentHash, header.CID, header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, + header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, header.BaseFee) + } + sqw.stmts <- []byte(stmt) + indexerMetrics.blocks.Inc(1) +} + +func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) { + sqw.stmts <- []byte(fmt.Sprintf(uncleInsert, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey)) +} + +func (sqw *SQLWriter) upsertTransactionCID(transaction models.TxModel) { + sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type)) + indexerMetrics.transactions.Inc(1) +} + +func (sqw *SQLWriter) upsertAccessListElement(accessListElement models.AccessListElementModel) { + sqw.stmts <- []byte(fmt.Sprintf(alInsert, accessListElement.TxID, accessListElement.Index, accessListElement.Address, formatPostgresStringArray(accessListElement.StorageKeys))) + indexerMetrics.accessListEntries.Inc(1) +} + +func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) { + sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot)) + indexerMetrics.receipts.Inc(1) +} + +func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) { + for _, l := range logs { + sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, l.Topic1, l.Topic2, l.Topic3, l.Data)) + indexerMetrics.logs.Inc(1) + } +} + +func (sqw *SQLWriter) upsertStateCID(stateNode models.StateNodeModel) { + var stateKey string + if stateNode.StateKey != nullHash.String() { + stateKey = stateNode.StateKey + } + sqw.stmts <- []byte(fmt.Sprintf(stateInsert, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, + true, stateNode.MhKey, stateKey, stateNode.CID, stateNode.NodeType, true, stateNode.MhKey)) +} + +func (sqw *SQLWriter) upsertStateAccount(stateAccount models.StateAccountModel) { + sqw.stmts <- []byte(fmt.Sprintf(accountInsert, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, + stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)) +} + +func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) { + var storageKey string + if storageCID.StorageKey != nullHash.String() { + storageKey = storageCID.StorageKey + } + sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, + storageCID.Path, storageCID.NodeType, true, storageCID.MhKey, storageKey, storageCID.CID, storageCID.NodeType, + true, storageCID.MhKey)) +} diff --git a/statediff/indexer/database/sql/batch_tx.go b/statediff/indexer/database/sql/batch_tx.go index ff847eec6..fb1b289a1 100644 --- a/statediff/indexer/database/sql/batch_tx.go +++ b/statediff/indexer/database/sql/batch_tx.go @@ -34,7 +34,6 @@ type BatchTx struct { BlockNumber uint64 ctx context.Context dbtx Tx - headerID int64 stm string quit chan struct{} iplds chan models.IPLDModel diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index fad68bf96..b557ec903 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -187,7 +187,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip t = time.Now() // Publish and index header, collect headerID - var headerID int64 + var headerID string headerID, err = sdi.processHeader(blockTx, block.Header(), headerNode, reward, totalDifficulty) if err != nil { return nil, err @@ -227,13 +227,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String()) t = time.Now() - blockTx.headerID = headerID return blockTx, err } // processHeader publishes and indexes a header IPLD in Postgres // it returns the headerID -func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (int64, error) { +func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) { tx.cacheIPLD(headerNode) var baseFee *int64 @@ -241,14 +240,14 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he baseFee = new(int64) *baseFee = header.BaseFee.Int64() } - + headerID := header.Hash().String() // index header - return sdi.dbWriter.upsertHeaderCID(tx.dbtx, models.HeaderModel{ + return headerID, sdi.dbWriter.upsertHeaderCID(tx.dbtx, models.HeaderModel{ CID: headerNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), ParentHash: header.ParentHash.String(), BlockNumber: header.Number.String(), - BlockHash: header.Hash().String(), + BlockHash: headerID, TotalDifficulty: td.String(), Reward: reward.String(), Bloom: header.Bloom.Bytes(), @@ -262,7 +261,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he } // processUncles publishes and indexes uncle IPLDs in Postgres -func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error { +func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error { // publish and index uncles for _, uncleNode := range uncleNodes { tx.cacheIPLD(uncleNode) @@ -274,13 +273,14 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNum uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) } uncle := models.UncleModel{ + HeaderID: headerID, CID: uncleNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), ParentHash: uncleNode.ParentHash.String(), BlockHash: uncleNode.Hash().String(), Reward: uncleReward.String(), } - if err := sdi.dbWriter.upsertUncleCID(tx.dbtx, uncle, headerID); err != nil { + if err := sdi.dbWriter.upsertUncleCID(tx.dbtx, uncle); err != nil { return err } } @@ -289,7 +289,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNum // processArgs bundles arguments to processReceiptsAndTxs type processArgs struct { - headerID int64 + headerID string blockNumber *big.Int receipts types.Receipts txs types.Transactions @@ -313,63 +313,26 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs txNode := args.txNodes[i] tx.cacheIPLD(txNode) - // Indexing - // extract topic and contract data from the receipt for indexing - mappedContracts := make(map[string]bool) // use map to avoid duplicate addresses - logDataSet := make([]*models.LogsModel, len(receipt.Logs)) - for idx, l := range receipt.Logs { - topicSet := make([]string, 4) - for ti, topic := range l.Topics { - topicSet[ti] = topic.Hex() - } - - if !args.logLeafNodeCIDs[i][idx].Defined() { - return fmt.Errorf("invalid log cid") - } - - mappedContracts[l.Address.String()] = true - logDataSet[idx] = &models.LogsModel{ - Address: l.Address.String(), - Index: int64(l.Index), - Data: l.Data, - LeafCID: args.logLeafNodeCIDs[i][idx].String(), - LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), - Topic0: topicSet[0], - Topic1: topicSet[1], - Topic2: topicSet[2], - Topic3: topicSet[3], - } - } - // these are the contracts seen in the logs - logContracts := make([]string, 0, len(mappedContracts)) - for addr := range mappedContracts { - logContracts = append(logContracts, addr) - } - // this is the contract address if this receipt is for a contract creation tx - contract := shared.HandleZeroAddr(receipt.ContractAddress) - var contractHash string - if contract != "" { - contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String() - } - // index tx first so that the receipt can reference it by FK + // index tx trx := args.txs[i] + txID := trx.Hash().String() // derive sender for the tx that corresponds with this receipt from, err := types.Sender(signer, trx) if err != nil { return fmt.Errorf("error deriving tx sender: %v", err) } txModel := models.TxModel{ - Dst: shared.HandleZeroAddrPointer(trx.To()), - Src: shared.HandleZeroAddr(from), - TxHash: trx.Hash().String(), - Index: int64(i), - Data: trx.Data(), - CID: txNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(txNode.Cid()), - Type: trx.Type(), + HeaderID: args.headerID, + Dst: shared.HandleZeroAddrPointer(trx.To()), + Src: shared.HandleZeroAddr(from), + TxHash: txID, + Index: int64(i), + Data: trx.Data(), + CID: txNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(txNode.Cid()), + Type: trx.Type(), } - txID, err := sdi.dbWriter.upsertTransactionCID(tx.dbtx, txModel, args.headerID) - if err != nil { + if err := sdi.dbWriter.upsertTransactionCID(tx.dbtx, txModel); err != nil { return err } @@ -380,21 +343,30 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs storageKeys[k] = storageKey.Hex() } accessListElementModel := models.AccessListElementModel{ + TxID: txID, Index: int64(j), Address: accessListElement.Address.Hex(), StorageKeys: storageKeys, } - if err := sdi.dbWriter.upsertAccessListElement(tx.dbtx, accessListElementModel, txID); err != nil { + if err := sdi.dbWriter.upsertAccessListElement(tx.dbtx, accessListElementModel); err != nil { return err } } - // index the receipt + // this is the contract address if this receipt is for a contract creation tx + contract := shared.HandleZeroAddr(receipt.ContractAddress) + var contractHash string + if contract != "" { + contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String() + } + + // index receipt if !args.rctLeafNodeCIDs[i].Defined() { return fmt.Errorf("invalid receipt leaf node cid") } rctModel := &models.ReceiptModel{ + TxID: txID, Contract: contract, ContractHash: contractHash, LeafCID: args.rctLeafNodeCIDs[i].String(), @@ -407,12 +379,37 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs rctModel.PostState = common.Bytes2Hex(receipt.PostState) } - receiptID, err := sdi.dbWriter.upsertReceiptCID(tx.dbtx, rctModel, txID) - if err != nil { + if err := sdi.dbWriter.upsertReceiptCID(tx.dbtx, rctModel); err != nil { return err } - if err = sdi.dbWriter.upsertLogCID(tx.dbtx, logDataSet, receiptID); err != nil { + // index logs + logDataSet := make([]*models.LogsModel, len(receipt.Logs)) + for idx, l := range receipt.Logs { + topicSet := make([]string, 4) + for ti, topic := range l.Topics { + topicSet[ti] = topic.Hex() + } + + if !args.logLeafNodeCIDs[i][idx].Defined() { + return fmt.Errorf("invalid log cid") + } + + logDataSet[idx] = &models.LogsModel{ + ReceiptID: txID, + Address: l.Address.String(), + Index: int64(l.Index), + Data: l.Data, + LeafCID: args.logLeafNodeCIDs[i][idx].String(), + LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), + Topic0: topicSet[0], + Topic1: topicSet[1], + Topic2: topicSet[2], + Topic3: topicSet[3], + } + } + + if err := sdi.dbWriter.upsertLogCID(tx.dbtx, logDataSet); err != nil { return err } } @@ -427,7 +424,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } // PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql -func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode) error { +func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { tx, ok := batch.(*BatchTx) if !ok { return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch) @@ -437,29 +434,29 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // short circuit if it is a Removed node // this assumes the db has been initialized and a public.blocks entry for the Removed node is present stateModel := models.StateNodeModel{ + HeaderID: headerID, Path: stateNode.Path, StateKey: common.BytesToHash(stateNode.LeafKey).String(), CID: shared.RemovedNodeStateCID, MhKey: shared.RemovedNodeMhKey, NodeType: stateNode.NodeType.Int(), } - _, err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID) - return err + return sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel) } stateCIDStr, stateMhKey, err := tx.cacheRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) if err != nil { return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) } stateModel := models.StateNodeModel{ + HeaderID: headerID, Path: stateNode.Path, StateKey: common.BytesToHash(stateNode.LeafKey).String(), CID: stateCIDStr, MhKey: stateMhKey, NodeType: stateNode.NodeType.Int(), } - // index the state node, collect the stateID to reference by FK - stateID, err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID) - if err != nil { + // index the state node + if err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel); err != nil { return err } // if we have a leaf, decode and index the account data @@ -476,12 +473,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error decoding state account rlp: %s", err.Error()) } accountModel := models.StateAccountModel{ + HeaderID: headerID, + StatePath: stateNode.Path, Balance: account.Balance.String(), Nonce: account.Nonce, CodeHash: account.CodeHash, StorageRoot: account.Root.String(), } - if err := sdi.dbWriter.upsertStateAccount(tx.dbtx, accountModel, stateID); err != nil { + if err := sdi.dbWriter.upsertStateAccount(tx.dbtx, accountModel); err != nil { return err } } @@ -491,13 +490,15 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // short circuit if it is a Removed node // this assumes the db has been initialized and a public.blocks entry for the Removed node is present storageModel := models.StorageNodeModel{ + HeaderID: headerID, + StatePath: stateNode.Path, Path: storageNode.Path, StorageKey: common.BytesToHash(storageNode.LeafKey).String(), CID: shared.RemovedNodeStorageCID, MhKey: shared.RemovedNodeMhKey, NodeType: storageNode.NodeType.Int(), } - if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel, stateID); err != nil { + if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel); err != nil { return err } continue @@ -507,13 +508,15 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) } storageModel := models.StorageNodeModel{ + HeaderID: headerID, + StatePath: stateNode.Path, Path: storageNode.Path, StorageKey: common.BytesToHash(storageNode.LeafKey).String(), CID: storageCIDStr, MhKey: storageMhKey, NodeType: storageNode.NodeType.Int(), } - if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel, stateID); err != nil { + if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel); err != nil { return err } } @@ -536,7 +539,7 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd return nil } -// Close satisfied io.Closer +// Close satisfies io.Closer func (sdi *StateDiffIndexer) Close() error { return sdi.dbWriter.db.Close() } diff --git a/statediff/indexer/database/sql/interfaces.go b/statediff/indexer/database/sql/interfaces.go index 3ed1a11e7..445b35d9b 100644 --- a/statediff/indexer/database/sql/interfaces.go +++ b/statediff/indexer/database/sql/interfaces.go @@ -36,7 +36,7 @@ type Driver interface { Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error Begin(ctx context.Context) (Tx, error) Stats() Stats - NodeID() int64 + NodeID() string Context() context.Context io.Closer } diff --git a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go index d08336e63..21b74b3b2 100644 --- a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go @@ -20,7 +20,6 @@ import ( "context" "testing" - "github.com/jmoiron/sqlx" "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" @@ -53,7 +52,7 @@ func setupLegacyPGX(t *testing.T) { } }() for _, node := range legacyData.StateDiffs { - err = ind.PushStateNode(tx, node) + err = ind.PushStateNode(tx, node, legacyData.MockBlock.Hash().String()) require.NoError(t, err) } @@ -64,20 +63,21 @@ func TestPGXIndexerLegacy(t *testing.T) { t.Run("Publish and index header IPLDs in a legacy tx", func(t *testing.T) { setupLegacyPGX(t) defer tearDown(t) - pgStr := `SELECT cid, td, reward, id, base_fee + pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, base_fee FROM eth.header_cids WHERE block_number = $1` // check header was properly indexed type res struct { - CID string - TD string - Reward string - ID int - BaseFee *int64 `db:"base_fee"` + CID string + TD string + Reward string + BlockHash string `db:"block_hash"` + BaseFee *int64 `db:"base_fee"` } header := new(res) - err = db.QueryRow(context.Background(), pgStr, legacyData.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header) + err = db.QueryRow(context.Background(), pgStr, legacyData.BlockNumber.Uint64()).Scan( + &header.CID, &header.TD, &header.Reward, &header.BlockHash, &header.BaseFee) require.NoError(t, err) test_helpers.ExpectEqual(t, header.CID, legacyHeaderCID.String()) diff --git a/statediff/indexer/database/sql/pgx_indexer_test.go b/statediff/indexer/database/sql/pgx_indexer_test.go index f63efe712..a86927341 100644 --- a/statediff/indexer/database/sql/pgx_indexer_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_test.go @@ -26,7 +26,6 @@ import ( "github.com/ipfs/go-cid" blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" - "github.com/jmoiron/sqlx" "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" @@ -140,7 +139,7 @@ func setupPGX(t *testing.T) { } }() for _, node := range mocks.StateDiffs { - err = ind.PushStateNode(tx, node) + err = ind.PushStateNode(tx, node, mockBlock.Hash().String()) if err != nil { t.Fatal(err) } @@ -153,19 +152,24 @@ func TestPGXIndexer(t *testing.T) { t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) { setupPGX(t) defer tearDown(t) - pgStr := `SELECT cid, td, reward, id, base_fee + pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, base_fee FROM eth.header_cids WHERE block_number = $1` // check header was properly indexed type res struct { - CID string - TD string - Reward string - ID int - BaseFee *int64 `db:"base_fee"` + CID string + TD string + Reward string + BlockHash string `db:"block_hash"` + BaseFee *int64 `db:"base_fee"` } header := new(res) - err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header) + err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).Scan( + &header.CID, + &header.TD, + &header.Reward, + &header.BlockHash, + &header.BaseFee) if err != nil { t.Fatal(err) } @@ -192,7 +196,7 @@ func TestPGXIndexer(t *testing.T) { defer tearDown(t) // check that txs were properly indexed trxs := make([]string, 0) - pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id) + pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash) WHERE header_cids.block_number = $1` err = db.Select(context.Background(), &trxs, pgStr, mocks.BlockNumber.Uint64()) if err != nil { @@ -221,46 +225,46 @@ func TestPGXIndexer(t *testing.T) { switch c { case trx1CID.String(): test_helpers.ExpectEqual(t, data, tx1) - var txType *uint8 + var txType uint8 err = db.Get(context.Background(), &txType, txTypePgStr, c) if err != nil { t.Fatal(err) } - if txType != nil { - t.Fatalf("expected nil tx_type, got %d", *txType) + if txType != 0 { + t.Fatalf("expected tx_type 0, got %d", txType) } case trx2CID.String(): test_helpers.ExpectEqual(t, data, tx2) - var txType *uint8 + var txType uint8 err = db.Get(context.Background(), &txType, txTypePgStr, c) if err != nil { t.Fatal(err) } - if txType != nil { - t.Fatalf("expected nil tx_type, got %d", *txType) + if txType != 0 { + t.Fatalf("expected tx_type 0, got %d", txType) } case trx3CID.String(): test_helpers.ExpectEqual(t, data, tx3) - var txType *uint8 + var txType uint8 err = db.Get(context.Background(), &txType, txTypePgStr, c) if err != nil { t.Fatal(err) } - if txType != nil { - t.Fatalf("expected nil tx_type, got %d", *txType) + if txType != 0 { + t.Fatalf("expected tx_type 0, got %d", txType) } case trx4CID.String(): test_helpers.ExpectEqual(t, data, tx4) - var txType *uint8 + var txType uint8 err = db.Get(context.Background(), &txType, txTypePgStr, c) if err != nil { t.Fatal(err) } - if *txType != types.AccessListTxType { - t.Fatalf("expected AccessListTxType (1), got %d", *txType) + if txType != types.AccessListTxType { + t.Fatalf("expected AccessListTxType (1), got %d", txType) } accessListElementModels := make([]models.AccessListElementModel, 0) - pgStr = `SELECT access_list_element.* FROM eth.access_list_element INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.id) WHERE cid = $1 ORDER BY access_list_element.index ASC` + pgStr = `SELECT access_list_element.* FROM eth.access_list_element INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_element.index ASC` err = db.Select(context.Background(), &accessListElementModels, pgStr, c) if err != nil { t.Fatal(err) @@ -299,8 +303,8 @@ func TestPGXIndexer(t *testing.T) { rcts := make([]string, 0) pgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids - WHERE receipt_cids.tx_id = transaction_cids.id - AND transaction_cids.header_id = header_cids.id + WHERE receipt_cids.tx_id = transaction_cids.tx_hash + AND transaction_cids.header_id = header_cids.block_hash AND header_cids.block_number = $1 ORDER BY transaction_cids.index` err = db.Select(context.Background(), &rcts, pgStr, mocks.BlockNumber.Uint64()) @@ -317,8 +321,8 @@ func TestPGXIndexer(t *testing.T) { } for i := range rcts { results := make([]logIPLD, 0) - pgStr = `SELECT log_cids.index, log_cids.address, log_cids.Topic0, log_cids.Topic1, data FROM eth.log_cids - INNER JOIN eth.receipt_cids ON (log_cids.receipt_id = receipt_cids.id) + pgStr = `SELECT log_cids.index, log_cids.address, log_cids.topic0, log_cids.topic1, data FROM eth.log_cids + INNER JOIN eth.receipt_cids ON (log_cids.rct_id = receipt_cids.tx_id) INNER JOIN public.blocks ON (log_cids.leaf_mh_key = blocks.key) WHERE receipt_cids.leaf_cid = $1 ORDER BY eth.log_cids.index ASC` err = db.Select(context.Background(), &results, pgStr, rcts[i]) @@ -350,9 +354,9 @@ func TestPGXIndexer(t *testing.T) { // check receipts were properly indexed rcts := make([]string, 0) pgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids - WHERE receipt_cids.tx_id = transaction_cids.id - AND transaction_cids.header_id = header_cids.id - AND header_cids.block_number = $1 order by transaction_cids.id` + WHERE receipt_cids.tx_id = transaction_cids.tx_hash + AND transaction_cids.header_id = header_cids.block_hash + AND header_cids.block_number = $1 order by transaction_cids.index` err = db.Select(context.Background(), &rcts, pgStr, mocks.BlockNumber.Uint64()) if err != nil { t.Fatal(err) @@ -447,8 +451,8 @@ func TestPGXIndexer(t *testing.T) { defer tearDown(t) // check that state nodes were properly indexed and published stateNodes := make([]models.StateNodeModel, 0) - pgStr := `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id - FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id) + pgStr := `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id + FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash) WHERE header_cids.block_number = $1 AND node_type != 3` err = db.Select(context.Background(), &stateNodes, pgStr, mocks.BlockNumber.Uint64()) if err != nil { @@ -467,9 +471,9 @@ func TestPGXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - pgStr = `SELECT * from eth.state_accounts WHERE state_id = $1` + pgStr = `SELECT header_id, state_path, cast(balance AS TEXT), nonce, code_hash, storage_root from eth.state_accounts WHERE header_id = $1 AND state_path = $2` var account models.StateAccountModel - err = db.Get(context.Background(), &account, pgStr, stateNode.ID) + err = db.Get(context.Background(), &account, pgStr, stateNode.HeaderID, stateNode.Path) if err != nil { t.Fatal(err) } @@ -479,8 +483,8 @@ func TestPGXIndexer(t *testing.T) { test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x06'}) test_helpers.ExpectEqual(t, data, mocks.ContractLeafNode) test_helpers.ExpectEqual(t, account, models.StateAccountModel{ - ID: account.ID, - StateID: stateNode.ID, + HeaderID: account.HeaderID, + StatePath: stateNode.Path, Balance: "0", CodeHash: mocks.ContractCodeHash.Bytes(), StorageRoot: mocks.ContractRoot, @@ -493,8 +497,8 @@ func TestPGXIndexer(t *testing.T) { test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x0c'}) test_helpers.ExpectEqual(t, data, mocks.AccountLeafNode) test_helpers.ExpectEqual(t, account, models.StateAccountModel{ - ID: account.ID, - StateID: stateNode.ID, + HeaderID: account.HeaderID, + StatePath: stateNode.Path, Balance: "1000", CodeHash: mocks.AccountCodeHash.Bytes(), StorageRoot: mocks.AccountRoot, @@ -505,8 +509,8 @@ func TestPGXIndexer(t *testing.T) { // check that Removed state nodes were properly indexed and published stateNodes = make([]models.StateNodeModel, 0) - pgStr = `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id - FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id) + pgStr = `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id + FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash) WHERE header_cids.block_number = $1 AND node_type = 3` err = db.Select(context.Background(), &stateNodes, pgStr, mocks.BlockNumber.Uint64()) if err != nil { @@ -538,8 +542,8 @@ func TestPGXIndexer(t *testing.T) { storageNodes := make([]models.StorageNodeWithStateKeyModel, 0) pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path FROM eth.storage_cids, eth.state_cids, eth.header_cids - WHERE storage_cids.state_id = state_cids.id - AND state_cids.header_id = header_cids.id + WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id) + AND state_cids.header_id = header_cids.block_hash AND header_cids.block_number = $1 AND storage_cids.node_type != 3` err = db.Select(context.Background(), &storageNodes, pgStr, mocks.BlockNumber.Uint64()) @@ -571,8 +575,8 @@ func TestPGXIndexer(t *testing.T) { storageNodes = make([]models.StorageNodeWithStateKeyModel, 0) pgStr = `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path FROM eth.storage_cids, eth.state_cids, eth.header_cids - WHERE storage_cids.state_id = state_cids.id - AND state_cids.header_id = header_cids.id + WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id) + AND state_cids.header_id = header_cids.block_hash AND header_cids.block_number = $1 AND storage_cids.node_type = 3` err = db.Select(context.Background(), &storageNodes, pgStr, mocks.BlockNumber.Uint64()) diff --git a/statediff/indexer/database/sql/postgres/config.go b/statediff/indexer/database/sql/postgres/config.go index a7c7cc9b4..5794bd0af 100644 --- a/statediff/indexer/database/sql/postgres/config.go +++ b/statediff/indexer/database/sql/postgres/config.go @@ -49,9 +49,9 @@ func ResolveDriverType(str string) (DriverType, error) { var DefaultConfig = Config{ Hostname: "localhost", Port: 5432, - DatabaseName: "vulcanize_test", + DatabaseName: "vulcanize_testing", Username: "postgres", - Password: "", + Password: "password", } // Config holds params for a Postgres db diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index 3fe7f652e..213638017 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -22,14 +22,7 @@ var _ sql.Database = &DB{} const ( createNodeStm = `INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (genesis_block, network_id, node_id, chain_id) - DO UPDATE - SET genesis_block = $1, - network_id = $2, - node_id = $3, - client_name = $4, - chain_id = $5 - RETURNING id` + ON CONFLICT (node_id) DO NOTHING` ) // NewPostgresDB returns a postgres.DB using the provided driver @@ -37,7 +30,7 @@ func NewPostgresDB(driver sql.Driver) *DB { return &DB{driver} } -// DB implements sql.Databse using a configured driver and Postgres statement syntax +// DB implements sql.Database using a configured driver and Postgres statement syntax type DB struct { sql.Driver } @@ -46,59 +39,55 @@ type DB struct { func (db *DB) InsertHeaderStm() string { return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) - ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16) - RETURNING id` + ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)` } // InsertUncleStm satisfies the sql.Statements interface func (db *DB) InsertUncleStm() string { return `INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (header_id, block_hash) DO UPDATE SET (parent_hash, cid, reward, mh_key) = ($3, $4, $5, $6)` + ON CONFLICT (block_hash) DO NOTHING` } // InsertTxStm satisfies the sql.Statements interface func (db *DB) InsertTxStm() string { return `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, tx_data, tx_type) = ($3, $4, $5, $6, $7, $8, $9) - RETURNING id` + ON CONFLICT (tx_hash) DO NOTHING` } // InsertAccessListElementStm satisfies the sql.Statements interface func (db *DB) InsertAccessListElementStm() string { return `INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4) - ON CONFLICT (tx_id, index) DO UPDATE SET (address, storage_keys) = ($3, $4)` + ON CONFLICT (tx_id, index) DO NOTHING` } // InsertRctStm satisfies the sql.Statements interface func (db *DB) InsertRctStm() string { return `INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - ON CONFLICT (tx_id) DO UPDATE SET (leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) = ($2, $3, $4, $5, $6, $7, $8) - RETURNING id` + ON CONFLICT (tx_id) DO NOTHING` } // InsertLogStm satisfies the sql.Statements interface func (db *DB) InsertLogStm() string { - return `INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, receipt_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) - ON CONFLICT (receipt_id, index) DO UPDATE SET (leaf_cid, leaf_mh_key, address, topic0, topic1, topic2, topic3, log_data) = ($1, $2, $4, $6, $7, $8, $9, $10)` + return `INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + ON CONFLICT (rct_id, index) DO NOTHING` } // InsertStateStm satisfies the sql.Statements interface func (db *DB) InsertStateStm() string { return `INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7) - RETURNING id` + ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)` } // InsertAccountStm satisfies the sql.Statements interface func (db *DB) InsertAccountStm() string { - return `INSERT INTO eth.state_accounts (state_id, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (state_id) DO UPDATE SET (balance, nonce, code_hash, storage_root) = ($2, $3, $4, $5)` + return `INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (header_id, state_path) DO NOTHING` } // InsertStorageStm satisfies the sql.Statements interface func (db *DB) InsertStorageStm() string { - return `INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)` + return `INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($3, $4, $6, $7, $8)` } // InsertIPLDStm satisfies the sql.Statements interface diff --git a/statediff/indexer/database/sql/postgres/pgx.go b/statediff/indexer/database/sql/postgres/pgx.go index 9f6701400..936a3765d 100644 --- a/statediff/indexer/database/sql/postgres/pgx.go +++ b/statediff/indexer/database/sql/postgres/pgx.go @@ -34,7 +34,7 @@ type PGXDriver struct { ctx context.Context pool *pgxpool.Pool nodeInfo node.Info - nodeID int64 + nodeID string } // NewPGXDriver returns a new pgx driver @@ -89,17 +89,16 @@ func MakeConfig(config Config) (*pgxpool.Config, error) { } func (pgx *PGXDriver) createNode() error { - var nodeID int64 - err := pgx.pool.QueryRow( + _, err := pgx.pool.Exec( pgx.ctx, createNodeStm, pgx.nodeInfo.GenesisBlock, pgx.nodeInfo.NetworkID, pgx.nodeInfo.ID, pgx.nodeInfo.ClientName, - pgx.nodeInfo.ChainID).Scan(&nodeID) + pgx.nodeInfo.ChainID) if err != nil { return ErrUnableToSetNode(err) } - pgx.nodeID = nodeID + pgx.nodeID = pgx.nodeInfo.ID return nil } @@ -138,13 +137,8 @@ func (pgx *PGXDriver) Stats() sql.Stats { return pgxStatsWrapper{stats: stats} } -// NodeInfo satisfies sql.Database -func (pgx *PGXDriver) NodeInfo() node.Info { - return pgx.nodeInfo -} - // NodeID satisfies sql.Database -func (pgx *PGXDriver) NodeID() int64 { +func (pgx *PGXDriver) NodeID() string { return pgx.nodeID } diff --git a/statediff/indexer/database/sql/postgres/sqlx.go b/statediff/indexer/database/sql/postgres/sqlx.go index 684fc7bf0..406b44a19 100644 --- a/statediff/indexer/database/sql/postgres/sqlx.go +++ b/statediff/indexer/database/sql/postgres/sqlx.go @@ -32,7 +32,7 @@ type SQLXDriver struct { ctx context.Context db *sqlx.DB nodeInfo node.Info - nodeID int64 + nodeID string } // NewSQLXDriver returns a new sqlx driver for Postgres @@ -60,16 +60,15 @@ func NewSQLXDriver(ctx context.Context, config Config, node node.Info) (*SQLXDri } func (driver *SQLXDriver) createNode() error { - var nodeID int64 - err := driver.db.QueryRowx( + _, err := driver.db.Exec( createNodeStm, driver.nodeInfo.GenesisBlock, driver.nodeInfo.NetworkID, driver.nodeInfo.ID, driver.nodeInfo.ClientName, - driver.nodeInfo.ChainID).Scan(&nodeID) + driver.nodeInfo.ChainID) if err != nil { return ErrUnableToSetNode(err) } - driver.nodeID = nodeID + driver.nodeID = driver.nodeInfo.ID return nil } @@ -107,13 +106,8 @@ func (driver *SQLXDriver) Stats() sql.Stats { return sqlxStatsWrapper{stats: stats} } -// NodeInfo satisfies sql.Database -func (driver *SQLXDriver) NodeInfo() node.Info { - return driver.nodeInfo -} - // NodeID satisfies sql.Database -func (driver *SQLXDriver) NodeID() int64 { +func (driver *SQLXDriver) NodeID() string { return driver.nodeID } diff --git a/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go b/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go index 2ce5f494f..4349850ed 100644 --- a/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go +++ b/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go @@ -62,7 +62,7 @@ func setupLegacySQLX(t *testing.T) { } }() for _, node := range legacyData.StateDiffs { - err = ind.PushStateNode(tx, node) + err = ind.PushStateNode(tx, node, mockLegacyBlock.Hash().String()) require.NoError(t, err) } @@ -73,16 +73,16 @@ func TestSQLXIndexerLegacy(t *testing.T) { t.Run("Publish and index header IPLDs in a legacy tx", func(t *testing.T) { setupLegacySQLX(t) defer tearDown(t) - pgStr := `SELECT cid, td, reward, id, base_fee + pgStr := `SELECT cid, td, reward, block_hash, base_fee FROM eth.header_cids WHERE block_number = $1` // check header was properly indexed type res struct { - CID string - TD string - Reward string - ID int - BaseFee *int64 `db:"base_fee"` + CID string + TD string + Reward string + BlockHash string `db:"block_hash"` + BaseFee *int64 `db:"base_fee"` } header := new(res) err = db.QueryRow(context.Background(), pgStr, legacyData.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header) diff --git a/statediff/indexer/database/sql/sqlx_indexer_test.go b/statediff/indexer/database/sql/sqlx_indexer_test.go index 0fa4e8c1a..09ee62fa3 100644 --- a/statediff/indexer/database/sql/sqlx_indexer_test.go +++ b/statediff/indexer/database/sql/sqlx_indexer_test.go @@ -159,7 +159,7 @@ func setupSQLX(t *testing.T) { } }() for _, node := range mocks.StateDiffs { - err = ind.PushStateNode(tx, node) + err = ind.PushStateNode(tx, node, mockBlock.Hash().String()) if err != nil { t.Fatal(err) } @@ -179,16 +179,16 @@ func TestSQLXIndexer(t *testing.T) { t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) { setupSQLX(t) defer tearDown(t) - pgStr := `SELECT cid, td, reward, id, base_fee + pgStr := `SELECT cid, td, reward, block_hash, base_fee FROM eth.header_cids WHERE block_number = $1` // check header was properly indexed type res struct { - CID string - TD string - Reward string - ID int - BaseFee *int64 `db:"base_fee"` + CID string + TD string + Reward string + BlockHash string `db:"block_hash"` + BaseFee *int64 `db:"base_fee"` } header := new(res) err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header) @@ -218,7 +218,7 @@ func TestSQLXIndexer(t *testing.T) { defer tearDown(t) // check that txs were properly indexed trxs := make([]string, 0) - pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id) + pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash) WHERE header_cids.block_number = $1` err = db.Select(context.Background(), &trxs, pgStr, mocks.BlockNumber.Uint64()) if err != nil { @@ -286,7 +286,7 @@ func TestSQLXIndexer(t *testing.T) { t.Fatalf("expected AccessListTxType (1), got %d", txType) } accessListElementModels := make([]models.AccessListElementModel, 0) - pgStr = `SELECT access_list_element.* FROM eth.access_list_element INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.id) WHERE cid = $1 ORDER BY access_list_element.index ASC` + pgStr = `SELECT access_list_element.* FROM eth.access_list_element INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_element.index ASC` err = db.Select(context.Background(), &accessListElementModels, pgStr, c) if err != nil { t.Fatal(err) @@ -325,8 +325,8 @@ func TestSQLXIndexer(t *testing.T) { rcts := make([]string, 0) pgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids - WHERE receipt_cids.tx_id = transaction_cids.id - AND transaction_cids.header_id = header_cids.id + WHERE receipt_cids.tx_id = transaction_cids.tx_hash + AND transaction_cids.header_id = header_cids.block_hash AND header_cids.block_number = $1 ORDER BY transaction_cids.index` err = db.Select(context.Background(), &rcts, pgStr, mocks.BlockNumber.Uint64()) @@ -343,8 +343,8 @@ func TestSQLXIndexer(t *testing.T) { } for i := range rcts { results := make([]logIPLD, 0) - pgStr = `SELECT log_cids.index, log_cids.address, log_cids.Topic0, log_cids.Topic1, data FROM eth.log_cids - INNER JOIN eth.receipt_cids ON (log_cids.receipt_id = receipt_cids.id) + pgStr = `SELECT log_cids.index, log_cids.address, log_cids.topic0, log_cids.topic1, data FROM eth.log_cids + INNER JOIN eth.receipt_cids ON (log_cids.rct_id = receipt_cids.tx_id) INNER JOIN public.blocks ON (log_cids.leaf_mh_key = blocks.key) WHERE receipt_cids.leaf_cid = $1 ORDER BY eth.log_cids.index ASC` err = db.Select(context.Background(), &results, pgStr, rcts[i]) @@ -376,9 +376,9 @@ func TestSQLXIndexer(t *testing.T) { // check receipts were properly indexed rcts := make([]string, 0) pgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids - WHERE receipt_cids.tx_id = transaction_cids.id - AND transaction_cids.header_id = header_cids.id - AND header_cids.block_number = $1 order by transaction_cids.id` + WHERE receipt_cids.tx_id = transaction_cids.tx_hash + AND transaction_cids.header_id = header_cids.block_hash + AND header_cids.block_number = $1 order by transaction_cids.index` err = db.Select(context.Background(), &rcts, pgStr, mocks.BlockNumber.Uint64()) if err != nil { t.Fatal(err) @@ -472,8 +472,8 @@ func TestSQLXIndexer(t *testing.T) { defer tearDown(t) // check that state nodes were properly indexed and published stateNodes := make([]models.StateNodeModel, 0) - pgStr := `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id - FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id) + pgStr := `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id + FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash) WHERE header_cids.block_number = $1 AND node_type != 3` err = db.Select(context.Background(), &stateNodes, pgStr, mocks.BlockNumber.Uint64()) if err != nil { @@ -492,9 +492,9 @@ func TestSQLXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - pgStr = `SELECT * from eth.state_accounts WHERE state_id = $1` + pgStr = `SELECT * from eth.state_accounts WHERE header_id = $1 AND state_path = $2` var account models.StateAccountModel - err = db.Get(context.Background(), &account, pgStr, stateNode.ID) + err = db.Get(context.Background(), &account, pgStr, stateNode.HeaderID, stateNode.Path) if err != nil { t.Fatal(err) } @@ -504,8 +504,8 @@ func TestSQLXIndexer(t *testing.T) { test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x06'}) test_helpers.ExpectEqual(t, data, mocks.ContractLeafNode) test_helpers.ExpectEqual(t, account, models.StateAccountModel{ - ID: account.ID, - StateID: stateNode.ID, + HeaderID: account.HeaderID, + StatePath: stateNode.Path, Balance: "0", CodeHash: mocks.ContractCodeHash.Bytes(), StorageRoot: mocks.ContractRoot, @@ -518,8 +518,8 @@ func TestSQLXIndexer(t *testing.T) { test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x0c'}) test_helpers.ExpectEqual(t, data, mocks.AccountLeafNode) test_helpers.ExpectEqual(t, account, models.StateAccountModel{ - ID: account.ID, - StateID: stateNode.ID, + HeaderID: account.HeaderID, + StatePath: stateNode.Path, Balance: "1000", CodeHash: mocks.AccountCodeHash.Bytes(), StorageRoot: mocks.AccountRoot, @@ -530,8 +530,8 @@ func TestSQLXIndexer(t *testing.T) { // check that Removed state nodes were properly indexed and published stateNodes = make([]models.StateNodeModel, 0) - pgStr = `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id - FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id) + pgStr = `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id + FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash) WHERE header_cids.block_number = $1 AND node_type = 3` err = db.Select(context.Background(), &stateNodes, pgStr, mocks.BlockNumber.Uint64()) if err != nil { @@ -563,8 +563,8 @@ func TestSQLXIndexer(t *testing.T) { storageNodes := make([]models.StorageNodeWithStateKeyModel, 0) pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path FROM eth.storage_cids, eth.state_cids, eth.header_cids - WHERE storage_cids.state_id = state_cids.id - AND state_cids.header_id = header_cids.id + WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id) + AND state_cids.header_id = header_cids.block_hash AND header_cids.block_number = $1 AND storage_cids.node_type != 3` err = db.Select(context.Background(), &storageNodes, pgStr, mocks.BlockNumber.Uint64()) @@ -596,8 +596,8 @@ func TestSQLXIndexer(t *testing.T) { storageNodes = make([]models.StorageNodeWithStateKeyModel, 0) pgStr = `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path FROM eth.storage_cids, eth.state_cids, eth.header_cids - WHERE storage_cids.state_id = state_cids.id - AND state_cids.header_id = header_cids.id + WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id) + AND state_cids.header_id = header_cids.block_hash AND header_cids.block_number = $1 AND storage_cids.node_type = 3` err = db.Select(context.Background(), &storageNodes, pgStr, mocks.BlockNumber.Uint64()) diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index ea276dfbf..94b38c7e1 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -39,41 +39,56 @@ func NewWriter(db Database) *Writer { } } -func (in *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) (int64, error) { - var headerID int64 - err := tx.QueryRow(in.db.Context(), in.db.InsertHeaderStm(), +/* +INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) +ON CONFLICT (block_hash) DO UPDATE SET (block_number, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = ($1, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16) +*/ +func (in *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error { + _, err := tx.Exec(in.db.Context(), in.db.InsertHeaderStm(), header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, in.db.NodeID(), header.Reward, header.StateRoot, header.TxRoot, - header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee).Scan(&headerID) + header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee) if err != nil { - return 0, fmt.Errorf("error upserting header_cids entry: %v", err) + return fmt.Errorf("error upserting header_cids entry: %v", err) } indexerMetrics.blocks.Inc(1) - return headerID, nil + return nil } -func (in *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel, headerID int64) error { +/* +INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6) +ON CONFLICT (block_hash) DO NOTHING +*/ +func (in *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error { _, err := tx.Exec(in.db.Context(), in.db.InsertUncleStm(), - uncle.BlockHash, headerID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey) + uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey) if err != nil { return fmt.Errorf("error upserting uncle_cids entry: %v", err) } return nil } -func (in *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel, headerID int64) (int64, error) { - var txID int64 - err := tx.QueryRow(in.db.Context(), in.db.InsertTxStm(), - headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type).Scan(&txID) +/* +INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) +ON CONFLICT (tx_hash) DO NOTHING +*/ +func (in *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { + _, err := tx.Exec(in.db.Context(), in.db.InsertTxStm(), + transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type) if err != nil { - return 0, fmt.Errorf("error upserting transaction_cids entry: %v", err) + return fmt.Errorf("error upserting transaction_cids entry: %v", err) } indexerMetrics.transactions.Inc(1) - return txID, nil + return nil } -func (in *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessListElementModel, txID int64) error { +/* +INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4) +ON CONFLICT (tx_id, index) DO NOTHING +*/ +func (in *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessListElementModel) error { _, err := tx.Exec(in.db.Context(), in.db.InsertAccessListElementStm(), - txID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys) + accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys) if err != nil { return fmt.Errorf("error upserting access_list_element entry: %v", err) } @@ -81,21 +96,28 @@ func (in *Writer) upsertAccessListElement(tx Tx, accessListElement models.Access return nil } -func (in *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel, txID int64) (int64, error) { - var receiptID int64 - err := tx.QueryRow(in.db.Context(), in.db.InsertRctStm(), - txID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot).Scan(&receiptID) +/* +INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +ON CONFLICT (tx_id) DO NOTHING +*/ +func (in *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { + _, err := tx.Exec(in.db.Context(), in.db.InsertRctStm(), + rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot) if err != nil { - return 0, fmt.Errorf("error upserting receipt_cids entry: %w", err) + return fmt.Errorf("error upserting receipt_cids entry: %w", err) } indexerMetrics.receipts.Inc(1) - return receiptID, nil + return nil } -func (in *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel, receiptID int64) error { +/* +INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) +ON CONFLICT (rct_id, index) DO NOTHING +*/ +func (in *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { for _, log := range logs { _, err := tx.Exec(in.db.Context(), in.db.InsertLogStm(), - log.LeafCID, log.LeafMhKey, receiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data) + log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data) if err != nil { return fmt.Errorf("error upserting logs entry: %w", err) } @@ -104,36 +126,47 @@ func (in *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel, receiptID int64) return nil } -func (in *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel, headerID int64) (int64, error) { - var stateID int64 +/* +INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) +ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7) +*/ +func (in *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { var stateKey string if stateNode.StateKey != nullHash.String() { stateKey = stateNode.StateKey } - err := tx.QueryRow(in.db.Context(), in.db.InsertStateStm(), - headerID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey).Scan(&stateID) + _, err := tx.Exec(in.db.Context(), in.db.InsertStateStm(), + stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey) if err != nil { - return 0, fmt.Errorf("error upserting state_cids entry: %v", err) + return fmt.Errorf("error upserting state_cids entry: %v", err) } - return stateID, nil + return nil } -func (in *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel, stateID int64) error { +/* +INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6) +ON CONFLICT (header_id, state_path) DO NOTHING +*/ +func (in *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error { _, err := tx.Exec(in.db.Context(), in.db.InsertAccountStm(), - stateID, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) + stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) if err != nil { return fmt.Errorf("error upserting state_accounts entry: %v", err) } return nil } -func (in *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel, stateID int64) error { +/* +INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($3, $4, $6, $7, $8) +*/ +func (in *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) error { var storageKey string if storageCID.StorageKey != nullHash.String() { storageKey = storageCID.StorageKey } _, err := tx.Exec(in.db.Context(), in.db.InsertStorageStm(), - stateID, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey) + storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey) if err != nil { return fmt.Errorf("error upserting storage_cids entry: %v", err) } diff --git a/statediff/indexer/interfaces/interfaces.go b/statediff/indexer/interfaces/interfaces.go index d32c117eb..8f951230d 100644 --- a/statediff/indexer/interfaces/interfaces.go +++ b/statediff/indexer/interfaces/interfaces.go @@ -29,7 +29,7 @@ import ( // StateDiffIndexer interface required to index statediff data type StateDiffIndexer interface { PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) - PushStateNode(tx Batch, stateNode sdtypes.StateNode) error + PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error ReportDBMetrics(delay time.Duration, quit <-chan bool) io.Closer diff --git a/statediff/indexer/models/batch.go b/statediff/indexer/models/batch.go index 48b2944e0..16096f292 100644 --- a/statediff/indexer/models/batch.go +++ b/statediff/indexer/models/batch.go @@ -26,7 +26,7 @@ type IPLDBatch struct { // UncleBatch holds the arguments for a batch insert of uncle data type UncleBatch struct { - HeaderID []int64 + HeaderID []string BlockHashes []string ParentHashes []string CIDs []string @@ -36,7 +36,7 @@ type UncleBatch struct { // TxBatch holds the arguments for a batch insert of tx data type TxBatch struct { - HeaderID int64 + HeaderID string Indexes []int64 TxHashes []string CIDs []string @@ -44,20 +44,20 @@ type TxBatch struct { Dsts []string Srcs []string Datas [][]byte - Types []*uint8 + Types []uint8 } // AccessListBatch holds the arguments for a batch insert of access list data type AccessListBatch struct { Indexes []int64 - TxIDs []int64 + TxIDs []string Addresses []string StorageKeysSets []pq.StringArray } // ReceiptBatch holds the arguments for a batch insert of receipt data type ReceiptBatch struct { - TxIDs []int64 + TxIDs []string LeafCIDs []string LeafMhKeys []string PostStatuses []uint64 @@ -71,7 +71,7 @@ type ReceiptBatch struct { type LogBatch struct { LeafCIDs []string LeafMhKeys []string - ReceiptIDs []int64 + ReceiptIDs []string Addresses []string Indexes []int64 Datas [][]byte @@ -83,34 +83,33 @@ type LogBatch struct { // StateBatch holds the arguments for a batch insert of state data type StateBatch struct { - ID int64 - HeaderID int64 - Path []byte - StateKey string - NodeType int - CID string - MhKey string - Diff bool + HeaderID string + Paths [][]byte + StateKeys []string + NodeTypes []int + CIDs []string + MhKeys []string + Diff bool } // AccountBatch holds the arguments for a batch insert of account data type AccountBatch struct { - ID int64 - StateID int64 - Balance string - Nonce uint64 - CodeHash []byte - StorageRoot string + HeaderID string + StatePaths [][]byte + Balances []string + Nonces []uint64 + CodeHashes [][]byte + StorageRoots []string } // StorageBatch holds the arguments for a batch insert of storage data type StorageBatch struct { - ID int64 - StateID int64 - Path []byte - StorageKey string - NodeType int - CID string - MhKey string - Diff bool + HeaderID string + StatePaths [][]string + Paths [][]byte + StorageKeys []string + NodeTypes []int + CIDs []string + MhKeys []string + Diff bool } diff --git a/statediff/indexer/models/models.go b/statediff/indexer/models/models.go index 5e849193e..d37aa5449 100644 --- a/statediff/indexer/models/models.go +++ b/statediff/indexer/models/models.go @@ -26,14 +26,13 @@ type IPLDModel struct { // HeaderModel is the db model for eth.header_cids type HeaderModel struct { - ID int64 `db:"id"` BlockNumber string `db:"block_number"` BlockHash string `db:"block_hash"` ParentHash string `db:"parent_hash"` CID string `db:"cid"` MhKey string `db:"mh_key"` TotalDifficulty string `db:"td"` - NodeID int64 `db:"node_id"` + NodeID string `db:"node_id"` Reward string `db:"reward"` StateRoot string `db:"state_root"` UncleRoot string `db:"uncle_root"` @@ -47,8 +46,7 @@ type HeaderModel struct { // UncleModel is the db model for eth.uncle_cids type UncleModel struct { - ID int64 `db:"id"` - HeaderID int64 `db:"header_id"` + HeaderID string `db:"header_id"` BlockHash string `db:"block_hash"` ParentHash string `db:"parent_hash"` CID string `db:"cid"` @@ -58,8 +56,7 @@ type UncleModel struct { // TxModel is the db model for eth.transaction_cids type TxModel struct { - ID int64 `db:"id"` - HeaderID int64 `db:"header_id"` + HeaderID string `db:"header_id"` Index int64 `db:"index"` TxHash string `db:"tx_hash"` CID string `db:"cid"` @@ -72,17 +69,15 @@ type TxModel struct { // AccessListElementModel is the db model for eth.access_list_entry type AccessListElementModel struct { - ID int64 `db:"id"` Index int64 `db:"index"` - TxID int64 `db:"tx_id"` + TxID string `db:"tx_id"` Address string `db:"address"` StorageKeys pq.StringArray `db:"storage_keys"` } // ReceiptModel is the db model for eth.receipt_cids type ReceiptModel struct { - ID int64 `db:"id"` - TxID int64 `db:"tx_id"` + TxID string `db:"tx_id"` LeafCID string `db:"leaf_cid"` LeafMhKey string `db:"leaf_mh_key"` PostStatus uint64 `db:"post_status"` @@ -94,8 +89,7 @@ type ReceiptModel struct { // StateNodeModel is the db model for eth.state_cids type StateNodeModel struct { - ID int64 `db:"id"` - HeaderID int64 `db:"header_id"` + HeaderID string `db:"header_id"` Path []byte `db:"state_path"` StateKey string `db:"state_leaf_key"` NodeType int `db:"node_type"` @@ -106,8 +100,8 @@ type StateNodeModel struct { // StorageNodeModel is the db model for eth.storage_cids type StorageNodeModel struct { - ID int64 `db:"id"` - StateID int64 `db:"state_id"` + HeaderID string `db:"header_id"` + StatePath []byte `db:"state_path"` Path []byte `db:"storage_path"` StorageKey string `db:"storage_leaf_key"` NodeType int `db:"node_type"` @@ -118,8 +112,8 @@ type StorageNodeModel struct { // StorageNodeWithStateKeyModel is a db model for eth.storage_cids + eth.state_cids.state_key type StorageNodeWithStateKeyModel struct { - ID int64 `db:"id"` - StateID int64 `db:"state_id"` + HeaderID string `db:"header_id"` + StatePath []byte `db:"state_path"` Path []byte `db:"storage_path"` StateKey string `db:"state_leaf_key"` StorageKey string `db:"storage_leaf_key"` @@ -131,8 +125,8 @@ type StorageNodeWithStateKeyModel struct { // StateAccountModel is a db model for an eth state account (decoded value of state leaf node) type StateAccountModel struct { - ID int64 `db:"id"` - StateID int64 `db:"state_id"` + HeaderID string `db:"header_id"` + StatePath []byte `db:"state_path"` Balance string `db:"balance"` Nonce uint64 `db:"nonce"` CodeHash []byte `db:"code_hash"` @@ -141,10 +135,9 @@ type StateAccountModel struct { // LogsModel is the db model for eth.logs type LogsModel struct { - ID int64 `db:"id"` + ReceiptID string `db:"rct_id"` LeafCID string `db:"leaf_cid"` LeafMhKey string `db:"leaf_mh_key"` - ReceiptID int64 `db:"receipt_id"` Address string `db:"address"` Index int64 `db:"index"` Data []byte `db:"log_data"` diff --git a/statediff/indexer/shared/db_kind.go b/statediff/indexer/shared/db_kind.go index 6b88164e1..7e7997f95 100644 --- a/statediff/indexer/shared/db_kind.go +++ b/statediff/indexer/shared/db_kind.go @@ -27,6 +27,7 @@ type DBType string const ( POSTGRES DBType = "Postgres" DUMP DBType = "Dump" + FILE DBType = "File" UNKNOWN DBType = "Unknown" ) @@ -37,6 +38,8 @@ func ResolveDBType(str string) (DBType, error) { return POSTGRES, nil case "dump", "d": return DUMP, nil + case "file", "f", "fs": + return FILE, nil default: return UNKNOWN, fmt.Errorf("unrecognized db type string: %s", str) } diff --git a/statediff/service.go b/statediff/service.go index 31a56b809..04aaac458 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -239,8 +239,14 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) chainEventFwd <- chainEvent case err := <-errCh: + println("here") log.Error("Error from chain event subscription", "error", err) close(sds.QuitChan) + log.Info("Quitting the statediffing writing loop") + if err := sds.indexer.Close(); err != nil { + log.Error("Error closing indexer", "err", err) + } + return case <-sds.QuitChan: log.Info("Quitting the statediffing writing loop") if err := sds.indexer.Close(); err != nil { @@ -339,6 +345,9 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { case err := <-errCh: log.Error("Error from chain event subscription", "error", err) close(sds.QuitChan) + log.Info("Quitting the statediffing listening loop") + sds.close() + return case <-sds.QuitChan: log.Info("Quitting the statediffing listening loop") sds.close() @@ -664,7 +673,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p } }() output := func(node types2.StateNode) error { - return sds.indexer.PushStateNode(tx, node) + return sds.indexer.PushStateNode(tx, node, block.Hash().String()) } codeOutput := func(c types2.CodeAndCodeHash) error { return sds.indexer.PushCodeAndCodeHash(tx, c) diff --git a/statediff/test_helpers/mocks/service_test.go b/statediff/test_helpers/mocks/service_test.go index b3b77d4bf..dde784316 100644 --- a/statediff/test_helpers/mocks/service_test.go +++ b/statediff/test_helpers/mocks/service_test.go @@ -24,6 +24,7 @@ import ( "sort" "sync" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -150,29 +151,35 @@ func testSubscriptionAPI(t *testing.T) { id := rpc.NewID() payloadChan := make(chan statediff.Payload) quitChan := make(chan bool) + wg := new(sync.WaitGroup) + go func() { + wg.Add(1) + defer wg.Done() + sort.Slice(expectedStateDiffBytes, func(i, j int) bool { return expectedStateDiffBytes[i] < expectedStateDiffBytes[j] }) + select { + case payload := <-payloadChan: + if !bytes.Equal(payload.BlockRlp, expectedBlockRlp) { + t.Errorf("payload does not have expected block\r\nactual block rlp: %v\r\nexpected block rlp: %v", payload.BlockRlp, expectedBlockRlp) + } + sort.Slice(payload.StateObjectRlp, func(i, j int) bool { return payload.StateObjectRlp[i] < payload.StateObjectRlp[j] }) + if !bytes.Equal(payload.StateObjectRlp, expectedStateDiffBytes) { + t.Errorf("payload does not have expected state diff\r\nactual state diff rlp: %v\r\nexpected state diff rlp: %v", payload.StateObjectRlp, expectedStateDiffBytes) + } + if !bytes.Equal(expectedReceiptBytes, payload.ReceiptsRlp) { + t.Errorf("payload does not have expected receipts\r\nactual receipt rlp: %v\r\nexpected receipt rlp: %v", payload.ReceiptsRlp, expectedReceiptBytes) + } + if !bytes.Equal(payload.TotalDifficulty.Bytes(), mockTotalDifficulty.Bytes()) { + t.Errorf("payload does not have expected total difficulty\r\nactual td: %d\r\nexpected td: %d", payload.TotalDifficulty.Int64(), mockTotalDifficulty.Int64()) + } + case <-quitChan: + t.Errorf("channel quit before delivering payload") + } + }() + time.Sleep(1) mockService.Subscribe(id, payloadChan, quitChan, params) blockChan <- block1 parentBlockChain <- block0 - - sort.Slice(expectedStateDiffBytes, func(i, j int) bool { return expectedStateDiffBytes[i] < expectedStateDiffBytes[j] }) - select { - case payload := <-payloadChan: - if !bytes.Equal(payload.BlockRlp, expectedBlockRlp) { - t.Errorf("payload does not have expected block\r\nactual block rlp: %v\r\nexpected block rlp: %v", payload.BlockRlp, expectedBlockRlp) - } - sort.Slice(payload.StateObjectRlp, func(i, j int) bool { return payload.StateObjectRlp[i] < payload.StateObjectRlp[j] }) - if !bytes.Equal(payload.StateObjectRlp, expectedStateDiffBytes) { - t.Errorf("payload does not have expected state diff\r\nactual state diff rlp: %v\r\nexpected state diff rlp: %v", payload.StateObjectRlp, expectedStateDiffBytes) - } - if !bytes.Equal(expectedReceiptBytes, payload.ReceiptsRlp) { - t.Errorf("payload does not have expected receipts\r\nactual receipt rlp: %v\r\nexpected receipt rlp: %v", payload.ReceiptsRlp, expectedReceiptBytes) - } - if !bytes.Equal(payload.TotalDifficulty.Bytes(), mockTotalDifficulty.Bytes()) { - t.Errorf("payload does not have expected total difficulty\r\nactual td: %d\r\nexpected td: %d", payload.TotalDifficulty.Int64(), mockTotalDifficulty.Int64()) - } - case <-quitChan: - t.Errorf("channel quit before delivering payload") - } + wg.Wait() } func testHTTPAPI(t *testing.T) {