Set state node diff
field to false for snapshots
#18
@ -33,11 +33,19 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface.
|
// NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface.
|
||||||
|
// The returned indexer is used to process state diffs and write them to a database.
|
||||||
|
// The returned SQL database, if non-nil, is the indexer's backing database.
|
||||||
|
// `ctx` is used to cancel an underlying DB connection.
|
||||||
|
// `chainConfig` is used when processing chain state.
|
||||||
|
// `nodeInfo` contains metadata on the Ethereum node, which is inserted with the indexed state.
|
||||||
|
// `config` contains configuration specific to the indexer.
|
||||||
roysc marked this conversation as resolved
Outdated
|
|||||||
|
// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot.
|
||||||
func NewStateDiffIndexer(
|
func NewStateDiffIndexer(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
chainConfig *params.ChainConfig,
|
chainConfig *params.ChainConfig,
|
||||||
nodeInfo node.Info,
|
nodeInfo node.Info,
|
||||||
config interfaces.Config,
|
config interfaces.Config,
|
||||||
|
isDiff bool,
|
||||||
) (
|
) (
|
||||||
sql.Database,
|
sql.Database,
|
||||||
interfaces.StateDiffIndexer,
|
interfaces.StateDiffIndexer,
|
||||||
@ -50,7 +58,7 @@ func NewStateDiffIndexer(
|
|||||||
if !ok {
|
if !ok {
|
||||||
return nil, nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{})
|
return nil, nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{})
|
||||||
}
|
}
|
||||||
ind, err := file.NewStateDiffIndexer(chainConfig, fc, nodeInfo)
|
ind, err := file.NewStateDiffIndexer(chainConfig, fc, nodeInfo, isDiff)
|
||||||
return nil, ind, err
|
return nil, ind, err
|
||||||
case shared.POSTGRES:
|
case shared.POSTGRES:
|
||||||
log.Info("Starting statediff service in Postgres writing mode")
|
log.Info("Starting statediff service in Postgres writing mode")
|
||||||
@ -75,7 +83,7 @@ func NewStateDiffIndexer(
|
|||||||
return nil, nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver)
|
return nil, nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver)
|
||||||
}
|
}
|
||||||
db := postgres.NewPostgresDB(driver, pgc.Upsert)
|
db := postgres.NewPostgresDB(driver, pgc.Upsert)
|
||||||
ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db)
|
ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db, isDiff)
|
||||||
return db, ind, err
|
return db, ind, err
|
||||||
case shared.DUMP:
|
case shared.DUMP:
|
||||||
log.Info("Starting statediff service in data dump mode")
|
log.Info("Starting statediff service in data dump mode")
|
||||||
|
@ -43,7 +43,7 @@ func setupLegacyCSVIndexer(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.CSVTestConfig, test.LegacyNodeInfo)
|
ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.CSVTestConfig, test.LegacyNodeInfo, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
db, err = postgres.SetupSQLXDB()
|
db, err = postgres.SetupSQLXDB()
|
||||||
|
@ -41,7 +41,7 @@ func setupCSVIndexer(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.CSVTestConfig, test.LegacyNodeInfo)
|
ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.CSVTestConfig, test.LegacyNodeInfo, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
db, err = postgres.SetupSQLXDB()
|
db, err = postgres.SetupSQLXDB()
|
||||||
|
@ -57,7 +57,8 @@ type tableRow struct {
|
|||||||
|
|
||||||
type CSVWriter struct {
|
type CSVWriter struct {
|
||||||
// dir containing output files
|
// dir containing output files
|
||||||
dir string
|
dir string
|
||||||
|
isDiff bool
|
||||||
|
|
||||||
writers fileWriters
|
writers fileWriters
|
||||||
watchedAddressesWriter fileWriter
|
watchedAddressesWriter fileWriter
|
||||||
@ -128,7 +129,7 @@ func (tx fileWriters) flush() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCSVWriter(path string, watchedAddressesFilePath string) (*CSVWriter, error) {
|
func NewCSVWriter(path string, watchedAddressesFilePath string, diff bool) (*CSVWriter, error) {
|
||||||
if err := os.MkdirAll(path, 0777); err != nil {
|
if err := os.MkdirAll(path, 0777); err != nil {
|
||||||
return nil, fmt.Errorf("unable to create directory '%s': %w", path, err)
|
return nil, fmt.Errorf("unable to create directory '%s': %w", path, err)
|
||||||
}
|
}
|
||||||
@ -147,6 +148,7 @@ func NewCSVWriter(path string, watchedAddressesFilePath string) (*CSVWriter, err
|
|||||||
writers: writers,
|
writers: writers,
|
||||||
watchedAddressesWriter: watchedAddressesWriter,
|
watchedAddressesWriter: watchedAddressesWriter,
|
||||||
dir: path,
|
dir: path,
|
||||||
|
isDiff: diff,
|
||||||
rows: make(chan tableRow),
|
rows: make(chan tableRow),
|
||||||
flushChan: make(chan struct{}),
|
flushChan: make(chan struct{}),
|
||||||
flushFinished: make(chan struct{}),
|
flushFinished: make(chan struct{}),
|
||||||
@ -278,14 +280,14 @@ func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) {
|
|||||||
|
|
||||||
var values []interface{}
|
var values []interface{}
|
||||||
values = append(values, stateNode.BlockNumber, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
|
values = append(values, stateNode.BlockNumber, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
|
||||||
true, balance, strconv.FormatUint(stateNode.Nonce, 10), stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)
|
csw.isDiff, balance, strconv.FormatUint(stateNode.Nonce, 10), stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)
|
||||||
csw.rows <- tableRow{schema.TableStateNode, values}
|
csw.rows <- tableRow{schema.TableStateNode, values}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
|
func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
|
||||||
var values []interface{}
|
var values []interface{}
|
||||||
values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
|
values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
|
||||||
true, storageCID.Value, storageCID.Removed)
|
csw.isDiff, storageCID.Value, storageCID.Removed)
|
||||||
csw.rows <- tableRow{schema.TableStorageNode, values}
|
csw.rows <- tableRow{schema.TableStorageNode, values}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,7 +63,11 @@ type StateDiffIndexer struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
|
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
|
||||||
func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInfo node.Info) (*StateDiffIndexer, error) {
|
// `chainConfig` is used when processing chain state.
|
||||||
roysc marked this conversation as resolved
Outdated
telackey
commented
Same as above. Same as above.
|
|||||||
|
// `config` contains configuration specific to the indexer.
|
||||||
|
// `nodeInfo` contains metadata on the Ethereum node, which is inserted with the indexed state.
|
||||||
|
// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot.
|
||||||
|
func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInfo node.Info, isDiff bool) (*StateDiffIndexer, error) {
|
||||||
var err error
|
var err error
|
||||||
var writer FileWriter
|
var writer FileWriter
|
||||||
|
|
||||||
@ -86,7 +90,7 @@ func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInf
|
|||||||
}
|
}
|
||||||
log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath)
|
log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath)
|
||||||
|
|
||||||
writer, err = NewCSVWriter(outputDir, watchedAddressesFilePath)
|
writer, err = NewCSVWriter(outputDir, watchedAddressesFilePath, isDiff)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -109,7 +113,7 @@ func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInf
|
|||||||
}
|
}
|
||||||
log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath)
|
log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath)
|
||||||
|
|
||||||
writer = NewSQLWriter(file, watchedAddressesFilePath)
|
writer = NewSQLWriter(file, watchedAddressesFilePath, isDiff)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unrecognized file mode: %s", config.Mode)
|
return nil, fmt.Errorf("unrecognized file mode: %s", config.Mode)
|
||||||
}
|
}
|
||||||
|
@ -83,7 +83,7 @@ func setupMainnetIndexer(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ind, err = file.NewStateDiffIndexer(chainConf, file.CSVTestConfig, test.LegacyNodeInfo)
|
ind, err = file.NewStateDiffIndexer(chainConf, file.CSVTestConfig, test.LegacyNodeInfo, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
db, err = postgres.SetupSQLXDB()
|
db, err = postgres.SetupSQLXDB()
|
||||||
|
@ -44,7 +44,7 @@ func setupLegacySQLIndexer(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.SQLTestConfig, test.LegacyNodeInfo)
|
ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.SQLTestConfig, test.LegacyNodeInfo, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
db, err = postgres.SetupSQLXDB()
|
db, err = postgres.SetupSQLXDB()
|
||||||
|
@ -41,7 +41,7 @@ func setupIndexer(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.SQLTestConfig, test.LegacyNodeInfo)
|
ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.SQLTestConfig, test.LegacyNodeInfo, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
db, err = postgres.SetupSQLXDB()
|
db, err = postgres.SetupSQLXDB()
|
||||||
|
@ -44,6 +44,7 @@ var (
|
|||||||
type SQLWriter struct {
|
type SQLWriter struct {
|
||||||
wc io.WriteCloser
|
wc io.WriteCloser
|
||||||
stmts chan []byte
|
stmts chan []byte
|
||||||
|
isDiff bool
|
||||||
collatedStmt []byte
|
collatedStmt []byte
|
||||||
collationIndex int
|
collationIndex int
|
||||||
|
|
||||||
@ -55,11 +56,15 @@ type SQLWriter struct {
|
|||||||
watchedAddressesFilePath string
|
watchedAddressesFilePath string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSQLWriter creates a new pointer to a Writer
|
// NewSQLWriter creates a new Writer.
|
||||||
func NewSQLWriter(wc io.WriteCloser, watchedAddressesFilePath string) *SQLWriter {
|
// `wc` is the underlying io.WriteCloser to write to.
|
||||||
roysc marked this conversation as resolved
Outdated
telackey
commented
Same. Same.
|
|||||||
|
// `watchedAddressesFilePath` is the path to the file containing watched addresses.
|
||||||
|
// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot.
|
||||||
|
func NewSQLWriter(wc io.WriteCloser, watchedAddressesFilePath string, isDiff bool) *SQLWriter {
|
||||||
return &SQLWriter{
|
return &SQLWriter{
|
||||||
wc: wc,
|
wc: wc,
|
||||||
stmts: make(chan []byte),
|
stmts: make(chan []byte),
|
||||||
|
isDiff: isDiff,
|
||||||
collatedStmt: make([]byte, writeBufferSize),
|
collatedStmt: make([]byte, writeBufferSize),
|
||||||
flushChan: make(chan struct{}),
|
flushChan: make(chan struct{}),
|
||||||
flushFinished: make(chan struct{}),
|
flushFinished: make(chan struct{}),
|
||||||
@ -225,12 +230,12 @@ func (sqw *SQLWriter) upsertStateCID(stateNode models.StateNodeModel) {
|
|||||||
balance = "0"
|
balance = "0"
|
||||||
}
|
}
|
||||||
sqw.stmts <- []byte(fmt.Sprintf(stateInsert, stateNode.BlockNumber, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
|
sqw.stmts <- []byte(fmt.Sprintf(stateInsert, stateNode.BlockNumber, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
|
||||||
stateNode.Removed, true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot))
|
stateNode.Removed, sqw.isDiff, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
|
func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
|
||||||
sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
|
sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
|
||||||
storageCID.Removed, true, storageCID.Value))
|
storageCID.Removed, sqw.isDiff, storageCID.Value))
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadWatchedAddresses loads watched addresses from a file
|
// LoadWatchedAddresses loads watched addresses from a file
|
||||||
|
@ -43,7 +43,7 @@ import (
|
|||||||
|
|
||||||
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
|
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
|
||||||
|
|
||||||
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of an SQL sql
|
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of an SQL DB.
|
||||||
type StateDiffIndexer struct {
|
type StateDiffIndexer struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
chainConfig *params.ChainConfig
|
chainConfig *params.ChainConfig
|
||||||
@ -51,11 +51,17 @@ type StateDiffIndexer struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewStateDiffIndexer creates a sql implementation of interfaces.StateDiffIndexer
|
// NewStateDiffIndexer creates a sql implementation of interfaces.StateDiffIndexer
|
||||||
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, db Database) (*StateDiffIndexer, error) {
|
// `ctx` is used to cancel the underlying DB connection.
|
||||||
|
// `chainConfig` is used when processing chain state.
|
||||||
roysc marked this conversation as resolved
Outdated
telackey
commented
Same. Same.
|
|||||||
|
// `db` is the backing database to use.
|
||||||
|
// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot.
|
||||||
|
func NewStateDiffIndexer(
|
||||||
|
ctx context.Context, chainConfig *params.ChainConfig, db Database, isDiff bool,
|
||||||
|
) (*StateDiffIndexer, error) {
|
||||||
return &StateDiffIndexer{
|
return &StateDiffIndexer{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
chainConfig: chainConfig,
|
chainConfig: chainConfig,
|
||||||
dbWriter: NewWriter(db),
|
dbWriter: NewWriter(db, isDiff),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,7 +71,7 @@ func setupMainnetIndexer(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
ind, err = sql.NewStateDiffIndexer(context.Background(), chainConf, db)
|
ind, err = sql.NewStateDiffIndexer(context.Background(), chainConf, db, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkTxClosure(t *testing.T, idle, inUse, open int64) {
|
func checkTxClosure(t *testing.T, idle, inUse, open int64) {
|
||||||
|
@ -33,7 +33,7 @@ func setupLegacyPGXIndexer(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
ind, err = sql.NewStateDiffIndexer(context.Background(), test.LegacyConfig, db)
|
ind, err = sql.NewStateDiffIndexer(context.Background(), test.LegacyConfig, db, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,7 +44,7 @@ func setupPGXIndexer(t *testing.T, config postgres.Config) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestChainConfig, db)
|
ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestChainConfig, db, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,7 +32,7 @@ func setupLegacySQLXIndexer(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
ind, err = sql.NewStateDiffIndexer(context.Background(), test.LegacyConfig, db)
|
ind, err = sql.NewStateDiffIndexer(context.Background(), test.LegacyConfig, db, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,7 +34,7 @@ func setupSQLXIndexer(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestChainConfig, db)
|
ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestChainConfig, db, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,13 +34,15 @@ import (
|
|||||||
|
|
||||||
// Writer handles processing and writing of indexed IPLD objects to Postgres
|
// Writer handles processing and writing of indexed IPLD objects to Postgres
|
||||||
type Writer struct {
|
type Writer struct {
|
||||||
db Database
|
db Database
|
||||||
|
isDiff bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWriter creates a new pointer to a Writer
|
// NewWriter creates a new pointer to a Writer. `diff` indicates whether this is part of an
|
||||||
func NewWriter(db Database) *Writer {
|
// incremental diff (as opposed to a snapshot).
|
||||||
|
func NewWriter(db Database, diff bool) *Writer {
|
||||||
return &Writer{
|
return &Writer{
|
||||||
db: db,
|
db: db, isDiff: diff,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -308,7 +310,8 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
|
|||||||
_, err = tx.CopyFrom(w.db.Context(),
|
_, err = tx.CopyFrom(w.db.Context(),
|
||||||
schema.TableStateNode.TableName(), schema.TableStateNode.ColumnNames(),
|
schema.TableStateNode.TableName(), schema.TableStateNode.ColumnNames(),
|
||||||
toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
|
toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
|
||||||
true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)))
|
w.isDiff, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot,
|
||||||
|
stateNode.Removed)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return insertError{"eth.state_cids", err, "COPY", stateNode}
|
return insertError{"eth.state_cids", err, "COPY", stateNode}
|
||||||
}
|
}
|
||||||
@ -318,7 +321,7 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
|
|||||||
stateNode.HeaderID,
|
stateNode.HeaderID,
|
||||||
stateNode.StateKey,
|
stateNode.StateKey,
|
||||||
stateNode.CID,
|
stateNode.CID,
|
||||||
true,
|
w.isDiff,
|
||||||
bal,
|
bal,
|
||||||
stateNode.Nonce,
|
stateNode.Nonce,
|
||||||
stateNode.CodeHash,
|
stateNode.CodeHash,
|
||||||
@ -346,7 +349,7 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
|
|||||||
_, err = tx.CopyFrom(w.db.Context(),
|
_, err = tx.CopyFrom(w.db.Context(),
|
||||||
schema.TableStorageNode.TableName(), schema.TableStorageNode.ColumnNames(),
|
schema.TableStorageNode.TableName(), schema.TableStorageNode.ColumnNames(),
|
||||||
toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
|
toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
|
||||||
true, storageCID.Value, storageCID.Removed)))
|
w.isDiff, storageCID.Value, storageCID.Removed)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return insertError{"eth.storage_cids", err, "COPY", storageCID}
|
return insertError{"eth.storage_cids", err, "COPY", storageCID}
|
||||||
}
|
}
|
||||||
@ -357,7 +360,7 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
|
|||||||
storageCID.StateKey,
|
storageCID.StateKey,
|
||||||
storageCID.StorageKey,
|
storageCID.StorageKey,
|
||||||
storageCID.CID,
|
storageCID.CID,
|
||||||
true,
|
w.isDiff,
|
||||||
storageCID.Value,
|
storageCID.Value,
|
||||||
storageCID.Removed,
|
storageCID.Removed,
|
||||||
)
|
)
|
||||||
|
@ -52,6 +52,7 @@ func InitializeNode(stack core.Node, b core.Backend) {
|
|||||||
adapt.ChainConfig(backend.ChainConfig()),
|
adapt.ChainConfig(backend.ChainConfig()),
|
||||||
info,
|
info,
|
||||||
serviceConfig.IndexerConfig,
|
serviceConfig.IndexerConfig,
|
||||||
|
true,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("failed to construct indexer", "error", err)
|
log.Error("failed to construct indexer", "error", err)
|
||||||
|
Loading…
Reference in New Issue
Block a user
Could you add a comment as to the purpose of the flag?