full_sync and light_sync receipt tables and light receipt repository methods and test

This commit is contained in:
Ian Norden 2019-03-21 22:43:06 -05:00
parent 4d0f7ee1bb
commit c1940c3e58
15 changed files with 304 additions and 30 deletions

View File

@ -1,5 +1,5 @@
-- +goose Up -- +goose Up
CREATE TABLE receipts CREATE TABLE full_sync_receipts
( (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
transaction_id INTEGER NOT NULL REFERENCES full_sync_transactions (id) ON DELETE CASCADE, transaction_id INTEGER NOT NULL REFERENCES full_sync_transactions (id) ON DELETE CASCADE,
@ -13,4 +13,4 @@ CREATE TABLE receipts
-- +goose Down -- +goose Down
DROP TABLE receipts; DROP TABLE full_sync_receipts;

View File

@ -1,5 +1,5 @@
-- +goose Up -- +goose Up
CREATE INDEX transaction_id_index ON receipts (transaction_id); CREATE INDEX transaction_id_index ON full_sync_receipts (transaction_id);
-- +goose Down -- +goose Down
DROP INDEX transaction_id_index; DROP INDEX transaction_id_index;

View File

@ -8,7 +8,7 @@ ALTER TABLE logs
ALTER TABLE logs ALTER TABLE logs
ADD CONSTRAINT receipts_fk ADD CONSTRAINT receipts_fk
FOREIGN KEY (receipt_id) FOREIGN KEY (receipt_id)
REFERENCES receipts (id) REFERENCES full_sync_receipts (id)
ON DELETE CASCADE; ON DELETE CASCADE;

View File

@ -1,44 +1,44 @@
-- +goose Up -- +goose Up
ALTER TABLE receipts ALTER TABLE full_sync_receipts
ADD COLUMN block_id INT; ADD COLUMN block_id INT;
UPDATE receipts UPDATE full_sync_receipts
SET block_id = ( SET block_id = (
SELECT block_id FROM full_sync_transactions WHERE full_sync_transactions.id = receipts.transaction_id SELECT block_id FROM full_sync_transactions WHERE full_sync_transactions.id = full_sync_receipts.transaction_id
); );
ALTER TABLE receipts ALTER TABLE full_sync_receipts
ALTER COLUMN block_id SET NOT NULL; ALTER COLUMN block_id SET NOT NULL;
ALTER TABLE receipts ALTER TABLE full_sync_receipts
ADD CONSTRAINT blocks_fk ADD CONSTRAINT blocks_fk
FOREIGN KEY (block_id) FOREIGN KEY (block_id)
REFERENCES blocks (id) REFERENCES blocks (id)
ON DELETE CASCADE; ON DELETE CASCADE;
ALTER TABLE receipts ALTER TABLE full_sync_receipts
DROP COLUMN transaction_id; DROP COLUMN transaction_id;
-- +goose Down -- +goose Down
ALTER TABLE receipts ALTER TABLE full_sync_receipts
ADD COLUMN transaction_id INT; ADD COLUMN transaction_id INT;
CREATE INDEX transaction_id_index ON receipts (transaction_id); CREATE INDEX transaction_id_index ON full_sync_receipts (transaction_id);
UPDATE receipts UPDATE full_sync_receipts
SET transaction_id = ( SET transaction_id = (
SELECT id FROM full_sync_transactions WHERE full_sync_transactions.hash = receipts.tx_hash SELECT id FROM full_sync_transactions WHERE full_sync_transactions.hash = full_sync_receipts.tx_hash
); );
ALTER TABLE receipts ALTER TABLE full_sync_receipts
ALTER COLUMN transaction_id SET NOT NULL; ALTER COLUMN transaction_id SET NOT NULL;
ALTER TABLE receipts ALTER TABLE full_sync_receipts
ADD CONSTRAINT transaction_fk ADD CONSTRAINT transaction_fk
FOREIGN KEY (transaction_id) FOREIGN KEY (transaction_id)
REFERENCES full_sync_transactions (id) REFERENCES full_sync_transactions (id)
ON DELETE CASCADE; ON DELETE CASCADE;
ALTER TABLE receipts ALTER TABLE full_sync_receipts
DROP COLUMN block_id; DROP COLUMN block_id;

View File

@ -2,15 +2,15 @@
CREATE TABLE light_sync_transactions ( CREATE TABLE light_sync_transactions (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
header_id INTEGER NOT NULL REFERENCES headers(id) ON DELETE CASCADE, header_id INTEGER NOT NULL REFERENCES headers(id) ON DELETE CASCADE,
hash TEXT, hash VARCHAR(66),
gas_limit NUMERIC, gas_limit NUMERIC,
gas_price NUMERIC, gas_price NUMERIC,
input_data BYTEA, input_data BYTEA,
nonce NUMERIC, nonce NUMERIC,
raw BYTEA, raw BYTEA,
tx_from TEXT, tx_from VARCHAR(44),
tx_index INTEGER, tx_index INTEGER,
tx_to TEXT, tx_to VARCHAR(44),
"value" NUMERIC, "value" NUMERIC,
UNIQUE (header_id, hash) UNIQUE (header_id, hash)
); );

View File

@ -0,0 +1,17 @@
-- +goose Up
CREATE TABLE light_sync_receipts(
id SERIAL PRIMARY KEY,
transaction_id INTEGER NOT NULL REFERENCES light_sync_transactions(id) ON DELETE CASCADE,
header_id INTEGER NOT NULL REFERENCES headers(id) ON DELETE CASCADE,
contract_address VARCHAR(42),
cumulative_gas_used NUMERIC,
gas_used NUMERIC,
state_root VARCHAR(66),
status INTEGER,
tx_hash VARCHAR(66),
UNIQUE(header_id, transaction_id)
);
-- +goose Down
DROP TABLE light_sync_receipts;

View File

@ -53,7 +53,7 @@ func (r *blockRetriever) retrieveFirstBlockFromReceipts(contractAddr string) (in
err := r.db.Get( err := r.db.Get(
&firstBlock, &firstBlock,
`SELECT number FROM blocks `SELECT number FROM blocks
WHERE id = (SELECT block_id FROM receipts WHERE id = (SELECT block_id FROM full_sync_receipts
WHERE lower(contract_address) = $1 WHERE lower(contract_address) = $1
ORDER BY block_id ASC ORDER BY block_id ASC
LIMIT 1)`, LIMIT 1)`,

View File

@ -243,7 +243,10 @@ func TearDown(db *postgres.DB) {
_, err = tx.Exec("DELETE FROM light_sync_transactions") _, err = tx.Exec("DELETE FROM light_sync_transactions")
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM receipts`) _, err = tx.Exec(`DELETE FROM full_sync_receipts`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM light_sync_receipts`)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DROP TABLE checked_headers`) _, err = tx.Exec(`DROP TABLE checked_headers`)

View File

@ -26,3 +26,12 @@ type Receipt struct {
Status int Status int
TxHash string TxHash string
} }
type ReceiptModel struct {
ContractAddress string `db:"contract_address"`
CumulativeGasUsed string `db:"cumulative_gas_used"`
GasUsed string `db:"gas_used"`
StateRoot string `db:"state_root"`
Status int
TxHash string `db:"tx_hash"`
}

View File

@ -19,10 +19,11 @@ package repositories
import ( import (
"database/sql" "database/sql"
"errors" "errors"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/utilities"
"github.com/vulcanize/vulcanizedb/libraries/shared/utilities"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore" "github.com/vulcanize/vulcanizedb/pkg/datastore"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
@ -260,7 +261,7 @@ func (blockRepository BlockRepository) createReceipt(tx *sqlx.Tx, blockId int64,
//Not currently persisting log bloom filters //Not currently persisting log bloom filters
var receiptId int var receiptId int
err := tx.QueryRow( err := tx.QueryRow(
`INSERT INTO receipts `INSERT INTO full_sync_receipts
(contract_address, tx_hash, cumulative_gas_used, gas_used, state_root, status, block_id) (contract_address, tx_hash, cumulative_gas_used, gas_used, state_root, status, block_id)
VALUES ($1, $2, $3, $4, $5, $6, $7) VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id`, RETURNING id`,

View File

@ -19,6 +19,7 @@ package repositories
import ( import (
"database/sql" "database/sql"
"errors" "errors"
"github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
@ -64,6 +65,49 @@ func (repository HeaderRepository) CreateTransactions(headerID int64, transactio
return nil return nil
} }
func (repository HeaderRepository) CreateTransactionInTx(tx *sqlx.Tx, headerID int64, transaction core.TransactionModel) (int64, error) {
var txId int64
err := tx.QueryRowx(`INSERT INTO public.light_sync_transactions
(header_id, hash, gaslimit, gasprice, input_data, nonce, raw, tx_from, tx_index, tx_to, "value")
VALUES ($1, $2, $3::NUMERIC, $4::NUMERIC, $5, $6::NUMERIC, $7, $8, $9::NUMERIC, $10, $11::NUMERIC)
ON CONFLICT (header_id, hash) DO UPDATE
SET (gaslimit, gasprice, input_data, nonce, raw, tx_from, tx_index, tx_to, "value") = ($3::NUMERIC, $4::NUMERIC, $5, $6::NUMERIC, $7, $8, $9::NUMERIC, $10, $11::NUMERIC)
RETURNING id`,
headerID, transaction.Hash, transaction.GasLimit, transaction.GasPrice,
transaction.Data, transaction.Nonce, transaction.Raw, transaction.From,
transaction.TxIndex, transaction.To, transaction.Value).Scan(&txId)
if err != nil {
log.Error("header_repository: error inserting transaction: ", err)
return txId, err
}
return txId, err
}
func (repository HeaderRepository) CreateReceipt(headerID, transactionID int64, receipt core.Receipt) error {
_, err := repository.database.Exec(`INSERT INTO public.light_sync_receipts
(header_id, transaction_id, contract_address, cumulative_gas_used, gas_used, state_root, status, tx_hash)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT DO NOTHING`,
headerID, transactionID, receipt.ContractAddress, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, receipt.TxHash)
return err
}
func (repository HeaderRepository) CreateReceiptInTx(tx *sqlx.Tx, headerID, transactionID int64, receipt core.Receipt) (int64, error) {
var receiptId int64
err := tx.QueryRowx(`INSERT INTO public.light_sync_receipts
(header_id, transaction_id, contract_address, cumulative_gas_used, gas_used, state_root, status, tx_hash)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (header_id, transaction_id) DO UPDATE
SET (contract_address, cumulative_gas_used, gas_used, state_root, status, tx_hash) = ($3, $4::NUMERIC, $5::NUMERIC, $6, $7, $8)
RETURNING id`,
headerID, transactionID, receipt.ContractAddress, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.StateRoot, receipt.Status, receipt.TxHash).Scan(&receiptId)
if err != nil {
log.Error("header_repository: error inserting receipt: ", err)
return receiptId, err
}
return receiptId, err
}
func (repository HeaderRepository) GetHeader(blockNumber int64) (core.Header, error) { func (repository HeaderRepository) GetHeader(blockNumber int64) (core.Header, error) {
var header core.Header var header core.Header
err := repository.database.Get(&header, `SELECT id, block_number, hash, raw, block_timestamp FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`, err := repository.database.Get(&header, `SELECT id, block_number, hash, raw, block_timestamp FROM headers WHERE block_number = $1 AND eth_node_fingerprint = $2`,

View File

@ -180,6 +180,124 @@ var _ = Describe("Block header repository", func() {
}) })
}) })
Describe("creating a receipt", func() {
It("adds a receipt in a tx", func() {
headerID, err := repo.CreateOrUpdateHeader(header)
Expect(err).NotTo(HaveOccurred())
fromAddress := common.HexToAddress("0x1234")
toAddress := common.HexToAddress("0x5678")
txHash := common.HexToHash("0x9876")
txIndex := big.NewInt(123)
transaction := core.TransactionModel{
Data: []byte{},
From: fromAddress.Hex(),
GasLimit: 0,
GasPrice: 0,
Hash: txHash.Hex(),
Nonce: 0,
Raw: []byte{},
To: toAddress.Hex(),
TxIndex: txIndex.Int64(),
Value: "0",
}
tx, err := db.Beginx()
Expect(err).ToNot(HaveOccurred())
txId, txErr := repo.CreateTransactionInTx(tx, headerID, transaction)
Expect(txErr).ToNot(HaveOccurred())
contractAddr := common.HexToAddress("0x1234")
stateRoot := common.HexToHash("0x5678")
var gasUsed uint64 = 10
var cumulativeGasUsed uint64 = 100
receipt := core.Receipt{
ContractAddress: contractAddr.Hex(),
TxHash: txHash.Hex(),
GasUsed: gasUsed,
CumulativeGasUsed: cumulativeGasUsed,
StateRoot: stateRoot.Hex(),
}
_, receiptErr := repo.CreateReceiptInTx(tx, headerID, txId, receipt)
commitErr := tx.Commit()
Expect(commitErr).ToNot(HaveOccurred())
Expect(receiptErr).ToNot(HaveOccurred())
type idModel struct {
TransactionId int64 `db:"transaction_id"`
core.ReceiptModel
}
var dbReceipt idModel
err = db.Get(&dbReceipt,
`SELECT transaction_id, contract_address, cumulative_gas_used, gas_used, state_root, status, tx_hash
FROM public.light_sync_receipts WHERE header_id = $1`, headerID)
Expect(err).NotTo(HaveOccurred())
Expect(dbReceipt.TransactionId).To(Equal(txId))
Expect(dbReceipt.TxHash).To(Equal(txHash.Hex()))
Expect(dbReceipt.ContractAddress).To(Equal(contractAddr.Hex()))
Expect(dbReceipt.CumulativeGasUsed).To(Equal("100"))
Expect(dbReceipt.GasUsed).To(Equal("10"))
Expect(dbReceipt.StateRoot).To(Equal(stateRoot.Hex()))
})
It("adds a receipt in standalone query", func() {
headerID, err := repo.CreateOrUpdateHeader(header)
Expect(err).NotTo(HaveOccurred())
fromAddress := common.HexToAddress("0x1234")
toAddress := common.HexToAddress("0x5678")
txHash := common.HexToHash("0x9876")
txIndex := big.NewInt(123)
transaction := core.TransactionModel{
Data: []byte{},
From: fromAddress.Hex(),
GasLimit: 0,
GasPrice: 0,
Hash: txHash.Hex(),
Nonce: 0,
Raw: []byte{},
To: toAddress.Hex(),
TxIndex: txIndex.Int64(),
Value: "0",
}
tx, err := db.Beginx()
Expect(err).ToNot(HaveOccurred())
txId, txErr := repo.CreateTransactionInTx(tx, headerID, transaction)
commitErr := tx.Commit()
Expect(commitErr).ToNot(HaveOccurred())
Expect(txErr).ToNot(HaveOccurred())
contractAddr := common.HexToAddress("0x1234")
stateRoot := common.HexToHash("0x5678")
var gasUsed uint64 = 10
var cumulativeGasUsed uint64 = 100
receipt := core.Receipt{
ContractAddress: contractAddr.Hex(),
TxHash: txHash.Hex(),
GasUsed: gasUsed,
CumulativeGasUsed: cumulativeGasUsed,
StateRoot: stateRoot.Hex(),
}
receiptErr := repo.CreateReceipt(headerID, txId, receipt)
Expect(receiptErr).ToNot(HaveOccurred())
type idModel struct {
TransactionId int64 `db:"transaction_id"`
core.ReceiptModel
}
var dbReceipt idModel
err = db.Get(&dbReceipt,
`SELECT transaction_id, contract_address, cumulative_gas_used, gas_used, state_root, status, tx_hash
FROM public.light_sync_receipts WHERE header_id = $1`, headerID)
Expect(err).NotTo(HaveOccurred())
Expect(dbReceipt.TransactionId).To(Equal(txId))
Expect(dbReceipt.TxHash).To(Equal(txHash.Hex()))
Expect(dbReceipt.ContractAddress).To(Equal(contractAddr.Hex()))
Expect(dbReceipt.CumulativeGasUsed).To(Equal("100"))
Expect(dbReceipt.GasUsed).To(Equal("10"))
Expect(dbReceipt.StateRoot).To(Equal(stateRoot.Hex()))
})
})
Describe("creating a transaction", func() { Describe("creating a transaction", func() {
var ( var (
headerID int64 headerID int64
@ -245,6 +363,87 @@ var _ = Describe("Block header repository", func() {
}) })
}) })
Describe("creating a transaction in a sqlx tx", func() {
It("adds a transaction", func() {
headerID, err := repo.CreateOrUpdateHeader(header)
Expect(err).NotTo(HaveOccurred())
fromAddress := common.HexToAddress("0x1234")
toAddress := common.HexToAddress("0x5678")
txHash := common.HexToHash("0x9876")
txIndex := big.NewInt(123)
transaction := core.TransactionModel{
Data: []byte{},
From: fromAddress.Hex(),
GasLimit: 0,
GasPrice: 0,
Hash: txHash.Hex(),
Nonce: 0,
Raw: []byte{},
To: toAddress.Hex(),
TxIndex: txIndex.Int64(),
Value: "0",
}
tx, err := db.Beginx()
Expect(err).ToNot(HaveOccurred())
_, insertErr := repo.CreateTransactionInTx(tx, headerID, transaction)
commitErr := tx.Commit()
Expect(commitErr).ToNot(HaveOccurred())
Expect(insertErr).NotTo(HaveOccurred())
var dbTransaction core.TransactionModel
err = db.Get(&dbTransaction,
`SELECT hash, gaslimit, gasprice, input_data, nonce, raw, tx_from, tx_index, tx_to, "value"
FROM public.light_sync_transactions WHERE header_id = $1`, headerID)
Expect(err).NotTo(HaveOccurred())
Expect(dbTransaction).To(Equal(transaction))
})
It("silently upserts", func() {
headerID, err := repo.CreateOrUpdateHeader(header)
Expect(err).NotTo(HaveOccurred())
fromAddress := common.HexToAddress("0x1234")
toAddress := common.HexToAddress("0x5678")
txHash := common.HexToHash("0x9876")
txIndex := big.NewInt(123)
transaction := core.TransactionModel{
Data: []byte{},
From: fromAddress.Hex(),
GasLimit: 0,
GasPrice: 0,
Hash: txHash.Hex(),
Nonce: 0,
Raw: []byte{},
Receipt: core.Receipt{},
To: toAddress.Hex(),
TxIndex: txIndex.Int64(),
Value: "0",
}
tx1, err := db.Beginx()
Expect(err).ToNot(HaveOccurred())
txId1, insertErr := repo.CreateTransactionInTx(tx1, headerID, transaction)
commit1Err := tx1.Commit()
Expect(commit1Err).ToNot(HaveOccurred())
Expect(insertErr).NotTo(HaveOccurred())
tx2, err := db.Beginx()
Expect(err).ToNot(HaveOccurred())
txId2, insertErr := repo.CreateTransactionInTx(tx2, headerID, transaction)
commit2Err := tx2.Commit()
Expect(commit2Err).ToNot(HaveOccurred())
Expect(insertErr).NotTo(HaveOccurred())
Expect(txId1).To(Equal(txId2))
var dbTransactions []core.TransactionModel
err = db.Select(&dbTransactions,
`SELECT hash, gaslimit, gasprice, input_data, nonce, raw, tx_from, tx_index, tx_to, "value"
FROM public.light_sync_transactions WHERE header_id = $1`, headerID)
Expect(err).NotTo(HaveOccurred())
Expect(len(dbTransactions)).To(Equal(1))
})
})
Describe("Getting a header", func() { Describe("Getting a header", func() {
It("returns header if it exists", func() { It("returns header if it exists", func() {
_, err = repo.CreateOrUpdateHeader(header) _, err = repo.CreateOrUpdateHeader(header)

View File

@ -56,7 +56,7 @@ func (receiptRepository ReceiptRepository) CreateReceiptsAndLogs(blockId int64,
func createReceipt(receipt core.Receipt, blockId int64, tx *sqlx.Tx) (int64, error) { func createReceipt(receipt core.Receipt, blockId int64, tx *sqlx.Tx) (int64, error) {
var receiptId int64 var receiptId int64
err := tx.QueryRow( err := tx.QueryRow(
`INSERT INTO receipts `INSERT INTO full_sync_receipts
(contract_address, tx_hash, cumulative_gas_used, gas_used, state_root, status, block_id) (contract_address, tx_hash, cumulative_gas_used, gas_used, state_root, status, block_id)
VALUES ($1, $2, $3, $4, $5, $6, $7) VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id`, RETURNING id`,
@ -87,7 +87,7 @@ func (receiptRepository ReceiptRepository) CreateReceipt(blockId int64, receipt
tx, _ := receiptRepository.DB.Beginx() tx, _ := receiptRepository.DB.Beginx()
var receiptId int64 var receiptId int64
err := tx.QueryRow( err := tx.QueryRow(
`INSERT INTO receipts `INSERT INTO full_sync_receipts
(contract_address, tx_hash, cumulative_gas_used, gas_used, state_root, status, block_id) (contract_address, tx_hash, cumulative_gas_used, gas_used, state_root, status, block_id)
VALUES ($1, $2, $3, $4, $5, $6, $7) VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id`, RETURNING id`,
@ -109,7 +109,7 @@ func (receiptRepository ReceiptRepository) GetReceipt(txHash string) (core.Recei
gas_used, gas_used,
state_root, state_root,
status status
FROM receipts FROM full_sync_receipts
WHERE tx_hash = $1`, txHash) WHERE tx_hash = $1`, txHash)
receipt, err := loadReceipt(row) receipt, err := loadReceipt(row)
if err != nil { if err != nil {

View File

@ -115,7 +115,8 @@ func CleanTestDB(db *postgres.DB) {
db.MustExec("DELETE FROM log_filters") db.MustExec("DELETE FROM log_filters")
db.MustExec("DELETE FROM logs") db.MustExec("DELETE FROM logs")
db.MustExec("DELETE FROM queued_storage") db.MustExec("DELETE FROM queued_storage")
db.MustExec("DELETE FROM receipts") db.MustExec("DELETE FROM full_sync_receipts")
db.MustExec("DELETE FROM light_sync_receipts")
db.MustExec("DELETE FROM watched_contracts") db.MustExec("DELETE FROM watched_contracts")
} }