diff --git a/indexer/constructor.go b/indexer/constructor.go index 13f6d65..46dc52c 100644 --- a/indexer/constructor.go +++ b/indexer/constructor.go @@ -33,11 +33,19 @@ import ( ) // NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface. +// The returned indexer is used to process state diffs and write them to a database. +// The returned SQL database, if non-nil, is the indexer's backing database. +// `ctx` is used to cancel an underlying DB connection. +// `chainConfig` is used when processing chain state. +// `nodeInfo` contains metadata on the Ethereum node, which is inserted with the indexed state. +// `config` contains configuration specific to the indexer. +// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot. func NewStateDiffIndexer( ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config, + isDiff bool, ) ( sql.Database, interfaces.StateDiffIndexer, @@ -50,7 +58,7 @@ func NewStateDiffIndexer( if !ok { return nil, nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{}) } - ind, err := file.NewStateDiffIndexer(chainConfig, fc, nodeInfo) + ind, err := file.NewStateDiffIndexer(chainConfig, fc, nodeInfo, isDiff) return nil, ind, err case shared.POSTGRES: log.Info("Starting statediff service in Postgres writing mode") @@ -75,7 +83,7 @@ func NewStateDiffIndexer( return nil, nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver) } db := postgres.NewPostgresDB(driver, pgc.Upsert) - ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db) + ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db, isDiff) return db, ind, err case shared.DUMP: log.Info("Starting statediff service in data dump mode") diff --git a/indexer/database/file/csv_indexer_legacy_test.go b/indexer/database/file/csv_indexer_legacy_test.go index 238423b..411b90b 100644 --- a/indexer/database/file/csv_indexer_legacy_test.go +++ b/indexer/database/file/csv_indexer_legacy_test.go @@ -43,7 +43,7 @@ func setupLegacyCSVIndexer(t *testing.T) { require.NoError(t, err) } - ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.CSVTestConfig, test.LegacyNodeInfo) + ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.CSVTestConfig, test.LegacyNodeInfo, true) require.NoError(t, err) db, err = postgres.SetupSQLXDB() diff --git a/indexer/database/file/csv_indexer_test.go b/indexer/database/file/csv_indexer_test.go index 06aa366..98095f2 100644 --- a/indexer/database/file/csv_indexer_test.go +++ b/indexer/database/file/csv_indexer_test.go @@ -41,7 +41,7 @@ func setupCSVIndexer(t *testing.T) { require.NoError(t, err) } - ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.CSVTestConfig, test.LegacyNodeInfo) + ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.CSVTestConfig, test.LegacyNodeInfo, true) require.NoError(t, err) db, err = postgres.SetupSQLXDB() diff --git a/indexer/database/file/csv_writer.go b/indexer/database/file/csv_writer.go index 5463e64..a634906 100644 --- a/indexer/database/file/csv_writer.go +++ b/indexer/database/file/csv_writer.go @@ -57,7 +57,8 @@ type tableRow struct { type CSVWriter struct { // dir containing output files - dir string + dir string + isDiff bool writers fileWriters watchedAddressesWriter fileWriter @@ -128,7 +129,7 @@ func (tx fileWriters) flush() error { return nil } -func NewCSVWriter(path string, watchedAddressesFilePath string) (*CSVWriter, error) { +func NewCSVWriter(path string, watchedAddressesFilePath string, diff bool) (*CSVWriter, error) { if err := os.MkdirAll(path, 0777); err != nil { return nil, fmt.Errorf("unable to create directory '%s': %w", path, err) } @@ -147,6 +148,7 @@ func NewCSVWriter(path string, watchedAddressesFilePath string) (*CSVWriter, err writers: writers, watchedAddressesWriter: watchedAddressesWriter, dir: path, + isDiff: diff, rows: make(chan tableRow), flushChan: make(chan struct{}), flushFinished: make(chan struct{}), @@ -278,14 +280,14 @@ func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) { var values []interface{} values = append(values, stateNode.BlockNumber, stateNode.HeaderID, stateNode.StateKey, stateNode.CID, - true, balance, strconv.FormatUint(stateNode.Nonce, 10), stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed) + csw.isDiff, balance, strconv.FormatUint(stateNode.Nonce, 10), stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed) csw.rows <- tableRow{schema.TableStateNode, values} } func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) { var values []interface{} values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID, - true, storageCID.Value, storageCID.Removed) + csw.isDiff, storageCID.Value, storageCID.Removed) csw.rows <- tableRow{schema.TableStorageNode, values} } diff --git a/indexer/database/file/indexer.go b/indexer/database/file/indexer.go index 56719b8..d47042d 100644 --- a/indexer/database/file/indexer.go +++ b/indexer/database/file/indexer.go @@ -63,7 +63,11 @@ type StateDiffIndexer struct { } // NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer -func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInfo node.Info) (*StateDiffIndexer, error) { +// `chainConfig` is used when processing chain state. +// `config` contains configuration specific to the indexer. +// `nodeInfo` contains metadata on the Ethereum node, which is inserted with the indexed state. +// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot. +func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInfo node.Info, isDiff bool) (*StateDiffIndexer, error) { var err error var writer FileWriter @@ -86,7 +90,7 @@ func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInf } log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath) - writer, err = NewCSVWriter(outputDir, watchedAddressesFilePath) + writer, err = NewCSVWriter(outputDir, watchedAddressesFilePath, isDiff) if err != nil { return nil, err } @@ -109,7 +113,7 @@ func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInf } log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath) - writer = NewSQLWriter(file, watchedAddressesFilePath) + writer = NewSQLWriter(file, watchedAddressesFilePath, isDiff) default: return nil, fmt.Errorf("unrecognized file mode: %s", config.Mode) } diff --git a/indexer/database/file/mainnet_tests/indexer_test.go b/indexer/database/file/mainnet_tests/indexer_test.go index cdb8042..df425d6 100644 --- a/indexer/database/file/mainnet_tests/indexer_test.go +++ b/indexer/database/file/mainnet_tests/indexer_test.go @@ -83,7 +83,7 @@ func setupMainnetIndexer(t *testing.T) { require.NoError(t, err) } - ind, err = file.NewStateDiffIndexer(chainConf, file.CSVTestConfig, test.LegacyNodeInfo) + ind, err = file.NewStateDiffIndexer(chainConf, file.CSVTestConfig, test.LegacyNodeInfo, true) require.NoError(t, err) db, err = postgres.SetupSQLXDB() diff --git a/indexer/database/file/sql_indexer_legacy_test.go b/indexer/database/file/sql_indexer_legacy_test.go index 6d721d5..26da928 100644 --- a/indexer/database/file/sql_indexer_legacy_test.go +++ b/indexer/database/file/sql_indexer_legacy_test.go @@ -44,7 +44,7 @@ func setupLegacySQLIndexer(t *testing.T) { require.NoError(t, err) } - ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.SQLTestConfig, test.LegacyNodeInfo) + ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.SQLTestConfig, test.LegacyNodeInfo, true) require.NoError(t, err) db, err = postgres.SetupSQLXDB() diff --git a/indexer/database/file/sql_indexer_test.go b/indexer/database/file/sql_indexer_test.go index 3aab7b8..688ae70 100644 --- a/indexer/database/file/sql_indexer_test.go +++ b/indexer/database/file/sql_indexer_test.go @@ -41,7 +41,7 @@ func setupIndexer(t *testing.T) { require.NoError(t, err) } - ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.SQLTestConfig, test.LegacyNodeInfo) + ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.SQLTestConfig, test.LegacyNodeInfo, true) require.NoError(t, err) db, err = postgres.SetupSQLXDB() diff --git a/indexer/database/file/sql_writer.go b/indexer/database/file/sql_writer.go index b6b5c4f..5326b6f 100644 --- a/indexer/database/file/sql_writer.go +++ b/indexer/database/file/sql_writer.go @@ -44,6 +44,7 @@ var ( type SQLWriter struct { wc io.WriteCloser stmts chan []byte + isDiff bool collatedStmt []byte collationIndex int @@ -55,11 +56,15 @@ type SQLWriter struct { watchedAddressesFilePath string } -// NewSQLWriter creates a new pointer to a Writer -func NewSQLWriter(wc io.WriteCloser, watchedAddressesFilePath string) *SQLWriter { +// NewSQLWriter creates a new Writer. +// `wc` is the underlying io.WriteCloser to write to. +// `watchedAddressesFilePath` is the path to the file containing watched addresses. +// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot. +func NewSQLWriter(wc io.WriteCloser, watchedAddressesFilePath string, isDiff bool) *SQLWriter { return &SQLWriter{ wc: wc, stmts: make(chan []byte), + isDiff: isDiff, collatedStmt: make([]byte, writeBufferSize), flushChan: make(chan struct{}), flushFinished: make(chan struct{}), @@ -225,12 +230,12 @@ func (sqw *SQLWriter) upsertStateCID(stateNode models.StateNodeModel) { balance = "0" } sqw.stmts <- []byte(fmt.Sprintf(stateInsert, stateNode.BlockNumber, stateNode.HeaderID, stateNode.StateKey, stateNode.CID, - stateNode.Removed, true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot)) + stateNode.Removed, sqw.isDiff, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot)) } func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) { sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID, - storageCID.Removed, true, storageCID.Value)) + storageCID.Removed, sqw.isDiff, storageCID.Value)) } // LoadWatchedAddresses loads watched addresses from a file diff --git a/indexer/database/sql/indexer.go b/indexer/database/sql/indexer.go index 6dab963..8db9d73 100644 --- a/indexer/database/sql/indexer.go +++ b/indexer/database/sql/indexer.go @@ -43,7 +43,7 @@ import ( var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} -// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of an SQL sql +// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of an SQL DB. type StateDiffIndexer struct { ctx context.Context chainConfig *params.ChainConfig @@ -51,11 +51,17 @@ type StateDiffIndexer struct { } // NewStateDiffIndexer creates a sql implementation of interfaces.StateDiffIndexer -func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, db Database) (*StateDiffIndexer, error) { +// `ctx` is used to cancel the underlying DB connection. +// `chainConfig` is used when processing chain state. +// `db` is the backing database to use. +// `isDiff` means to mark state nodes as belonging to an incremental diff, as opposed to a full snapshot. +func NewStateDiffIndexer( + ctx context.Context, chainConfig *params.ChainConfig, db Database, isDiff bool, +) (*StateDiffIndexer, error) { return &StateDiffIndexer{ ctx: ctx, chainConfig: chainConfig, - dbWriter: NewWriter(db), + dbWriter: NewWriter(db, isDiff), }, nil } diff --git a/indexer/database/sql/mainnet_tests/indexer_test.go b/indexer/database/sql/mainnet_tests/indexer_test.go index 22bc6e6..0d2f4ab 100644 --- a/indexer/database/sql/mainnet_tests/indexer_test.go +++ b/indexer/database/sql/mainnet_tests/indexer_test.go @@ -71,7 +71,7 @@ func setupMainnetIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - ind, err = sql.NewStateDiffIndexer(context.Background(), chainConf, db) + ind, err = sql.NewStateDiffIndexer(context.Background(), chainConf, db, true) } func checkTxClosure(t *testing.T, idle, inUse, open int64) { diff --git a/indexer/database/sql/pgx_indexer_legacy_test.go b/indexer/database/sql/pgx_indexer_legacy_test.go index 689b11e..d68305c 100644 --- a/indexer/database/sql/pgx_indexer_legacy_test.go +++ b/indexer/database/sql/pgx_indexer_legacy_test.go @@ -33,7 +33,7 @@ func setupLegacyPGXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - ind, err = sql.NewStateDiffIndexer(context.Background(), test.LegacyConfig, db) + ind, err = sql.NewStateDiffIndexer(context.Background(), test.LegacyConfig, db, true) require.NoError(t, err) } diff --git a/indexer/database/sql/pgx_indexer_test.go b/indexer/database/sql/pgx_indexer_test.go index 2adee95..944d682 100644 --- a/indexer/database/sql/pgx_indexer_test.go +++ b/indexer/database/sql/pgx_indexer_test.go @@ -44,7 +44,7 @@ func setupPGXIndexer(t *testing.T, config postgres.Config) { if err != nil { t.Fatal(err) } - ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestChainConfig, db) + ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestChainConfig, db, true) require.NoError(t, err) } diff --git a/indexer/database/sql/sqlx_indexer_legacy_test.go b/indexer/database/sql/sqlx_indexer_legacy_test.go index af96940..05ea7d5 100644 --- a/indexer/database/sql/sqlx_indexer_legacy_test.go +++ b/indexer/database/sql/sqlx_indexer_legacy_test.go @@ -32,7 +32,7 @@ func setupLegacySQLXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - ind, err = sql.NewStateDiffIndexer(context.Background(), test.LegacyConfig, db) + ind, err = sql.NewStateDiffIndexer(context.Background(), test.LegacyConfig, db, true) require.NoError(t, err) } diff --git a/indexer/database/sql/sqlx_indexer_test.go b/indexer/database/sql/sqlx_indexer_test.go index 3aa35c4..fb099ef 100644 --- a/indexer/database/sql/sqlx_indexer_test.go +++ b/indexer/database/sql/sqlx_indexer_test.go @@ -34,7 +34,7 @@ func setupSQLXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestChainConfig, db) + ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestChainConfig, db, true) require.NoError(t, err) } diff --git a/indexer/database/sql/writer.go b/indexer/database/sql/writer.go index e5e0c81..752761b 100644 --- a/indexer/database/sql/writer.go +++ b/indexer/database/sql/writer.go @@ -34,13 +34,15 @@ import ( // Writer handles processing and writing of indexed IPLD objects to Postgres type Writer struct { - db Database + db Database + isDiff bool } -// NewWriter creates a new pointer to a Writer -func NewWriter(db Database) *Writer { +// NewWriter creates a new pointer to a Writer. `diff` indicates whether this is part of an +// incremental diff (as opposed to a snapshot). +func NewWriter(db Database, diff bool) *Writer { return &Writer{ - db: db, + db: db, isDiff: diff, } } @@ -308,7 +310,8 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { _, err = tx.CopyFrom(w.db.Context(), schema.TableStateNode.TableName(), schema.TableStateNode.ColumnNames(), toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID, - true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed))) + w.isDiff, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, + stateNode.Removed))) if err != nil { return insertError{"eth.state_cids", err, "COPY", stateNode} } @@ -318,7 +321,7 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { stateNode.HeaderID, stateNode.StateKey, stateNode.CID, - true, + w.isDiff, bal, stateNode.Nonce, stateNode.CodeHash, @@ -346,7 +349,7 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err _, err = tx.CopyFrom(w.db.Context(), schema.TableStorageNode.TableName(), schema.TableStorageNode.ColumnNames(), toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID, - true, storageCID.Value, storageCID.Removed))) + w.isDiff, storageCID.Value, storageCID.Removed))) if err != nil { return insertError{"eth.storage_cids", err, "COPY", storageCID} } @@ -357,7 +360,7 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err storageCID.StateKey, storageCID.StorageKey, storageCID.CID, - true, + w.isDiff, storageCID.Value, storageCID.Removed, ) diff --git a/main/main.go b/main/main.go index 20f9aca..61cce90 100644 --- a/main/main.go +++ b/main/main.go @@ -52,6 +52,7 @@ func InitializeNode(stack core.Node, b core.Backend) { adapt.ChainConfig(backend.ChainConfig()), info, serviceConfig.IndexerConfig, + true, ) if err != nil { log.Error("failed to construct indexer", "error", err)