No-UPSERT mode #300
@ -244,6 +244,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
||||
ClientName: clientName,
|
||||
Driver: driverType,
|
||||
}
|
||||
if ctx.IsSet(utils.StateDiffUpsert.Name) {
|
||||
pgConfig.Upsert = ctx.Bool(utils.StateDiffUpsert.Name)
|
||||
}
|
||||
if ctx.IsSet(utils.StateDiffDBMinConns.Name) {
|
||||
pgConfig.MinConns = ctx.Int(utils.StateDiffDBMinConns.Name)
|
||||
}
|
||||
|
@ -179,6 +179,7 @@ var (
|
||||
utils.StateDiffKnownGapsFilePath,
|
||||
utils.StateDiffWaitForSync,
|
||||
utils.StateDiffWatchedAddressesFilePath,
|
||||
utils.StateDiffUpsert,
|
||||
configFileFlag,
|
||||
}, utils.NetworkFlags, utils.DatabasePathFlags)
|
||||
|
||||
|
@ -1078,6 +1078,11 @@ var (
|
||||
Usage: "Client name to use when writing state diffs to database",
|
||||
Value: "go-ethereum",
|
||||
}
|
||||
StateDiffUpsert = &cli.BoolFlag{
|
||||
Name: "statediff.db.upsert",
|
||||
Usage: "Should the statediff service overwrite data existing in the database?",
|
||||
Value: false,
|
||||
}
|
||||
StateDiffWritingFlag = &cli.BoolFlag{
|
||||
Name: "statediff.writing",
|
||||
Usage: "Activates progressive writing of state diffs to database as new block are synced",
|
||||
|
@ -118,6 +118,8 @@ This service introduces a CLI flag namespace `statediff`
|
||||
|
||||
`--statediff.db.clientname` is the client name to use in the Postgres database
|
||||
|
||||
`--statediff.db.upsert` whether or not the service, when operating in a direct database writing mode, should overwrite any existing conflicting data
|
||||
|
||||
`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode
|
||||
|
||||
`--statediff.file.wapath` full path (including filename) to write statediff watched addresses out to when operating in file mode
|
||||
|
@ -65,7 +65,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n
|
||||
default:
|
||||
return nil, nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver)
|
||||
}
|
||||
db := postgres.NewPostgresDB(driver)
|
||||
db := postgres.NewPostgresDB(driver, pgc.Upsert)
|
||||
ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db)
|
||||
return db, ind, err
|
||||
case shared.DUMP:
|
||||
|
@ -77,6 +77,9 @@ type Config struct {
|
||||
|
||||
// driver type
|
||||
Driver DriverType
|
||||
|
||||
// toggle on/off upserts
|
||||
Upsert bool
|
||||
}
|
||||
|
||||
// Type satisfies interfaces.Config
|
||||
|
@ -26,21 +26,27 @@ const (
|
||||
)
|
||||
|
||||
// NewPostgresDB returns a postgres.DB using the provided driver
|
||||
func NewPostgresDB(driver sql.Driver) *DB {
|
||||
return &DB{driver}
|
||||
func NewPostgresDB(driver sql.Driver, upsert bool) *DB {
|
||||
return &DB{upsert, driver}
|
||||
}
|
||||
|
||||
// DB implements sql.Database using a configured driver and Postgres statement syntax
|
||||
type DB struct {
|
||||
upsert bool
|
||||
sql.Driver
|
||||
}
|
||||
|
||||
// InsertHeaderStm satisfies the sql.Statements interface
|
||||
// Stm == Statement
|
||||
func (db *DB) InsertHeaderStm() string {
|
||||
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
|
||||
if db.upsert {
|
||||
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
|
||||
ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)`
|
||||
}
|
||||
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
|
||||
ON CONFLICT (block_hash, block_number) DO NOTHING`
|
||||
}
|
||||
|
||||
// InsertUncleStm satisfies the sql.Statements interface
|
||||
@ -75,8 +81,12 @@ func (db *DB) InsertLogStm() string {
|
||||
|
||||
// InsertStateStm satisfies the sql.Statements interface
|
||||
func (db *DB) InsertStateStm() string {
|
||||
return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
if db.upsert {
|
||||
return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
ON CONFLICT (header_id, state_path, block_number) DO UPDATE SET (block_number, state_leaf_key, cid, node_type, diff, mh_key) = ($1, $3, $4, $6, $7, $8)`
|
||||
}
|
||||
return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
ON CONFLICT (header_id, state_path, block_number) DO NOTHING`
|
||||
}
|
||||
|
||||
// InsertAccountStm satisfies the sql.Statements interface
|
||||
@ -87,8 +97,12 @@ func (db *DB) InsertAccountStm() string {
|
||||
|
||||
// InsertStorageStm satisfies the sql.Statements interface
|
||||
func (db *DB) InsertStorageStm() string {
|
||||
return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||
if db.upsert {
|
||||
return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||
ON CONFLICT (header_id, state_path, storage_path, block_number) DO UPDATE SET (block_number, storage_leaf_key, cid, node_type, diff, mh_key) = ($1, $4, $5, $7, $8, $9)`
|
||||
}
|
||||
return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||
ON CONFLICT (header_id, state_path, storage_path, block_number) DO NOTHING`
|
||||
}
|
||||
|
||||
// InsertIPLDStm satisfies the sql.Statements interface
|
||||
|
@ -31,7 +31,7 @@ func SetupSQLXDB() (sql.Database, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewPostgresDB(driver), nil
|
||||
return NewPostgresDB(driver, false), nil
|
||||
}
|
||||
|
||||
// SetupPGXDB is used to setup a pgx db for tests
|
||||
@ -40,5 +40,5 @@ func SetupPGXDB() (sql.Database, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewPostgresDB(driver), nil
|
||||
return NewPostgresDB(driver, false), nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user