This commit is contained in:
i-norden 2022-01-31 12:36:48 -06:00
parent 4519813341
commit 27e96c4aed
19 changed files with 193 additions and 130 deletions

View File

@ -216,6 +216,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
ClientName: clientName, ClientName: clientName,
Driver: v2DriverType, Driver: v2DriverType,
} }
fmt.Printf("v2 config: %+v\r\n", v2PgConfig)
if ctx.GlobalIsSet(utils.StateDiffV2DBMinConns.Name) { if ctx.GlobalIsSet(utils.StateDiffV2DBMinConns.Name) {
v2PgConfig.MinConns = ctx.GlobalInt(utils.StateDiffV2DBMinConns.Name) v2PgConfig.MinConns = ctx.GlobalInt(utils.StateDiffV2DBMinConns.Name)
} }
@ -235,7 +236,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
v2PgConfig.ConnTimeout = ctx.GlobalDuration(utils.StateDiffV2DBConnTimeout.Name) * time.Second v2PgConfig.ConnTimeout = ctx.GlobalDuration(utils.StateDiffV2DBConnTimeout.Name) * time.Second
} }
v3DriverTypeStr := ctx.GlobalString(utils.StateDiffV2DBDriverTypeFlag.Name) v3DriverTypeStr := ctx.GlobalString(utils.StateDiffV3DBDriverTypeFlag.Name)
v3DriverType, err := postgres.ResolveDriverType(v3DriverTypeStr) v3DriverType, err := postgres.ResolveDriverType(v3DriverTypeStr)
if err != nil { if err != nil {
utils.Fatalf("%v", err) utils.Fatalf("%v", err)
@ -250,6 +251,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
ClientName: clientName, ClientName: clientName,
Driver: v3DriverType, Driver: v3DriverType,
} }
fmt.Printf("v3 config: %+v\r\n", v3PgConfig)
if ctx.GlobalIsSet(utils.StateDiffV3DBMinConns.Name) { if ctx.GlobalIsSet(utils.StateDiffV3DBMinConns.Name) {
v3PgConfig.MinConns = ctx.GlobalInt(utils.StateDiffV3DBMinConns.Name) v3PgConfig.MinConns = ctx.GlobalInt(utils.StateDiffV3DBMinConns.Name)
} }

View File

@ -158,19 +158,31 @@ var (
utils.MinerNotifyFullFlag, utils.MinerNotifyFullFlag,
utils.StateDiffFlag, utils.StateDiffFlag,
utils.StateDiffDBTypeFlag, utils.StateDiffDBTypeFlag,
utils.StateDiffDBDriverTypeFlag,
utils.StateDiffDBDumpDst, utils.StateDiffDBDumpDst,
utils.StateDiffDBNameFlag, utils.StateDiffV2DBDriverTypeFlag,
utils.StateDiffDBPasswordFlag, utils.StateDiffV2DBNameFlag,
utils.StateDiffDBUserFlag, utils.StateDiffV2DBPasswordFlag,
utils.StateDiffDBHostFlag, utils.StateDiffV2DBUserFlag,
utils.StateDiffDBPortFlag, utils.StateDiffV2DBHostFlag,
utils.StateDiffDBMaxConnLifetime, utils.StateDiffV2DBPortFlag,
utils.StateDiffDBMaxConnIdleTime, utils.StateDiffV2DBMaxConnLifetime,
utils.StateDiffDBMaxConns, utils.StateDiffV2DBMaxConnIdleTime,
utils.StateDiffDBMinConns, utils.StateDiffV2DBMaxConns,
utils.StateDiffDBMaxIdleConns, utils.StateDiffV2DBMinConns,
utils.StateDiffDBConnTimeout, utils.StateDiffV2DBMaxIdleConns,
utils.StateDiffV2DBConnTimeout,
utils.StateDiffV3DBDriverTypeFlag,
utils.StateDiffV3DBNameFlag,
utils.StateDiffV3DBPasswordFlag,
utils.StateDiffV3DBUserFlag,
utils.StateDiffV3DBHostFlag,
utils.StateDiffV3DBPortFlag,
utils.StateDiffV3DBMaxConnLifetime,
utils.StateDiffV3DBMaxConnIdleTime,
utils.StateDiffV3DBMaxConns,
utils.StateDiffV3DBMinConns,
utils.StateDiffV3DBMaxIdleConns,
utils.StateDiffV3DBConnTimeout,
utils.StateDiffDBNodeIDFlag, utils.StateDiffDBNodeIDFlag,
utils.StateDiffDBClientNameFlag, utils.StateDiffDBClientNameFlag,
utils.StateDiffWritingFlag, utils.StateDiffWritingFlag,

View File

@ -228,19 +228,31 @@ var AppHelpFlagGroups = []flags.FlagGroup{
Flags: []cli.Flag{ Flags: []cli.Flag{
utils.StateDiffFlag, utils.StateDiffFlag,
utils.StateDiffDBTypeFlag, utils.StateDiffDBTypeFlag,
utils.StateDiffDBDriverTypeFlag,
utils.StateDiffDBDumpDst, utils.StateDiffDBDumpDst,
utils.StateDiffDBNameFlag, utils.StateDiffV2DBDriverTypeFlag,
utils.StateDiffDBPasswordFlag, utils.StateDiffV2DBNameFlag,
utils.StateDiffDBUserFlag, utils.StateDiffV2DBPasswordFlag,
utils.StateDiffDBHostFlag, utils.StateDiffV2DBUserFlag,
utils.StateDiffDBPortFlag, utils.StateDiffV2DBHostFlag,
utils.StateDiffDBMaxConnLifetime, utils.StateDiffV2DBPortFlag,
utils.StateDiffDBMaxConnIdleTime, utils.StateDiffV2DBMaxConnLifetime,
utils.StateDiffDBMaxConns, utils.StateDiffV2DBMaxConnIdleTime,
utils.StateDiffDBMinConns, utils.StateDiffV2DBMaxConns,
utils.StateDiffDBMaxIdleConns, utils.StateDiffV2DBMinConns,
utils.StateDiffDBConnTimeout, utils.StateDiffV2DBMaxIdleConns,
utils.StateDiffV2DBConnTimeout,
utils.StateDiffV3DBDriverTypeFlag,
utils.StateDiffV3DBNameFlag,
utils.StateDiffV3DBPasswordFlag,
utils.StateDiffV3DBUserFlag,
utils.StateDiffV3DBHostFlag,
utils.StateDiffV3DBPortFlag,
utils.StateDiffV3DBMaxConnLifetime,
utils.StateDiffV3DBMaxConnIdleTime,
utils.StateDiffV3DBMaxConns,
utils.StateDiffV3DBMinConns,
utils.StateDiffV3DBMaxIdleConns,
utils.StateDiffV3DBConnTimeout,
utils.StateDiffDBNodeIDFlag, utils.StateDiffDBNodeIDFlag,
utils.StateDiffDBClientNameFlag, utils.StateDiffDBClientNameFlag,
utils.StateDiffWritingFlag, utils.StateDiffWritingFlag,

2
go.mod
View File

@ -47,9 +47,7 @@ require (
github.com/ipfs/go-ipfs-blockstore v1.0.1 github.com/ipfs/go-ipfs-blockstore v1.0.1
github.com/ipfs/go-ipfs-ds-help v1.0.0 github.com/ipfs/go-ipfs-ds-help v1.0.0
github.com/ipfs/go-ipld-format v0.2.0 github.com/ipfs/go-ipld-format v0.2.0
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
github.com/jackc/pgconn v1.10.0 github.com/jackc/pgconn v1.10.0
github.com/jackc/pgx v3.6.2+incompatible
github.com/jackc/pgx/v4 v4.13.0 github.com/jackc/pgx/v4 v4.13.0
github.com/jackpal/go-nat-pmp v1.0.2-0.20160603034137-1fa385a6f458 github.com/jackpal/go-nat-pmp v1.0.2-0.20160603034137-1fa385a6f458
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e

2
go.sum
View File

@ -305,8 +305,6 @@ github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9
github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk=
github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8=
github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk=
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGUK1IDqRa5lAAvEkZG1LKaCRc=
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ=
github.com/jackc/pgconn v0.0.0-20190420214824-7e0022ef6ba3/go.mod h1:jkELnwuX+w9qN5YIfX0fl88Ehu4XC3keFuOJJk9pcnA= github.com/jackc/pgconn v0.0.0-20190420214824-7e0022ef6ba3/go.mod h1:jkELnwuX+w9qN5YIfX0fl88Ehu4XC3keFuOJJk9pcnA=
github.com/jackc/pgconn v0.0.0-20190824142844-760dd75542eb/go.mod h1:lLjNuW/+OfW9/pnVKPazfWOgNfH2aPem8YQ7ilXGvJE= github.com/jackc/pgconn v0.0.0-20190824142844-760dd75542eb/go.mod h1:lLjNuW/+OfW9/pnVKPazfWOgNfH2aPem8YQ7ilXGvJE=
github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsUgOEh9hBm+xYTstcNHg7UPMVJqRfQxq4s= github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsUgOEh9hBm+xYTstcNHg7UPMVJqRfQxq4s=

View File

@ -33,7 +33,7 @@ type StateNode struct {
NodeType NodeType `json:"nodeType" gencodec:"required"` NodeType NodeType `json:"nodeType" gencodec:"required"`
Path []byte `json:"path" gencodec:"required"` Path []byte `json:"path" gencodec:"required"`
NodeValue []byte `json:"value" gencodec:"required"` NodeValue []byte `json:"value" gencodec:"required"`
StorageNodes []StorageNode `json:"storage"` StorageNodes []StorageNode `json:"storage"`
LeafKey []byte `json:"leafKey"` LeafKey []byte `json:"leafKey"`
} }
@ -80,29 +80,42 @@ This service introduces a CLI flag namespace `statediff`
`--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database `--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database
`--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database `--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database
`--statediff.db.type` is the type of database we write out to (current options: postgres, dump, file) `--statediff.db.type` is the type of database we write out to (current options: postgres, dump, file)
`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode
`--statediff.dump.dst` is the destination to write to when operating in database dump mode (stdout, stderr, discard) `--statediff.dump.dst` is the destination to write to when operating in database dump mode (stdout, stderr, discard)
`--statediff.db.driver` is the specific driver to use for the database (current options for postgres: pgx and sqlx)
`--statediff.db.host` is the hostname/ip to dial to connect to the database
`--statediff.db.port` is the port to dial to connect to the database
`--statediff.db.name` is the name of the database to connect to
`--statediff.db.user` is the user to connect to the database as
`--statediff.db.password` is the password to use to connect to the database
`--statediff.db.conntimeout` is the connection timeout (in seconds)
`--statediff.db.maxconns` is the maximum number of database connections
`--statediff.db.minconns` is the minimum number of database connections
`--statediff.db.maxidleconns` is the maximum number of idle connections
`--statediff.db.maxconnidletime` is the maximum lifetime for an idle connection (in seconds)
`--statediff.db.maxconnlifetime` is the maximum lifetime for a connection (in seconds)
`--statediff.db.nodeid` is the node id to use in the Postgres database `--statediff.db.nodeid` is the node id to use in the Postgres database
`--statediff.db.clientname` is the client name to use in the Postgres database `--statediff.db.clientname` is the client name to use in the Postgres database
`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode `--statediff.db.v2.driver` is the specific driver to use for the v2 database (current options for postgres: pgx and sqlx)
`--statediff.db.v2.host` is the hostname/ip to dial to connect to the v2 database
`--statediff.db.v2.port` is the port to dial to connect to the v2 database
`--statediff.db.v2.name` is the name of the v2 database to connect to
`--statediff.db.v2.user` is the user to connect to the v2 database as
`--statediff.db.v2.password` is the password to use to connect to the v2 database
`--statediff.db.v2.conntimeout` is the connection timeout (in seconds) for v2 database
`--statediff.db.v2.maxconns` is the maximum number of database connections for v2 database
`--statediff.db.v2.minconns` is the minimum number of database connections for v2 database
`--statediff.db.v2.maxidleconns` is the maximum number of idle connections for v2 database
`--statediff.db.v2.maxconnidletime` is the maximum lifetime for an idle connection (in seconds) for v2 database
`--statediff.db.v2.maxconnlifetime` is the maximum lifetime for a connection (in seconds) for v2 database
`--statediff.db.v3.driver` is the specific driver to use for the v3 database (current options for postgres: pgx and sqlx)
`--statediff.db.v3.host` is the hostname/ip to dial to connect to the v3 database
`--statediff.db.v3.port` is the port to dial to connect to the v3 database
`--statediff.db.v3.name` is the name of the v3 database to connect to
`--statediff.db.v3.user` is the user to connect to the v3 database as
`--statediff.db.v3.password` is the password to use to connect to the v3 database
`--statediff.db.v3.conntimeout` is the connection timeout (in seconds) for v3 database
`--statediff.db.v3.maxconns` is the maximum number of database connections for v3 database
`--statediff.db.v3.minconns` is the minimum number of database connections for v3 database
`--statediff.db.v3.maxidleconns` is the maximum number of idle connections for v3 database
`--statediff.db.v3.maxconnidletime` is the maximum lifetime for an idle connection (in seconds) for v3 database
`--statediff.db.v3.maxconnlifetime` is the maximum lifetime for a connection (in seconds) for v3 database
The service can only operate in full sync mode (`--syncmode=full`), but only the historical RPC endpoints require an archive node (`--gcmode=archive`) The service can only operate in full sync mode (`--syncmode=full`), but only the historical RPC endpoints require an archive node (`--gcmode=archive`)
e.g. e.g.
` `./build/bin/geth --syncmode=full --gcmode=archive --statediff --statediff.writing --statediff.db.type=postgres --statediff.db.nodeid=nodeid --statediff.db.v2.driver=sqlx
./build/bin/geth --syncmode=full --gcmode=archive --statediff --statediff.writing --statediff.db.type=postgres --statediff.db.driver=sqlx --statediff.db.host=localhost --statediff.db.port=5432 --statediff.db.name=vulcanize_test --statediff.db.user=postgres --statediff.db.nodeid=nodeid --statediff.db.clientname=clientname --statediff.db.v3.driver=sqlx --statediff.db.v2.host=localhost --statediff.db.v3.host=localhost --statediff.db.v2.port=5432 --statediff.db.v3.port=5432
` --statediff.db.v2.name=vulcanize_dual_v2 --statediff.db.v3.name=vulcanize_dual_v3 --statediff.db.v2.user=postgres --statediff.db.v3.user=postgres
--statediff.db.clientname=clientname --statediff.workers=20`
When operating in `--statediff.db.type=file` mode, the service will write SQL statements out to the file designated by When operating in `--statediff.db.type=file` mode, the service will write SQL statements out to the file designated by
`--statediff.file.path`. Please note that it writes out SQL statements with all `ON CONFLICT` constraint checks dropped. `--statediff.file.path`. Please note that it writes out SQL statements with all `ON CONFLICT` constraint checks dropped.

View File

@ -66,7 +66,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n
default: default:
return nil, fmt.Errorf("unrecongized Postgres driver type: %s", pgc.V2.Driver) return nil, fmt.Errorf("unrecongized Postgres driver type: %s", pgc.V2.Driver)
} }
switch pgc.V2.Driver { switch pgc.V3.Driver {
case postgres.PGX: case postgres.PGX:
newDriver, err = postgres.NewPGXDriver(ctx, pgc.V3) newDriver, err = postgres.NewPGXDriver(ctx, pgc.V3)
if err != nil { if err != nil {

View File

@ -20,6 +20,8 @@ import (
"fmt" "fmt"
"io" "io"
sharedModels "github.com/ethereum/go-ethereum/statediff/indexer/models/shared"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
@ -32,8 +34,8 @@ type BatchTx struct {
BlockNumber uint64 BlockNumber uint64
dump io.Writer dump io.Writer
quit chan struct{} quit chan struct{}
iplds chan v3.IPLDModel iplds chan sharedModels.IPLDModel
ipldCache v3.IPLDBatch ipldCache sharedModels.IPLDBatch
submit func(blockTx *BatchTx, err error) error submit func(blockTx *BatchTx, err error) error
} }
@ -47,7 +49,7 @@ func (tx *BatchTx) flush() error {
if _, err := fmt.Fprintf(tx.dump, "%+v\r\n", tx.ipldCache); err != nil { if _, err := fmt.Fprintf(tx.dump, "%+v\r\n", tx.ipldCache); err != nil {
return err return err
} }
tx.ipldCache = v3.IPLDBatch{} tx.ipldCache = sharedModels.IPLDBatch{}
return nil return nil
} }
@ -59,21 +61,21 @@ func (tx *BatchTx) cache() {
tx.ipldCache.Keys = append(tx.ipldCache.Keys, i.Key) tx.ipldCache.Keys = append(tx.ipldCache.Keys, i.Key)
tx.ipldCache.Values = append(tx.ipldCache.Values, i.Data) tx.ipldCache.Values = append(tx.ipldCache.Values, i.Data)
case <-tx.quit: case <-tx.quit:
tx.ipldCache = v3.IPLDBatch{} tx.ipldCache = sharedModels.IPLDBatch{}
return return
} }
} }
} }
func (tx *BatchTx) cacheDirect(key string, value []byte) { func (tx *BatchTx) cacheDirect(key string, value []byte) {
tx.iplds <- v3.IPLDModel{ tx.iplds <- sharedModels.IPLDModel{
Key: key, Key: key,
Data: value, Data: value,
} }
} }
func (tx *BatchTx) cacheIPLD(i node.Node) { func (tx *BatchTx) cacheIPLD(i node.Node) {
tx.iplds <- v3.IPLDModel{ tx.iplds <- sharedModels.IPLDModel{
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(), Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
Data: i.RawData(), Data: i.RawData(),
} }
@ -85,7 +87,7 @@ func (tx *BatchTx) cacheRaw(codec, mh uint64, raw []byte) (string, string, error
return "", "", err return "", "", err
} }
prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String() prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String()
tx.iplds <- v3.IPLDModel{ tx.iplds <- sharedModels.IPLDModel{
Key: prefixedKey, Key: prefixedKey,
Data: raw, Data: raw,
} }

View File

@ -24,8 +24,6 @@ import (
"os" "os"
"testing" "testing"
"github.com/ethereum/go-ethereum/statediff/indexer/models/v2"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/shared" "github.com/ethereum/go-ethereum/statediff/indexer/shared"

View File

@ -18,6 +18,7 @@ package sql
import ( import (
"context" "context"
"fmt"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
@ -53,11 +54,11 @@ func (tx *BatchTx) Submit(err error) error {
func (tx *BatchTx) flush() error { func (tx *BatchTx) flush() error {
_, err := tx.oldDBTx.Exec(tx.ctx, tx.oldStmt, pq.Array(tx.ipldCache.Keys), pq.Array(tx.ipldCache.Values)) _, err := tx.oldDBTx.Exec(tx.ctx, tx.oldStmt, pq.Array(tx.ipldCache.Keys), pq.Array(tx.ipldCache.Values))
if err != nil { if err != nil {
return err return fmt.Errorf("error flushing IPLD cache to old DB: %v", err)
} }
_, err = tx.newDBTx.Exec(tx.ctx, tx.newStmt, pq.Array(tx.ipldCache.Keys), pq.Array(tx.ipldCache.Values)) _, err = tx.newDBTx.Exec(tx.ctx, tx.newStmt, pq.Array(tx.ipldCache.Keys), pq.Array(tx.ipldCache.Values))
if err != nil { if err != nil {
return err return fmt.Errorf("error flushing IPLD cache to new DB: %v", err)
} }
tx.ipldCache = modelsShared.IPLDBatch{} tx.ipldCache = modelsShared.IPLDBatch{}
return nil return nil

View File

@ -27,6 +27,8 @@ import (
"strings" "strings"
"time" "time"
metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/metrics"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
node "github.com/ipfs/go-ipld-format" node "github.com/ipfs/go-ipld-format"
"github.com/multiformats/go-multihash" "github.com/multiformats/go-multihash"
@ -53,8 +55,8 @@ import (
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
var ( var (
indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) indexerMetrics = metrics2.RegisterIndexerMetrics(metrics.DefaultRegistry)
dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry) dbMetrics = metrics2.RegisterDBMetrics(metrics.DefaultRegistry)
) )
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of an SQL sql // StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of an SQL sql
@ -69,19 +71,19 @@ type StateDiffIndexer struct {
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, info nodeInfo.Info, old, new interfaces.Database) (*StateDiffIndexer, error) { func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, info nodeInfo.Info, old, new interfaces.Database) (*StateDiffIndexer, error) {
// Write the removed node to the db on init // Write the removed node to the db on init
if _, err := old.Exec(ctx, old.InsertIPLDStm(), shared.RemovedNodeMhKey, []byte{}); err != nil { if _, err := old.Exec(ctx, old.InsertIPLDStm(), shared.RemovedNodeMhKey, []byte{}); err != nil {
return nil, err return nil, fmt.Errorf("unable to write removed node IPLD to old DB: %v", err)
} }
if _, err := new.Exec(ctx, new.InsertIPLDStm(), shared.RemovedNodeMhKey, []byte{}); err != nil { if _, err := new.Exec(ctx, new.InsertIPLDStm(), shared.RemovedNodeMhKey, []byte{}); err != nil {
return nil, err return nil, fmt.Errorf("unable to write removed node IPLD to new DB: %v", err)
} }
// Write node info to the db on init // Write node info to the db on init
oldWriter := v2Writer.NewWriter(old) oldWriter := v2Writer.NewWriter(old)
newWriter := v3Writer.NewWriter(new) newWriter := v3Writer.NewWriter(new)
if err := oldWriter.InsertNodeInfo(info); err != nil { if err := oldWriter.InsertNodeInfo(info); err != nil {
return nil, err return nil, fmt.Errorf("unable to write node info to old DB: %v", err)
} }
if err := newWriter.InsertNodeInfo(info); err != nil { if err := newWriter.InsertNodeInfo(info); err != nil {
return nil, err return nil, fmt.Errorf("unable to write node info to new DB: %v", err)
} }
return &StateDiffIndexer{ return &StateDiffIndexer{
ctx: ctx, ctx: ctx,
@ -171,31 +173,26 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
defer func() {
if p := recover(); p != nil {
rollback(sdi.ctx, oldTx)
panic(p)
} else if err != nil {
rollback(sdi.ctx, oldTx)
}
}()
newTx, err := sdi.newDBWriter.DB.Begin(sdi.ctx) newTx, err := sdi.newDBWriter.DB.Begin(sdi.ctx)
if err != nil { if err != nil {
rollback(sdi.ctx, oldTx)
return nil, 0, err return nil, 0, err
} }
defer func() { defer func() {
if p := recover(); p != nil { if p := recover(); p != nil {
rollback(sdi.ctx, newTx) rollback(sdi.ctx, newTx)
rollback(sdi.ctx, oldTx)
panic(p) panic(p)
} else if err != nil { } else if err != nil {
rollback(sdi.ctx, newTx) rollback(sdi.ctx, newTx)
rollback(sdi.ctx, oldTx)
} }
}() }()
blockTx := &BatchTx{ blockTx := &BatchTx{
ctx: sdi.ctx, ctx: sdi.ctx,
BlockNumber: height, BlockNumber: height,
oldStmt: sdi.oldDBWriter.DB.InsertIPLDsStm(), oldStmt: sdi.oldDBWriter.DB.InsertIPLDsStm(),
newStmt: sdi.newDBWriter.DB.InsertStateStm(), newStmt: sdi.newDBWriter.DB.InsertIPLDsStm(),
iplds: make(chan sharedModels.IPLDModel), iplds: make(chan sharedModels.IPLDModel),
quit: make(chan struct{}), quit: make(chan struct{}),
ipldCache: sharedModels.IPLDBatch{}, ipldCache: sharedModels.IPLDBatch{},
@ -310,10 +307,11 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
baseFee = new(string) baseFee = new(string)
*baseFee = header.BaseFee.String() *baseFee = header.BaseFee.String()
} }
mhKey := shared.MultihashKeyFromCID(headerNode.Cid())
// index header // index header
headerID, err := sdi.oldDBWriter.InsertHeaderCID(tx.oldDBTx, &v2Models.HeaderModel{ headerID, err := sdi.oldDBWriter.InsertHeaderCID(tx.oldDBTx, &v2Models.HeaderModel{
CID: headerNode.Cid().String(), CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), MhKey: mhKey,
ParentHash: header.ParentHash.String(), ParentHash: header.ParentHash.String(),
BlockNumber: header.Number.String(), BlockNumber: header.Number.String(),
BlockHash: header.Hash().String(), BlockHash: header.Hash().String(),
@ -330,9 +328,9 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
if err != nil { if err != nil {
return 0, err return 0, err
} }
if err := sdi.newDBWriter.InsertHeaderCID(tx.newDBTx, v3Models.HeaderModel{ if err := sdi.newDBWriter.InsertHeaderCID(tx.newDBTx, &v3Models.HeaderModel{
CID: headerNode.Cid().String(), CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), MhKey: mhKey,
ParentHash: header.ParentHash.String(), ParentHash: header.ParentHash.String(),
BlockNumber: header.Number.String(), BlockNumber: header.Number.String(),
BlockHash: header.Hash().String(), BlockHash: header.Hash().String(),
@ -363,10 +361,11 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerHash string, heade
} else { } else {
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
} }
mhKey := shared.MultihashKeyFromCID(uncleNode.Cid())
if err := sdi.oldDBWriter.InsertUncleCID(tx.oldDBTx, &v2Models.UncleModel{ if err := sdi.oldDBWriter.InsertUncleCID(tx.oldDBTx, &v2Models.UncleModel{
HeaderID: headerID, HeaderID: headerID,
CID: uncleNode.Cid().String(), CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), MhKey: mhKey,
ParentHash: uncleNode.ParentHash.String(), ParentHash: uncleNode.ParentHash.String(),
BlockHash: uncleNode.Hash().String(), BlockHash: uncleNode.Hash().String(),
Reward: uncleReward.String(), Reward: uncleReward.String(),
@ -376,7 +375,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerHash string, heade
if err := sdi.newDBWriter.InsertUncleCID(tx.newDBTx, &v3Models.UncleModel{ if err := sdi.newDBWriter.InsertUncleCID(tx.newDBTx, &v3Models.UncleModel{
HeaderID: headerHash, HeaderID: headerHash,
CID: uncleNode.Cid().String(), CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), MhKey: mhKey,
ParentHash: uncleNode.ParentHash.String(), ParentHash: uncleNode.ParentHash.String(),
BlockHash: uncleNode.Hash().String(), BlockHash: uncleNode.Hash().String(),
Reward: uncleReward.String(), Reward: uncleReward.String(),
@ -428,15 +427,18 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
if err != nil { if err != nil {
return fmt.Errorf("error deriving tx sender: %v", err) return fmt.Errorf("error deriving tx sender: %v", err)
} }
mhKey := shared.MultihashKeyFromCID(txNode.Cid())
dst := shared.HandleZeroAddrPointer(trx.To())
src := shared.HandleZeroAddr(from)
txID, err := sdi.oldDBWriter.InsertTransactionCID(tx.oldDBTx, &v2Models.TxModel{ txID, err := sdi.oldDBWriter.InsertTransactionCID(tx.oldDBTx, &v2Models.TxModel{
HeaderID: args.headerID, HeaderID: args.headerID,
Dst: shared.HandleZeroAddrPointer(trx.To()), Dst: dst,
Src: shared.HandleZeroAddr(from), Src: src,
TxHash: txHash, TxHash: txHash,
Index: int64(i), Index: int64(i),
Data: trx.Data(), Data: trx.Data(),
CID: txNode.Cid().String(), CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()), MhKey: mhKey,
Type: trx.Type(), Type: trx.Type(),
}) })
if err != nil { if err != nil {
@ -444,13 +446,13 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
} }
if err := sdi.newDBWriter.InsertTransactionCID(tx.newDBTx, &v3Models.TxModel{ if err := sdi.newDBWriter.InsertTransactionCID(tx.newDBTx, &v3Models.TxModel{
HeaderID: args.headerHash, HeaderID: args.headerHash,
Dst: shared.HandleZeroAddrPointer(trx.To()), Dst: dst,
Src: shared.HandleZeroAddr(from), Src: src,
TxHash: txHash, TxHash: txHash,
Index: int64(i), Index: int64(i),
Data: trx.Data(), Data: trx.Data(),
CID: txNode.Cid().String(), CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()), MhKey: mhKey,
Type: trx.Type(), Type: trx.Type(),
Value: val, Value: val,
}); err != nil { }); err != nil {
@ -500,12 +502,13 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
postState = common.Bytes2Hex(receipt.PostState) postState = common.Bytes2Hex(receipt.PostState)
} }
rctMhKey := shared.MultihashKeyFromCID(args.rctLeafNodeCIDs[i])
rctID, err := sdi.oldDBWriter.InsertReceiptCID(tx.oldDBTx, &v2Models.ReceiptModel{ rctID, err := sdi.oldDBWriter.InsertReceiptCID(tx.oldDBTx, &v2Models.ReceiptModel{
TxID: txID, TxID: txID,
Contract: contract, Contract: contract,
ContractHash: contractHash, ContractHash: contractHash,
LeafCID: args.rctLeafNodeCIDs[i].String(), LeafCID: args.rctLeafNodeCIDs[i].String(),
LeafMhKey: shared.MultihashKeyFromCID(args.rctLeafNodeCIDs[i]), LeafMhKey: rctMhKey,
LogRoot: args.rctNodes[i].LogRoot.String(), LogRoot: args.rctNodes[i].LogRoot.String(),
PostState: postState, PostState: postState,
PostStatus: postStatus, PostStatus: postStatus,
@ -518,7 +521,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
Contract: contract, Contract: contract,
ContractHash: contractHash, ContractHash: contractHash,
LeafCID: args.rctLeafNodeCIDs[i].String(), LeafCID: args.rctLeafNodeCIDs[i].String(),
LeafMhKey: shared.MultihashKeyFromCID(args.rctLeafNodeCIDs[i]), LeafMhKey: rctMhKey,
LogRoot: args.rctNodes[i].LogRoot.String(), LogRoot: args.rctNodes[i].LogRoot.String(),
PostState: postState, PostState: postState,
PostStatus: postStatus, PostStatus: postStatus,
@ -540,13 +543,14 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
return fmt.Errorf("invalid log cid") return fmt.Errorf("invalid log cid")
} }
logMhKey := shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx])
oldLogDataSet[idx] = &v2Models.LogsModel{ oldLogDataSet[idx] = &v2Models.LogsModel{
ReceiptID: rctID, ReceiptID: rctID,
Address: l.Address.String(), Address: l.Address.String(),
Index: int64(l.Index), Index: int64(l.Index),
Data: l.Data, Data: l.Data,
LeafCID: args.logLeafNodeCIDs[i][idx].String(), LeafCID: args.logLeafNodeCIDs[i][idx].String(),
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), LeafMhKey: logMhKey,
Topic0: topicSet[0], Topic0: topicSet[0],
Topic1: topicSet[1], Topic1: topicSet[1],
Topic2: topicSet[2], Topic2: topicSet[2],
@ -558,7 +562,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
Index: int64(l.Index), Index: int64(l.Index),
Data: l.Data, Data: l.Data,
LeafCID: args.logLeafNodeCIDs[i][idx].String(), LeafCID: args.logLeafNodeCIDs[i][idx].String(),
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), LeafMhKey: logMhKey,
Topic0: topicSet[0], Topic0: topicSet[0],
Topic1: topicSet[1], Topic1: topicSet[1],
Topic2: topicSet[2], Topic2: topicSet[2],

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
package sql package metrics
import ( import (
"strings" "strings"
@ -41,7 +41,7 @@ func metricName(subsystem, name string) string {
return strings.Join(parts, "/") return strings.Join(parts, "/")
} }
type IndexerMetricsHandles struct { type WriterMetricsHandles struct {
// The total number of processed blocks // The total number of processed blocks
Blocks metrics.Counter Blocks metrics.Counter
// The total number of processed transactions // The total number of processed transactions
@ -52,6 +52,9 @@ type IndexerMetricsHandles struct {
Logs metrics.Counter Logs metrics.Counter
// The total number of access list entries processed // The total number of access list entries processed
AccessListEntries metrics.Counter AccessListEntries metrics.Counter
}
type IndexerMetricsHandles struct {
// Time spent waiting for free postgres tx // Time spent waiting for free postgres tx
TimeFreePostgres metrics.Timer TimeFreePostgres metrics.Timer
// Postgres transaction commit duration // Postgres transaction commit duration
@ -66,13 +69,25 @@ type IndexerMetricsHandles struct {
TimeStateStoreCodeProcessing metrics.Timer TimeStateStoreCodeProcessing metrics.Timer
} }
func RegisterWriterMetrics(reg metrics.Registry, version string) WriterMetricsHandles {
ctx := WriterMetricsHandles{
Blocks: metrics.NewCounter(),
Transactions: metrics.NewCounter(),
Receipts: metrics.NewCounter(),
Logs: metrics.NewCounter(),
AccessListEntries: metrics.NewCounter(),
}
subsys := "writer"
reg.Register(metricName(subsys, version+"/"+"blocks"), ctx.Blocks)
reg.Register(metricName(subsys, version+"/"+"transactions"), ctx.Transactions)
reg.Register(metricName(subsys, version+"/"+"receipts"), ctx.Receipts)
reg.Register(metricName(subsys, version+"/"+"logs"), ctx.Logs)
reg.Register(metricName(subsys, version+"/"+"access_list_entries"), ctx.AccessListEntries)
return ctx
}
func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles { func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
ctx := IndexerMetricsHandles{ ctx := IndexerMetricsHandles{
Blocks: metrics.NewCounter(),
Transactions: metrics.NewCounter(),
Receipts: metrics.NewCounter(),
Logs: metrics.NewCounter(),
AccessListEntries: metrics.NewCounter(),
TimeFreePostgres: metrics.NewTimer(), TimeFreePostgres: metrics.NewTimer(),
TimePostgresCommit: metrics.NewTimer(), TimePostgresCommit: metrics.NewTimer(),
TimeHeaderProcessing: metrics.NewTimer(), TimeHeaderProcessing: metrics.NewTimer(),
@ -81,11 +96,6 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
TimeStateStoreCodeProcessing: metrics.NewTimer(), TimeStateStoreCodeProcessing: metrics.NewTimer(),
} }
subsys := "indexer" subsys := "indexer"
reg.Register(metricName(subsys, "blocks"), ctx.Blocks)
reg.Register(metricName(subsys, "transactions"), ctx.Transactions)
reg.Register(metricName(subsys, "receipts"), ctx.Receipts)
reg.Register(metricName(subsys, "logs"), ctx.Logs)
reg.Register(metricName(subsys, "access_list_entries"), ctx.AccessListEntries)
reg.Register(metricName(subsys, "t_free_postgres"), ctx.TimeFreePostgres) reg.Register(metricName(subsys, "t_free_postgres"), ctx.TimeFreePostgres)
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.TimePostgresCommit) reg.Register(metricName(subsys, "t_postgres_commit"), ctx.TimePostgresCommit)
reg.Register(metricName(subsys, "t_header_processing"), ctx.TimeHeaderProcessing) reg.Register(metricName(subsys, "t_header_processing"), ctx.TimeHeaderProcessing)

View File

@ -25,6 +25,7 @@ import (
"github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool" "github.com/jackc/pgx/v4/pgxpool"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
) )
@ -37,6 +38,7 @@ type PGXDriver struct {
// NewPGXDriver returns a new pgx driver // NewPGXDriver returns a new pgx driver
// it initializes the connection pool and creates the node info table // it initializes the connection pool and creates the node info table
func NewPGXDriver(ctx context.Context, config Config) (*PGXDriver, error) { func NewPGXDriver(ctx context.Context, config Config) (*PGXDriver, error) {
log.Info("connecting to database", "connection string", config.DbConnectionString())
pgConf, err := MakeConfig(config) pgConf, err := MakeConfig(config)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -18,12 +18,14 @@ package postgres
import ( import (
"context" "context"
coresql "database/sql"
"time" "time"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces" coresql "database/sql"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
) )
// SQLXDriver driver, implements sql.Driver // SQLXDriver driver, implements sql.Driver
@ -35,7 +37,9 @@ type SQLXDriver struct {
// NewSQLXDriver returns a new sqlx driver for Postgres // NewSQLXDriver returns a new sqlx driver for Postgres
// it initializes the connection pool and creates the node info table // it initializes the connection pool and creates the node info table
func NewSQLXDriver(ctx context.Context, config Config) (*SQLXDriver, error) { func NewSQLXDriver(ctx context.Context, config Config) (*SQLXDriver, error) {
db, err := sqlx.ConnectContext(ctx, "postgres", config.DbConnectionString()) connStr := config.DbConnectionString()
log.Info("connecting to database", "connection string", connStr)
db, err := sqlx.ConnectContext(ctx, "postgres", connStr)
if err != nil { if err != nil {
return &SQLXDriver{}, ErrDBConnectionFailed(err) return &SQLXDriver{}, ErrDBConnectionFailed(err)
} }

View File

@ -37,7 +37,9 @@ type DB struct {
// InsertNodeInfoStm satisfies interfaces.Statements // InsertNodeInfoStm satisfies interfaces.Statements
func (db *DB) InsertNodeInfoStm() string { func (db *DB) InsertNodeInfoStm() string {
return `INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES ($1, $2, $3, $4, $5) return `INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (genesis_block, network_id, node_id, chain_id) DO NOTHING` ON CONFLICT (genesis_block, network_id, node_id, chain_id)
DO UPDATE SET client_name = $4
RETURNING ID`
} }
// InsertHeaderStm satisfies the interfaces.Statements // InsertHeaderStm satisfies the interfaces.Statements

View File

@ -19,10 +19,12 @@ package sql
import ( import (
"context" "context"
"testing" "testing"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
) )
// TearDownDB is used to tear down the watcher dbs after tests // TearDownDB is used to tear down the watcher dbs after tests
func TearDownDB(t *testing.T, db Database) { func TearDownDB(t *testing.T, db interfaces.Database) {
ctx := context.Background() ctx := context.Background()
tx, err := db.Begin(ctx) tx, err := db.Begin(ctx)
if err != nil { if err != nil {

View File

@ -20,21 +20,22 @@ import (
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/metrics"
metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/models/v2" "github.com/ethereum/go-ethereum/statediff/indexer/models/v2"
"github.com/ethereum/go-ethereum/statediff/indexer/node" "github.com/ethereum/go-ethereum/statediff/indexer/node"
) )
var ( var (
nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000") nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")
writerV2Metrics = metrics2.RegisterWriterMetrics(metrics.DefaultRegistry, "v3")
) )
// Writer handles processing and writing of indexed IPLD objects to Postgres // Writer handles processing and writing of indexed IPLD objects to Postgres
type Writer struct { type Writer struct {
DB interfaces.Database DB interfaces.Database
metrics sql.IndexerMetricsHandles nodeID int64
nodeID int64
} }
// NewWriter creates a new pointer to a Writer // NewWriter creates a new pointer to a Writer
@ -52,7 +53,7 @@ func (w *Writer) Close() error {
/* /*
InsertNodeInfo inserts a node info model InsertNodeInfo inserts a node info model
INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES ($1, $2, $3, $4, $5) INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (genesis_block, network_id, node_id, chain_id) DO NOTHING ON CONFLICT (genesis_block, network_id, node_id, chain_id) DO NOTHING RETURNING ID
*/ */
func (w *Writer) InsertNodeInfo(info node.Info) error { func (w *Writer) InsertNodeInfo(info node.Info) error {
var nodeID int64 var nodeID int64
@ -79,7 +80,7 @@ func (w *Writer) InsertHeaderCID(tx interfaces.Tx, header *models.HeaderModel) (
if err != nil { if err != nil {
return 0, fmt.Errorf("error inserting header_cids entry: %v", err) return 0, fmt.Errorf("error inserting header_cids entry: %v", err)
} }
w.metrics.Blocks.Inc(1) writerV2Metrics.Blocks.Inc(1)
return headerID, nil return headerID, nil
} }
@ -110,7 +111,7 @@ func (w *Writer) InsertTransactionCID(tx interfaces.Tx, transaction *models.TxMo
if err != nil { if err != nil {
return 0, fmt.Errorf("error inserting transaction_cids entry: %v", err) return 0, fmt.Errorf("error inserting transaction_cids entry: %v", err)
} }
w.metrics.Transactions.Inc(1) writerV2Metrics.Transactions.Inc(1)
return txID, nil return txID, nil
} }
@ -125,7 +126,7 @@ func (w *Writer) InsertAccessListElement(tx interfaces.Tx, accessListElement *mo
if err != nil { if err != nil {
return fmt.Errorf("error inserting access_list_element entry: %v", err) return fmt.Errorf("error inserting access_list_element entry: %v", err)
} }
w.metrics.AccessListEntries.Inc(1) writerV2Metrics.AccessListEntries.Inc(1)
return nil return nil
} }
@ -141,7 +142,7 @@ func (w *Writer) InsertReceiptCID(tx interfaces.Tx, rct *models.ReceiptModel) (i
if err != nil { if err != nil {
return 0, fmt.Errorf("error inserting receipt_cids entry: %w", err) return 0, fmt.Errorf("error inserting receipt_cids entry: %w", err)
} }
w.metrics.Receipts.Inc(1) writerV2Metrics.Receipts.Inc(1)
return receiptID, nil return receiptID, nil
} }
@ -158,7 +159,7 @@ func (w *Writer) InsertLogCID(tx interfaces.Tx, logs []*models.LogsModel) error
if err != nil { if err != nil {
return fmt.Errorf("error inserting logs entry: %w", err) return fmt.Errorf("error inserting logs entry: %w", err)
} }
w.metrics.Logs.Inc(1) writerV2Metrics.Logs.Inc(1)
} }
return nil return nil
} }

View File

@ -20,21 +20,22 @@ import (
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/metrics"
metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/models/v3" "github.com/ethereum/go-ethereum/statediff/indexer/models/v3"
"github.com/ethereum/go-ethereum/statediff/indexer/node" "github.com/ethereum/go-ethereum/statediff/indexer/node"
) )
var ( var (
nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000") nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")
writerV3Metrics = metrics2.RegisterWriterMetrics(metrics.DefaultRegistry, "v3")
) )
// Writer handles processing and writing of indexed IPLD objects to Postgres // Writer handles processing and writing of indexed IPLD objects to Postgres
type Writer struct { type Writer struct {
DB interfaces.Database DB interfaces.Database
metrics sql.IndexerMetricsHandles nodeID string
nodeID string
} }
// NewWriter creates a new pointer to a Writer // NewWriter creates a new pointer to a Writer
@ -69,7 +70,7 @@ INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, nod
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
ON CONFLICT (block_hash) DO UPDATE SET (block_number, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($1, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16) ON CONFLICT (block_hash) DO UPDATE SET (block_number, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($1, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)
*/ */
func (w *Writer) InsertHeaderCID(tx interfaces.Tx, header models.HeaderModel) error { func (w *Writer) InsertHeaderCID(tx interfaces.Tx, header *models.HeaderModel) error {
_, err := tx.Exec(w.DB.Context(), w.DB.InsertHeaderStm(), _, err := tx.Exec(w.DB.Context(), w.DB.InsertHeaderStm(),
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, w.nodeID, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, w.nodeID,
header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom,
@ -77,7 +78,7 @@ func (w *Writer) InsertHeaderCID(tx interfaces.Tx, header models.HeaderModel) er
if err != nil { if err != nil {
return fmt.Errorf("error inserting header_cids entry: %v", err) return fmt.Errorf("error inserting header_cids entry: %v", err)
} }
w.metrics.Blocks.Inc(1) writerV3Metrics.Blocks.Inc(1)
return nil return nil
} }
@ -107,7 +108,7 @@ func (w *Writer) InsertTransactionCID(tx interfaces.Tx, transaction *models.TxMo
if err != nil { if err != nil {
return fmt.Errorf("error inserting transaction_cids entry: %v", err) return fmt.Errorf("error inserting transaction_cids entry: %v", err)
} }
w.metrics.Transactions.Inc(1) writerV3Metrics.Transactions.Inc(1)
return nil return nil
} }
@ -122,7 +123,7 @@ func (w *Writer) InsertAccessListElement(tx interfaces.Tx, accessListElement *mo
if err != nil { if err != nil {
return fmt.Errorf("error inserting access_list_element entry: %v", err) return fmt.Errorf("error inserting access_list_element entry: %v", err)
} }
w.metrics.AccessListEntries.Inc(1) writerV3Metrics.AccessListEntries.Inc(1)
return nil return nil
} }
@ -137,7 +138,7 @@ func (w *Writer) InsertReceiptCID(tx interfaces.Tx, rct *models.ReceiptModel) er
if err != nil { if err != nil {
return fmt.Errorf("error inserting receipt_cids entry: %w", err) return fmt.Errorf("error inserting receipt_cids entry: %w", err)
} }
w.metrics.Receipts.Inc(1) writerV3Metrics.Receipts.Inc(1)
return nil return nil
} }
@ -154,7 +155,7 @@ func (w *Writer) InsertLogCID(tx interfaces.Tx, logs []*models.LogsModel) error
if err != nil { if err != nil {
return fmt.Errorf("error inserting logs entry: %w", err) return fmt.Errorf("error inserting logs entry: %w", err)
} }
w.metrics.Logs.Inc(1) writerV3Metrics.Logs.Inc(1)
} }
return nil return nil
} }

View File

@ -18,6 +18,7 @@ package statediff
import ( import (
"bytes" "bytes"
"fmt"
"math/big" "math/big"
"strconv" "strconv"
"strings" "strings"
@ -162,7 +163,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
var err error var err error
indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig) indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig)
if err != nil { if err != nil {
return err return fmt.Errorf("unable to initialize a new statediff indexer: %v", err)
} }
indexer.ReportOldDBMetrics(10*time.Second, quitCh) indexer.ReportOldDBMetrics(10*time.Second, quitCh)
indexer.ReportNewDBMetrics(10*time.Second, quitCh) indexer.ReportNewDBMetrics(10*time.Second, quitCh)
@ -235,7 +236,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh) chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
defer chainEventSub.Unsubscribe() defer chainEventSub.Unsubscribe()
errCh := chainEventSub.Err() errCh := chainEventSub.Err()
var wg sync.WaitGroup wg := new(sync.WaitGroup)
// Process metrics for chain events, then forward to workers // Process metrics for chain events, then forward to workers
chainEventFwd := make(chan core.ChainEvent, chainEventChanSize) chainEventFwd := make(chan core.ChainEvent, chainEventChanSize)
wg.Add(1) wg.Add(1)
@ -266,7 +267,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
}() }()
wg.Add(int(sds.numWorkers)) wg.Add(int(sds.numWorkers))
for worker := uint(0); worker < sds.numWorkers; worker++ { for worker := uint(0); worker < sds.numWorkers; worker++ {
params := workerParams{chainEventCh: chainEventFwd, wg: &wg, id: worker} params := workerParams{chainEventCh: chainEventFwd, wg: wg, id: worker}
go sds.writeLoopWorker(params) go sds.writeLoopWorker(params)
} }
wg.Wait() wg.Wait()