Set state node diff field to false for snapshots #18

Merged
roysc merged 3 commits from snapshot-patch into main 2023-09-29 18:06:23 +00:00
17 changed files with 63 additions and 34 deletions

View File

@ -33,11 +33,19 @@ import (
) )
// NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface. // NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface.
// The returned indexer is used to process state diffs and write them to a database.
// The returned SQL database, if non-nil, is the indexer's backing database.
// `ctx` is used to cancel an underlying DB connection.
// `chainConfig` is used when processing chain state.
// `nodeInfo` contains metadata on the Ethereum node, which is inserted with the indexed state.
// `config` contains configuration specific to the indexer.
roysc marked this conversation as resolved Outdated

Could you add a comment as to the purpose of the flag?

Could you add a comment as to the purpose of the flag?
// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot.
func NewStateDiffIndexer( func NewStateDiffIndexer(
ctx context.Context, ctx context.Context,
chainConfig *params.ChainConfig, chainConfig *params.ChainConfig,
nodeInfo node.Info, nodeInfo node.Info,
config interfaces.Config, config interfaces.Config,
isDiff bool,
) ( ) (
sql.Database, sql.Database,
interfaces.StateDiffIndexer, interfaces.StateDiffIndexer,
@ -50,7 +58,7 @@ func NewStateDiffIndexer(
if !ok { if !ok {
return nil, nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{}) return nil, nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{})
} }
ind, err := file.NewStateDiffIndexer(chainConfig, fc, nodeInfo) ind, err := file.NewStateDiffIndexer(chainConfig, fc, nodeInfo, isDiff)
return nil, ind, err return nil, ind, err
case shared.POSTGRES: case shared.POSTGRES:
log.Info("Starting statediff service in Postgres writing mode") log.Info("Starting statediff service in Postgres writing mode")
@ -75,7 +83,7 @@ func NewStateDiffIndexer(
return nil, nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver) return nil, nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver)
} }
db := postgres.NewPostgresDB(driver, pgc.Upsert) db := postgres.NewPostgresDB(driver, pgc.Upsert)
ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db) ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db, isDiff)
return db, ind, err return db, ind, err
case shared.DUMP: case shared.DUMP:
log.Info("Starting statediff service in data dump mode") log.Info("Starting statediff service in data dump mode")

View File

@ -43,7 +43,7 @@ func setupLegacyCSVIndexer(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.CSVTestConfig, test.LegacyNodeInfo) ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.CSVTestConfig, test.LegacyNodeInfo, true)
require.NoError(t, err) require.NoError(t, err)
db, err = postgres.SetupSQLXDB() db, err = postgres.SetupSQLXDB()

View File

@ -41,7 +41,7 @@ func setupCSVIndexer(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.CSVTestConfig, test.LegacyNodeInfo) ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.CSVTestConfig, test.LegacyNodeInfo, true)
require.NoError(t, err) require.NoError(t, err)
db, err = postgres.SetupSQLXDB() db, err = postgres.SetupSQLXDB()

View File

@ -57,7 +57,8 @@ type tableRow struct {
type CSVWriter struct { type CSVWriter struct {
// dir containing output files // dir containing output files
dir string dir string
isDiff bool
writers fileWriters writers fileWriters
watchedAddressesWriter fileWriter watchedAddressesWriter fileWriter
@ -128,7 +129,7 @@ func (tx fileWriters) flush() error {
return nil return nil
} }
func NewCSVWriter(path string, watchedAddressesFilePath string) (*CSVWriter, error) { func NewCSVWriter(path string, watchedAddressesFilePath string, diff bool) (*CSVWriter, error) {
if err := os.MkdirAll(path, 0777); err != nil { if err := os.MkdirAll(path, 0777); err != nil {
return nil, fmt.Errorf("unable to create directory '%s': %w", path, err) return nil, fmt.Errorf("unable to create directory '%s': %w", path, err)
} }
@ -147,6 +148,7 @@ func NewCSVWriter(path string, watchedAddressesFilePath string) (*CSVWriter, err
writers: writers, writers: writers,
watchedAddressesWriter: watchedAddressesWriter, watchedAddressesWriter: watchedAddressesWriter,
dir: path, dir: path,
isDiff: diff,
rows: make(chan tableRow), rows: make(chan tableRow),
flushChan: make(chan struct{}), flushChan: make(chan struct{}),
flushFinished: make(chan struct{}), flushFinished: make(chan struct{}),
@ -278,14 +280,14 @@ func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) {
var values []interface{} var values []interface{}
values = append(values, stateNode.BlockNumber, stateNode.HeaderID, stateNode.StateKey, stateNode.CID, values = append(values, stateNode.BlockNumber, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
true, balance, strconv.FormatUint(stateNode.Nonce, 10), stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed) csw.isDiff, balance, strconv.FormatUint(stateNode.Nonce, 10), stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)
csw.rows <- tableRow{schema.TableStateNode, values} csw.rows <- tableRow{schema.TableStateNode, values}
} }
func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) { func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
var values []interface{} var values []interface{}
values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID, values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
true, storageCID.Value, storageCID.Removed) csw.isDiff, storageCID.Value, storageCID.Removed)
csw.rows <- tableRow{schema.TableStorageNode, values} csw.rows <- tableRow{schema.TableStorageNode, values}
} }

View File

@ -63,7 +63,11 @@ type StateDiffIndexer struct {
} }
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer // NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInfo node.Info) (*StateDiffIndexer, error) { // `chainConfig` is used when processing chain state.
roysc marked this conversation as resolved Outdated

Same as above.

Same as above.
// `config` contains configuration specific to the indexer.
// `nodeInfo` contains metadata on the Ethereum node, which is inserted with the indexed state.
// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot.
func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInfo node.Info, isDiff bool) (*StateDiffIndexer, error) {
var err error var err error
var writer FileWriter var writer FileWriter
@ -86,7 +90,7 @@ func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInf
} }
log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath) log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath)
writer, err = NewCSVWriter(outputDir, watchedAddressesFilePath) writer, err = NewCSVWriter(outputDir, watchedAddressesFilePath, isDiff)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -109,7 +113,7 @@ func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInf
} }
log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath) log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath)
writer = NewSQLWriter(file, watchedAddressesFilePath) writer = NewSQLWriter(file, watchedAddressesFilePath, isDiff)
default: default:
return nil, fmt.Errorf("unrecognized file mode: %s", config.Mode) return nil, fmt.Errorf("unrecognized file mode: %s", config.Mode)
} }

View File

@ -83,7 +83,7 @@ func setupMainnetIndexer(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
ind, err = file.NewStateDiffIndexer(chainConf, file.CSVTestConfig, test.LegacyNodeInfo) ind, err = file.NewStateDiffIndexer(chainConf, file.CSVTestConfig, test.LegacyNodeInfo, true)
require.NoError(t, err) require.NoError(t, err)
db, err = postgres.SetupSQLXDB() db, err = postgres.SetupSQLXDB()

View File

@ -44,7 +44,7 @@ func setupLegacySQLIndexer(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.SQLTestConfig, test.LegacyNodeInfo) ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.SQLTestConfig, test.LegacyNodeInfo, true)
require.NoError(t, err) require.NoError(t, err)
db, err = postgres.SetupSQLXDB() db, err = postgres.SetupSQLXDB()

View File

@ -41,7 +41,7 @@ func setupIndexer(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.SQLTestConfig, test.LegacyNodeInfo) ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.SQLTestConfig, test.LegacyNodeInfo, true)
require.NoError(t, err) require.NoError(t, err)
db, err = postgres.SetupSQLXDB() db, err = postgres.SetupSQLXDB()

View File

@ -44,6 +44,7 @@ var (
type SQLWriter struct { type SQLWriter struct {
wc io.WriteCloser wc io.WriteCloser
stmts chan []byte stmts chan []byte
isDiff bool
collatedStmt []byte collatedStmt []byte
collationIndex int collationIndex int
@ -55,11 +56,15 @@ type SQLWriter struct {
watchedAddressesFilePath string watchedAddressesFilePath string
} }
// NewSQLWriter creates a new pointer to a Writer // NewSQLWriter creates a new Writer.
func NewSQLWriter(wc io.WriteCloser, watchedAddressesFilePath string) *SQLWriter { // `wc` is the underlying io.WriteCloser to write to.
roysc marked this conversation as resolved Outdated

Same.

Same.
// `watchedAddressesFilePath` is the path to the file containing watched addresses.
// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot.
func NewSQLWriter(wc io.WriteCloser, watchedAddressesFilePath string, isDiff bool) *SQLWriter {
return &SQLWriter{ return &SQLWriter{
wc: wc, wc: wc,
stmts: make(chan []byte), stmts: make(chan []byte),
isDiff: isDiff,
collatedStmt: make([]byte, writeBufferSize), collatedStmt: make([]byte, writeBufferSize),
flushChan: make(chan struct{}), flushChan: make(chan struct{}),
flushFinished: make(chan struct{}), flushFinished: make(chan struct{}),
@ -225,12 +230,12 @@ func (sqw *SQLWriter) upsertStateCID(stateNode models.StateNodeModel) {
balance = "0" balance = "0"
} }
sqw.stmts <- []byte(fmt.Sprintf(stateInsert, stateNode.BlockNumber, stateNode.HeaderID, stateNode.StateKey, stateNode.CID, sqw.stmts <- []byte(fmt.Sprintf(stateInsert, stateNode.BlockNumber, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
stateNode.Removed, true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot)) stateNode.Removed, sqw.isDiff, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot))
} }
func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) { func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID, sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
storageCID.Removed, true, storageCID.Value)) storageCID.Removed, sqw.isDiff, storageCID.Value))
} }
// LoadWatchedAddresses loads watched addresses from a file // LoadWatchedAddresses loads watched addresses from a file

View File

@ -43,7 +43,7 @@ import (
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of an SQL sql // StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of an SQL DB.
type StateDiffIndexer struct { type StateDiffIndexer struct {
ctx context.Context ctx context.Context
chainConfig *params.ChainConfig chainConfig *params.ChainConfig
@ -51,11 +51,17 @@ type StateDiffIndexer struct {
} }
// NewStateDiffIndexer creates a sql implementation of interfaces.StateDiffIndexer // NewStateDiffIndexer creates a sql implementation of interfaces.StateDiffIndexer
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, db Database) (*StateDiffIndexer, error) { // `ctx` is used to cancel the underlying DB connection.
// `chainConfig` is used when processing chain state.
roysc marked this conversation as resolved Outdated

Same.

Same.
// `db` is the backing database to use.
// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot.
func NewStateDiffIndexer(
ctx context.Context, chainConfig *params.ChainConfig, db Database, isDiff bool,
) (*StateDiffIndexer, error) {
return &StateDiffIndexer{ return &StateDiffIndexer{
ctx: ctx, ctx: ctx,
chainConfig: chainConfig, chainConfig: chainConfig,
dbWriter: NewWriter(db), dbWriter: NewWriter(db, isDiff),
}, nil }, nil
} }

View File

@ -71,7 +71,7 @@ func setupMainnetIndexer(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
ind, err = sql.NewStateDiffIndexer(context.Background(), chainConf, db) ind, err = sql.NewStateDiffIndexer(context.Background(), chainConf, db, true)
} }
func checkTxClosure(t *testing.T, idle, inUse, open int64) { func checkTxClosure(t *testing.T, idle, inUse, open int64) {

View File

@ -33,7 +33,7 @@ func setupLegacyPGXIndexer(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
ind, err = sql.NewStateDiffIndexer(context.Background(), test.LegacyConfig, db) ind, err = sql.NewStateDiffIndexer(context.Background(), test.LegacyConfig, db, true)
require.NoError(t, err) require.NoError(t, err)
} }

View File

@ -44,7 +44,7 @@ func setupPGXIndexer(t *testing.T, config postgres.Config) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestChainConfig, db) ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestChainConfig, db, true)
require.NoError(t, err) require.NoError(t, err)
} }

View File

@ -32,7 +32,7 @@ func setupLegacySQLXIndexer(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
ind, err = sql.NewStateDiffIndexer(context.Background(), test.LegacyConfig, db) ind, err = sql.NewStateDiffIndexer(context.Background(), test.LegacyConfig, db, true)
require.NoError(t, err) require.NoError(t, err)
} }

View File

@ -34,7 +34,7 @@ func setupSQLXIndexer(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestChainConfig, db) ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestChainConfig, db, true)
require.NoError(t, err) require.NoError(t, err)
} }

View File

@ -34,13 +34,15 @@ import (
// Writer handles processing and writing of indexed IPLD objects to Postgres // Writer handles processing and writing of indexed IPLD objects to Postgres
type Writer struct { type Writer struct {
db Database db Database
isDiff bool
} }
// NewWriter creates a new pointer to a Writer // NewWriter creates a new pointer to a Writer. `diff` indicates whether this is part of an
func NewWriter(db Database) *Writer { // incremental diff (as opposed to a snapshot).
func NewWriter(db Database, diff bool) *Writer {
return &Writer{ return &Writer{
db: db, db: db, isDiff: diff,
} }
} }
@ -308,7 +310,8 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
_, err = tx.CopyFrom(w.db.Context(), _, err = tx.CopyFrom(w.db.Context(),
schema.TableStateNode.TableName(), schema.TableStateNode.ColumnNames(), schema.TableStateNode.TableName(), schema.TableStateNode.ColumnNames(),
toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID, toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed))) w.isDiff, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot,
stateNode.Removed)))
if err != nil { if err != nil {
return insertError{"eth.state_cids", err, "COPY", stateNode} return insertError{"eth.state_cids", err, "COPY", stateNode}
} }
@ -318,7 +321,7 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
stateNode.HeaderID, stateNode.HeaderID,
stateNode.StateKey, stateNode.StateKey,
stateNode.CID, stateNode.CID,
true, w.isDiff,
bal, bal,
stateNode.Nonce, stateNode.Nonce,
stateNode.CodeHash, stateNode.CodeHash,
@ -346,7 +349,7 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
_, err = tx.CopyFrom(w.db.Context(), _, err = tx.CopyFrom(w.db.Context(),
schema.TableStorageNode.TableName(), schema.TableStorageNode.ColumnNames(), schema.TableStorageNode.TableName(), schema.TableStorageNode.ColumnNames(),
toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID, toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
true, storageCID.Value, storageCID.Removed))) w.isDiff, storageCID.Value, storageCID.Removed)))
if err != nil { if err != nil {
return insertError{"eth.storage_cids", err, "COPY", storageCID} return insertError{"eth.storage_cids", err, "COPY", storageCID}
} }
@ -357,7 +360,7 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
storageCID.StateKey, storageCID.StateKey,
storageCID.StorageKey, storageCID.StorageKey,
storageCID.CID, storageCID.CID,
true, w.isDiff,
storageCID.Value, storageCID.Value,
storageCID.Removed, storageCID.Removed,
) )

View File

@ -52,6 +52,7 @@ func InitializeNode(stack core.Node, b core.Backend) {
adapt.ChainConfig(backend.ChainConfig()), adapt.ChainConfig(backend.ChainConfig()),
info, info,
serviceConfig.IndexerConfig, serviceConfig.IndexerConfig,
true,
) )
if err != nil { if err != nil {
log.Error("failed to construct indexer", "error", err) log.Error("failed to construct indexer", "error", err)