From 2339b0a5afe6add87188463eb21b73ffc6b5b042 Mon Sep 17 00:00:00 2001 From: i-norden Date: Thu, 11 Nov 2021 21:51:14 -0600 Subject: [PATCH] fixes and cli integration for new options --- cmd/geth/config.go | 113 ++-- cmd/geth/main.go | 3 + cmd/geth/usage.go | 3 + cmd/utils/flags.go | 21 +- statediff/README.md | 3 + statediff/indexer/database/dump/batch_tx.go | 15 +- statediff/indexer/database/dump/config.go | 48 ++ statediff/indexer/database/dump/indexer.go | 33 +- statediff/indexer/database/sql/batch_tx.go | 22 +- statediff/indexer/database/sql/indexer.go | 14 +- .../database/sql/pgx_indexer_legacy_test.go | 88 +++ .../indexer/database/sql/pgx_indexer_test.go | 609 ++++++++++++++++++ .../indexer/database/sql/postgres/config.go | 21 +- .../indexer/database/sql/postgres/pgx.go | 20 +- .../indexer/database/sql/postgres/pgx_test.go | 6 +- .../indexer/database/sql/postgres/sqlx.go | 3 +- .../database/sql/postgres/sqlx_test.go | 5 +- ...cy_test.go => sqlx_indexer_legacy_test.go} | 23 +- .../{indexer_test.go => sqlx_indexer_test.go} | 43 +- .../indexer/database/sql/test_helpers.go | 12 + statediff/indexer/shared/db_kind.go | 19 + statediff/service.go | 22 +- 22 files changed, 1008 insertions(+), 138 deletions(-) create mode 100644 statediff/indexer/database/sql/pgx_indexer_legacy_test.go create mode 100644 statediff/indexer/database/sql/pgx_indexer_test.go rename statediff/indexer/database/sql/{indexer_legacy_test.go => sqlx_indexer_legacy_test.go} (82%) rename statediff/indexer/database/sql/{indexer_test.go => sqlx_indexer_test.go} (95%) diff --git a/cmd/geth/config.go b/cmd/geth/config.go index cfc43d2ab..9a8b169be 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -18,6 +18,7 @@ package main import ( "bufio" + "context" "errors" "fmt" "math/big" @@ -26,10 +27,7 @@ import ( "time" "unicode" - "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" - - "github.com/ethereum/go-ethereum/eth/downloader" - "github.com/ethereum/go-ethereum/statediff" + "github.com/naoina/toml" "gopkg.in/urfave/cli.v1" "github.com/ethereum/go-ethereum/accounts/external" @@ -38,13 +36,18 @@ import ( "github.com/ethereum/go-ethereum/accounts/usbwallet" "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/eth/catalyst" + "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/params" - "github.com/naoina/toml" + "github.com/ethereum/go-ethereum/statediff" + dumpdb "github.com/ethereum/go-ethereum/statediff/indexer/database/dump" + "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" + "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" + "github.com/ethereum/go-ethereum/statediff/indexer/shared" ) var ( @@ -185,48 +188,82 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { } if ctx.GlobalBool(utils.StateDiffFlag.Name) { - var dbConfig *sql.Config + var indexerConfig interfaces.Config + var clientName, nodeID string if ctx.GlobalIsSet(utils.StateDiffWritingFlag.Name) { - dbConfig = new(sql.Config) - dbConfig.Hostname = ctx.GlobalString(utils.StateDiffDBHostFlag.Name) - dbConfig.Port = ctx.GlobalInt(utils.StateDiffDBPortFlag.Name) - dbConfig.DatabaseName = ctx.GlobalString(utils.StateDiffDBNameFlag.Name) - dbConfig.Username = ctx.GlobalString(utils.StateDiffDBUserFlag.Name) - dbConfig.Password = ctx.GlobalString(utils.StateDiffDBPasswordFlag.Name) - + clientName = ctx.GlobalString(utils.StateDiffDBClientNameFlag.Name) if ctx.GlobalIsSet(utils.StateDiffDBNodeIDFlag.Name) { - dbConfig.ID = ctx.GlobalString(utils.StateDiffDBNodeIDFlag.Name) + nodeID = ctx.GlobalString(utils.StateDiffDBNodeIDFlag.Name) } else { utils.Fatalf("Must specify node ID for statediff DB output") } - if ctx.GlobalIsSet(utils.StateDiffDBClientNameFlag.Name) { - dbConfig.ClientName = ctx.GlobalString(utils.StateDiffDBClientNameFlag.Name) - } else { - utils.Fatalf("Must specify client name for statediff DB output") + dbTypeStr := ctx.GlobalString(utils.StateDiffDBTypeFlag.Name) + dbType, err := shared.ResolveDBType(dbTypeStr) + if err != nil { + utils.Fatalf("%v", err) } - - if ctx.GlobalIsSet(utils.StateDiffDBMinConns.Name) { - dbConfig.MinConns = ctx.GlobalInt(utils.StateDiffDBMinConns.Name) - } - if ctx.GlobalIsSet(utils.StateDiffDBMaxConns.Name) { - dbConfig.MaxConns = ctx.GlobalInt(utils.StateDiffDBMaxConns.Name) - } - if ctx.GlobalIsSet(utils.StateDiffDBMaxIdleConns.Name) { - dbConfig.MaxIdle = ctx.GlobalInt(utils.StateDiffDBMaxIdleConns.Name) - } - if ctx.GlobalIsSet(utils.StateDiffDBMaxConnLifetime.Name) { - dbConfig.MaxConnLifetime = ctx.GlobalDuration(utils.StateDiffDBMaxConnLifetime.Name) * time.Second - } - if ctx.GlobalIsSet(utils.StateDiffDBMaxConnIdleTime.Name) { - dbConfig.MaxConnIdleTime = ctx.GlobalDuration(utils.StateDiffDBMaxConnIdleTime.Name) * time.Second - } - if ctx.GlobalIsSet(utils.StateDiffDBConnTimeout.Name) { - dbConfig.ConnTimeout = ctx.GlobalDuration(utils.StateDiffDBConnTimeout.Name) * time.Second + switch dbType { + case shared.POSTGRES: + driverTypeStr := ctx.GlobalString(utils.StateDiffDBDriverTypeFlag.Name) + driverType, err := postgres.ResolveDriverType(driverTypeStr) + if err != nil { + utils.Fatalf("%v", err) + } + pgConfig := postgres.Config{ + Hostname: ctx.GlobalString(utils.StateDiffDBHostFlag.Name), + Port: ctx.GlobalInt(utils.StateDiffDBPortFlag.Name), + DatabaseName: ctx.GlobalString(utils.StateDiffDBNameFlag.Name), + Username: ctx.GlobalString(utils.StateDiffDBUserFlag.Name), + Password: ctx.GlobalString(utils.StateDiffDBPasswordFlag.Name), + ID: nodeID, + ClientName: clientName, + Driver: driverType, + } + if ctx.GlobalIsSet(utils.StateDiffDBMinConns.Name) { + pgConfig.MinConns = ctx.GlobalInt(utils.StateDiffDBMinConns.Name) + } + if ctx.GlobalIsSet(utils.StateDiffDBMaxConns.Name) { + pgConfig.MaxConns = ctx.GlobalInt(utils.StateDiffDBMaxConns.Name) + } + if ctx.GlobalIsSet(utils.StateDiffDBMaxIdleConns.Name) { + pgConfig.MaxIdle = ctx.GlobalInt(utils.StateDiffDBMaxIdleConns.Name) + } + if ctx.GlobalIsSet(utils.StateDiffDBMaxConnLifetime.Name) { + pgConfig.MaxConnLifetime = ctx.GlobalDuration(utils.StateDiffDBMaxConnLifetime.Name) * time.Second + } + if ctx.GlobalIsSet(utils.StateDiffDBMaxConnIdleTime.Name) { + pgConfig.MaxConnIdleTime = ctx.GlobalDuration(utils.StateDiffDBMaxConnIdleTime.Name) * time.Second + } + if ctx.GlobalIsSet(utils.StateDiffDBConnTimeout.Name) { + pgConfig.ConnTimeout = ctx.GlobalDuration(utils.StateDiffDBConnTimeout.Name) * time.Second + } + indexerConfig = pgConfig + case shared.DUMP: + dumpTypeStr := ctx.GlobalString(utils.StateDiffDBDumpDst.Name) + dumpType, err := dumpdb.ResolveDumpType(dumpTypeStr) + if err != nil { + utils.Fatalf("%v", err) + } + switch dumpType { + case dumpdb.STDERR: + indexerConfig = dumpdb.Config{Dump: os.Stdout} + case dumpdb.STDOUT: + indexerConfig = dumpdb.Config{Dump: os.Stderr} + case dumpdb.DISCARD: + indexerConfig = dumpdb.Config{Dump: dumpdb.NewDiscardWriterCloser()} + default: + utils.Fatalf("unrecognized dump destination: %s", dumpType) + } + default: + utils.Fatalf("unrecognized database type: %s", dbType) } } - p := statediff.ServiceParams{ - DBParams: dbConfig, + p := statediff.Config{ + IndexerConfig: indexerConfig, + ID: nodeID, + ClientName: clientName, + Context: context.Background(), EnableWriteLoop: ctx.GlobalBool(utils.StateDiffWritingFlag.Name), NumWorkers: ctx.GlobalUint(utils.StateDiffWorkersFlag.Name), } diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 9c8dbdcfd..c92810d11 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -149,6 +149,9 @@ var ( utils.GpoIgnoreGasPriceFlag, utils.MinerNotifyFullFlag, utils.StateDiffFlag, + utils.StateDiffDBTypeFlag, + utils.StateDiffDBDriverTypeFlag, + utils.StateDiffDBDumpDst, utils.StateDiffDBNameFlag, utils.StateDiffDBPasswordFlag, utils.StateDiffDBUserFlag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 1ad1b8557..68e2a3f4c 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -225,6 +225,9 @@ var AppHelpFlagGroups = []flags.FlagGroup{ Name: "STATE DIFF", Flags: []cli.Flag{ utils.StateDiffFlag, + utils.StateDiffDBTypeFlag, + utils.StateDiffDBDriverTypeFlag, + utils.StateDiffDBDumpDst, utils.StateDiffDBNameFlag, utils.StateDiffDBPasswordFlag, utils.StateDiffDBUserFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 8dfb92a7e..08f9088f5 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -786,6 +786,21 @@ var ( Name: "statediff", Usage: "Enables the processing of state diffs between each block", } + StateDiffDBTypeFlag = cli.StringFlag{ + Name: "statediff.db.type", + Usage: "Statediff database type", + Value: "postgres", + } + StateDiffDBDriverTypeFlag = cli.StringFlag{ + Name: "statediff.db.driver", + Usage: "Statediff database driver type", + Value: "pgx", + } + StateDiffDBDumpDst = cli.StringFlag{ + Name: "statediff.dump.dst", + Usage: "Statediff database dump destination (default is stdout)", + Value: "stdout", + } StateDiffDBHostFlag = cli.StringFlag{ Name: "statediff.db.host", Usage: "Statediff database hostname/ip", @@ -840,6 +855,7 @@ var ( StateDiffDBClientNameFlag = cli.StringFlag{ Name: "statediff.db.clientname", Usage: "Client name to use when writing state diffs to database", + Value: "go-ethereum", } StateDiffWritingFlag = cli.BoolFlag{ Name: "statediff.writing", @@ -847,7 +863,8 @@ var ( } StateDiffWorkersFlag = cli.UintFlag{ Name: "statediff.workers", - Usage: "Number of concurrent workers to use during statediff processing (0 = 1)", + Usage: "Number of concurrent workers to use during statediff processing (default 1)", + Value: 1, } ) @@ -1804,7 +1821,7 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.C } // RegisterStateDiffService configures and registers a service to stream state diff data over RPC -func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params statediff.ServiceParams) { +func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params statediff.Config) { if err := statediff.New(stack, ethServ, cfg, params); err != nil { Fatalf("Failed to register the Statediff service: %v", err) } diff --git a/statediff/README.md b/statediff/README.md index dd2eaed7f..bd5d1d43c 100644 --- a/statediff/README.md +++ b/statediff/README.md @@ -79,6 +79,9 @@ This service introduces a CLI flag namespace `statediff` `--statediff` flag is used to turn on the service `--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database `--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database +`--statediff.db.type` is the type of database we write out to (current options: postgres and dump) +`--statediff.dump.dst` is the destination to write to when operating in database dump mode (stdout, stderr, discard) +`--statediff.db.driver` is the specific driver to use for the database (current options for postgres: pgx and sqlx) `--statediff.db.host` is the hostname/ip to dial to connect to the database `--statediff.db.port` is the port to dial to connect to the database `--statediff.db.name` is the name of the database to connect to diff --git a/statediff/indexer/database/dump/batch_tx.go b/statediff/indexer/database/dump/batch_tx.go index a0021baf7..f1754b907 100644 --- a/statediff/indexer/database/dump/batch_tx.go +++ b/statediff/indexer/database/dump/batch_tx.go @@ -30,21 +30,22 @@ import ( // BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration type BatchTx struct { - dump io.Writer - quit chan struct{} - iplds chan models.IPLDModel - ipldCache models.IPLDBatch + BlockNumber uint64 + dump io.Writer + quit chan struct{} + iplds chan models.IPLDModel + ipldCache models.IPLDBatch - close func(blockTx *BatchTx, err error) error + submit func(blockTx *BatchTx, err error) error } // Submit satisfies indexer.AtomicTx func (tx *BatchTx) Submit(err error) error { - return tx.close(tx, err) + return tx.submit(tx, err) } func (tx *BatchTx) flush() error { - if _, err := fmt.Fprintf(tx.dump, "%+v", tx.ipldCache); err != nil { + if _, err := fmt.Fprintf(tx.dump, "%+v\r\n", tx.ipldCache); err != nil { return err } tx.ipldCache = models.IPLDBatch{} diff --git a/statediff/indexer/database/dump/config.go b/statediff/indexer/database/dump/config.go index fb2e6a58c..6fb1f0a9e 100644 --- a/statediff/indexer/database/dump/config.go +++ b/statediff/indexer/database/dump/config.go @@ -17,15 +17,63 @@ package dump import ( + "fmt" "io" + "strings" "github.com/ethereum/go-ethereum/statediff/indexer/shared" ) +// DumpType to explicitly type the dump destination +type DumpType string + +const ( + STDOUT = "Stdout" + STDERR = "Stderr" + DISCARD = "Discard" + UNKNOWN = "Unknown" +) + +// ResolveDumpType resolves the dump type for the provided string +func ResolveDumpType(str string) (DumpType, error) { + switch strings.ToLower(str) { + case "stdout", "out", "std out": + return STDOUT, nil + case "stderr", "err", "std err": + return STDERR, nil + case "discard", "void", "devnull", "dev null": + return DISCARD, nil + default: + return UNKNOWN, fmt.Errorf("unrecognized dump type: %s", str) + } +} + +// Config for data dump type Config struct { Dump io.WriteCloser } +// Type satisfies interfaces.Config func (c Config) Type() shared.DBType { return shared.DUMP } + +// NewDiscardWriterCloser returns a discardWrapper wrapping io.Discard +func NewDiscardWriterCloser() io.WriteCloser { + return discardWrapper{blackhole: io.Discard} +} + +// discardWrapper wraps io.Discard with io.Closer +type discardWrapper struct { + blackhole io.Writer +} + +// Write satisfies io.Writer +func (dw discardWrapper) Write(b []byte) (int, error) { + return dw.blackhole.Write(b) +} + +// Close satisfies io.Closer +func (dw discardWrapper) Close() error { + return nil +} diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go index f815305b1..ccbe28c66 100644 --- a/statediff/indexer/database/dump/indexer.go +++ b/statediff/indexer/database/dump/indexer.go @@ -102,11 +102,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip t = time.Now() blockTx := &BatchTx{ - dump: sdi.dump, - iplds: make(chan models.IPLDModel), - quit: make(chan struct{}), - ipldCache: models.IPLDBatch{}, - close: func(self *BatchTx, err error) error { + BlockNumber: height, + dump: sdi.dump, + iplds: make(chan models.IPLDModel), + quit: make(chan struct{}), + ipldCache: models.IPLDBatch{}, + submit: func(self *BatchTx, err error) error { close(self.quit) close(self.iplds) tDiff := time.Since(t) @@ -205,7 +206,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he Timestamp: header.Time, BaseFee: baseFee, } - _, err := fmt.Fprintf(sdi.dump, "%+v", mod) + _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod) return 0, err } @@ -228,7 +229,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNum BlockHash: uncleNode.Hash().String(), Reward: uncleReward.String(), } - if _, err := fmt.Fprintf(sdi.dump, "%+v", uncle); err != nil { + if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", uncle); err != nil { return err } } @@ -319,7 +320,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs if txType != types.LegacyTxType { txModel.Type = &txType } - if _, err := fmt.Fprintf(sdi.dump, "%+v", txModel); err != nil { + if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", txModel); err != nil { return err } @@ -334,7 +335,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs Address: accessListElement.Address.Hex(), StorageKeys: storageKeys, } - if _, err := fmt.Fprintf(sdi.dump, "%+v", accessListElementModel); err != nil { + if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", accessListElementModel); err != nil { return err } } @@ -357,11 +358,11 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs rctModel.PostState = common.Bytes2Hex(receipt.PostState) } - if _, err := fmt.Fprintf(sdi.dump, "%+v", rctModel); err != nil { + if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", rctModel); err != nil { return err } - if _, err := fmt.Fprintf(sdi.dump, "%+v", logDataSet); err != nil { + if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", logDataSet); err != nil { return err } } @@ -392,7 +393,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt MhKey: shared.RemovedNodeMhKey, NodeType: stateNode.NodeType.Int(), } - _, err := fmt.Fprintf(sdi.dump, "%+v", stateModel) + _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", stateModel) return err } stateCIDStr, stateMhKey, err := tx.cacheRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) @@ -407,7 +408,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt NodeType: stateNode.NodeType.Int(), } // index the state node, collect the stateID to reference by FK - if _, err := fmt.Fprintf(sdi.dump, "%+v", stateModel); err != nil { + if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", stateModel); err != nil { return err } // if we have a leaf, decode and index the account data @@ -429,7 +430,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt CodeHash: account.CodeHash, StorageRoot: account.Root.String(), } - if _, err := fmt.Fprintf(sdi.dump, "%+v", accountModel); err != nil { + if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", accountModel); err != nil { return err } } @@ -445,7 +446,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt MhKey: shared.RemovedNodeMhKey, NodeType: storageNode.NodeType.Int(), } - if _, err := fmt.Fprintf(sdi.dump, "%+v", storageModel); err != nil { + if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", storageModel); err != nil { return err } continue @@ -461,7 +462,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt MhKey: storageMhKey, NodeType: storageNode.NodeType.Int(), } - if _, err := fmt.Fprintf(sdi.dump, "%+v", storageModel); err != nil { + if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", storageModel); err != nil { return err } } diff --git a/statediff/indexer/database/sql/batch_tx.go b/statediff/indexer/database/sql/batch_tx.go index 2041af1ed..ff847eec6 100644 --- a/statediff/indexer/database/sql/batch_tx.go +++ b/statediff/indexer/database/sql/batch_tx.go @@ -19,33 +19,33 @@ package sql import ( "context" - "github.com/ethereum/go-ethereum/statediff/indexer/ipld" - blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" node "github.com/ipfs/go-ipld-format" "github.com/lib/pq" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" ) // BatchTx wraps a sql tx with the state necessary for building the tx concurrently during trie difference iteration type BatchTx struct { - ctx context.Context - dbtx Tx - headerID int64 - stm string - quit chan struct{} - iplds chan models.IPLDModel - ipldCache models.IPLDBatch + BlockNumber uint64 + ctx context.Context + dbtx Tx + headerID int64 + stm string + quit chan struct{} + iplds chan models.IPLDModel + ipldCache models.IPLDBatch - close func(blockTx *BatchTx, err error) error + submit func(blockTx *BatchTx, err error) error } // Submit satisfies indexer.AtomicTx func (tx *BatchTx) Submit(err error) error { - return tx.close(tx, err) + return tx.submit(tx, err) } func (tx *BatchTx) flush() error { diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index 6c35cccac..b9cfd0733 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -141,13 +141,15 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } }() blockTx := &BatchTx{ - stm: sdi.dbWriter.db.InsertIPLDsStm(), - iplds: make(chan models.IPLDModel), - quit: make(chan struct{}), - ipldCache: models.IPLDBatch{}, - dbtx: tx, + ctx: sdi.ctx, + BlockNumber: height, + stm: sdi.dbWriter.db.InsertIPLDsStm(), + iplds: make(chan models.IPLDModel), + quit: make(chan struct{}), + ipldCache: models.IPLDBatch{}, + dbtx: tx, // handle transaction commit or rollback for any return case - close: func(self *BatchTx, err error) error { + submit: func(self *BatchTx, err error) error { close(self.quit) close(self.iplds) if p := recover(); p != nil { diff --git a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go new file mode 100644 index 000000000..f97f3e257 --- /dev/null +++ b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go @@ -0,0 +1,88 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package sql_test + +import ( + "context" + "testing" + + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" + "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" + "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" + "github.com/ethereum/go-ethereum/statediff/indexer/ipld" + "github.com/ethereum/go-ethereum/statediff/indexer/test_helpers" +) + +func setupLegacyPGX(t *testing.T) { + mockLegacyBlock = legacyData.MockBlock + legacyHeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, legacyData.MockHeaderRlp, multihash.KECCAK_256) + + db, err = postgres.SetupPGXDB() + require.NoError(t, err) + + ind, err = sql.NewStateDiffIndexer(context.Background(), legacyData.Config, db) + require.NoError(t, err) + var tx interfaces.Batch + tx, err = ind.PushBlock( + mockLegacyBlock, + legacyData.MockReceipts, + legacyData.MockBlock.Difficulty()) + require.NoError(t, err) + + defer func() { + if err := tx.Submit(err); err != nil { + t.Fatal(err) + } + }() + for _, node := range legacyData.StateDiffs { + err = ind.PushStateNode(tx, node) + require.NoError(t, err) + } + + test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, legacyData.BlockNumber.Uint64()) +} + +func TestPGXIndexerLegacy(t *testing.T) { + t.Run("Publish and index header IPLDs in a legacy tx", func(t *testing.T) { + setupLegacyPGX(t) + defer tearDown(t) + pgStr := `SELECT cid, td, reward, id, base_fee + FROM eth.header_cids + WHERE block_number = $1` + // check header was properly indexed + type res struct { + CID string + TD string + Reward string + ID int + BaseFee *int64 `db:"base_fee"` + } + header := new(res) + + err = db.QueryRow(context.Background(), pgStr, legacyData.BlockNumber.Uint64()).StructScan(header) + require.NoError(t, err) + + test_helpers.ExpectEqual(t, header.CID, legacyHeaderCID.String()) + test_helpers.ExpectEqual(t, header.TD, legacyData.MockBlock.Difficulty().String()) + test_helpers.ExpectEqual(t, header.Reward, "5000000000000011250") + require.Nil(t, legacyData.MockHeader.BaseFee) + require.Nil(t, header.BaseFee) + }) +} diff --git a/statediff/indexer/database/sql/pgx_indexer_test.go b/statediff/indexer/database/sql/pgx_indexer_test.go new file mode 100644 index 000000000..730257b21 --- /dev/null +++ b/statediff/indexer/database/sql/pgx_indexer_test.go @@ -0,0 +1,609 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package sql_test + +import ( + "bytes" + "context" + "fmt" + "os" + "testing" + + "github.com/ipfs/go-cid" + blockstore "github.com/ipfs/go-ipfs-blockstore" + dshelp "github.com/ipfs/go-ipfs-ds-help" + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" + "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" + "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" + "github.com/ethereum/go-ethereum/statediff/indexer/ipld" + "github.com/ethereum/go-ethereum/statediff/indexer/mocks" + "github.com/ethereum/go-ethereum/statediff/indexer/models" + "github.com/ethereum/go-ethereum/statediff/indexer/shared" + "github.com/ethereum/go-ethereum/statediff/indexer/test_helpers" +) + +func init() { + if os.Getenv("MODE") != "statediff" { + fmt.Println("Skipping statediff test") + os.Exit(0) + } + + mockBlock = mocks.MockBlock + txs, rcts := mocks.MockBlock.Transactions(), mocks.MockReceipts + + buf := new(bytes.Buffer) + txs.EncodeIndex(0, buf) + tx1 = make([]byte, buf.Len()) + copy(tx1, buf.Bytes()) + buf.Reset() + + txs.EncodeIndex(1, buf) + tx2 = make([]byte, buf.Len()) + copy(tx2, buf.Bytes()) + buf.Reset() + + txs.EncodeIndex(2, buf) + tx3 = make([]byte, buf.Len()) + copy(tx3, buf.Bytes()) + buf.Reset() + + txs.EncodeIndex(3, buf) + tx4 = make([]byte, buf.Len()) + copy(tx4, buf.Bytes()) + buf.Reset() + + txs.EncodeIndex(4, buf) + tx5 = make([]byte, buf.Len()) + copy(tx5, buf.Bytes()) + buf.Reset() + + rcts.EncodeIndex(0, buf) + rct1 = make([]byte, buf.Len()) + copy(rct1, buf.Bytes()) + buf.Reset() + + rcts.EncodeIndex(1, buf) + rct2 = make([]byte, buf.Len()) + copy(rct2, buf.Bytes()) + buf.Reset() + + rcts.EncodeIndex(2, buf) + rct3 = make([]byte, buf.Len()) + copy(rct3, buf.Bytes()) + buf.Reset() + + rcts.EncodeIndex(3, buf) + rct4 = make([]byte, buf.Len()) + copy(rct4, buf.Bytes()) + buf.Reset() + + rcts.EncodeIndex(4, buf) + rct5 = make([]byte, buf.Len()) + copy(rct5, buf.Bytes()) + buf.Reset() + + headerCID, _ = ipld.RawdataToCid(ipld.MEthHeader, mocks.MockHeaderRlp, multihash.KECCAK_256) + trx1CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx1, multihash.KECCAK_256) + trx2CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx2, multihash.KECCAK_256) + trx3CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx3, multihash.KECCAK_256) + trx4CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx4, multihash.KECCAK_256) + trx5CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx5, multihash.KECCAK_256) + rct1CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct1, multihash.KECCAK_256) + rct2CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct2, multihash.KECCAK_256) + rct3CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct3, multihash.KECCAK_256) + rct4CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct4, multihash.KECCAK_256) + rct5CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct5, multihash.KECCAK_256) + state1CID, _ = ipld.RawdataToCid(ipld.MEthStateTrie, mocks.ContractLeafNode, multihash.KECCAK_256) + state2CID, _ = ipld.RawdataToCid(ipld.MEthStateTrie, mocks.AccountLeafNode, multihash.KECCAK_256) + storageCID, _ = ipld.RawdataToCid(ipld.MEthStorageTrie, mocks.StorageLeafNode, multihash.KECCAK_256) +} + +func setupPGX(t *testing.T) { + db, err = postgres.SetupPGXDB() + if err != nil { + t.Fatal(err) + } + ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestConfig, db) + require.NoError(t, err) + var tx interfaces.Batch + tx, err = ind.PushBlock( + mockBlock, + mocks.MockReceipts, + mocks.MockBlock.Difficulty()) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := tx.Submit(err); err != nil { + t.Fatal(err) + } + }() + for _, node := range mocks.StateDiffs { + err = ind.PushStateNode(tx, node) + if err != nil { + t.Fatal(err) + } + } + + test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, mocks.BlockNumber.Uint64()) +} + +func TestPGXIndexer(t *testing.T) { + t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) { + setupPGX(t) + defer tearDown(t) + pgStr := `SELECT cid, td, reward, id, base_fee + FROM eth.header_cids + WHERE block_number = $1` + // check header was properly indexed + type res struct { + CID string + TD string + Reward string + ID int + BaseFee *int64 `db:"base_fee"` + } + header := new(res) + err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).StructScan(header) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, header.CID, headerCID.String()) + test_helpers.ExpectEqual(t, header.TD, mocks.MockBlock.Difficulty().String()) + test_helpers.ExpectEqual(t, header.Reward, "2000000000000021250") + test_helpers.ExpectEqual(t, *header.BaseFee, mocks.MockHeader.BaseFee.Int64()) + dc, err := cid.Decode(header.CID) + if err != nil { + t.Fatal(err) + } + mhKey := dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() + var data []byte + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, data, mocks.MockHeaderRlp) + }) + + t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) { + setupPGX(t) + defer tearDown(t) + // check that txs were properly indexed + trxs := make([]string, 0) + pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id) + WHERE header_cids.block_number = $1` + err = db.Select(context.Background(), &trxs, pgStr, mocks.BlockNumber.Uint64()) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, len(trxs), 5) + expectTrue(t, test_helpers.ListContainsString(trxs, trx1CID.String())) + expectTrue(t, test_helpers.ListContainsString(trxs, trx2CID.String())) + expectTrue(t, test_helpers.ListContainsString(trxs, trx3CID.String())) + expectTrue(t, test_helpers.ListContainsString(trxs, trx4CID.String())) + expectTrue(t, test_helpers.ListContainsString(trxs, trx5CID.String())) + // and published + for _, c := range trxs { + dc, err := cid.Decode(c) + if err != nil { + t.Fatal(err) + } + mhKey := dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() + var data []byte + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + if err != nil { + t.Fatal(err) + } + switch c { + case trx1CID.String(): + test_helpers.ExpectEqual(t, data, tx1) + var txType *uint8 + pgStr = `SELECT tx_type FROM eth.transaction_cids WHERE cid = $1` + err = db.Get(context.Background(), &txType, pgStr, c) + if err != nil { + t.Fatal(err) + } + if txType != nil { + t.Fatalf("expected nil tx_type, got %d", *txType) + } + case trx2CID.String(): + test_helpers.ExpectEqual(t, data, tx2) + var txType *uint8 + pgStr = `SELECT tx_type FROM eth.transaction_cids WHERE cid = $1` + err = db.Get(context.Background(), &txType, pgStr, c) + if err != nil { + t.Fatal(err) + } + if txType != nil { + t.Fatalf("expected nil tx_type, got %d", *txType) + } + case trx3CID.String(): + test_helpers.ExpectEqual(t, data, tx3) + var txType *uint8 + pgStr = `SELECT tx_type FROM eth.transaction_cids WHERE cid = $1` + err = db.Get(context.Background(), &txType, pgStr, c) + if err != nil { + t.Fatal(err) + } + if txType != nil { + t.Fatalf("expected nil tx_type, got %d", *txType) + } + case trx4CID.String(): + test_helpers.ExpectEqual(t, data, tx4) + var txType *uint8 + pgStr = `SELECT tx_type FROM eth.transaction_cids WHERE cid = $1` + err = db.Get(context.Background(), &txType, pgStr, c) + if err != nil { + t.Fatal(err) + } + if *txType != types.AccessListTxType { + t.Fatalf("expected AccessListTxType (1), got %d", *txType) + } + accessListElementModels := make([]models.AccessListElementModel, 0) + pgStr = `SELECT access_list_element.* FROM eth.access_list_element INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.id) WHERE cid = $1 ORDER BY access_list_element.index ASC` + err = db.Select(context.Background(), &accessListElementModels, pgStr, c) + if err != nil { + t.Fatal(err) + } + if len(accessListElementModels) != 2 { + t.Fatalf("expected two access list entries, got %d", len(accessListElementModels)) + } + model1 := models.AccessListElementModel{ + Index: accessListElementModels[0].Index, + Address: accessListElementModels[0].Address, + } + model2 := models.AccessListElementModel{ + Index: accessListElementModels[1].Index, + Address: accessListElementModels[1].Address, + StorageKeys: accessListElementModels[1].StorageKeys, + } + test_helpers.ExpectEqual(t, model1, mocks.AccessListEntry1Model) + test_helpers.ExpectEqual(t, model2, mocks.AccessListEntry2Model) + case trx5CID.String(): + test_helpers.ExpectEqual(t, data, tx5) + var txType *uint8 + pgStr = `SELECT tx_type FROM eth.transaction_cids WHERE cid = $1` + err = db.Get(context.Background(), &txType, pgStr, c) + if err != nil { + t.Fatal(err) + } + if *txType != types.DynamicFeeTxType { + t.Fatalf("expected DynamicFeeTxType (2), got %d", *txType) + } + } + } + }) + + t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) { + setupPGX(t) + defer tearDown(t) + + rcts := make([]string, 0) + pgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids + WHERE receipt_cids.tx_id = transaction_cids.id + AND transaction_cids.header_id = header_cids.id + AND header_cids.block_number = $1 + ORDER BY transaction_cids.index` + err = db.Select(context.Background(), &rcts, pgStr, mocks.BlockNumber.Uint64()) + if err != nil { + t.Fatal(err) + } + + type logIPLD struct { + Index int `db:"index"` + Address string `db:"address"` + Data []byte `db:"data"` + Topic0 string `db:"topic0"` + Topic1 string `db:"topic1"` + } + for i := range rcts { + results := make([]logIPLD, 0) + pgStr = `SELECT log_cids.index, log_cids.address, log_cids.Topic0, log_cids.Topic1, data FROM eth.log_cids + INNER JOIN eth.receipt_cids ON (log_cids.receipt_id = receipt_cids.id) + INNER JOIN public.blocks ON (log_cids.leaf_mh_key = blocks.key) + WHERE receipt_cids.leaf_cid = $1 ORDER BY eth.log_cids.index ASC` + err = db.Select(context.Background(), &results, pgStr, rcts[i]) + require.NoError(t, err) + + // expecting MockLog1 and MockLog2 for mockReceipt4 + expectedLogs := mocks.MockReceipts[i].Logs + test_helpers.ExpectEqual(t, len(results), len(expectedLogs)) + + var nodeElements []interface{} + for idx, r := range results { + // Decode the log leaf node. + err = rlp.DecodeBytes(r.Data, &nodeElements) + require.NoError(t, err) + + logRaw, err := rlp.EncodeToBytes(expectedLogs[idx]) + require.NoError(t, err) + + // 2nd element of the leaf node contains the encoded log data. + test_helpers.ExpectEqual(t, logRaw, nodeElements[1].([]byte)) + } + } + }) + + t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) { + setupPGX(t) + defer tearDown(t) + + // check receipts were properly indexed + rcts := make([]string, 0) + pgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids + WHERE receipt_cids.tx_id = transaction_cids.id + AND transaction_cids.header_id = header_cids.id + AND header_cids.block_number = $1 order by transaction_cids.id` + err = db.Select(context.Background(), &rcts, pgStr, mocks.BlockNumber.Uint64()) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, len(rcts), 5) + + for idx, rctLeafCID := range rcts { + result := make([]models.IPLDModel, 0) + pgStr = `SELECT data + FROM eth.receipt_cids + INNER JOIN public.blocks ON (receipt_cids.leaf_mh_key = public.blocks.key) + WHERE receipt_cids.leaf_cid = $1` + err = db.Select(context.Background(), &result, pgStr, rctLeafCID) + if err != nil { + t.Fatal(err) + } + + // Decode the log leaf node. + var nodeElements []interface{} + err = rlp.DecodeBytes(result[0].Data, &nodeElements) + require.NoError(t, err) + + expectedRct, err := mocks.MockReceipts[idx].MarshalBinary() + require.NoError(t, err) + + test_helpers.ExpectEqual(t, expectedRct, nodeElements[1].([]byte)) + } + + // and published + for _, c := range rcts { + dc, err := cid.Decode(c) + if err != nil { + t.Fatal(err) + } + mhKey := dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() + var data []byte + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + if err != nil { + t.Fatal(err) + } + + switch c { + case rct1CID.String(): + test_helpers.ExpectEqual(t, data, rct1) + var postStatus uint64 + pgStr = `SELECT post_status FROM eth.receipt_cids WHERE leaf_cid = $1` + err = db.Get(context.Background(), &postStatus, pgStr, c) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, postStatus, mocks.ExpectedPostStatus) + case rct2CID.String(): + test_helpers.ExpectEqual(t, data, rct2) + var postState string + pgStr = `SELECT post_state FROM eth.receipt_cids WHERE leaf_cid = $1` + err = db.Get(context.Background(), &postState, pgStr, c) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState1) + case rct3CID.String(): + test_helpers.ExpectEqual(t, data, rct3) + var postState string + pgStr = `SELECT post_state FROM eth.receipt_cids WHERE leaf_cid = $1` + err = db.Get(context.Background(), &postState, pgStr, c) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState2) + case rct4CID.String(): + test_helpers.ExpectEqual(t, data, rct4) + var postState string + pgStr = `SELECT post_state FROM eth.receipt_cids WHERE leaf_cid = $1` + err = db.Get(context.Background(), &postState, pgStr, c) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState3) + case rct5CID.String(): + test_helpers.ExpectEqual(t, data, rct5) + var postState string + pgStr = `SELECT post_state FROM eth.receipt_cids WHERE leaf_cid = $1` + err = db.Get(context.Background(), &postState, pgStr, c) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState3) + } + } + }) + + t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) { + setupPGX(t) + defer tearDown(t) + // check that state nodes were properly indexed and published + stateNodes := make([]models.StateNodeModel, 0) + pgStr := `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id + FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id) + WHERE header_cids.block_number = $1 AND node_type != 3` + err = db.Select(context.Background(), &stateNodes, pgStr, mocks.BlockNumber.Uint64()) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, len(stateNodes), 2) + for _, stateNode := range stateNodes { + var data []byte + dc, err := cid.Decode(stateNode.CID) + if err != nil { + t.Fatal(err) + } + mhKey := dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + if err != nil { + t.Fatal(err) + } + pgStr = `SELECT * from eth.state_accounts WHERE state_id = $1` + var account models.StateAccountModel + err = db.Get(context.Background(), &account, pgStr, stateNode.ID) + if err != nil { + t.Fatal(err) + } + if stateNode.CID == state1CID.String() { + test_helpers.ExpectEqual(t, stateNode.NodeType, 2) + test_helpers.ExpectEqual(t, stateNode.StateKey, common.BytesToHash(mocks.ContractLeafKey).Hex()) + test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x06'}) + test_helpers.ExpectEqual(t, data, mocks.ContractLeafNode) + test_helpers.ExpectEqual(t, account, models.StateAccountModel{ + ID: account.ID, + StateID: stateNode.ID, + Balance: "0", + CodeHash: mocks.ContractCodeHash.Bytes(), + StorageRoot: mocks.ContractRoot, + Nonce: 1, + }) + } + if stateNode.CID == state2CID.String() { + test_helpers.ExpectEqual(t, stateNode.NodeType, 2) + test_helpers.ExpectEqual(t, stateNode.StateKey, common.BytesToHash(mocks.AccountLeafKey).Hex()) + test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x0c'}) + test_helpers.ExpectEqual(t, data, mocks.AccountLeafNode) + test_helpers.ExpectEqual(t, account, models.StateAccountModel{ + ID: account.ID, + StateID: stateNode.ID, + Balance: "1000", + CodeHash: mocks.AccountCodeHash.Bytes(), + StorageRoot: mocks.AccountRoot, + Nonce: 0, + }) + } + } + + // check that Removed state nodes were properly indexed and published + stateNodes = make([]models.StateNodeModel, 0) + pgStr = `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id + FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id) + WHERE header_cids.block_number = $1 AND node_type = 3` + err = db.Select(context.Background(), &stateNodes, pgStr, mocks.BlockNumber.Uint64()) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, len(stateNodes), 1) + stateNode := stateNodes[0] + var data []byte + dc, err := cid.Decode(stateNode.CID) + if err != nil { + t.Fatal(err) + } + mhKey := dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() + test_helpers.ExpectEqual(t, prefixedKey, shared.RemovedNodeMhKey) + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, stateNode.CID, shared.RemovedNodeStateCID) + test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x02'}) + test_helpers.ExpectEqual(t, data, []byte{}) + }) + + t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) { + setupPGX(t) + defer tearDown(t) + // check that storage nodes were properly indexed + storageNodes := make([]models.StorageNodeWithStateKeyModel, 0) + pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path + FROM eth.storage_cids, eth.state_cids, eth.header_cids + WHERE storage_cids.state_id = state_cids.id + AND state_cids.header_id = header_cids.id + AND header_cids.block_number = $1 + AND storage_cids.node_type != 3` + err = db.Select(context.Background(), &storageNodes, pgStr, mocks.BlockNumber.Uint64()) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, len(storageNodes), 1) + test_helpers.ExpectEqual(t, storageNodes[0], models.StorageNodeWithStateKeyModel{ + CID: storageCID.String(), + NodeType: 2, + StorageKey: common.BytesToHash(mocks.StorageLeafKey).Hex(), + StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(), + Path: []byte{}, + }) + var data []byte + dc, err := cid.Decode(storageNodes[0].CID) + if err != nil { + t.Fatal(err) + } + mhKey := dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, data, mocks.StorageLeafNode) + + // check that Removed storage nodes were properly indexed + storageNodes = make([]models.StorageNodeWithStateKeyModel, 0) + pgStr = `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path + FROM eth.storage_cids, eth.state_cids, eth.header_cids + WHERE storage_cids.state_id = state_cids.id + AND state_cids.header_id = header_cids.id + AND header_cids.block_number = $1 + AND storage_cids.node_type = 3` + err = db.Select(context.Background(), &storageNodes, pgStr, mocks.BlockNumber.Uint64()) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, len(storageNodes), 1) + test_helpers.ExpectEqual(t, storageNodes[0], models.StorageNodeWithStateKeyModel{ + CID: shared.RemovedNodeStorageCID, + NodeType: 3, + StorageKey: common.BytesToHash(mocks.RemovedLeafKey).Hex(), + StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(), + Path: []byte{'\x03'}, + }) + dc, err = cid.Decode(storageNodes[0].CID) + if err != nil { + t.Fatal(err) + } + mhKey = dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey = blockstore.BlockPrefix.String() + mhKey.String() + test_helpers.ExpectEqual(t, prefixedKey, shared.RemovedNodeMhKey) + err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, data, []byte{}) + }) +} diff --git a/statediff/indexer/database/sql/postgres/config.go b/statediff/indexer/database/sql/postgres/config.go index 07e3dfe21..aff7ac773 100644 --- a/statediff/indexer/database/sql/postgres/config.go +++ b/statediff/indexer/database/sql/postgres/config.go @@ -18,18 +18,33 @@ package postgres import ( "fmt" + "strings" "time" "github.com/ethereum/go-ethereum/statediff/indexer/shared" ) +// DriverType to explicity type the kind of sql driver we are using type DriverType string const ( - PGX DriverType = "PGX" - SQLX DriverType = "SQLX" + PGX DriverType = "PGX" + SQLX DriverType = "SQLX" + Unknown DriverType = "Unknown" ) +// ResolveDriverType resolves a DriverType from a provided string +func ResolveDriverType(str string) (DriverType, error) { + switch strings.ToLower(str) { + case "pgx", "pgxpool": + return PGX, nil + case "sqlx": + return SQLX, nil + default: + return Unknown, fmt.Errorf("unrecognized driver type string: %s", str) + } +} + // DefaultConfig are default parameters for connecting to a Postgres sql var DefaultConfig = Config{ Hostname: "localhost", @@ -64,10 +79,12 @@ type Config struct { Driver DriverType } +// Type satisfies interfaces.Config func (c Config) Type() shared.DBType { return shared.POSTGRES } +// DbConnectionString constructs and returns the connection string from the config func (c Config) DbConnectionString() string { if len(c.Username) > 0 && len(c.Password) > 0 { return fmt.Sprintf("postgresql://%s:%s@%s:%d/%s?sslmode=disable", diff --git a/statediff/indexer/database/sql/postgres/pgx.go b/statediff/indexer/database/sql/postgres/pgx.go index d94c35083..838c78911 100644 --- a/statediff/indexer/database/sql/postgres/pgx.go +++ b/statediff/indexer/database/sql/postgres/pgx.go @@ -105,8 +105,8 @@ func (pgx *PGXDriver) createNode() error { // QueryRow satisfies sql.Database func (pgx *PGXDriver) QueryRow(ctx context.Context, sql string, args ...interface{}) sql.ScannableRow { - row := pgx.pool.QueryRow(ctx, sql, args...) - return rowWrapper{row: row} + rows, _ := pgx.pool.Query(ctx, sql, args...) + return rowsWrapper{rows: rows} } // Exec satisfies sql.Database @@ -160,18 +160,18 @@ func (pgx *PGXDriver) Context() context.Context { return pgx.ctx } -type rowWrapper struct { - row pgx.Row +type rowsWrapper struct { + rows pgx.Rows } // Scan satisfies sql.ScannableRow -func (r rowWrapper) Scan(dest ...interface{}) error { - return r.row.Scan(dest) +func (r rowsWrapper) Scan(dest ...interface{}) error { + return (pgx.Row)(r.rows).Scan(dest...) } // StructScan satisfies sql.ScannableRow -func (r rowWrapper) StructScan(dest interface{}) error { - return pgxscan.ScanRow(dest, r.row.(pgx.Rows)) +func (r rowsWrapper) StructScan(dest interface{}) error { + return pgxscan.ScanRow(dest, r.rows) } type resultWrapper struct { @@ -234,8 +234,8 @@ type pgxTxWrapper struct { // QueryRow satisfies sql.Tx func (t pgxTxWrapper) QueryRow(ctx context.Context, sql string, args ...interface{}) sql.ScannableRow { - row := t.tx.QueryRow(ctx, sql, args...) - return rowWrapper{row: row} + rows, _ := t.tx.Query(ctx, sql, args...) + return rowsWrapper{rows: rows} } // Exec satisfies sql.Tx diff --git a/statediff/indexer/database/sql/postgres/pgx_test.go b/statediff/indexer/database/sql/postgres/pgx_test.go index aadb12835..50b6817eb 100644 --- a/statediff/indexer/database/sql/postgres/pgx_test.go +++ b/statediff/indexer/database/sql/postgres/pgx_test.go @@ -86,15 +86,15 @@ func TestPostgresPGX(t *testing.T) { t.Fatal(err) } - var data pgtype.Numeric + var data pgtype.Text err = dbPool.QueryRow(ctx, `SELECT data FROM example WHERE id = 1`).Scan(&data) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, bi.String(), data) + test_helpers.ExpectEqual(t, data, bi.String()) actual := new(big.Int) - actual.Set(data.Int) + actual.SetString(data.String, 10) test_helpers.ExpectEqual(t, actual, bi) }) diff --git a/statediff/indexer/database/sql/postgres/sqlx.go b/statediff/indexer/database/sql/postgres/sqlx.go index 2abf82d89..684fc7bf0 100644 --- a/statediff/indexer/database/sql/postgres/sqlx.go +++ b/statediff/indexer/database/sql/postgres/sqlx.go @@ -177,8 +177,7 @@ type sqlxTxWrapper struct { // QueryRow satisfies sql.Tx func (t sqlxTxWrapper) QueryRow(ctx context.Context, sql string, args ...interface{}) sql.ScannableRow { - row := t.tx.QueryRow(sql, args...) - return rowWrapper{row: row} + return t.tx.QueryRowx(sql, args...) } // Exec satisfies sql.Tx diff --git a/statediff/indexer/database/sql/postgres/sqlx_test.go b/statediff/indexer/database/sql/postgres/sqlx_test.go index 37164e0f7..95975a868 100644 --- a/statediff/indexer/database/sql/postgres/sqlx_test.go +++ b/statediff/indexer/database/sql/postgres/sqlx_test.go @@ -38,9 +38,8 @@ func TestPostgresSQLX(t *testing.T) { connStr := postgres.DefaultConfig.DbConnectionString() sqlxdb, err = sqlx.Connect("postgres", connStr) - if err != nil { - t.Fatalf("failed to connect to db with connection string: %s err: %v", pgConfig, err) + t.Fatalf("failed to connect to db with connection string: %s err: %v", pgConfig.ConnString(), err) } if sqlxdb == nil { t.Fatal("DB is nil") @@ -88,7 +87,7 @@ func TestPostgresSQLX(t *testing.T) { t.Fatal(err) } - test_helpers.ExpectEqual(t, bi.String(), data) + test_helpers.ExpectEqual(t, data, bi.String()) actual := new(big.Int) actual.SetString(data, 10) test_helpers.ExpectEqual(t, actual, bi) diff --git a/statediff/indexer/database/sql/indexer_legacy_test.go b/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go similarity index 82% rename from statediff/indexer/database/sql/indexer_legacy_test.go rename to statediff/indexer/database/sql/sqlx_indexer_legacy_test.go index f2fdb0521..840a1ccae 100644 --- a/statediff/indexer/database/sql/indexer_legacy_test.go +++ b/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go @@ -20,6 +20,9 @@ import ( "context" "testing" + "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" + "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" + "github.com/ipfs/go-cid" "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" @@ -37,34 +40,38 @@ var ( legacyHeaderCID cid.Cid ) -func setupLegacy(t *testing.T) { +func setupLegacySQLX(t *testing.T) { mockLegacyBlock = legacyData.MockBlock legacyHeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, legacyData.MockHeaderRlp, multihash.KECCAK_256) - db, err = test_helpers.SetupDB() + db, err = postgres.SetupSQLXDB() require.NoError(t, err) - ind, err = sql.NewSQLIndexer(context.Background(), legacyData.Config, db) + ind, err = sql.NewStateDiffIndexer(context.Background(), legacyData.Config, db) require.NoError(t, err) - var tx *sql.BlockTx + var tx interfaces.Batch tx, err = ind.PushBlock( mockLegacyBlock, legacyData.MockReceipts, legacyData.MockBlock.Difficulty()) require.NoError(t, err) - defer tx.Close(tx, err) + defer func() { + if err := tx.Submit(err); err != nil { + t.Fatal(err) + } + }() for _, node := range legacyData.StateDiffs { err = ind.PushStateNode(tx, node) require.NoError(t, err) } - test_helpers.ExpectEqual(t, tx.BlockNumber, legacyData.BlockNumber.Uint64()) + test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, legacyData.BlockNumber.Uint64()) } -func TestPublishAndIndexerLegacy(t *testing.T) { +func TestSQLXIndexerLegacy(t *testing.T) { t.Run("Publish and index header IPLDs in a legacy tx", func(t *testing.T) { - setupLegacy(t) + setupLegacySQLX(t) defer tearDown(t) pgStr := `SELECT cid, td, reward, id, base_fee FROM eth.header_cids diff --git a/statediff/indexer/database/sql/indexer_test.go b/statediff/indexer/database/sql/sqlx_indexer_test.go similarity index 95% rename from statediff/indexer/database/sql/indexer_test.go rename to statediff/indexer/database/sql/sqlx_indexer_test.go index 91d55f094..815d36915 100644 --- a/statediff/indexer/database/sql/indexer_test.go +++ b/statediff/indexer/database/sql/sqlx_indexer_test.go @@ -32,19 +32,20 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/statediff/indexer" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" + "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/mocks" "github.com/ethereum/go-ethereum/statediff/indexer/models" + "github.com/ethereum/go-ethereum/statediff/indexer/shared" "github.com/ethereum/go-ethereum/statediff/indexer/test_helpers" ) var ( db sql.Database err error - ind *interfaces.StateDiffIndexer + ind interfaces.StateDiffIndexer ipfsPgGet = `SELECT data FROM public.blocks WHERE key = $1` tx1, tx2, tx3, tx4, tx5, rct1, rct2, rct3, rct4, rct5 []byte @@ -136,14 +137,14 @@ func init() { storageCID, _ = ipld.RawdataToCid(ipld.MEthStorageTrie, mocks.StorageLeafNode, multihash.KECCAK_256) } -func setup(t *testing.T) { - db, err = test_helpers.SetupDB() +func setupSQLX(t *testing.T) { + db, err = postgres.SetupSQLXDB() if err != nil { t.Fatal(err) } - ind, err = indexer.NewStateDiffIndexer(context.Background(), mocks.TestConfig, db) + ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestConfig, db) require.NoError(t, err) - var tx *sql.BlockTx + var tx interfaces.Batch tx, err = ind.PushBlock( mockBlock, mocks.MockReceipts, @@ -151,7 +152,11 @@ func setup(t *testing.T) { if err != nil { t.Fatal(err) } - defer tx.Close(tx, err) + defer func() { + if err := tx.Submit(err); err != nil { + t.Fatal(err) + } + }() for _, node := range mocks.StateDiffs { err = ind.PushStateNode(tx, node) if err != nil { @@ -159,7 +164,7 @@ func setup(t *testing.T) { } } - test_helpers.ExpectEqual(t, tx.BlockNumber, mocks.BlockNumber.Uint64()) + test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, mocks.BlockNumber.Uint64()) } func tearDown(t *testing.T) { @@ -169,9 +174,9 @@ func tearDown(t *testing.T) { } } -func TestPublishAndIndexer(t *testing.T) { +func TestSQLXIndexer(t *testing.T) { t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) { - setup(t) + setupSQLX(t) defer tearDown(t) pgStr := `SELECT cid, td, reward, id, base_fee FROM eth.header_cids @@ -208,7 +213,7 @@ func TestPublishAndIndexer(t *testing.T) { }) t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) { - setup(t) + setupSQLX(t) defer tearDown(t) // check that txs were properly indexed trxs := make([]string, 0) @@ -318,7 +323,7 @@ func TestPublishAndIndexer(t *testing.T) { }) t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) { - setup(t) + setupSQLX(t) defer tearDown(t) rcts := make([]string, 0) @@ -368,7 +373,7 @@ func TestPublishAndIndexer(t *testing.T) { }) t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) { - setup(t) + setupSQLX(t) defer tearDown(t) // check receipts were properly indexed @@ -470,7 +475,7 @@ func TestPublishAndIndexer(t *testing.T) { }) t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) { - setup(t) + setupSQLX(t) defer tearDown(t) // check that state nodes were properly indexed and published stateNodes := make([]models.StateNodeModel, 0) @@ -548,18 +553,18 @@ func TestPublishAndIndexer(t *testing.T) { } mhKey := dshelp.MultihashToDsKey(dc.Hash()) prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() - test_helpers.ExpectEqual(t, prefixedKey, sql.RemovedNodeMhKey) + test_helpers.ExpectEqual(t, prefixedKey, shared.RemovedNodeMhKey) err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) if err != nil { t.Fatal(err) } - test_helpers.ExpectEqual(t, stateNode.CID, sql.RemovedNodeStateCID) + test_helpers.ExpectEqual(t, stateNode.CID, shared.RemovedNodeStateCID) test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x02'}) test_helpers.ExpectEqual(t, data, []byte{}) }) t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) { - setup(t) + setupSQLX(t) defer tearDown(t) // check that storage nodes were properly indexed storageNodes := make([]models.StorageNodeWithStateKeyModel, 0) @@ -608,7 +613,7 @@ func TestPublishAndIndexer(t *testing.T) { } test_helpers.ExpectEqual(t, len(storageNodes), 1) test_helpers.ExpectEqual(t, storageNodes[0], models.StorageNodeWithStateKeyModel{ - CID: sql.RemovedNodeStorageCID, + CID: shared.RemovedNodeStorageCID, NodeType: 3, StorageKey: common.BytesToHash(mocks.RemovedLeafKey).Hex(), StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(), @@ -620,7 +625,7 @@ func TestPublishAndIndexer(t *testing.T) { } mhKey = dshelp.MultihashToDsKey(dc.Hash()) prefixedKey = blockstore.BlockPrefix.String() + mhKey.String() - test_helpers.ExpectEqual(t, prefixedKey, sql.RemovedNodeMhKey) + test_helpers.ExpectEqual(t, prefixedKey, shared.RemovedNodeMhKey) err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey) if err != nil { t.Fatal(err) diff --git a/statediff/indexer/database/sql/test_helpers.go b/statediff/indexer/database/sql/test_helpers.go index cebddb9d1..7de7beec0 100644 --- a/statediff/indexer/database/sql/test_helpers.go +++ b/statediff/indexer/database/sql/test_helpers.go @@ -49,6 +49,18 @@ func TearDownDB(t *testing.T, db Database) { if err != nil { t.Fatal(err) } + _, err = tx.Exec(ctx, `DELETE FROM eth.state_accounts`) + if err != nil { + t.Fatal(err) + } + _, err = tx.Exec(ctx, `DELETE FROM eth.access_list_element`) + if err != nil { + t.Fatal(err) + } + _, err = tx.Exec(ctx, `DELETE FROM eth.log_cids`) + if err != nil { + t.Fatal(err) + } _, err = tx.Exec(ctx, `DELETE FROM blocks`) if err != nil { t.Fatal(err) diff --git a/statediff/indexer/shared/db_kind.go b/statediff/indexer/shared/db_kind.go index 711f9d050..78aae9f7f 100644 --- a/statediff/indexer/shared/db_kind.go +++ b/statediff/indexer/shared/db_kind.go @@ -16,9 +16,28 @@ package shared +import ( + "fmt" + "strings" +) + +// DBType to explicitly type the kind of DB type DBType string const ( POSTGRES DBType = "Postgres" DUMP DBType = "Dump" + UNKOWN DBType = "Unknown" ) + +// ResolveDBType resolves a DBType from a provided string +func ResolveDBType(str string) (DBType, error) { + switch strings.ToLower(str) { + case "postgres", "pg": + return POSTGRES, nil + case "dump", "d": + return DUMP, nil + default: + return UNKOWN, fmt.Errorf("unrecognized db type string: %s", str) + } +} diff --git a/statediff/service.go b/statediff/service.go index ae2e34c6c..8557f87d0 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -218,7 +218,6 @@ func (lbc *BlockCache) getParentBlock(currentBlock *types.Block, bc blockChain) type workerParams struct { chainEventCh <-chan core.ChainEvent - errCh <-chan error wg *sync.WaitGroup id uint } @@ -239,14 +238,21 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { statediffMetrics.lastEventHeight.Update(int64(chainEvent.Block.Number().Uint64())) statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) chainEventFwd <- chainEvent + case err := <-errCh: + log.Error("Error from chain event subscription", "error", err) + close(sds.QuitChan) case <-sds.QuitChan: + log.Info("Quitting the statediffing writing loop") + if err := sds.indexer.Close(); err != nil { + log.Error("Error closing indexer", "err", err) + } return } } }() wg.Add(int(sds.numWorkers)) for worker := uint(0); worker < sds.numWorkers; worker++ { - params := workerParams{chainEventCh: chainEventFwd, errCh: errCh, wg: &wg, id: worker} + params := workerParams{chainEventCh: chainEventFwd, wg: &wg, id: worker} go sds.writeLoopWorker(params) } wg.Wait() @@ -291,13 +297,8 @@ func (sds *Service) writeLoopWorker(params workerParams) { } // TODO: how to handle with concurrent workers statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64())) - case err := <-params.errCh: - log.Warn("Error from chain event subscription", "error", err, "worker", params.id) - sds.close() - return case <-sds.QuitChan: log.Info("Quitting the statediff writing process", "worker", params.id) - sds.close() return } } @@ -335,11 +336,10 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { sds.streamStateDiff(currentBlock, parentBlock.Root()) case err := <-errCh: - log.Warn("Error from chain event subscription", "error", err) - sds.close() - return + log.Error("Error from chain event subscription", "error", err) + close(sds.QuitChan) case <-sds.QuitChan: - log.Info("Quitting the statediffing process") + log.Info("Quitting the statediffing listening loop") sds.close() return }