From 248b558e9659f6d461a0f84d1d4de56103afe590 Mon Sep 17 00:00:00 2001 From: i-norden Date: Tue, 10 Jan 2023 14:53:35 -0600 Subject: [PATCH 1/2] indexer/database/postgres.DB updates to toggle no-UPSERT mode --- .../indexer/database/sql/postgres/database.go | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index 5484ff8d8..27f89ab83 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -26,21 +26,27 @@ const ( ) // NewPostgresDB returns a postgres.DB using the provided driver -func NewPostgresDB(driver sql.Driver) *DB { - return &DB{driver} +func NewPostgresDB(driver sql.Driver, upsert bool) *DB { + return &DB{upsert, driver} } // DB implements sql.Database using a configured driver and Postgres statement syntax type DB struct { + upsert bool sql.Driver } // InsertHeaderStm satisfies the sql.Statements interface // Stm == Statement func (db *DB) InsertHeaderStm() string { - return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) + if db.upsert { + return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)` + } + return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) + ON CONFLICT (block_hash, block_number) DO NOTHING` } // InsertUncleStm satisfies the sql.Statements interface @@ -75,8 +81,12 @@ func (db *DB) InsertLogStm() string { // InsertStateStm satisfies the sql.Statements interface func (db *DB) InsertStateStm() string { - return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + if db.upsert { + return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (header_id, state_path, block_number) DO UPDATE SET (block_number, state_leaf_key, cid, node_type, diff, mh_key) = ($1, $3, $4, $6, $7, $8)` + } + return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (header_id, state_path, block_number) DO NOTHING` } // InsertAccountStm satisfies the sql.Statements interface @@ -87,8 +97,12 @@ func (db *DB) InsertAccountStm() string { // InsertStorageStm satisfies the sql.Statements interface func (db *DB) InsertStorageStm() string { - return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + if db.upsert { + return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (header_id, state_path, storage_path, block_number) DO UPDATE SET (block_number, storage_leaf_key, cid, node_type, diff, mh_key) = ($1, $4, $5, $7, $8, $9)` + } + return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (header_id, state_path, storage_path, block_number) DO NOTHING` } // InsertIPLDStm satisfies the sql.Statements interface From 47297d67241c8cd3611e3bfb17ea3a08f7b8c8dd Mon Sep 17 00:00:00 2001 From: i-norden Date: Tue, 10 Jan 2023 15:16:12 -0600 Subject: [PATCH 2/2] integrate upsert mode toggle into CLI --- cmd/geth/config.go | 3 +++ cmd/geth/main.go | 1 + cmd/utils/flags.go | 5 +++++ statediff/README.md | 2 ++ statediff/indexer/constructor.go | 2 +- statediff/indexer/database/sql/postgres/config.go | 3 +++ statediff/indexer/database/sql/postgres/test_helpers.go | 4 ++-- 7 files changed, 17 insertions(+), 3 deletions(-) diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 7c1bb2dc5..33b2d79db 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -244,6 +244,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { ClientName: clientName, Driver: driverType, } + if ctx.IsSet(utils.StateDiffUpsert.Name) { + pgConfig.Upsert = ctx.Bool(utils.StateDiffUpsert.Name) + } if ctx.IsSet(utils.StateDiffDBMinConns.Name) { pgConfig.MinConns = ctx.Int(utils.StateDiffDBMinConns.Name) } diff --git a/cmd/geth/main.go b/cmd/geth/main.go index c267d122c..b09c566f2 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -179,6 +179,7 @@ var ( utils.StateDiffKnownGapsFilePath, utils.StateDiffWaitForSync, utils.StateDiffWatchedAddressesFilePath, + utils.StateDiffUpsert, configFileFlag, }, utils.NetworkFlags, utils.DatabasePathFlags) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index f7477c710..6cf9d162e 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1078,6 +1078,11 @@ var ( Usage: "Client name to use when writing state diffs to database", Value: "go-ethereum", } + StateDiffUpsert = &cli.BoolFlag{ + Name: "statediff.db.upsert", + Usage: "Should the statediff service overwrite data existing in the database?", + Value: false, + } StateDiffWritingFlag = &cli.BoolFlag{ Name: "statediff.writing", Usage: "Activates progressive writing of state diffs to database as new block are synced", diff --git a/statediff/README.md b/statediff/README.md index f262d7a8e..56a2fffd2 100644 --- a/statediff/README.md +++ b/statediff/README.md @@ -118,6 +118,8 @@ This service introduces a CLI flag namespace `statediff` `--statediff.db.clientname` is the client name to use in the Postgres database +`--statediff.db.upsert` whether or not the service, when operating in a direct database writing mode, should overwrite any existing conflicting data + `--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode `--statediff.file.wapath` full path (including filename) to write statediff watched addresses out to when operating in file mode diff --git a/statediff/indexer/constructor.go b/statediff/indexer/constructor.go index 1a4f64001..0f07e7410 100644 --- a/statediff/indexer/constructor.go +++ b/statediff/indexer/constructor.go @@ -65,7 +65,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n default: return nil, nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver) } - db := postgres.NewPostgresDB(driver) + db := postgres.NewPostgresDB(driver, pgc.Upsert) ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db) return db, ind, err case shared.DUMP: diff --git a/statediff/indexer/database/sql/postgres/config.go b/statediff/indexer/database/sql/postgres/config.go index 27f78f8f4..07c426811 100644 --- a/statediff/indexer/database/sql/postgres/config.go +++ b/statediff/indexer/database/sql/postgres/config.go @@ -77,6 +77,9 @@ type Config struct { // driver type Driver DriverType + + // toggle on/off upserts + Upsert bool } // Type satisfies interfaces.Config diff --git a/statediff/indexer/database/sql/postgres/test_helpers.go b/statediff/indexer/database/sql/postgres/test_helpers.go index 0eab778ae..f8311b413 100644 --- a/statediff/indexer/database/sql/postgres/test_helpers.go +++ b/statediff/indexer/database/sql/postgres/test_helpers.go @@ -31,7 +31,7 @@ func SetupSQLXDB() (sql.Database, error) { if err != nil { return nil, err } - return NewPostgresDB(driver), nil + return NewPostgresDB(driver, false), nil } // SetupPGXDB is used to setup a pgx db for tests @@ -40,5 +40,5 @@ func SetupPGXDB() (sql.Database, error) { if err != nil { return nil, err } - return NewPostgresDB(driver), nil + return NewPostgresDB(driver, false), nil }