From 27e96c4aedf502cd3ecb4b17070025c7dd05f3e8 Mon Sep 17 00:00:00 2001 From: i-norden Date: Mon, 31 Jan 2022 12:36:48 -0600 Subject: [PATCH] fixes --- cmd/geth/config.go | 4 +- cmd/geth/main.go | 36 +++++++---- cmd/geth/usage.go | 36 +++++++---- go.mod | 2 - go.sum | 2 - statediff/README.md | 47 +++++++++----- statediff/indexer/constructor.go | 2 +- statediff/indexer/database/dump/batch_tx.go | 16 +++-- .../indexer/database/file/indexer_test.go | 2 - statediff/indexer/database/sql/batch_tx.go | 5 +- statediff/indexer/database/sql/indexer.go | 64 ++++++++++--------- .../database/sql/{ => metrics}/metrics.go | 34 ++++++---- .../indexer/database/sql/postgres/pgx.go | 2 + .../indexer/database/sql/postgres/sqlx.go | 10 ++- .../database/sql/postgres/v2/database.go | 4 +- .../indexer/database/sql/test_helpers.go | 4 +- statediff/indexer/database/sql/v2/writer.go | 23 +++---- statediff/indexer/database/sql/v3/writer.go | 23 +++---- statediff/service.go | 7 +- 19 files changed, 193 insertions(+), 130 deletions(-) rename statediff/indexer/database/sql/{ => metrics}/metrics.go (86%) diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 35fce6ce8..dcdd07768 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -216,6 +216,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { ClientName: clientName, Driver: v2DriverType, } + fmt.Printf("v2 config: %+v\r\n", v2PgConfig) if ctx.GlobalIsSet(utils.StateDiffV2DBMinConns.Name) { v2PgConfig.MinConns = ctx.GlobalInt(utils.StateDiffV2DBMinConns.Name) } @@ -235,7 +236,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { v2PgConfig.ConnTimeout = ctx.GlobalDuration(utils.StateDiffV2DBConnTimeout.Name) * time.Second } - v3DriverTypeStr := ctx.GlobalString(utils.StateDiffV2DBDriverTypeFlag.Name) + v3DriverTypeStr := ctx.GlobalString(utils.StateDiffV3DBDriverTypeFlag.Name) v3DriverType, err := postgres.ResolveDriverType(v3DriverTypeStr) if err != nil { utils.Fatalf("%v", err) @@ -250,6 +251,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { ClientName: clientName, Driver: v3DriverType, } + fmt.Printf("v3 config: %+v\r\n", v3PgConfig) if ctx.GlobalIsSet(utils.StateDiffV3DBMinConns.Name) { v3PgConfig.MinConns = ctx.GlobalInt(utils.StateDiffV3DBMinConns.Name) } diff --git a/cmd/geth/main.go b/cmd/geth/main.go index fc8202c11..476d39582 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -158,19 +158,31 @@ var ( utils.MinerNotifyFullFlag, utils.StateDiffFlag, utils.StateDiffDBTypeFlag, - utils.StateDiffDBDriverTypeFlag, utils.StateDiffDBDumpDst, - utils.StateDiffDBNameFlag, - utils.StateDiffDBPasswordFlag, - utils.StateDiffDBUserFlag, - utils.StateDiffDBHostFlag, - utils.StateDiffDBPortFlag, - utils.StateDiffDBMaxConnLifetime, - utils.StateDiffDBMaxConnIdleTime, - utils.StateDiffDBMaxConns, - utils.StateDiffDBMinConns, - utils.StateDiffDBMaxIdleConns, - utils.StateDiffDBConnTimeout, + utils.StateDiffV2DBDriverTypeFlag, + utils.StateDiffV2DBNameFlag, + utils.StateDiffV2DBPasswordFlag, + utils.StateDiffV2DBUserFlag, + utils.StateDiffV2DBHostFlag, + utils.StateDiffV2DBPortFlag, + utils.StateDiffV2DBMaxConnLifetime, + utils.StateDiffV2DBMaxConnIdleTime, + utils.StateDiffV2DBMaxConns, + utils.StateDiffV2DBMinConns, + utils.StateDiffV2DBMaxIdleConns, + utils.StateDiffV2DBConnTimeout, + utils.StateDiffV3DBDriverTypeFlag, + utils.StateDiffV3DBNameFlag, + utils.StateDiffV3DBPasswordFlag, + utils.StateDiffV3DBUserFlag, + utils.StateDiffV3DBHostFlag, + utils.StateDiffV3DBPortFlag, + utils.StateDiffV3DBMaxConnLifetime, + utils.StateDiffV3DBMaxConnIdleTime, + utils.StateDiffV3DBMaxConns, + utils.StateDiffV3DBMinConns, + utils.StateDiffV3DBMaxIdleConns, + utils.StateDiffV3DBConnTimeout, utils.StateDiffDBNodeIDFlag, utils.StateDiffDBClientNameFlag, utils.StateDiffWritingFlag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 51b17083b..52e82b60e 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -228,19 +228,31 @@ var AppHelpFlagGroups = []flags.FlagGroup{ Flags: []cli.Flag{ utils.StateDiffFlag, utils.StateDiffDBTypeFlag, - utils.StateDiffDBDriverTypeFlag, utils.StateDiffDBDumpDst, - utils.StateDiffDBNameFlag, - utils.StateDiffDBPasswordFlag, - utils.StateDiffDBUserFlag, - utils.StateDiffDBHostFlag, - utils.StateDiffDBPortFlag, - utils.StateDiffDBMaxConnLifetime, - utils.StateDiffDBMaxConnIdleTime, - utils.StateDiffDBMaxConns, - utils.StateDiffDBMinConns, - utils.StateDiffDBMaxIdleConns, - utils.StateDiffDBConnTimeout, + utils.StateDiffV2DBDriverTypeFlag, + utils.StateDiffV2DBNameFlag, + utils.StateDiffV2DBPasswordFlag, + utils.StateDiffV2DBUserFlag, + utils.StateDiffV2DBHostFlag, + utils.StateDiffV2DBPortFlag, + utils.StateDiffV2DBMaxConnLifetime, + utils.StateDiffV2DBMaxConnIdleTime, + utils.StateDiffV2DBMaxConns, + utils.StateDiffV2DBMinConns, + utils.StateDiffV2DBMaxIdleConns, + utils.StateDiffV2DBConnTimeout, + utils.StateDiffV3DBDriverTypeFlag, + utils.StateDiffV3DBNameFlag, + utils.StateDiffV3DBPasswordFlag, + utils.StateDiffV3DBUserFlag, + utils.StateDiffV3DBHostFlag, + utils.StateDiffV3DBPortFlag, + utils.StateDiffV3DBMaxConnLifetime, + utils.StateDiffV3DBMaxConnIdleTime, + utils.StateDiffV3DBMaxConns, + utils.StateDiffV3DBMinConns, + utils.StateDiffV3DBMaxIdleConns, + utils.StateDiffV3DBConnTimeout, utils.StateDiffDBNodeIDFlag, utils.StateDiffDBClientNameFlag, utils.StateDiffWritingFlag, diff --git a/go.mod b/go.mod index da726b7c4..86a7aaa2f 100644 --- a/go.mod +++ b/go.mod @@ -47,9 +47,7 @@ require ( github.com/ipfs/go-ipfs-blockstore v1.0.1 github.com/ipfs/go-ipfs-ds-help v1.0.0 github.com/ipfs/go-ipld-format v0.2.0 - github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect github.com/jackc/pgconn v1.10.0 - github.com/jackc/pgx v3.6.2+incompatible github.com/jackc/pgx/v4 v4.13.0 github.com/jackpal/go-nat-pmp v1.0.2-0.20160603034137-1fa385a6f458 github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e diff --git a/go.sum b/go.sum index cf5e867db..df08fa841 100644 --- a/go.sum +++ b/go.sum @@ -305,8 +305,6 @@ github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9 github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= -github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGUK1IDqRa5lAAvEkZG1LKaCRc= -github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ= github.com/jackc/pgconn v0.0.0-20190420214824-7e0022ef6ba3/go.mod h1:jkELnwuX+w9qN5YIfX0fl88Ehu4XC3keFuOJJk9pcnA= github.com/jackc/pgconn v0.0.0-20190824142844-760dd75542eb/go.mod h1:lLjNuW/+OfW9/pnVKPazfWOgNfH2aPem8YQ7ilXGvJE= github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsUgOEh9hBm+xYTstcNHg7UPMVJqRfQxq4s= diff --git a/statediff/README.md b/statediff/README.md index 35d50e39d..897ad74a2 100644 --- a/statediff/README.md +++ b/statediff/README.md @@ -33,7 +33,7 @@ type StateNode struct { NodeType NodeType `json:"nodeType" gencodec:"required"` Path []byte `json:"path" gencodec:"required"` NodeValue []byte `json:"value" gencodec:"required"` - StorageNodes []StorageNode `json:"storage"` + StorageNodes []StorageNode `json:"storage"` LeafKey []byte `json:"leafKey"` } @@ -80,29 +80,42 @@ This service introduces a CLI flag namespace `statediff` `--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database `--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database `--statediff.db.type` is the type of database we write out to (current options: postgres, dump, file) +`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode `--statediff.dump.dst` is the destination to write to when operating in database dump mode (stdout, stderr, discard) -`--statediff.db.driver` is the specific driver to use for the database (current options for postgres: pgx and sqlx) -`--statediff.db.host` is the hostname/ip to dial to connect to the database -`--statediff.db.port` is the port to dial to connect to the database -`--statediff.db.name` is the name of the database to connect to -`--statediff.db.user` is the user to connect to the database as -`--statediff.db.password` is the password to use to connect to the database -`--statediff.db.conntimeout` is the connection timeout (in seconds) -`--statediff.db.maxconns` is the maximum number of database connections -`--statediff.db.minconns` is the minimum number of database connections -`--statediff.db.maxidleconns` is the maximum number of idle connections -`--statediff.db.maxconnidletime` is the maximum lifetime for an idle connection (in seconds) -`--statediff.db.maxconnlifetime` is the maximum lifetime for a connection (in seconds) `--statediff.db.nodeid` is the node id to use in the Postgres database `--statediff.db.clientname` is the client name to use in the Postgres database -`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode +`--statediff.db.v2.driver` is the specific driver to use for the v2 database (current options for postgres: pgx and sqlx) +`--statediff.db.v2.host` is the hostname/ip to dial to connect to the v2 database +`--statediff.db.v2.port` is the port to dial to connect to the v2 database +`--statediff.db.v2.name` is the name of the v2 database to connect to +`--statediff.db.v2.user` is the user to connect to the v2 database as +`--statediff.db.v2.password` is the password to use to connect to the v2 database +`--statediff.db.v2.conntimeout` is the connection timeout (in seconds) for v2 database +`--statediff.db.v2.maxconns` is the maximum number of database connections for v2 database +`--statediff.db.v2.minconns` is the minimum number of database connections for v2 database +`--statediff.db.v2.maxidleconns` is the maximum number of idle connections for v2 database +`--statediff.db.v2.maxconnidletime` is the maximum lifetime for an idle connection (in seconds) for v2 database +`--statediff.db.v2.maxconnlifetime` is the maximum lifetime for a connection (in seconds) for v2 database +`--statediff.db.v3.driver` is the specific driver to use for the v3 database (current options for postgres: pgx and sqlx) +`--statediff.db.v3.host` is the hostname/ip to dial to connect to the v3 database +`--statediff.db.v3.port` is the port to dial to connect to the v3 database +`--statediff.db.v3.name` is the name of the v3 database to connect to +`--statediff.db.v3.user` is the user to connect to the v3 database as +`--statediff.db.v3.password` is the password to use to connect to the v3 database +`--statediff.db.v3.conntimeout` is the connection timeout (in seconds) for v3 database +`--statediff.db.v3.maxconns` is the maximum number of database connections for v3 database +`--statediff.db.v3.minconns` is the minimum number of database connections for v3 database +`--statediff.db.v3.maxidleconns` is the maximum number of idle connections for v3 database +`--statediff.db.v3.maxconnidletime` is the maximum lifetime for an idle connection (in seconds) for v3 database +`--statediff.db.v3.maxconnlifetime` is the maximum lifetime for a connection (in seconds) for v3 database The service can only operate in full sync mode (`--syncmode=full`), but only the historical RPC endpoints require an archive node (`--gcmode=archive`) e.g. -` -./build/bin/geth --syncmode=full --gcmode=archive --statediff --statediff.writing --statediff.db.type=postgres --statediff.db.driver=sqlx --statediff.db.host=localhost --statediff.db.port=5432 --statediff.db.name=vulcanize_test --statediff.db.user=postgres --statediff.db.nodeid=nodeid --statediff.db.clientname=clientname -` +`./build/bin/geth --syncmode=full --gcmode=archive --statediff --statediff.writing --statediff.db.type=postgres --statediff.db.nodeid=nodeid --statediff.db.v2.driver=sqlx +--statediff.db.v3.driver=sqlx --statediff.db.v2.host=localhost --statediff.db.v3.host=localhost --statediff.db.v2.port=5432 --statediff.db.v3.port=5432 +--statediff.db.v2.name=vulcanize_dual_v2 --statediff.db.v3.name=vulcanize_dual_v3 --statediff.db.v2.user=postgres --statediff.db.v3.user=postgres +--statediff.db.clientname=clientname --statediff.workers=20` When operating in `--statediff.db.type=file` mode, the service will write SQL statements out to the file designated by `--statediff.file.path`. Please note that it writes out SQL statements with all `ON CONFLICT` constraint checks dropped. diff --git a/statediff/indexer/constructor.go b/statediff/indexer/constructor.go index a8f2d5211..f00af67e1 100644 --- a/statediff/indexer/constructor.go +++ b/statediff/indexer/constructor.go @@ -66,7 +66,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n default: return nil, fmt.Errorf("unrecongized Postgres driver type: %s", pgc.V2.Driver) } - switch pgc.V2.Driver { + switch pgc.V3.Driver { case postgres.PGX: newDriver, err = postgres.NewPGXDriver(ctx, pgc.V3) if err != nil { diff --git a/statediff/indexer/database/dump/batch_tx.go b/statediff/indexer/database/dump/batch_tx.go index 9e001dbca..0d7e5f3b3 100644 --- a/statediff/indexer/database/dump/batch_tx.go +++ b/statediff/indexer/database/dump/batch_tx.go @@ -20,6 +20,8 @@ import ( "fmt" "io" + sharedModels "github.com/ethereum/go-ethereum/statediff/indexer/models/shared" + "github.com/ethereum/go-ethereum/statediff/indexer/ipld" blockstore "github.com/ipfs/go-ipfs-blockstore" @@ -32,8 +34,8 @@ type BatchTx struct { BlockNumber uint64 dump io.Writer quit chan struct{} - iplds chan v3.IPLDModel - ipldCache v3.IPLDBatch + iplds chan sharedModels.IPLDModel + ipldCache sharedModels.IPLDBatch submit func(blockTx *BatchTx, err error) error } @@ -47,7 +49,7 @@ func (tx *BatchTx) flush() error { if _, err := fmt.Fprintf(tx.dump, "%+v\r\n", tx.ipldCache); err != nil { return err } - tx.ipldCache = v3.IPLDBatch{} + tx.ipldCache = sharedModels.IPLDBatch{} return nil } @@ -59,21 +61,21 @@ func (tx *BatchTx) cache() { tx.ipldCache.Keys = append(tx.ipldCache.Keys, i.Key) tx.ipldCache.Values = append(tx.ipldCache.Values, i.Data) case <-tx.quit: - tx.ipldCache = v3.IPLDBatch{} + tx.ipldCache = sharedModels.IPLDBatch{} return } } } func (tx *BatchTx) cacheDirect(key string, value []byte) { - tx.iplds <- v3.IPLDModel{ + tx.iplds <- sharedModels.IPLDModel{ Key: key, Data: value, } } func (tx *BatchTx) cacheIPLD(i node.Node) { - tx.iplds <- v3.IPLDModel{ + tx.iplds <- sharedModels.IPLDModel{ Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(), Data: i.RawData(), } @@ -85,7 +87,7 @@ func (tx *BatchTx) cacheRaw(codec, mh uint64, raw []byte) (string, string, error return "", "", err } prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String() - tx.iplds <- v3.IPLDModel{ + tx.iplds <- sharedModels.IPLDModel{ Key: prefixedKey, Data: raw, } diff --git a/statediff/indexer/database/file/indexer_test.go b/statediff/indexer/database/file/indexer_test.go index e5a030dcf..52bcfea68 100644 --- a/statediff/indexer/database/file/indexer_test.go +++ b/statediff/indexer/database/file/indexer_test.go @@ -24,8 +24,6 @@ import ( "os" "testing" - "github.com/ethereum/go-ethereum/statediff/indexer/models/v2" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff/indexer/shared" diff --git a/statediff/indexer/database/sql/batch_tx.go b/statediff/indexer/database/sql/batch_tx.go index deec4f07b..d72c7a72c 100644 --- a/statediff/indexer/database/sql/batch_tx.go +++ b/statediff/indexer/database/sql/batch_tx.go @@ -18,6 +18,7 @@ package sql import ( "context" + "fmt" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" @@ -53,11 +54,11 @@ func (tx *BatchTx) Submit(err error) error { func (tx *BatchTx) flush() error { _, err := tx.oldDBTx.Exec(tx.ctx, tx.oldStmt, pq.Array(tx.ipldCache.Keys), pq.Array(tx.ipldCache.Values)) if err != nil { - return err + return fmt.Errorf("error flushing IPLD cache to old DB: %v", err) } _, err = tx.newDBTx.Exec(tx.ctx, tx.newStmt, pq.Array(tx.ipldCache.Keys), pq.Array(tx.ipldCache.Values)) if err != nil { - return err + return fmt.Errorf("error flushing IPLD cache to new DB: %v", err) } tx.ipldCache = modelsShared.IPLDBatch{} return nil diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index 2b67832f1..010a675bb 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -27,6 +27,8 @@ import ( "strings" "time" + metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/metrics" + "github.com/ipfs/go-cid" node "github.com/ipfs/go-ipld-format" "github.com/multiformats/go-multihash" @@ -53,8 +55,8 @@ import ( var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} var ( - indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) - dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry) + indexerMetrics = metrics2.RegisterIndexerMetrics(metrics.DefaultRegistry) + dbMetrics = metrics2.RegisterDBMetrics(metrics.DefaultRegistry) ) // StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of an SQL sql @@ -69,19 +71,19 @@ type StateDiffIndexer struct { func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, info nodeInfo.Info, old, new interfaces.Database) (*StateDiffIndexer, error) { // Write the removed node to the db on init if _, err := old.Exec(ctx, old.InsertIPLDStm(), shared.RemovedNodeMhKey, []byte{}); err != nil { - return nil, err + return nil, fmt.Errorf("unable to write removed node IPLD to old DB: %v", err) } if _, err := new.Exec(ctx, new.InsertIPLDStm(), shared.RemovedNodeMhKey, []byte{}); err != nil { - return nil, err + return nil, fmt.Errorf("unable to write removed node IPLD to new DB: %v", err) } // Write node info to the db on init oldWriter := v2Writer.NewWriter(old) newWriter := v3Writer.NewWriter(new) if err := oldWriter.InsertNodeInfo(info); err != nil { - return nil, err + return nil, fmt.Errorf("unable to write node info to old DB: %v", err) } if err := newWriter.InsertNodeInfo(info); err != nil { - return nil, err + return nil, fmt.Errorf("unable to write node info to new DB: %v", err) } return &StateDiffIndexer{ ctx: ctx, @@ -171,31 +173,26 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip if err != nil { return nil, 0, err } - defer func() { - if p := recover(); p != nil { - rollback(sdi.ctx, oldTx) - panic(p) - } else if err != nil { - rollback(sdi.ctx, oldTx) - } - }() newTx, err := sdi.newDBWriter.DB.Begin(sdi.ctx) if err != nil { + rollback(sdi.ctx, oldTx) return nil, 0, err } defer func() { if p := recover(); p != nil { rollback(sdi.ctx, newTx) + rollback(sdi.ctx, oldTx) panic(p) } else if err != nil { rollback(sdi.ctx, newTx) + rollback(sdi.ctx, oldTx) } }() blockTx := &BatchTx{ ctx: sdi.ctx, BlockNumber: height, oldStmt: sdi.oldDBWriter.DB.InsertIPLDsStm(), - newStmt: sdi.newDBWriter.DB.InsertStateStm(), + newStmt: sdi.newDBWriter.DB.InsertIPLDsStm(), iplds: make(chan sharedModels.IPLDModel), quit: make(chan struct{}), ipldCache: sharedModels.IPLDBatch{}, @@ -310,10 +307,11 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he baseFee = new(string) *baseFee = header.BaseFee.String() } + mhKey := shared.MultihashKeyFromCID(headerNode.Cid()) // index header headerID, err := sdi.oldDBWriter.InsertHeaderCID(tx.oldDBTx, &v2Models.HeaderModel{ CID: headerNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), + MhKey: mhKey, ParentHash: header.ParentHash.String(), BlockNumber: header.Number.String(), BlockHash: header.Hash().String(), @@ -330,9 +328,9 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he if err != nil { return 0, err } - if err := sdi.newDBWriter.InsertHeaderCID(tx.newDBTx, v3Models.HeaderModel{ + if err := sdi.newDBWriter.InsertHeaderCID(tx.newDBTx, &v3Models.HeaderModel{ CID: headerNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), + MhKey: mhKey, ParentHash: header.ParentHash.String(), BlockNumber: header.Number.String(), BlockHash: header.Hash().String(), @@ -363,10 +361,11 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerHash string, heade } else { uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) } + mhKey := shared.MultihashKeyFromCID(uncleNode.Cid()) if err := sdi.oldDBWriter.InsertUncleCID(tx.oldDBTx, &v2Models.UncleModel{ HeaderID: headerID, CID: uncleNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), + MhKey: mhKey, ParentHash: uncleNode.ParentHash.String(), BlockHash: uncleNode.Hash().String(), Reward: uncleReward.String(), @@ -376,7 +375,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerHash string, heade if err := sdi.newDBWriter.InsertUncleCID(tx.newDBTx, &v3Models.UncleModel{ HeaderID: headerHash, CID: uncleNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), + MhKey: mhKey, ParentHash: uncleNode.ParentHash.String(), BlockHash: uncleNode.Hash().String(), Reward: uncleReward.String(), @@ -428,15 +427,18 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs if err != nil { return fmt.Errorf("error deriving tx sender: %v", err) } + mhKey := shared.MultihashKeyFromCID(txNode.Cid()) + dst := shared.HandleZeroAddrPointer(trx.To()) + src := shared.HandleZeroAddr(from) txID, err := sdi.oldDBWriter.InsertTransactionCID(tx.oldDBTx, &v2Models.TxModel{ HeaderID: args.headerID, - Dst: shared.HandleZeroAddrPointer(trx.To()), - Src: shared.HandleZeroAddr(from), + Dst: dst, + Src: src, TxHash: txHash, Index: int64(i), Data: trx.Data(), CID: txNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(txNode.Cid()), + MhKey: mhKey, Type: trx.Type(), }) if err != nil { @@ -444,13 +446,13 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } if err := sdi.newDBWriter.InsertTransactionCID(tx.newDBTx, &v3Models.TxModel{ HeaderID: args.headerHash, - Dst: shared.HandleZeroAddrPointer(trx.To()), - Src: shared.HandleZeroAddr(from), + Dst: dst, + Src: src, TxHash: txHash, Index: int64(i), Data: trx.Data(), CID: txNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(txNode.Cid()), + MhKey: mhKey, Type: trx.Type(), Value: val, }); err != nil { @@ -500,12 +502,13 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs postState = common.Bytes2Hex(receipt.PostState) } + rctMhKey := shared.MultihashKeyFromCID(args.rctLeafNodeCIDs[i]) rctID, err := sdi.oldDBWriter.InsertReceiptCID(tx.oldDBTx, &v2Models.ReceiptModel{ TxID: txID, Contract: contract, ContractHash: contractHash, LeafCID: args.rctLeafNodeCIDs[i].String(), - LeafMhKey: shared.MultihashKeyFromCID(args.rctLeafNodeCIDs[i]), + LeafMhKey: rctMhKey, LogRoot: args.rctNodes[i].LogRoot.String(), PostState: postState, PostStatus: postStatus, @@ -518,7 +521,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs Contract: contract, ContractHash: contractHash, LeafCID: args.rctLeafNodeCIDs[i].String(), - LeafMhKey: shared.MultihashKeyFromCID(args.rctLeafNodeCIDs[i]), + LeafMhKey: rctMhKey, LogRoot: args.rctNodes[i].LogRoot.String(), PostState: postState, PostStatus: postStatus, @@ -540,13 +543,14 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs return fmt.Errorf("invalid log cid") } + logMhKey := shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]) oldLogDataSet[idx] = &v2Models.LogsModel{ ReceiptID: rctID, Address: l.Address.String(), Index: int64(l.Index), Data: l.Data, LeafCID: args.logLeafNodeCIDs[i][idx].String(), - LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), + LeafMhKey: logMhKey, Topic0: topicSet[0], Topic1: topicSet[1], Topic2: topicSet[2], @@ -558,7 +562,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs Index: int64(l.Index), Data: l.Data, LeafCID: args.logLeafNodeCIDs[i][idx].String(), - LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), + LeafMhKey: logMhKey, Topic0: topicSet[0], Topic1: topicSet[1], Topic2: topicSet[2], diff --git a/statediff/indexer/database/sql/metrics.go b/statediff/indexer/database/sql/metrics/metrics.go similarity index 86% rename from statediff/indexer/database/sql/metrics.go rename to statediff/indexer/database/sql/metrics/metrics.go index f59edcf14..9afc5f4b1 100644 --- a/statediff/indexer/database/sql/metrics.go +++ b/statediff/indexer/database/sql/metrics/metrics.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package sql +package metrics import ( "strings" @@ -41,7 +41,7 @@ func metricName(subsystem, name string) string { return strings.Join(parts, "/") } -type IndexerMetricsHandles struct { +type WriterMetricsHandles struct { // The total number of processed blocks Blocks metrics.Counter // The total number of processed transactions @@ -52,6 +52,9 @@ type IndexerMetricsHandles struct { Logs metrics.Counter // The total number of access list entries processed AccessListEntries metrics.Counter +} + +type IndexerMetricsHandles struct { // Time spent waiting for free postgres tx TimeFreePostgres metrics.Timer // Postgres transaction commit duration @@ -66,13 +69,25 @@ type IndexerMetricsHandles struct { TimeStateStoreCodeProcessing metrics.Timer } +func RegisterWriterMetrics(reg metrics.Registry, version string) WriterMetricsHandles { + ctx := WriterMetricsHandles{ + Blocks: metrics.NewCounter(), + Transactions: metrics.NewCounter(), + Receipts: metrics.NewCounter(), + Logs: metrics.NewCounter(), + AccessListEntries: metrics.NewCounter(), + } + subsys := "writer" + reg.Register(metricName(subsys, version+"/"+"blocks"), ctx.Blocks) + reg.Register(metricName(subsys, version+"/"+"transactions"), ctx.Transactions) + reg.Register(metricName(subsys, version+"/"+"receipts"), ctx.Receipts) + reg.Register(metricName(subsys, version+"/"+"logs"), ctx.Logs) + reg.Register(metricName(subsys, version+"/"+"access_list_entries"), ctx.AccessListEntries) + return ctx +} + func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles { ctx := IndexerMetricsHandles{ - Blocks: metrics.NewCounter(), - Transactions: metrics.NewCounter(), - Receipts: metrics.NewCounter(), - Logs: metrics.NewCounter(), - AccessListEntries: metrics.NewCounter(), TimeFreePostgres: metrics.NewTimer(), TimePostgresCommit: metrics.NewTimer(), TimeHeaderProcessing: metrics.NewTimer(), @@ -81,11 +96,6 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles { TimeStateStoreCodeProcessing: metrics.NewTimer(), } subsys := "indexer" - reg.Register(metricName(subsys, "blocks"), ctx.Blocks) - reg.Register(metricName(subsys, "transactions"), ctx.Transactions) - reg.Register(metricName(subsys, "receipts"), ctx.Receipts) - reg.Register(metricName(subsys, "logs"), ctx.Logs) - reg.Register(metricName(subsys, "access_list_entries"), ctx.AccessListEntries) reg.Register(metricName(subsys, "t_free_postgres"), ctx.TimeFreePostgres) reg.Register(metricName(subsys, "t_postgres_commit"), ctx.TimePostgresCommit) reg.Register(metricName(subsys, "t_header_processing"), ctx.TimeHeaderProcessing) diff --git a/statediff/indexer/database/sql/postgres/pgx.go b/statediff/indexer/database/sql/postgres/pgx.go index b720f7fe5..9911d45bf 100644 --- a/statediff/indexer/database/sql/postgres/pgx.go +++ b/statediff/indexer/database/sql/postgres/pgx.go @@ -25,6 +25,7 @@ import ( "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" ) @@ -37,6 +38,7 @@ type PGXDriver struct { // NewPGXDriver returns a new pgx driver // it initializes the connection pool and creates the node info table func NewPGXDriver(ctx context.Context, config Config) (*PGXDriver, error) { + log.Info("connecting to database", "connection string", config.DbConnectionString()) pgConf, err := MakeConfig(config) if err != nil { return nil, err diff --git a/statediff/indexer/database/sql/postgres/sqlx.go b/statediff/indexer/database/sql/postgres/sqlx.go index 733c35734..6187c9f38 100644 --- a/statediff/indexer/database/sql/postgres/sqlx.go +++ b/statediff/indexer/database/sql/postgres/sqlx.go @@ -18,12 +18,14 @@ package postgres import ( "context" - coresql "database/sql" "time" - "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" + coresql "database/sql" "github.com/jmoiron/sqlx" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" ) // SQLXDriver driver, implements sql.Driver @@ -35,7 +37,9 @@ type SQLXDriver struct { // NewSQLXDriver returns a new sqlx driver for Postgres // it initializes the connection pool and creates the node info table func NewSQLXDriver(ctx context.Context, config Config) (*SQLXDriver, error) { - db, err := sqlx.ConnectContext(ctx, "postgres", config.DbConnectionString()) + connStr := config.DbConnectionString() + log.Info("connecting to database", "connection string", connStr) + db, err := sqlx.ConnectContext(ctx, "postgres", connStr) if err != nil { return &SQLXDriver{}, ErrDBConnectionFailed(err) } diff --git a/statediff/indexer/database/sql/postgres/v2/database.go b/statediff/indexer/database/sql/postgres/v2/database.go index f632e09c5..d4ca1efc8 100644 --- a/statediff/indexer/database/sql/postgres/v2/database.go +++ b/statediff/indexer/database/sql/postgres/v2/database.go @@ -37,7 +37,9 @@ type DB struct { // InsertNodeInfoStm satisfies interfaces.Statements func (db *DB) InsertNodeInfoStm() string { return `INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (genesis_block, network_id, node_id, chain_id) DO NOTHING` + ON CONFLICT (genesis_block, network_id, node_id, chain_id) + DO UPDATE SET client_name = $4 + RETURNING ID` } // InsertHeaderStm satisfies the interfaces.Statements diff --git a/statediff/indexer/database/sql/test_helpers.go b/statediff/indexer/database/sql/test_helpers.go index b1032f8ff..1f392b93f 100644 --- a/statediff/indexer/database/sql/test_helpers.go +++ b/statediff/indexer/database/sql/test_helpers.go @@ -19,10 +19,12 @@ package sql import ( "context" "testing" + + "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" ) // TearDownDB is used to tear down the watcher dbs after tests -func TearDownDB(t *testing.T, db Database) { +func TearDownDB(t *testing.T, db interfaces.Database) { ctx := context.Background() tx, err := db.Begin(ctx) if err != nil { diff --git a/statediff/indexer/database/sql/v2/writer.go b/statediff/indexer/database/sql/v2/writer.go index feeb49c56..7b4e2a758 100644 --- a/statediff/indexer/database/sql/v2/writer.go +++ b/statediff/indexer/database/sql/v2/writer.go @@ -20,21 +20,22 @@ import ( "fmt" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" + "github.com/ethereum/go-ethereum/metrics" + metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/models/v2" "github.com/ethereum/go-ethereum/statediff/indexer/node" ) var ( - nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000") + nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000") + writerV2Metrics = metrics2.RegisterWriterMetrics(metrics.DefaultRegistry, "v3") ) // Writer handles processing and writing of indexed IPLD objects to Postgres type Writer struct { - DB interfaces.Database - metrics sql.IndexerMetricsHandles - nodeID int64 + DB interfaces.Database + nodeID int64 } // NewWriter creates a new pointer to a Writer @@ -52,7 +53,7 @@ func (w *Writer) Close() error { /* InsertNodeInfo inserts a node info model INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES ($1, $2, $3, $4, $5) -ON CONFLICT (genesis_block, network_id, node_id, chain_id) DO NOTHING +ON CONFLICT (genesis_block, network_id, node_id, chain_id) DO NOTHING RETURNING ID */ func (w *Writer) InsertNodeInfo(info node.Info) error { var nodeID int64 @@ -79,7 +80,7 @@ func (w *Writer) InsertHeaderCID(tx interfaces.Tx, header *models.HeaderModel) ( if err != nil { return 0, fmt.Errorf("error inserting header_cids entry: %v", err) } - w.metrics.Blocks.Inc(1) + writerV2Metrics.Blocks.Inc(1) return headerID, nil } @@ -110,7 +111,7 @@ func (w *Writer) InsertTransactionCID(tx interfaces.Tx, transaction *models.TxMo if err != nil { return 0, fmt.Errorf("error inserting transaction_cids entry: %v", err) } - w.metrics.Transactions.Inc(1) + writerV2Metrics.Transactions.Inc(1) return txID, nil } @@ -125,7 +126,7 @@ func (w *Writer) InsertAccessListElement(tx interfaces.Tx, accessListElement *mo if err != nil { return fmt.Errorf("error inserting access_list_element entry: %v", err) } - w.metrics.AccessListEntries.Inc(1) + writerV2Metrics.AccessListEntries.Inc(1) return nil } @@ -141,7 +142,7 @@ func (w *Writer) InsertReceiptCID(tx interfaces.Tx, rct *models.ReceiptModel) (i if err != nil { return 0, fmt.Errorf("error inserting receipt_cids entry: %w", err) } - w.metrics.Receipts.Inc(1) + writerV2Metrics.Receipts.Inc(1) return receiptID, nil } @@ -158,7 +159,7 @@ func (w *Writer) InsertLogCID(tx interfaces.Tx, logs []*models.LogsModel) error if err != nil { return fmt.Errorf("error inserting logs entry: %w", err) } - w.metrics.Logs.Inc(1) + writerV2Metrics.Logs.Inc(1) } return nil } diff --git a/statediff/indexer/database/sql/v3/writer.go b/statediff/indexer/database/sql/v3/writer.go index 593cb3339..2cda907ad 100644 --- a/statediff/indexer/database/sql/v3/writer.go +++ b/statediff/indexer/database/sql/v3/writer.go @@ -20,21 +20,22 @@ import ( "fmt" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" + "github.com/ethereum/go-ethereum/metrics" + metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/models/v3" "github.com/ethereum/go-ethereum/statediff/indexer/node" ) var ( - nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000") + nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000") + writerV3Metrics = metrics2.RegisterWriterMetrics(metrics.DefaultRegistry, "v3") ) // Writer handles processing and writing of indexed IPLD objects to Postgres type Writer struct { - DB interfaces.Database - metrics sql.IndexerMetricsHandles - nodeID string + DB interfaces.Database + nodeID string } // NewWriter creates a new pointer to a Writer @@ -69,7 +70,7 @@ INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, nod VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash) DO UPDATE SET (block_number, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($1, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16) */ -func (w *Writer) InsertHeaderCID(tx interfaces.Tx, header models.HeaderModel) error { +func (w *Writer) InsertHeaderCID(tx interfaces.Tx, header *models.HeaderModel) error { _, err := tx.Exec(w.DB.Context(), w.DB.InsertHeaderStm(), header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, w.nodeID, header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, @@ -77,7 +78,7 @@ func (w *Writer) InsertHeaderCID(tx interfaces.Tx, header models.HeaderModel) er if err != nil { return fmt.Errorf("error inserting header_cids entry: %v", err) } - w.metrics.Blocks.Inc(1) + writerV3Metrics.Blocks.Inc(1) return nil } @@ -107,7 +108,7 @@ func (w *Writer) InsertTransactionCID(tx interfaces.Tx, transaction *models.TxMo if err != nil { return fmt.Errorf("error inserting transaction_cids entry: %v", err) } - w.metrics.Transactions.Inc(1) + writerV3Metrics.Transactions.Inc(1) return nil } @@ -122,7 +123,7 @@ func (w *Writer) InsertAccessListElement(tx interfaces.Tx, accessListElement *mo if err != nil { return fmt.Errorf("error inserting access_list_element entry: %v", err) } - w.metrics.AccessListEntries.Inc(1) + writerV3Metrics.AccessListEntries.Inc(1) return nil } @@ -137,7 +138,7 @@ func (w *Writer) InsertReceiptCID(tx interfaces.Tx, rct *models.ReceiptModel) er if err != nil { return fmt.Errorf("error inserting receipt_cids entry: %w", err) } - w.metrics.Receipts.Inc(1) + writerV3Metrics.Receipts.Inc(1) return nil } @@ -154,7 +155,7 @@ func (w *Writer) InsertLogCID(tx interfaces.Tx, logs []*models.LogsModel) error if err != nil { return fmt.Errorf("error inserting logs entry: %w", err) } - w.metrics.Logs.Inc(1) + writerV3Metrics.Logs.Inc(1) } return nil } diff --git a/statediff/service.go b/statediff/service.go index 88186b99d..46bcb9213 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -18,6 +18,7 @@ package statediff import ( "bytes" + "fmt" "math/big" "strconv" "strings" @@ -162,7 +163,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params var err error indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig) if err != nil { - return err + return fmt.Errorf("unable to initialize a new statediff indexer: %v", err) } indexer.ReportOldDBMetrics(10*time.Second, quitCh) indexer.ReportNewDBMetrics(10*time.Second, quitCh) @@ -235,7 +236,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh) defer chainEventSub.Unsubscribe() errCh := chainEventSub.Err() - var wg sync.WaitGroup + wg := new(sync.WaitGroup) // Process metrics for chain events, then forward to workers chainEventFwd := make(chan core.ChainEvent, chainEventChanSize) wg.Add(1) @@ -266,7 +267,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { }() wg.Add(int(sds.numWorkers)) for worker := uint(0); worker < sds.numWorkers; worker++ { - params := workerParams{chainEventCh: chainEventFwd, wg: &wg, id: worker} + params := workerParams{chainEventCh: chainEventFwd, wg: wg, id: worker} go sds.writeLoopWorker(params) } wg.Wait()