Set state node diff
field to false for snapshots
#18
@ -33,11 +33,19 @@ import (
|
||||
)
|
||||
|
||||
// NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface.
|
||||
// The returned indexer is used to process state diffs and write them to a database.
|
||||
// The returned SQL database, if non-nil, is the indexer's backing database.
|
||||
// `ctx` is used to cancel an underlying DB connection.
|
||||
// `chainConfig` is used when processing chain state.
|
||||
// `nodeInfo` contains metadata on the Ethereum node, which is inserted with the indexed state.
|
||||
// `config` contains configuration specific to the indexer.
|
||||
// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot.
|
||||
func NewStateDiffIndexer(
|
||||
ctx context.Context,
|
||||
chainConfig *params.ChainConfig,
|
||||
nodeInfo node.Info,
|
||||
config interfaces.Config,
|
||||
isDiff bool,
|
||||
) (
|
||||
sql.Database,
|
||||
interfaces.StateDiffIndexer,
|
||||
@ -50,7 +58,7 @@ func NewStateDiffIndexer(
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{})
|
||||
}
|
||||
ind, err := file.NewStateDiffIndexer(chainConfig, fc, nodeInfo)
|
||||
ind, err := file.NewStateDiffIndexer(chainConfig, fc, nodeInfo, isDiff)
|
||||
return nil, ind, err
|
||||
case shared.POSTGRES:
|
||||
log.Info("Starting statediff service in Postgres writing mode")
|
||||
@ -75,7 +83,7 @@ func NewStateDiffIndexer(
|
||||
return nil, nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver)
|
||||
}
|
||||
db := postgres.NewPostgresDB(driver, pgc.Upsert)
|
||||
ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db)
|
||||
ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db, isDiff)
|
||||
return db, ind, err
|
||||
case shared.DUMP:
|
||||
log.Info("Starting statediff service in data dump mode")
|
||||
|
@ -43,7 +43,7 @@ func setupLegacyCSVIndexer(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.CSVTestConfig, test.LegacyNodeInfo)
|
||||
ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.CSVTestConfig, test.LegacyNodeInfo, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
db, err = postgres.SetupSQLXDB()
|
||||
|
@ -41,7 +41,7 @@ func setupCSVIndexer(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.CSVTestConfig, test.LegacyNodeInfo)
|
||||
ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.CSVTestConfig, test.LegacyNodeInfo, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
db, err = postgres.SetupSQLXDB()
|
||||
|
@ -57,7 +57,8 @@ type tableRow struct {
|
||||
|
||||
type CSVWriter struct {
|
||||
// dir containing output files
|
||||
dir string
|
||||
dir string
|
||||
isDiff bool
|
||||
|
||||
writers fileWriters
|
||||
watchedAddressesWriter fileWriter
|
||||
@ -128,7 +129,7 @@ func (tx fileWriters) flush() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewCSVWriter(path string, watchedAddressesFilePath string) (*CSVWriter, error) {
|
||||
func NewCSVWriter(path string, watchedAddressesFilePath string, diff bool) (*CSVWriter, error) {
|
||||
if err := os.MkdirAll(path, 0777); err != nil {
|
||||
return nil, fmt.Errorf("unable to create directory '%s': %w", path, err)
|
||||
}
|
||||
@ -147,6 +148,7 @@ func NewCSVWriter(path string, watchedAddressesFilePath string) (*CSVWriter, err
|
||||
writers: writers,
|
||||
watchedAddressesWriter: watchedAddressesWriter,
|
||||
dir: path,
|
||||
isDiff: diff,
|
||||
rows: make(chan tableRow),
|
||||
flushChan: make(chan struct{}),
|
||||
flushFinished: make(chan struct{}),
|
||||
@ -278,14 +280,14 @@ func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) {
|
||||
|
||||
var values []interface{}
|
||||
values = append(values, stateNode.BlockNumber, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
|
||||
true, balance, strconv.FormatUint(stateNode.Nonce, 10), stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)
|
||||
csw.isDiff, balance, strconv.FormatUint(stateNode.Nonce, 10), stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)
|
||||
csw.rows <- tableRow{schema.TableStateNode, values}
|
||||
}
|
||||
|
||||
func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
|
||||
var values []interface{}
|
||||
values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
|
||||
true, storageCID.Value, storageCID.Removed)
|
||||
csw.isDiff, storageCID.Value, storageCID.Removed)
|
||||
csw.rows <- tableRow{schema.TableStorageNode, values}
|
||||
}
|
||||
|
||||
|
@ -63,7 +63,11 @@ type StateDiffIndexer struct {
|
||||
}
|
||||
|
||||
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
|
||||
func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInfo node.Info) (*StateDiffIndexer, error) {
|
||||
// `chainConfig` is used when processing chain state.
|
||||
// `config` contains configuration specific to the indexer.
|
||||
// `nodeInfo` contains metadata on the Ethereum node, which is inserted with the indexed state.
|
||||
// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot.
|
||||
func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInfo node.Info, isDiff bool) (*StateDiffIndexer, error) {
|
||||
var err error
|
||||
var writer FileWriter
|
||||
|
||||
@ -86,7 +90,7 @@ func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInf
|
||||
}
|
||||
log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath)
|
||||
|
||||
writer, err = NewCSVWriter(outputDir, watchedAddressesFilePath)
|
||||
writer, err = NewCSVWriter(outputDir, watchedAddressesFilePath, isDiff)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -109,7 +113,7 @@ func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInf
|
||||
}
|
||||
log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath)
|
||||
|
||||
writer = NewSQLWriter(file, watchedAddressesFilePath)
|
||||
writer = NewSQLWriter(file, watchedAddressesFilePath, isDiff)
|
||||
default:
|
||||
return nil, fmt.Errorf("unrecognized file mode: %s", config.Mode)
|
||||
}
|
||||
|
@ -83,7 +83,7 @@ func setupMainnetIndexer(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
ind, err = file.NewStateDiffIndexer(chainConf, file.CSVTestConfig, test.LegacyNodeInfo)
|
||||
ind, err = file.NewStateDiffIndexer(chainConf, file.CSVTestConfig, test.LegacyNodeInfo, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
db, err = postgres.SetupSQLXDB()
|
||||
|
@ -44,7 +44,7 @@ func setupLegacySQLIndexer(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.SQLTestConfig, test.LegacyNodeInfo)
|
||||
ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.SQLTestConfig, test.LegacyNodeInfo, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
db, err = postgres.SetupSQLXDB()
|
||||
|
@ -41,7 +41,7 @@ func setupIndexer(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.SQLTestConfig, test.LegacyNodeInfo)
|
||||
ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.SQLTestConfig, test.LegacyNodeInfo, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
db, err = postgres.SetupSQLXDB()
|
||||
|
@ -44,6 +44,7 @@ var (
|
||||
type SQLWriter struct {
|
||||
wc io.WriteCloser
|
||||
stmts chan []byte
|
||||
isDiff bool
|
||||
collatedStmt []byte
|
||||
collationIndex int
|
||||
|
||||
@ -55,11 +56,15 @@ type SQLWriter struct {
|
||||
watchedAddressesFilePath string
|
||||
}
|
||||
|
||||
// NewSQLWriter creates a new pointer to a Writer
|
||||
func NewSQLWriter(wc io.WriteCloser, watchedAddressesFilePath string) *SQLWriter {
|
||||
// NewSQLWriter creates a new Writer.
|
||||
// `wc` is the underlying io.WriteCloser to write to.
|
||||
// `watchedAddressesFilePath` is the path to the file containing watched addresses.
|
||||
// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot.
|
||||
func NewSQLWriter(wc io.WriteCloser, watchedAddressesFilePath string, isDiff bool) *SQLWriter {
|
||||
return &SQLWriter{
|
||||
wc: wc,
|
||||
stmts: make(chan []byte),
|
||||
isDiff: isDiff,
|
||||
collatedStmt: make([]byte, writeBufferSize),
|
||||
flushChan: make(chan struct{}),
|
||||
flushFinished: make(chan struct{}),
|
||||
@ -225,12 +230,12 @@ func (sqw *SQLWriter) upsertStateCID(stateNode models.StateNodeModel) {
|
||||
balance = "0"
|
||||
}
|
||||
sqw.stmts <- []byte(fmt.Sprintf(stateInsert, stateNode.BlockNumber, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
|
||||
stateNode.Removed, true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot))
|
||||
stateNode.Removed, sqw.isDiff, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot))
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
|
||||
sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
|
||||
storageCID.Removed, true, storageCID.Value))
|
||||
storageCID.Removed, sqw.isDiff, storageCID.Value))
|
||||
}
|
||||
|
||||
// LoadWatchedAddresses loads watched addresses from a file
|
||||
|
@ -43,7 +43,7 @@ import (
|
||||
|
||||
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
|
||||
|
||||
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of an SQL sql
|
||||
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of an SQL DB.
|
||||
type StateDiffIndexer struct {
|
||||
ctx context.Context
|
||||
chainConfig *params.ChainConfig
|
||||
@ -51,11 +51,17 @@ type StateDiffIndexer struct {
|
||||
}
|
||||
|
||||
// NewStateDiffIndexer creates a sql implementation of interfaces.StateDiffIndexer
|
||||
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, db Database) (*StateDiffIndexer, error) {
|
||||
// `ctx` is used to cancel the underlying DB connection.
|
||||
// `chainConfig` is used when processing chain state.
|
||||
// `db` is the backing database to use.
|
||||
// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot.
|
||||
func NewStateDiffIndexer(
|
||||
ctx context.Context, chainConfig *params.ChainConfig, db Database, isDiff bool,
|
||||
) (*StateDiffIndexer, error) {
|
||||
return &StateDiffIndexer{
|
||||
ctx: ctx,
|
||||
chainConfig: chainConfig,
|
||||
dbWriter: NewWriter(db),
|
||||
dbWriter: NewWriter(db, isDiff),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -71,7 +71,7 @@ func setupMainnetIndexer(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ind, err = sql.NewStateDiffIndexer(context.Background(), chainConf, db)
|
||||
ind, err = sql.NewStateDiffIndexer(context.Background(), chainConf, db, true)
|
||||
}
|
||||
|
||||
func checkTxClosure(t *testing.T, idle, inUse, open int64) {
|
||||
|
@ -33,7 +33,7 @@ func setupLegacyPGXIndexer(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ind, err = sql.NewStateDiffIndexer(context.Background(), test.LegacyConfig, db)
|
||||
ind, err = sql.NewStateDiffIndexer(context.Background(), test.LegacyConfig, db, true)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
@ -44,7 +44,7 @@ func setupPGXIndexer(t *testing.T, config postgres.Config) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestChainConfig, db)
|
||||
ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestChainConfig, db, true)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
@ -32,7 +32,7 @@ func setupLegacySQLXIndexer(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ind, err = sql.NewStateDiffIndexer(context.Background(), test.LegacyConfig, db)
|
||||
ind, err = sql.NewStateDiffIndexer(context.Background(), test.LegacyConfig, db, true)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
@ -34,7 +34,7 @@ func setupSQLXIndexer(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestChainConfig, db)
|
||||
ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestChainConfig, db, true)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
@ -34,13 +34,15 @@ import (
|
||||
|
||||
// Writer handles processing and writing of indexed IPLD objects to Postgres
|
||||
type Writer struct {
|
||||
db Database
|
||||
db Database
|
||||
isDiff bool
|
||||
}
|
||||
|
||||
// NewWriter creates a new pointer to a Writer
|
||||
func NewWriter(db Database) *Writer {
|
||||
// NewWriter creates a new pointer to a Writer. `diff` indicates whether this is part of an
|
||||
// incremental diff (as opposed to a snapshot).
|
||||
func NewWriter(db Database, diff bool) *Writer {
|
||||
return &Writer{
|
||||
db: db,
|
||||
db: db, isDiff: diff,
|
||||
}
|
||||
}
|
||||
|
||||
@ -308,7 +310,8 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
|
||||
_, err = tx.CopyFrom(w.db.Context(),
|
||||
schema.TableStateNode.TableName(), schema.TableStateNode.ColumnNames(),
|
||||
toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
|
||||
true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)))
|
||||
w.isDiff, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot,
|
||||
stateNode.Removed)))
|
||||
if err != nil {
|
||||
return insertError{"eth.state_cids", err, "COPY", stateNode}
|
||||
}
|
||||
@ -318,7 +321,7 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
|
||||
stateNode.HeaderID,
|
||||
stateNode.StateKey,
|
||||
stateNode.CID,
|
||||
true,
|
||||
w.isDiff,
|
||||
bal,
|
||||
stateNode.Nonce,
|
||||
stateNode.CodeHash,
|
||||
@ -346,7 +349,7 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
|
||||
_, err = tx.CopyFrom(w.db.Context(),
|
||||
schema.TableStorageNode.TableName(), schema.TableStorageNode.ColumnNames(),
|
||||
toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
|
||||
true, storageCID.Value, storageCID.Removed)))
|
||||
w.isDiff, storageCID.Value, storageCID.Removed)))
|
||||
if err != nil {
|
||||
return insertError{"eth.storage_cids", err, "COPY", storageCID}
|
||||
}
|
||||
@ -357,7 +360,7 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
|
||||
storageCID.StateKey,
|
||||
storageCID.StorageKey,
|
||||
storageCID.CID,
|
||||
true,
|
||||
w.isDiff,
|
||||
storageCID.Value,
|
||||
storageCID.Removed,
|
||||
)
|
||||
|
@ -52,6 +52,7 @@ func InitializeNode(stack core.Node, b core.Backend) {
|
||||
adapt.ChainConfig(backend.ChainConfig()),
|
||||
info,
|
||||
serviceConfig.IndexerConfig,
|
||||
true,
|
||||
)
|
||||
if err != nil {
|
||||
log.Error("failed to construct indexer", "error", err)
|
||||
|
Loading…
Reference in New Issue
Block a user