More tables

This commit is contained in:
Thomas E Lackey 2023-03-07 23:08:51 -06:00
parent 9b535e2901
commit 5b9fdc1616
5 changed files with 80 additions and 15 deletions

View File

@ -50,10 +50,14 @@ type Statements interface {
InsertTxStm() string
InsertAccessListElementStm() string
InsertRctStm() string
LogsTableName() []string
LogsColumnNames() []string
InsertLogStm() string
StateTableName() []string
StateColumnNames() []string
InsertStateStm() string
AccountTableName() []string
AccountColumnNames() []string
InsertAccountStm() string
StorageTableName() []string
StorageColumnNames() []string

View File

@ -30,7 +30,7 @@ import (
)
func setupPGXIndexer(t *testing.T) {
db, err = postgres.SetupPGXDB()
db, err = postgres.SetupPGXDB(postgres.DefaultConfig)
if err != nil {
t.Fatal(err)
}

View File

@ -137,3 +137,18 @@ func (db *DB) StateColumnNames() []string {
func (db *DB) StorageColumnNames() []string {
return []string{"block_number", "header_id", "state_path", "storage_leaf_key", "cid", "storage_path", "node_type", "diff", "mh_key"}
}
func (db *DB) LogsTableName() []string {
return []string{"eth", "log_cids"}
}
func (db *DB) LogsColumnNames() []string {
return []string{"block_number", "header_id", "leaf_cid", "leaf_mh_key", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3", "log_data"}
}
func (db *DB) AccountTableName() []string {
return []string{"eth", "state_accounts"}
}
func (db *DB) AccountColumnNames() []string {
return []string{"block_number", "header_id", "state_path", "balance", "nonce", "code_hash", "storage_root"}
}

View File

@ -35,8 +35,8 @@ func SetupSQLXDB() (sql.Database, error) {
}
// SetupPGXDB is used to setup a pgx db for tests
func SetupPGXDB() (sql.Database, error) {
driver, err := NewPGXDriver(context.Background(), DefaultConfig, node.Info{})
func SetupPGXDB(config Config) (sql.Database, error) {
driver, err := NewPGXDriver(context.Background(), config, node.Info{})
if err != nil {
return nil, err
}

View File

@ -127,14 +127,37 @@ INSERT INTO eth.log_cids (block_number, header_id, leaf_cid, leaf_mh_key, rct_id
ON CONFLICT (rct_id, index, header_id, block_number) DO NOTHING
*/
func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
for _, log := range logs {
_, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(),
log.BlockNumber, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1,
log.Topic2, log.Topic3, log.Data)
if err != nil {
return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log}
if w.db.UseCopyFrom() {
var rows [][]interface{}
for _, log := range logs {
var row []interface{}
blockNum, err := strconv.ParseInt(log.BlockNumber, 10, 64)
if err != nil {
return insertError{"eth.log_cids", err, "COPY", log}
}
row = append(row, blockNum, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address,
log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data)
rows = append(rows, row)
}
if nil != rows && len(rows) >= 0 {
_, err := tx.CopyFrom(w.db.Context(), w.db.LogsTableName(), w.db.LogsColumnNames(), rows)
if err != nil {
return insertError{"eth.log_cids", err, "COPY", rows}
}
metrics.IndexerMetrics.LogsCounter.Inc(int64(len(rows)))
}
} else {
for _, log := range logs {
_, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(),
log.BlockNumber, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1,
log.Topic2, log.Topic3, log.Data)
if err != nil {
return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log}
}
metrics.IndexerMetrics.LogsCounter.Inc(1)
}
metrics.IndexerMetrics.LogsCounter.Inc(1)
}
return nil
}
@ -181,11 +204,34 @@ INSERT INTO eth.state_accounts (block_number, header_id, state_path, balance, no
ON CONFLICT (header_id, state_path, block_number) DO NOTHING
*/
func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error {
_, err := tx.Exec(w.db.Context(), w.db.InsertAccountStm(),
stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance,
stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)
if err != nil {
return insertError{"eth.state_accounts", err, w.db.InsertAccountStm(), stateAccount}
if w.db.UseCopyFrom() {
var row []interface{}
blockNum, err := strconv.ParseInt(stateAccount.BlockNumber, 10, 64)
if err != nil {
return insertError{"eth.state_accounts", err, "COPY", stateAccount}
}
balance, err := strconv.ParseFloat(stateAccount.Balance, 64)
if err != nil {
return insertError{"eth.state_accounts", err, "COPY", stateAccount}
}
row = append(row, blockNum, stateAccount.HeaderID, stateAccount.StatePath, balance,
stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)
var rows [][]interface{}
rows = append(rows, row)
_, err = tx.CopyFrom(w.db.Context(), w.db.AccountTableName(), w.db.AccountColumnNames(), rows)
if err != nil {
return insertError{"eth.state_accounts", err, "COPY", stateAccount}
}
} else {
_, err := tx.Exec(w.db.Context(), w.db.InsertAccountStm(),
stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance,
stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)
if err != nil {
return insertError{"eth.state_accounts", err, w.db.InsertAccountStm(), stateAccount}
}
}
return nil
}