No-UPSERT mode #300

Merged
telackey merged 2 commits from ian_dev into v1.10.26-statediff-v4 2023-01-11 00:23:31 +00:00
8 changed files with 36 additions and 8 deletions

View File

@ -244,6 +244,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
ClientName: clientName,
Driver: driverType,
}
if ctx.IsSet(utils.StateDiffUpsert.Name) {
pgConfig.Upsert = ctx.Bool(utils.StateDiffUpsert.Name)
}
if ctx.IsSet(utils.StateDiffDBMinConns.Name) {
pgConfig.MinConns = ctx.Int(utils.StateDiffDBMinConns.Name)
}

View File

@ -179,6 +179,7 @@ var (
utils.StateDiffKnownGapsFilePath,
utils.StateDiffWaitForSync,
utils.StateDiffWatchedAddressesFilePath,
utils.StateDiffUpsert,
configFileFlag,
}, utils.NetworkFlags, utils.DatabasePathFlags)

View File

@ -1078,6 +1078,11 @@ var (
Usage: "Client name to use when writing state diffs to database",
Value: "go-ethereum",
}
StateDiffUpsert = &cli.BoolFlag{
Name: "statediff.db.upsert",
Usage: "Should the statediff service overwrite data existing in the database?",
Value: false,
}
StateDiffWritingFlag = &cli.BoolFlag{
Name: "statediff.writing",
Usage: "Activates progressive writing of state diffs to database as new block are synced",

View File

@ -118,6 +118,8 @@ This service introduces a CLI flag namespace `statediff`
`--statediff.db.clientname` is the client name to use in the Postgres database
`--statediff.db.upsert` whether or not the service, when operating in a direct database writing mode, should overwrite any existing conflicting data
`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode
`--statediff.file.wapath` full path (including filename) to write statediff watched addresses out to when operating in file mode

View File

@ -65,7 +65,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n
default:
return nil, nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver)
}
db := postgres.NewPostgresDB(driver)
db := postgres.NewPostgresDB(driver, pgc.Upsert)
ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db)
return db, ind, err
case shared.DUMP:

View File

@ -77,6 +77,9 @@ type Config struct {
// driver type
Driver DriverType
// toggle on/off upserts
Upsert bool
}
// Type satisfies interfaces.Config

View File

@ -26,21 +26,27 @@ const (
)
// NewPostgresDB returns a postgres.DB using the provided driver
func NewPostgresDB(driver sql.Driver) *DB {
return &DB{driver}
func NewPostgresDB(driver sql.Driver, upsert bool) *DB {
return &DB{upsert, driver}
}
// DB implements sql.Database using a configured driver and Postgres statement syntax
type DB struct {
upsert bool
sql.Driver
}
// InsertHeaderStm satisfies the sql.Statements interface
// Stm == Statement
func (db *DB) InsertHeaderStm() string {
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
if db.upsert {
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)`
}
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
ON CONFLICT (block_hash, block_number) DO NOTHING`
}
// InsertUncleStm satisfies the sql.Statements interface
@ -75,8 +81,12 @@ func (db *DB) InsertLogStm() string {
// InsertStateStm satisfies the sql.Statements interface
func (db *DB) InsertStateStm() string {
return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
if db.upsert {
return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (header_id, state_path, block_number) DO UPDATE SET (block_number, state_leaf_key, cid, node_type, diff, mh_key) = ($1, $3, $4, $6, $7, $8)`
}
return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (header_id, state_path, block_number) DO NOTHING`
}
// InsertAccountStm satisfies the sql.Statements interface
@ -87,8 +97,12 @@ func (db *DB) InsertAccountStm() string {
// InsertStorageStm satisfies the sql.Statements interface
func (db *DB) InsertStorageStm() string {
return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
if db.upsert {
return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (header_id, state_path, storage_path, block_number) DO UPDATE SET (block_number, storage_leaf_key, cid, node_type, diff, mh_key) = ($1, $4, $5, $7, $8, $9)`
}
return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (header_id, state_path, storage_path, block_number) DO NOTHING`
}
// InsertIPLDStm satisfies the sql.Statements interface

View File

@ -31,7 +31,7 @@ func SetupSQLXDB() (sql.Database, error) {
if err != nil {
return nil, err
}
return NewPostgresDB(driver), nil
return NewPostgresDB(driver, false), nil
}
// SetupPGXDB is used to setup a pgx db for tests
@ -40,5 +40,5 @@ func SetupPGXDB() (sql.Database, error) {
if err != nil {
return nil, err
}
return NewPostgresDB(driver), nil
return NewPostgresDB(driver, false), nil
}