No-UPSERT mode #300

Merged
telackey merged 2 commits from ian_dev into v1.10.26-statediff-v4 2023-01-11 00:23:31 +00:00
8 changed files with 36 additions and 8 deletions

View File

@ -244,6 +244,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
ClientName: clientName, ClientName: clientName,
Driver: driverType, Driver: driverType,
} }
if ctx.IsSet(utils.StateDiffUpsert.Name) {
pgConfig.Upsert = ctx.Bool(utils.StateDiffUpsert.Name)
}
if ctx.IsSet(utils.StateDiffDBMinConns.Name) { if ctx.IsSet(utils.StateDiffDBMinConns.Name) {
pgConfig.MinConns = ctx.Int(utils.StateDiffDBMinConns.Name) pgConfig.MinConns = ctx.Int(utils.StateDiffDBMinConns.Name)
} }

View File

@ -179,6 +179,7 @@ var (
utils.StateDiffKnownGapsFilePath, utils.StateDiffKnownGapsFilePath,
utils.StateDiffWaitForSync, utils.StateDiffWaitForSync,
utils.StateDiffWatchedAddressesFilePath, utils.StateDiffWatchedAddressesFilePath,
utils.StateDiffUpsert,
configFileFlag, configFileFlag,
}, utils.NetworkFlags, utils.DatabasePathFlags) }, utils.NetworkFlags, utils.DatabasePathFlags)

View File

@ -1078,6 +1078,11 @@ var (
Usage: "Client name to use when writing state diffs to database", Usage: "Client name to use when writing state diffs to database",
Value: "go-ethereum", Value: "go-ethereum",
} }
StateDiffUpsert = &cli.BoolFlag{
Name: "statediff.db.upsert",
Usage: "Should the statediff service overwrite data existing in the database?",
Value: false,
}
StateDiffWritingFlag = &cli.BoolFlag{ StateDiffWritingFlag = &cli.BoolFlag{
Name: "statediff.writing", Name: "statediff.writing",
Usage: "Activates progressive writing of state diffs to database as new block are synced", Usage: "Activates progressive writing of state diffs to database as new block are synced",

View File

@ -118,6 +118,8 @@ This service introduces a CLI flag namespace `statediff`
`--statediff.db.clientname` is the client name to use in the Postgres database `--statediff.db.clientname` is the client name to use in the Postgres database
`--statediff.db.upsert` whether or not the service, when operating in a direct database writing mode, should overwrite any existing conflicting data
`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode `--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode
`--statediff.file.wapath` full path (including filename) to write statediff watched addresses out to when operating in file mode `--statediff.file.wapath` full path (including filename) to write statediff watched addresses out to when operating in file mode

View File

@ -65,7 +65,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n
default: default:
return nil, nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver) return nil, nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver)
} }
db := postgres.NewPostgresDB(driver) db := postgres.NewPostgresDB(driver, pgc.Upsert)
ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db) ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db)
return db, ind, err return db, ind, err
case shared.DUMP: case shared.DUMP:

View File

@ -77,6 +77,9 @@ type Config struct {
// driver type // driver type
Driver DriverType Driver DriverType
// toggle on/off upserts
Upsert bool
} }
// Type satisfies interfaces.Config // Type satisfies interfaces.Config

View File

@ -26,21 +26,27 @@ const (
) )
// NewPostgresDB returns a postgres.DB using the provided driver // NewPostgresDB returns a postgres.DB using the provided driver
func NewPostgresDB(driver sql.Driver) *DB { func NewPostgresDB(driver sql.Driver, upsert bool) *DB {
return &DB{driver} return &DB{upsert, driver}
} }
// DB implements sql.Database using a configured driver and Postgres statement syntax // DB implements sql.Database using a configured driver and Postgres statement syntax
type DB struct { type DB struct {
upsert bool
sql.Driver sql.Driver
} }
// InsertHeaderStm satisfies the sql.Statements interface // InsertHeaderStm satisfies the sql.Statements interface
// Stm == Statement // Stm == Statement
func (db *DB) InsertHeaderStm() string { func (db *DB) InsertHeaderStm() string {
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) if db.upsert {
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)` ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)`
}
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
ON CONFLICT (block_hash, block_number) DO NOTHING`
} }
// InsertUncleStm satisfies the sql.Statements interface // InsertUncleStm satisfies the sql.Statements interface
@ -75,8 +81,12 @@ func (db *DB) InsertLogStm() string {
// InsertStateStm satisfies the sql.Statements interface // InsertStateStm satisfies the sql.Statements interface
func (db *DB) InsertStateStm() string { func (db *DB) InsertStateStm() string {
return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) if db.upsert {
return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (header_id, state_path, block_number) DO UPDATE SET (block_number, state_leaf_key, cid, node_type, diff, mh_key) = ($1, $3, $4, $6, $7, $8)` ON CONFLICT (header_id, state_path, block_number) DO UPDATE SET (block_number, state_leaf_key, cid, node_type, diff, mh_key) = ($1, $3, $4, $6, $7, $8)`
}
return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (header_id, state_path, block_number) DO NOTHING`
} }
// InsertAccountStm satisfies the sql.Statements interface // InsertAccountStm satisfies the sql.Statements interface
@ -87,8 +97,12 @@ func (db *DB) InsertAccountStm() string {
// InsertStorageStm satisfies the sql.Statements interface // InsertStorageStm satisfies the sql.Statements interface
func (db *DB) InsertStorageStm() string { func (db *DB) InsertStorageStm() string {
return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) if db.upsert {
return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (header_id, state_path, storage_path, block_number) DO UPDATE SET (block_number, storage_leaf_key, cid, node_type, diff, mh_key) = ($1, $4, $5, $7, $8, $9)` ON CONFLICT (header_id, state_path, storage_path, block_number) DO UPDATE SET (block_number, storage_leaf_key, cid, node_type, diff, mh_key) = ($1, $4, $5, $7, $8, $9)`
}
return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (header_id, state_path, storage_path, block_number) DO NOTHING`
} }
// InsertIPLDStm satisfies the sql.Statements interface // InsertIPLDStm satisfies the sql.Statements interface

View File

@ -31,7 +31,7 @@ func SetupSQLXDB() (sql.Database, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewPostgresDB(driver), nil return NewPostgresDB(driver, false), nil
} }
// SetupPGXDB is used to setup a pgx db for tests // SetupPGXDB is used to setup a pgx db for tests
@ -40,5 +40,5 @@ func SetupPGXDB() (sql.Database, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewPostgresDB(driver), nil return NewPostgresDB(driver, false), nil
} }