Set state node diff field to false for snapshots (#18)
All checks were successful
Test / Run unit tests (push) Successful in 16m22s
Test / Run compliance tests (push) Successful in 4m26s
Test / Run integration tests (push) Successful in 31m18s

Needed for cerc-io/ipld-eth-state-snapshot#1

Reviewed-on: #18
This commit is contained in:
Roy Crihfield 2023-09-29 18:06:21 +00:00
parent b8fec4b571
commit e70c55bf86
17 changed files with 63 additions and 34 deletions

View File

@ -33,11 +33,19 @@ import (
)
// NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface.
// The returned indexer is used to process state diffs and write them to a database.
// The returned SQL database, if non-nil, is the indexer's backing database.
// `ctx` is used to cancel an underlying DB connection.
// `chainConfig` is used when processing chain state.
// `nodeInfo` contains metadata on the Ethereum node, which is inserted with the indexed state.
// `config` contains configuration specific to the indexer.
// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot.
func NewStateDiffIndexer(
ctx context.Context,
chainConfig *params.ChainConfig,
nodeInfo node.Info,
config interfaces.Config,
isDiff bool,
) (
sql.Database,
interfaces.StateDiffIndexer,
@ -50,7 +58,7 @@ func NewStateDiffIndexer(
if !ok {
return nil, nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{})
}
ind, err := file.NewStateDiffIndexer(chainConfig, fc, nodeInfo)
ind, err := file.NewStateDiffIndexer(chainConfig, fc, nodeInfo, isDiff)
return nil, ind, err
case shared.POSTGRES:
log.Info("Starting statediff service in Postgres writing mode")
@ -75,7 +83,7 @@ func NewStateDiffIndexer(
return nil, nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver)
}
db := postgres.NewPostgresDB(driver, pgc.Upsert)
ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db)
ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db, isDiff)
return db, ind, err
case shared.DUMP:
log.Info("Starting statediff service in data dump mode")

View File

@ -43,7 +43,7 @@ func setupLegacyCSVIndexer(t *testing.T) {
require.NoError(t, err)
}
ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.CSVTestConfig, test.LegacyNodeInfo)
ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.CSVTestConfig, test.LegacyNodeInfo, true)
require.NoError(t, err)
db, err = postgres.SetupSQLXDB()

View File

@ -41,7 +41,7 @@ func setupCSVIndexer(t *testing.T) {
require.NoError(t, err)
}
ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.CSVTestConfig, test.LegacyNodeInfo)
ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.CSVTestConfig, test.LegacyNodeInfo, true)
require.NoError(t, err)
db, err = postgres.SetupSQLXDB()

View File

@ -58,6 +58,7 @@ type tableRow struct {
type CSVWriter struct {
// dir containing output files
dir string
isDiff bool
writers fileWriters
watchedAddressesWriter fileWriter
@ -128,7 +129,7 @@ func (tx fileWriters) flush() error {
return nil
}
func NewCSVWriter(path string, watchedAddressesFilePath string) (*CSVWriter, error) {
func NewCSVWriter(path string, watchedAddressesFilePath string, diff bool) (*CSVWriter, error) {
if err := os.MkdirAll(path, 0777); err != nil {
return nil, fmt.Errorf("unable to create directory '%s': %w", path, err)
}
@ -147,6 +148,7 @@ func NewCSVWriter(path string, watchedAddressesFilePath string) (*CSVWriter, err
writers: writers,
watchedAddressesWriter: watchedAddressesWriter,
dir: path,
isDiff: diff,
rows: make(chan tableRow),
flushChan: make(chan struct{}),
flushFinished: make(chan struct{}),
@ -278,14 +280,14 @@ func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) {
var values []interface{}
values = append(values, stateNode.BlockNumber, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
true, balance, strconv.FormatUint(stateNode.Nonce, 10), stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)
csw.isDiff, balance, strconv.FormatUint(stateNode.Nonce, 10), stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)
csw.rows <- tableRow{schema.TableStateNode, values}
}
func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
var values []interface{}
values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
true, storageCID.Value, storageCID.Removed)
csw.isDiff, storageCID.Value, storageCID.Removed)
csw.rows <- tableRow{schema.TableStorageNode, values}
}

View File

@ -63,7 +63,11 @@ type StateDiffIndexer struct {
}
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInfo node.Info) (*StateDiffIndexer, error) {
// `chainConfig` is used when processing chain state.
// `config` contains configuration specific to the indexer.
// `nodeInfo` contains metadata on the Ethereum node, which is inserted with the indexed state.
// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot.
func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInfo node.Info, isDiff bool) (*StateDiffIndexer, error) {
var err error
var writer FileWriter
@ -86,7 +90,7 @@ func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInf
}
log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath)
writer, err = NewCSVWriter(outputDir, watchedAddressesFilePath)
writer, err = NewCSVWriter(outputDir, watchedAddressesFilePath, isDiff)
if err != nil {
return nil, err
}
@ -109,7 +113,7 @@ func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInf
}
log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath)
writer = NewSQLWriter(file, watchedAddressesFilePath)
writer = NewSQLWriter(file, watchedAddressesFilePath, isDiff)
default:
return nil, fmt.Errorf("unrecognized file mode: %s", config.Mode)
}

View File

@ -83,7 +83,7 @@ func setupMainnetIndexer(t *testing.T) {
require.NoError(t, err)
}
ind, err = file.NewStateDiffIndexer(chainConf, file.CSVTestConfig, test.LegacyNodeInfo)
ind, err = file.NewStateDiffIndexer(chainConf, file.CSVTestConfig, test.LegacyNodeInfo, true)
require.NoError(t, err)
db, err = postgres.SetupSQLXDB()

View File

@ -44,7 +44,7 @@ func setupLegacySQLIndexer(t *testing.T) {
require.NoError(t, err)
}
ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.SQLTestConfig, test.LegacyNodeInfo)
ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.SQLTestConfig, test.LegacyNodeInfo, true)
require.NoError(t, err)
db, err = postgres.SetupSQLXDB()

View File

@ -41,7 +41,7 @@ func setupIndexer(t *testing.T) {
require.NoError(t, err)
}
ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.SQLTestConfig, test.LegacyNodeInfo)
ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.SQLTestConfig, test.LegacyNodeInfo, true)
require.NoError(t, err)
db, err = postgres.SetupSQLXDB()

View File

@ -44,6 +44,7 @@ var (
type SQLWriter struct {
wc io.WriteCloser
stmts chan []byte
isDiff bool
collatedStmt []byte
collationIndex int
@ -55,11 +56,15 @@ type SQLWriter struct {
watchedAddressesFilePath string
}
// NewSQLWriter creates a new pointer to a Writer
func NewSQLWriter(wc io.WriteCloser, watchedAddressesFilePath string) *SQLWriter {
// NewSQLWriter creates a new Writer.
// `wc` is the underlying io.WriteCloser to write to.
// `watchedAddressesFilePath` is the path to the file containing watched addresses.
// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot.
func NewSQLWriter(wc io.WriteCloser, watchedAddressesFilePath string, isDiff bool) *SQLWriter {
return &SQLWriter{
wc: wc,
stmts: make(chan []byte),
isDiff: isDiff,
collatedStmt: make([]byte, writeBufferSize),
flushChan: make(chan struct{}),
flushFinished: make(chan struct{}),
@ -225,12 +230,12 @@ func (sqw *SQLWriter) upsertStateCID(stateNode models.StateNodeModel) {
balance = "0"
}
sqw.stmts <- []byte(fmt.Sprintf(stateInsert, stateNode.BlockNumber, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
stateNode.Removed, true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot))
stateNode.Removed, sqw.isDiff, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot))
}
func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
storageCID.Removed, true, storageCID.Value))
storageCID.Removed, sqw.isDiff, storageCID.Value))
}
// LoadWatchedAddresses loads watched addresses from a file

View File

@ -43,7 +43,7 @@ import (
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of an SQL sql
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of an SQL DB.
type StateDiffIndexer struct {
ctx context.Context
chainConfig *params.ChainConfig
@ -51,11 +51,17 @@ type StateDiffIndexer struct {
}
// NewStateDiffIndexer creates a sql implementation of interfaces.StateDiffIndexer
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, db Database) (*StateDiffIndexer, error) {
// `ctx` is used to cancel the underlying DB connection.
// `chainConfig` is used when processing chain state.
// `db` is the backing database to use.
// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot.
func NewStateDiffIndexer(
ctx context.Context, chainConfig *params.ChainConfig, db Database, isDiff bool,
) (*StateDiffIndexer, error) {
return &StateDiffIndexer{
ctx: ctx,
chainConfig: chainConfig,
dbWriter: NewWriter(db),
dbWriter: NewWriter(db, isDiff),
}, nil
}

View File

@ -71,7 +71,7 @@ func setupMainnetIndexer(t *testing.T) {
if err != nil {
t.Fatal(err)
}
ind, err = sql.NewStateDiffIndexer(context.Background(), chainConf, db)
ind, err = sql.NewStateDiffIndexer(context.Background(), chainConf, db, true)
}
func checkTxClosure(t *testing.T, idle, inUse, open int64) {

View File

@ -33,7 +33,7 @@ func setupLegacyPGXIndexer(t *testing.T) {
if err != nil {
t.Fatal(err)
}
ind, err = sql.NewStateDiffIndexer(context.Background(), test.LegacyConfig, db)
ind, err = sql.NewStateDiffIndexer(context.Background(), test.LegacyConfig, db, true)
require.NoError(t, err)
}

View File

@ -44,7 +44,7 @@ func setupPGXIndexer(t *testing.T, config postgres.Config) {
if err != nil {
t.Fatal(err)
}
ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestChainConfig, db)
ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestChainConfig, db, true)
require.NoError(t, err)
}

View File

@ -32,7 +32,7 @@ func setupLegacySQLXIndexer(t *testing.T) {
if err != nil {
t.Fatal(err)
}
ind, err = sql.NewStateDiffIndexer(context.Background(), test.LegacyConfig, db)
ind, err = sql.NewStateDiffIndexer(context.Background(), test.LegacyConfig, db, true)
require.NoError(t, err)
}

View File

@ -34,7 +34,7 @@ func setupSQLXIndexer(t *testing.T) {
if err != nil {
t.Fatal(err)
}
ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestChainConfig, db)
ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestChainConfig, db, true)
require.NoError(t, err)
}

View File

@ -35,12 +35,14 @@ import (
// Writer handles processing and writing of indexed IPLD objects to Postgres
type Writer struct {
db Database
isDiff bool
}
// NewWriter creates a new pointer to a Writer
func NewWriter(db Database) *Writer {
// NewWriter creates a new pointer to a Writer. `diff` indicates whether this is part of an
// incremental diff (as opposed to a snapshot).
func NewWriter(db Database, diff bool) *Writer {
return &Writer{
db: db,
db: db, isDiff: diff,
}
}
@ -308,7 +310,8 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
_, err = tx.CopyFrom(w.db.Context(),
schema.TableStateNode.TableName(), schema.TableStateNode.ColumnNames(),
toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)))
w.isDiff, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot,
stateNode.Removed)))
if err != nil {
return insertError{"eth.state_cids", err, "COPY", stateNode}
}
@ -318,7 +321,7 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
stateNode.HeaderID,
stateNode.StateKey,
stateNode.CID,
true,
w.isDiff,
bal,
stateNode.Nonce,
stateNode.CodeHash,
@ -346,7 +349,7 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
_, err = tx.CopyFrom(w.db.Context(),
schema.TableStorageNode.TableName(), schema.TableStorageNode.ColumnNames(),
toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
true, storageCID.Value, storageCID.Removed)))
w.isDiff, storageCID.Value, storageCID.Removed)))
if err != nil {
return insertError{"eth.storage_cids", err, "COPY", storageCID}
}
@ -357,7 +360,7 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
storageCID.StateKey,
storageCID.StorageKey,
storageCID.CID,
true,
w.isDiff,
storageCID.Value,
storageCID.Removed,
)

View File

@ -52,6 +52,7 @@ func InitializeNode(stack core.Node, b core.Backend) {
adapt.ChainConfig(backend.ChainConfig()),
info,
serviceConfig.IndexerConfig,
true,
)
if err != nil {
log.Error("failed to construct indexer", "error", err)