Set state node diff field to false for snapshots #18

Merged
roysc merged 3 commits from snapshot-patch into main 2023-09-29 18:06:23 +00:00
4 changed files with 26 additions and 12 deletions
Showing only changes of commit 0b2be15cf9 - Show all commits

View File

@ -33,16 +33,19 @@ import (
) )
// NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface. // NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface.
// The returned indexer is used to process state diffs and write them to a database.
// The returned SQL database, if non-nil, is the indexer's backing database.
// `ctx` is used to cancel an underlying DB connection.
// `chainConfig` is used when processing chain state. // `chainConfig` is used when processing chain state.
// `nodeInfo` contains metadata on the Ethereum node, which is inserted with the indexed state. // `nodeInfo` contains metadata on the Ethereum node, which is inserted with the indexed state.
// `config` contains configuration specific to the indexer. // `config` contains configuration specific to the indexer.
// `diff` tells us to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot. // `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot.
func NewStateDiffIndexer( func NewStateDiffIndexer(
ctx context.Context, ctx context.Context,
chainConfig *params.ChainConfig, chainConfig *params.ChainConfig,
nodeInfo node.Info, nodeInfo node.Info,
config interfaces.Config, config interfaces.Config,
diff bool, isDiff bool,
) ( ) (
sql.Database, sql.Database,
interfaces.StateDiffIndexer, interfaces.StateDiffIndexer,
@ -55,7 +58,7 @@ func NewStateDiffIndexer(
if !ok { if !ok {
return nil, nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{}) return nil, nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{})
} }
ind, err := file.NewStateDiffIndexer(chainConfig, fc, nodeInfo, diff) ind, err := file.NewStateDiffIndexer(chainConfig, fc, nodeInfo, isDiff)
return nil, ind, err return nil, ind, err
case shared.POSTGRES: case shared.POSTGRES:
log.Info("Starting statediff service in Postgres writing mode") log.Info("Starting statediff service in Postgres writing mode")
@ -80,7 +83,7 @@ func NewStateDiffIndexer(
return nil, nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver) return nil, nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver)
} }
db := postgres.NewPostgresDB(driver, pgc.Upsert) db := postgres.NewPostgresDB(driver, pgc.Upsert)
ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db, diff) ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db, isDiff)
return db, ind, err return db, ind, err
case shared.DUMP: case shared.DUMP:
log.Info("Starting statediff service in data dump mode") log.Info("Starting statediff service in data dump mode")

View File

@ -63,7 +63,11 @@ type StateDiffIndexer struct {
} }
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer // NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInfo node.Info, diff bool) (*StateDiffIndexer, error) { // `chainConfig` is used when processing chain state.
// `config` contains configuration specific to the indexer.
// `nodeInfo` contains metadata on the Ethereum node, which is inserted with the indexed state.
// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot.
func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInfo node.Info, isDiff bool) (*StateDiffIndexer, error) {
var err error var err error
var writer FileWriter var writer FileWriter
@ -86,7 +90,7 @@ func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInf
} }
log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath) log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath)
writer, err = NewCSVWriter(outputDir, watchedAddressesFilePath, diff) writer, err = NewCSVWriter(outputDir, watchedAddressesFilePath, isDiff)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -109,7 +113,7 @@ func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInf
} }
log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath) log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath)
writer = NewSQLWriter(file, watchedAddressesFilePath, diff) writer = NewSQLWriter(file, watchedAddressesFilePath, isDiff)
default: default:
return nil, fmt.Errorf("unrecognized file mode: %s", config.Mode) return nil, fmt.Errorf("unrecognized file mode: %s", config.Mode)
} }

View File

@ -56,12 +56,15 @@ type SQLWriter struct {
watchedAddressesFilePath string watchedAddressesFilePath string
} }
// NewSQLWriter creates a new pointer to a Writer // NewSQLWriter creates a new Writer.
func NewSQLWriter(wc io.WriteCloser, watchedAddressesFilePath string, diff bool) *SQLWriter { // `wc` is the underlying io.WriteCloser to write to.
// `watchedAddressesFilePath` is the path to the file containing watched addresses.
// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot.
func NewSQLWriter(wc io.WriteCloser, watchedAddressesFilePath string, isDiff bool) *SQLWriter {
return &SQLWriter{ return &SQLWriter{
wc: wc, wc: wc,
stmts: make(chan []byte), stmts: make(chan []byte),
isDiff: diff, isDiff: isDiff,
collatedStmt: make([]byte, writeBufferSize), collatedStmt: make([]byte, writeBufferSize),
flushChan: make(chan struct{}), flushChan: make(chan struct{}),
flushFinished: make(chan struct{}), flushFinished: make(chan struct{}),

View File

@ -51,13 +51,17 @@ type StateDiffIndexer struct {
} }
// NewStateDiffIndexer creates a sql implementation of interfaces.StateDiffIndexer // NewStateDiffIndexer creates a sql implementation of interfaces.StateDiffIndexer
// `ctx` is used to cancel the underlying DB connection.
// `chainConfig` is used when processing chain state.
// `db` is the backing database to use.
// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot.
func NewStateDiffIndexer( func NewStateDiffIndexer(
ctx context.Context, chainConfig *params.ChainConfig, db Database, diff bool, ctx context.Context, chainConfig *params.ChainConfig, db Database, isDiff bool,
) (*StateDiffIndexer, error) { ) (*StateDiffIndexer, error) {
return &StateDiffIndexer{ return &StateDiffIndexer{
ctx: ctx, ctx: ctx,
chainConfig: chainConfig, chainConfig: chainConfig,
dbWriter: NewWriter(db, diff), dbWriter: NewWriter(db, isDiff),
}, nil }, nil
} }