Updating full_sync_receipts to have FK reference to addresses

This commit is contained in:
Elizabeth Engelman 2019-08-02 10:09:59 -05:00
parent 258035833b
commit bcd6d14fcd
13 changed files with 75 additions and 62 deletions

View File

@ -1,14 +1,14 @@
-- +goose Up -- +goose Up
CREATE TABLE full_sync_receipts CREATE TABLE full_sync_receipts
( (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
transaction_id INTEGER NOT NULL REFERENCES full_sync_transactions (id) ON DELETE CASCADE, transaction_id INTEGER NOT NULL REFERENCES full_sync_transactions (id) ON DELETE CASCADE,
contract_address VARCHAR(42), contract_address_id INTEGER NOT NULL REFERENCES addresses (id) ON DELETE CASCADE,
cumulative_gas_used NUMERIC, cumulative_gas_used NUMERIC,
gas_used NUMERIC, gas_used NUMERIC,
state_root VARCHAR(66), state_root VARCHAR(66),
status INTEGER, status INTEGER,
tx_hash VARCHAR(66) tx_hash VARCHAR(66)
); );

View File

@ -174,7 +174,7 @@ CREATE TABLE public.eth_nodes (
CREATE TABLE public.full_sync_receipts ( CREATE TABLE public.full_sync_receipts (
id integer NOT NULL, id integer NOT NULL,
contract_address character varying(42), contract_address_id integer NOT NULL,
cumulative_gas_used numeric, cumulative_gas_used numeric,
gas_used numeric, gas_used numeric,
state_root character varying(66), state_root character varying(66),
@ -949,6 +949,14 @@ ALTER TABLE ONLY public.checked_headers
ADD CONSTRAINT checked_headers_header_id_fkey FOREIGN KEY (header_id) REFERENCES public.headers(id) ON DELETE CASCADE; ADD CONSTRAINT checked_headers_header_id_fkey FOREIGN KEY (header_id) REFERENCES public.headers(id) ON DELETE CASCADE;
--
-- Name: full_sync_receipts full_sync_receipts_contract_address_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.full_sync_receipts
ADD CONSTRAINT full_sync_receipts_contract_address_id_fkey FOREIGN KEY (contract_address_id) REFERENCES public.addresses(id) ON DELETE CASCADE;
-- --
-- Name: full_sync_transactions full_sync_transactions_block_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- Name: full_sync_transactions full_sync_transactions_block_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: -
-- --

View File

@ -17,7 +17,9 @@
package retriever package retriever
import ( import (
"database/sql"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
) )
// Block retriever is used to retrieve the first block for a given contract and the most recent block // Block retriever is used to retrieve the first block for a given contract and the most recent block
@ -41,7 +43,10 @@ func NewBlockRetriever(db *postgres.DB) (r *blockRetriever) {
func (r *blockRetriever) RetrieveFirstBlock(contractAddr string) (int64, error) { func (r *blockRetriever) RetrieveFirstBlock(contractAddr string) (int64, error) {
i, err := r.retrieveFirstBlockFromReceipts(contractAddr) i, err := r.retrieveFirstBlockFromReceipts(contractAddr)
if err != nil { if err != nil {
i, err = r.retrieveFirstBlockFromLogs(contractAddr) if err == sql.ErrNoRows {
i, err = r.retrieveFirstBlockFromLogs(contractAddr)
}
return i, err
} }
return i, err return i, err
@ -49,18 +54,26 @@ func (r *blockRetriever) RetrieveFirstBlock(contractAddr string) (int64, error)
// For some contracts the contract creation transaction receipt doesn't have the contract address so this doesn't work (e.g. Sai) // For some contracts the contract creation transaction receipt doesn't have the contract address so this doesn't work (e.g. Sai)
func (r *blockRetriever) retrieveFirstBlockFromReceipts(contractAddr string) (int64, error) { func (r *blockRetriever) retrieveFirstBlockFromReceipts(contractAddr string) (int64, error) {
var firstBlock int var firstBlock int64
addressId, getAddressErr := addressRepository().GetOrCreateAddress(r.db, contractAddr)
if getAddressErr != nil {
return firstBlock, getAddressErr
}
err := r.db.Get( err := r.db.Get(
&firstBlock, &firstBlock,
`SELECT number FROM blocks `SELECT number FROM blocks
WHERE id = (SELECT block_id FROM full_sync_receipts WHERE id = (SELECT block_id FROM full_sync_receipts
WHERE lower(contract_address) = $1 WHERE contract_address_id = $1
ORDER BY block_id ASC ORDER BY block_id ASC
LIMIT 1)`, LIMIT 1)`,
contractAddr, addressId,
) )
return int64(firstBlock), err return firstBlock, err
}
func addressRepository() repositories.AddressRepository {
return repositories.AddressRepository{}
} }
// In which case this servers as a heuristic to find the first block by finding the first contract event log // In which case this servers as a heuristic to find the first block by finding the first contract event log

View File

@ -225,6 +225,9 @@ func TearDown(db *postgres.DB) {
tx, err := db.Beginx() tx, err := db.Beginx()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM addresses`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM blocks`) _, err = tx.Exec(`DELETE FROM blocks`)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())

View File

@ -51,7 +51,7 @@ func (repo AddressRepository) GetOrCreateAddressInTransaction(tx *sqlx.Tx, addre
return addressId, getErr return addressId, getErr
} }
func (repo AddressRepository) GetAddressById(db *postgres.DB, id int) (string, error){ func (repo AddressRepository) GetAddressById(db *postgres.DB, id int) (string, error) {
var address string var address string
getErr := db.Get(&address, `SELECT address FROM public.addresses WHERE id = $1`, id) getErr := db.Get(&address, `SELECT address FROM public.addresses WHERE id = $1`, id)
if getErr != nil { if getErr != nil {

View File

@ -164,4 +164,3 @@ var _ = Describe("address lookup", func() {
}) })
}) })
}) })

View File

@ -234,7 +234,8 @@ func (blockRepository BlockRepository) createTransaction(tx *sqlx.Tx, blockId in
return err return err
} }
if hasReceipt(transaction) { if hasReceipt(transaction) {
receiptId, err := blockRepository.createReceipt(tx, blockId, transaction.Receipt)
receiptId, err := receiptRepository().CreateReceipt(blockId, transaction.Receipt, tx)
if err != nil { if err != nil {
return err return err
} }
@ -248,6 +249,11 @@ func (blockRepository BlockRepository) createTransaction(tx *sqlx.Tx, blockId in
return nil return nil
} }
func receiptRepository() datastore.ReceiptRepository {
//TODO: set db?
return ReceiptRepository{}
}
func hasLogs(transaction core.TransactionModel) bool { func hasLogs(transaction core.TransactionModel) bool {
return len(transaction.Receipt.Logs) > 0 return len(transaction.Receipt.Logs) > 0
} }
@ -256,22 +262,6 @@ func hasReceipt(transaction core.TransactionModel) bool {
return transaction.Receipt.TxHash != "" return transaction.Receipt.TxHash != ""
} }
func (blockRepository BlockRepository) createReceipt(tx *sqlx.Tx, blockId int64, receipt core.Receipt) (int, error) {
//Not currently persisting log bloom filters
var receiptId int
err := tx.QueryRow(
`INSERT INTO full_sync_receipts
(contract_address, tx_hash, cumulative_gas_used, gas_used, state_root, status, block_id)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id`,
receipt.ContractAddress, receipt.TxHash, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, blockId).Scan(&receiptId)
if err != nil {
log.Error("createReceipt: error inserting receipt: ", err)
return receiptId, err
}
return receiptId, nil
}
func (blockRepository BlockRepository) getBlockHash(block core.Block) (string, bool) { func (blockRepository BlockRepository) getBlockHash(block core.Block) (string, bool) {
var retrievedBlockHash string var retrievedBlockHash string
// TODO: handle possible error // TODO: handle possible error
@ -283,7 +273,7 @@ func (blockRepository BlockRepository) getBlockHash(block core.Block) (string, b
return retrievedBlockHash, blockExists(retrievedBlockHash) return retrievedBlockHash, blockExists(retrievedBlockHash)
} }
func (blockRepository BlockRepository) createLogs(tx *sqlx.Tx, logs []core.Log, receiptId int) error { func (blockRepository BlockRepository) createLogs(tx *sqlx.Tx, logs []core.Log, receiptId int64) error {
for _, tlog := range logs { for _, tlog := range logs {
_, err := tx.Exec( _, err := tx.Exec(
`INSERT INTO logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data, receipt_id) `INSERT INTO logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data, receipt_id)

View File

@ -55,7 +55,9 @@ var _ = Describe("Logs Repository", func() {
blockNumber := int64(12345) blockNumber := int64(12345)
blockId, err := blockRepository.CreateOrUpdateBlock(core.Block{Number: blockNumber}) blockId, err := blockRepository.CreateOrUpdateBlock(core.Block{Number: blockNumber})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
receiptId, err := receiptRepository.CreateReceipt(blockId, core.Receipt{}) tx, _ := db.Beginx()
receiptId, err := receiptRepository.CreateReceipt(blockId, core.Receipt{}, tx)
tx.Commit()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = logsRepository.CreateLogs([]core.Log{{ err = logsRepository.CreateLogs([]core.Log{{
BlockNumber: blockNumber, BlockNumber: blockNumber,
@ -91,7 +93,9 @@ var _ = Describe("Logs Repository", func() {
blockNumber := int64(12345) blockNumber := int64(12345)
blockId, err := blockRepository.CreateOrUpdateBlock(core.Block{Number: blockNumber}) blockId, err := blockRepository.CreateOrUpdateBlock(core.Block{Number: blockNumber})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
receiptId, err := receiptRepository.CreateReceipt(blockId, core.Receipt{}) tx, _ := db.Beginx()
receiptId, err := receiptRepository.CreateReceipt(blockId, core.Receipt{}, tx)
tx.Commit()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = logsRepository.CreateLogs([]core.Log{{ err = logsRepository.CreateLogs([]core.Log{{

View File

@ -36,7 +36,7 @@ func (receiptRepository ReceiptRepository) CreateReceiptsAndLogs(blockId int64,
return err return err
} }
for _, receipt := range receipts { for _, receipt := range receipts {
receiptId, err := createReceipt(receipt, blockId, tx) receiptId, err := receiptRepository.CreateReceipt(blockId, receipt, tx)
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()
return err return err
@ -53,21 +53,6 @@ func (receiptRepository ReceiptRepository) CreateReceiptsAndLogs(blockId int64,
return nil return nil
} }
func createReceipt(receipt core.Receipt, blockId int64, tx *sqlx.Tx) (int64, error) {
var receiptId int64
err := tx.QueryRow(
`INSERT INTO full_sync_receipts
(contract_address, tx_hash, cumulative_gas_used, gas_used, state_root, status, block_id)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id`,
receipt.ContractAddress, receipt.TxHash, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, blockId,
).Scan(&receiptId)
if err != nil {
logrus.Error("createReceipt: Error inserting: ", err)
}
return receiptId, err
}
func createLogs(logs []core.Log, receiptId int64, tx *sqlx.Tx) error { func createLogs(logs []core.Log, receiptId int64, tx *sqlx.Tx) error {
for _, log := range logs { for _, log := range logs {
_, err := tx.Exec( _, err := tx.Exec(
@ -83,27 +68,31 @@ func createLogs(logs []core.Log, receiptId int64, tx *sqlx.Tx) error {
return nil return nil
} }
func (receiptRepository ReceiptRepository) CreateReceipt(blockId int64, receipt core.Receipt) (int64, error) { //TODO: test that creating the address should be in the transaction
tx, _ := receiptRepository.DB.Beginx() func (ReceiptRepository) CreateReceipt(blockId int64, receipt core.Receipt, tx *sqlx.Tx) (int64, error) {
var receiptId int64 var receiptId int64
addressId, getAddressErr := AddressRepository{}.GetOrCreateAddressInTransaction(tx, receipt.ContractAddress)
if getAddressErr != nil {
logrus.Error("createReceipt: Error getting address id: ", getAddressErr)
return receiptId, getAddressErr
}
err := tx.QueryRow( err := tx.QueryRow(
`INSERT INTO full_sync_receipts `INSERT INTO full_sync_receipts
(contract_address, tx_hash, cumulative_gas_used, gas_used, state_root, status, block_id) (contract_address_id, tx_hash, cumulative_gas_used, gas_used, state_root, status, block_id)
VALUES ($1, $2, $3, $4, $5, $6, $7) VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id`, RETURNING id`,
receipt.ContractAddress, receipt.TxHash, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, blockId).Scan(&receiptId) addressId, receipt.TxHash, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, blockId).Scan(&receiptId)
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()
logrus.Warning("CreateReceipt: error inserting receipt: ", err) logrus.Warning("CreateReceipt: error inserting receipt: ", err)
return receiptId, err return receiptId, err
} }
tx.Commit()
return receiptId, nil return receiptId, nil
} }
func (receiptRepository ReceiptRepository) GetReceipt(txHash string) (core.Receipt, error) { func (receiptRepository ReceiptRepository) GetReceipt(txHash string) (core.Receipt, error) {
row := receiptRepository.DB.QueryRow( row := receiptRepository.DB.QueryRow(
`SELECT contract_address, `SELECT contract_address_id,
tx_hash, tx_hash,
cumulative_gas_used, cumulative_gas_used,
gas_used, gas_used,

View File

@ -79,7 +79,9 @@ var _ = Describe("Watched Events Repository", func() {
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
blockId, err := blocksRepository.CreateOrUpdateBlock(core.Block{}) blockId, err := blocksRepository.CreateOrUpdateBlock(core.Block{})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
receiptId, err := receiptRepository.CreateReceipt(blockId, core.Receipt{}) tx, _ := db.Beginx()
receiptId, err := receiptRepository.CreateReceipt(blockId, core.Receipt{}, tx)
tx.Commit()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = logRepository.CreateLogs(logs, receiptId) err = logRepository.CreateLogs(logs, receiptId)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
@ -136,7 +138,9 @@ var _ = Describe("Watched Events Repository", func() {
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
blockId, err := blocksRepository.CreateOrUpdateBlock(core.Block{Hash: "Ox123"}) blockId, err := blocksRepository.CreateOrUpdateBlock(core.Block{Hash: "Ox123"})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
receiptId, err := receiptRepository.CreateReceipt(blockId, core.Receipt{TxHash: "0x123"}) tx, _ := db.Beginx()
receiptId, err := receiptRepository.CreateReceipt(blockId, core.Receipt{}, tx)
tx.Commit()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = logRepository.CreateLogs(logs, receiptId) err = logRepository.CreateLogs(logs, receiptId)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())

View File

@ -17,6 +17,7 @@
package datastore package datastore
import ( import (
"github.com/jmoiron/sqlx"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/filters" "github.com/vulcanize/vulcanizedb/pkg/filters"
) )
@ -57,7 +58,7 @@ type LogRepository interface {
type ReceiptRepository interface { type ReceiptRepository interface {
CreateReceiptsAndLogs(blockId int64, receipts []core.Receipt) error CreateReceiptsAndLogs(blockId int64, receipts []core.Receipt) error
CreateReceipt(blockId int64, receipt core.Receipt) (int64, error) CreateReceipt(blockId int64, receipt core.Receipt, tx *sqlx.Tx) (int64, error) //TODO: change the name to CreateReceiptInTransaction
GetReceipt(txHash string) (core.Receipt, error) GetReceipt(txHash string) (core.Receipt, error)
} }

View File

@ -17,6 +17,7 @@
package fakes package fakes
import ( import (
"github.com/jmoiron/sqlx"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
@ -49,7 +50,7 @@ func (mrr *MockReceiptRepository) CreateReceiptsAndLogs(blockId int64, receipts
return mrr.createReceiptsAndLogsReturnErr return mrr.createReceiptsAndLogsReturnErr
} }
func (mrr *MockReceiptRepository) CreateReceipt(blockId int64, receipt core.Receipt) (int64, error) { func (mrr *MockReceiptRepository) CreateReceipt(blockId int64, receipt core.Receipt, tx *sqlx.Tx) (int64, error) {
panic("implement me") panic("implement me")
} }

View File

@ -105,6 +105,7 @@ func NewTestDB(node core.Node) *postgres.DB {
} }
func CleanTestDB(db *postgres.DB) { func CleanTestDB(db *postgres.DB) {
db.MustExec("DELETE FROM addresses")
db.MustExec("DELETE FROM blocks") db.MustExec("DELETE FROM blocks")
db.MustExec("DELETE FROM checked_headers") db.MustExec("DELETE FROM checked_headers")
// can't delete from eth_nodes since this function is called after the required eth_node is persisted // can't delete from eth_nodes since this function is called after the required eth_node is persisted