diff --git a/statediff/indexer/database/sql/interfaces.go b/statediff/indexer/database/sql/interfaces.go index 7ad295c98..890342902 100644 --- a/statediff/indexer/database/sql/interfaces.go +++ b/statediff/indexer/database/sql/interfaces.go @@ -50,10 +50,14 @@ type Statements interface { InsertTxStm() string InsertAccessListElementStm() string InsertRctStm() string + LogsTableName() []string + LogsColumnNames() []string InsertLogStm() string StateTableName() []string StateColumnNames() []string InsertStateStm() string + AccountTableName() []string + AccountColumnNames() []string InsertAccountStm() string StorageTableName() []string StorageColumnNames() []string diff --git a/statediff/indexer/database/sql/pgx_indexer_test.go b/statediff/indexer/database/sql/pgx_indexer_test.go index 1dbf2dfa0..c3a758b01 100644 --- a/statediff/indexer/database/sql/pgx_indexer_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_test.go @@ -30,7 +30,7 @@ import ( ) func setupPGXIndexer(t *testing.T) { - db, err = postgres.SetupPGXDB() + db, err = postgres.SetupPGXDB(postgres.DefaultConfig) if err != nil { t.Fatal(err) } diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index 4a0b7d9e4..06aa9c03e 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -137,3 +137,18 @@ func (db *DB) StateColumnNames() []string { func (db *DB) StorageColumnNames() []string { return []string{"block_number", "header_id", "state_path", "storage_leaf_key", "cid", "storage_path", "node_type", "diff", "mh_key"} } + +func (db *DB) LogsTableName() []string { + return []string{"eth", "log_cids"} +} + +func (db *DB) LogsColumnNames() []string { + return []string{"block_number", "header_id", "leaf_cid", "leaf_mh_key", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3", "log_data"} +} + +func (db *DB) AccountTableName() []string { + return []string{"eth", "state_accounts"} +} +func (db *DB) AccountColumnNames() []string { + return []string{"block_number", "header_id", "state_path", "balance", "nonce", "code_hash", "storage_root"} +} diff --git a/statediff/indexer/database/sql/postgres/test_helpers.go b/statediff/indexer/database/sql/postgres/test_helpers.go index f8311b413..cb5255429 100644 --- a/statediff/indexer/database/sql/postgres/test_helpers.go +++ b/statediff/indexer/database/sql/postgres/test_helpers.go @@ -35,8 +35,8 @@ func SetupSQLXDB() (sql.Database, error) { } // SetupPGXDB is used to setup a pgx db for tests -func SetupPGXDB() (sql.Database, error) { - driver, err := NewPGXDriver(context.Background(), DefaultConfig, node.Info{}) +func SetupPGXDB(config Config) (sql.Database, error) { + driver, err := NewPGXDriver(context.Background(), config, node.Info{}) if err != nil { return nil, err } diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 6384ccfed..80436b44e 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -127,14 +127,37 @@ INSERT INTO eth.log_cids (block_number, header_id, leaf_cid, leaf_mh_key, rct_id ON CONFLICT (rct_id, index, header_id, block_number) DO NOTHING */ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { - for _, log := range logs { - _, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(), - log.BlockNumber, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, - log.Topic2, log.Topic3, log.Data) - if err != nil { - return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log} + if w.db.UseCopyFrom() { + var rows [][]interface{} + for _, log := range logs { + var row []interface{} + blockNum, err := strconv.ParseInt(log.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.log_cids", err, "COPY", log} + } + + row = append(row, blockNum, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, + log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data) + + rows = append(rows, row) + } + if nil != rows && len(rows) >= 0 { + _, err := tx.CopyFrom(w.db.Context(), w.db.LogsTableName(), w.db.LogsColumnNames(), rows) + if err != nil { + return insertError{"eth.log_cids", err, "COPY", rows} + } + metrics.IndexerMetrics.LogsCounter.Inc(int64(len(rows))) + } + } else { + for _, log := range logs { + _, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(), + log.BlockNumber, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, + log.Topic2, log.Topic3, log.Data) + if err != nil { + return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log} + } + metrics.IndexerMetrics.LogsCounter.Inc(1) } - metrics.IndexerMetrics.LogsCounter.Inc(1) } return nil } @@ -181,11 +204,34 @@ INSERT INTO eth.state_accounts (block_number, header_id, state_path, balance, no ON CONFLICT (header_id, state_path, block_number) DO NOTHING */ func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error { - _, err := tx.Exec(w.db.Context(), w.db.InsertAccountStm(), - stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, - stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) - if err != nil { - return insertError{"eth.state_accounts", err, w.db.InsertAccountStm(), stateAccount} + if w.db.UseCopyFrom() { + var row []interface{} + blockNum, err := strconv.ParseInt(stateAccount.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.state_accounts", err, "COPY", stateAccount} + } + balance, err := strconv.ParseFloat(stateAccount.Balance, 64) + if err != nil { + return insertError{"eth.state_accounts", err, "COPY", stateAccount} + } + + row = append(row, blockNum, stateAccount.HeaderID, stateAccount.StatePath, balance, + stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) + + var rows [][]interface{} + rows = append(rows, row) + + _, err = tx.CopyFrom(w.db.Context(), w.db.AccountTableName(), w.db.AccountColumnNames(), rows) + if err != nil { + return insertError{"eth.state_accounts", err, "COPY", stateAccount} + } + } else { + _, err := tx.Exec(w.db.Context(), w.db.InsertAccountStm(), + stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, + stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) + if err != nil { + return insertError{"eth.state_accounts", err, w.db.InsertAccountStm(), stateAccount} + } } return nil }