No-UPSERT mode #300
@ -244,6 +244,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
|||||||
ClientName: clientName,
|
ClientName: clientName,
|
||||||
Driver: driverType,
|
Driver: driverType,
|
||||||
}
|
}
|
||||||
|
if ctx.IsSet(utils.StateDiffUpsert.Name) {
|
||||||
|
pgConfig.Upsert = ctx.Bool(utils.StateDiffUpsert.Name)
|
||||||
|
}
|
||||||
if ctx.IsSet(utils.StateDiffDBMinConns.Name) {
|
if ctx.IsSet(utils.StateDiffDBMinConns.Name) {
|
||||||
pgConfig.MinConns = ctx.Int(utils.StateDiffDBMinConns.Name)
|
pgConfig.MinConns = ctx.Int(utils.StateDiffDBMinConns.Name)
|
||||||
}
|
}
|
||||||
|
@ -179,6 +179,7 @@ var (
|
|||||||
utils.StateDiffKnownGapsFilePath,
|
utils.StateDiffKnownGapsFilePath,
|
||||||
utils.StateDiffWaitForSync,
|
utils.StateDiffWaitForSync,
|
||||||
utils.StateDiffWatchedAddressesFilePath,
|
utils.StateDiffWatchedAddressesFilePath,
|
||||||
|
utils.StateDiffUpsert,
|
||||||
configFileFlag,
|
configFileFlag,
|
||||||
}, utils.NetworkFlags, utils.DatabasePathFlags)
|
}, utils.NetworkFlags, utils.DatabasePathFlags)
|
||||||
|
|
||||||
|
@ -1078,6 +1078,11 @@ var (
|
|||||||
Usage: "Client name to use when writing state diffs to database",
|
Usage: "Client name to use when writing state diffs to database",
|
||||||
Value: "go-ethereum",
|
Value: "go-ethereum",
|
||||||
}
|
}
|
||||||
|
StateDiffUpsert = &cli.BoolFlag{
|
||||||
|
Name: "statediff.db.upsert",
|
||||||
|
Usage: "Should the statediff service overwrite data existing in the database?",
|
||||||
|
Value: false,
|
||||||
|
}
|
||||||
StateDiffWritingFlag = &cli.BoolFlag{
|
StateDiffWritingFlag = &cli.BoolFlag{
|
||||||
Name: "statediff.writing",
|
Name: "statediff.writing",
|
||||||
Usage: "Activates progressive writing of state diffs to database as new block are synced",
|
Usage: "Activates progressive writing of state diffs to database as new block are synced",
|
||||||
|
@ -118,6 +118,8 @@ This service introduces a CLI flag namespace `statediff`
|
|||||||
|
|
||||||
`--statediff.db.clientname` is the client name to use in the Postgres database
|
`--statediff.db.clientname` is the client name to use in the Postgres database
|
||||||
|
|
||||||
|
`--statediff.db.upsert` whether or not the service, when operating in a direct database writing mode, should overwrite any existing conflicting data
|
||||||
|
|
||||||
`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode
|
`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode
|
||||||
|
|
||||||
`--statediff.file.wapath` full path (including filename) to write statediff watched addresses out to when operating in file mode
|
`--statediff.file.wapath` full path (including filename) to write statediff watched addresses out to when operating in file mode
|
||||||
|
@ -65,7 +65,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n
|
|||||||
default:
|
default:
|
||||||
return nil, nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver)
|
return nil, nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver)
|
||||||
}
|
}
|
||||||
db := postgres.NewPostgresDB(driver)
|
db := postgres.NewPostgresDB(driver, pgc.Upsert)
|
||||||
ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db)
|
ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db)
|
||||||
return db, ind, err
|
return db, ind, err
|
||||||
case shared.DUMP:
|
case shared.DUMP:
|
||||||
|
@ -77,6 +77,9 @@ type Config struct {
|
|||||||
|
|
||||||
// driver type
|
// driver type
|
||||||
Driver DriverType
|
Driver DriverType
|
||||||
|
|
||||||
|
// toggle on/off upserts
|
||||||
|
Upsert bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Type satisfies interfaces.Config
|
// Type satisfies interfaces.Config
|
||||||
|
@ -26,21 +26,27 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// NewPostgresDB returns a postgres.DB using the provided driver
|
// NewPostgresDB returns a postgres.DB using the provided driver
|
||||||
func NewPostgresDB(driver sql.Driver) *DB {
|
func NewPostgresDB(driver sql.Driver, upsert bool) *DB {
|
||||||
return &DB{driver}
|
return &DB{upsert, driver}
|
||||||
}
|
}
|
||||||
|
|
||||||
// DB implements sql.Database using a configured driver and Postgres statement syntax
|
// DB implements sql.Database using a configured driver and Postgres statement syntax
|
||||||
type DB struct {
|
type DB struct {
|
||||||
|
upsert bool
|
||||||
sql.Driver
|
sql.Driver
|
||||||
}
|
}
|
||||||
|
|
||||||
// InsertHeaderStm satisfies the sql.Statements interface
|
// InsertHeaderStm satisfies the sql.Statements interface
|
||||||
// Stm == Statement
|
// Stm == Statement
|
||||||
func (db *DB) InsertHeaderStm() string {
|
func (db *DB) InsertHeaderStm() string {
|
||||||
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
|
if db.upsert {
|
||||||
|
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
|
||||||
ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)`
|
ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)`
|
||||||
|
}
|
||||||
|
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
|
||||||
|
ON CONFLICT (block_hash, block_number) DO NOTHING`
|
||||||
}
|
}
|
||||||
|
|
||||||
// InsertUncleStm satisfies the sql.Statements interface
|
// InsertUncleStm satisfies the sql.Statements interface
|
||||||
@ -75,8 +81,12 @@ func (db *DB) InsertLogStm() string {
|
|||||||
|
|
||||||
// InsertStateStm satisfies the sql.Statements interface
|
// InsertStateStm satisfies the sql.Statements interface
|
||||||
func (db *DB) InsertStateStm() string {
|
func (db *DB) InsertStateStm() string {
|
||||||
return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
if db.upsert {
|
||||||
|
return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||||
ON CONFLICT (header_id, state_path, block_number) DO UPDATE SET (block_number, state_leaf_key, cid, node_type, diff, mh_key) = ($1, $3, $4, $6, $7, $8)`
|
ON CONFLICT (header_id, state_path, block_number) DO UPDATE SET (block_number, state_leaf_key, cid, node_type, diff, mh_key) = ($1, $3, $4, $6, $7, $8)`
|
||||||
|
}
|
||||||
|
return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||||
|
ON CONFLICT (header_id, state_path, block_number) DO NOTHING`
|
||||||
}
|
}
|
||||||
|
|
||||||
// InsertAccountStm satisfies the sql.Statements interface
|
// InsertAccountStm satisfies the sql.Statements interface
|
||||||
@ -87,8 +97,12 @@ func (db *DB) InsertAccountStm() string {
|
|||||||
|
|
||||||
// InsertStorageStm satisfies the sql.Statements interface
|
// InsertStorageStm satisfies the sql.Statements interface
|
||||||
func (db *DB) InsertStorageStm() string {
|
func (db *DB) InsertStorageStm() string {
|
||||||
return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
if db.upsert {
|
||||||
|
return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||||
ON CONFLICT (header_id, state_path, storage_path, block_number) DO UPDATE SET (block_number, storage_leaf_key, cid, node_type, diff, mh_key) = ($1, $4, $5, $7, $8, $9)`
|
ON CONFLICT (header_id, state_path, storage_path, block_number) DO UPDATE SET (block_number, storage_leaf_key, cid, node_type, diff, mh_key) = ($1, $4, $5, $7, $8, $9)`
|
||||||
|
}
|
||||||
|
return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||||
|
ON CONFLICT (header_id, state_path, storage_path, block_number) DO NOTHING`
|
||||||
}
|
}
|
||||||
|
|
||||||
// InsertIPLDStm satisfies the sql.Statements interface
|
// InsertIPLDStm satisfies the sql.Statements interface
|
||||||
|
@ -31,7 +31,7 @@ func SetupSQLXDB() (sql.Database, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return NewPostgresDB(driver), nil
|
return NewPostgresDB(driver, false), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetupPGXDB is used to setup a pgx db for tests
|
// SetupPGXDB is used to setup a pgx db for tests
|
||||||
@ -40,5 +40,5 @@ func SetupPGXDB() (sql.Database, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return NewPostgresDB(driver), nil
|
return NewPostgresDB(driver, false), nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user