From 1414779d52271912f990129506a195415489f401 Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Wed, 6 Mar 2019 10:15:32 -0600 Subject: [PATCH] Use *sqlx.Tx instead of *sql.Tx - requires using db.Beginx() instead of db.Begin() - enables calling tx.Get() --- libraries/shared/repository/repository.go | 4 ++-- libraries/shared/repository/repository_test.go | 2 +- .../postgres/repositories/block_repository.go | 11 +++++------ .../postgres/repositories/logs_repository.go | 3 +-- .../postgres/repositories/receipt_repository.go | 10 +++++----- pkg/omni/light/repository/header_repository.go | 6 +++--- pkg/omni/shared/helpers/test_helpers/database.go | 2 +- pkg/omni/shared/repository/event_repository.go | 4 ++-- pkg/omni/shared/repository/method_repository.go | 2 +- 9 files changed, 21 insertions(+), 23 deletions(-) diff --git a/libraries/shared/repository/repository.go b/libraries/shared/repository/repository.go index 00cedddb..e80a8ce7 100644 --- a/libraries/shared/repository/repository.go +++ b/libraries/shared/repository/repository.go @@ -18,9 +18,9 @@ package repository import ( "bytes" - "database/sql" "database/sql/driver" "fmt" + "github.com/jmoiron/sqlx" "github.com/vulcanize/vulcanizedb/libraries/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/core" @@ -35,7 +35,7 @@ func MarkHeaderChecked(headerID int64, db *postgres.DB, checkedHeadersColumn str return err } -func MarkHeaderCheckedInTransaction(headerID int64, tx *sql.Tx, checkedHeadersColumn string) error { +func MarkHeaderCheckedInTransaction(headerID int64, tx *sqlx.Tx, checkedHeadersColumn string) error { _, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+checkedHeadersColumn+`) VALUES ($1, $2) ON CONFLICT (header_id) DO diff --git a/libraries/shared/repository/repository_test.go b/libraries/shared/repository/repository_test.go index a3e0dbbf..dfd595ff 100644 --- a/libraries/shared/repository/repository_test.go +++ b/libraries/shared/repository/repository_test.go @@ -97,7 +97,7 @@ var _ = Describe("Repository", func() { headerRepository := repositories.NewHeaderRepository(db) headerID, headerErr := headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) Expect(headerErr).NotTo(HaveOccurred()) - tx, txErr := db.Begin() + tx, txErr := db.Beginx() Expect(txErr).NotTo(HaveOccurred()) err := shared.MarkHeaderCheckedInTransaction(headerID, tx, checkedHeadersColumn) diff --git a/pkg/datastore/postgres/repositories/block_repository.go b/pkg/datastore/postgres/repositories/block_repository.go index 4a5d19ff..2cbc1373 100644 --- a/pkg/datastore/postgres/repositories/block_repository.go +++ b/pkg/datastore/postgres/repositories/block_repository.go @@ -17,7 +17,6 @@ package repositories import ( - "context" "database/sql" "errors" log "github.com/sirupsen/logrus" @@ -122,7 +121,7 @@ func (blockRepository BlockRepository) GetBlock(blockNumber int64) (core.Block, func (blockRepository BlockRepository) insertBlock(block core.Block) (int64, error) { var blockId int64 - tx, _ := blockRepository.database.BeginTx(context.Background(), nil) + tx, _ := blockRepository.database.Beginx() err := tx.QueryRow( `INSERT INTO blocks (eth_node_id, number, gaslimit, gasused, time, difficulty, hash, nonce, parenthash, size, uncle_hash, is_final, miner, extra_data, reward, uncles_reward, eth_node_fingerprint) @@ -145,7 +144,7 @@ func (blockRepository BlockRepository) insertBlock(block core.Block) (int64, err return blockId, nil } -func (blockRepository BlockRepository) createTransactions(tx *sql.Tx, blockId int64, transactions []core.Transaction) error { +func (blockRepository BlockRepository) createTransactions(tx *sqlx.Tx, blockId int64, transactions []core.Transaction) error { for _, transaction := range transactions { err := blockRepository.createTransaction(tx, blockId, transaction) if err != nil { @@ -165,7 +164,7 @@ func nullStringToZero(s string) string { return s } -func (blockRepository BlockRepository) createTransaction(tx *sql.Tx, blockId int64, transaction core.Transaction) error { +func (blockRepository BlockRepository) createTransaction(tx *sqlx.Tx, blockId int64, transaction core.Transaction) error { _, err := tx.Exec( `INSERT INTO transactions (block_id, hash, nonce, tx_to, tx_from, gaslimit, gasprice, value, input_data) @@ -198,7 +197,7 @@ func hasReceipt(transaction core.Transaction) bool { return transaction.Receipt.TxHash != "" } -func (blockRepository BlockRepository) createReceipt(tx *sql.Tx, blockId int64, receipt core.Receipt) (int, error) { +func (blockRepository BlockRepository) createReceipt(tx *sqlx.Tx, blockId int64, receipt core.Receipt) (int, error) { //Not currently persisting log bloom filters var receiptId int err := tx.QueryRow( @@ -224,7 +223,7 @@ func (blockRepository BlockRepository) getBlockHash(block core.Block) (string, b return retrievedBlockHash, blockExists(retrievedBlockHash) } -func (blockRepository BlockRepository) createLogs(tx *sql.Tx, logs []core.Log, receiptId int) error { +func (blockRepository BlockRepository) createLogs(tx *sqlx.Tx, logs []core.Log, receiptId int) error { for _, tlog := range logs { _, err := tx.Exec( `INSERT INTO logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data, receipt_id) diff --git a/pkg/datastore/postgres/repositories/logs_repository.go b/pkg/datastore/postgres/repositories/logs_repository.go index 704ac350..63a8c99f 100644 --- a/pkg/datastore/postgres/repositories/logs_repository.go +++ b/pkg/datastore/postgres/repositories/logs_repository.go @@ -17,7 +17,6 @@ package repositories import ( - "context" "github.com/sirupsen/logrus" "database/sql" @@ -31,7 +30,7 @@ type LogRepository struct { } func (logRepository LogRepository) CreateLogs(lgs []core.Log, receiptId int64) error { - tx, _ := logRepository.DB.BeginTx(context.Background(), nil) + tx, _ := logRepository.DB.Beginx() for _, tlog := range lgs { _, err := tx.Exec( `INSERT INTO logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data, receipt_id) diff --git a/pkg/datastore/postgres/repositories/receipt_repository.go b/pkg/datastore/postgres/repositories/receipt_repository.go index 6554775b..29de83e9 100644 --- a/pkg/datastore/postgres/repositories/receipt_repository.go +++ b/pkg/datastore/postgres/repositories/receipt_repository.go @@ -17,8 +17,8 @@ package repositories import ( - "context" "database/sql" + "github.com/jmoiron/sqlx" "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/core" @@ -31,7 +31,7 @@ type ReceiptRepository struct { } func (receiptRepository ReceiptRepository) CreateReceiptsAndLogs(blockId int64, receipts []core.Receipt) error { - tx, err := receiptRepository.DB.BeginTx(context.Background(), nil) + tx, err := receiptRepository.DB.Beginx() if err != nil { return err } @@ -53,7 +53,7 @@ func (receiptRepository ReceiptRepository) CreateReceiptsAndLogs(blockId int64, return nil } -func createReceipt(receipt core.Receipt, blockId int64, tx *sql.Tx) (int64, error) { +func createReceipt(receipt core.Receipt, blockId int64, tx *sqlx.Tx) (int64, error) { var receiptId int64 err := tx.QueryRow( `INSERT INTO receipts @@ -68,7 +68,7 @@ func createReceipt(receipt core.Receipt, blockId int64, tx *sql.Tx) (int64, erro return receiptId, err } -func createLogs(logs []core.Log, receiptId int64, tx *sql.Tx) error { +func createLogs(logs []core.Log, receiptId int64, tx *sqlx.Tx) error { for _, log := range logs { _, err := tx.Exec( `INSERT INTO logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data, receipt_id) @@ -84,7 +84,7 @@ func createLogs(logs []core.Log, receiptId int64, tx *sql.Tx) error { } func (receiptRepository ReceiptRepository) CreateReceipt(blockId int64, receipt core.Receipt) (int64, error) { - tx, _ := receiptRepository.DB.BeginTx(context.Background(), nil) + tx, _ := receiptRepository.DB.Beginx() var receiptId int64 err := tx.QueryRow( `INSERT INTO receipts diff --git a/pkg/omni/light/repository/header_repository.go b/pkg/omni/light/repository/header_repository.go index 8281bc23..e0d8ba74 100644 --- a/pkg/omni/light/repository/header_repository.go +++ b/pkg/omni/light/repository/header_repository.go @@ -17,8 +17,8 @@ package repository import ( - "database/sql" "fmt" + "github.com/jmoiron/sqlx" "github.com/hashicorp/golang-lru" @@ -125,7 +125,7 @@ func (r *headerRepository) MarkHeaderCheckedForAll(headerID int64, ids []string) } func (r *headerRepository) MarkHeadersCheckedForAll(headers []core.Header, ids []string) error { - tx, err := r.db.Begin() + tx, err := r.db.Beginx() if err != nil { return err } @@ -250,7 +250,7 @@ func (r *headerRepository) CheckCache(key string) (interface{}, bool) { return r.columns.Get(key) } -func MarkHeaderCheckedInTransaction(headerID int64, tx *sql.Tx, eventID string) error { +func MarkHeaderCheckedInTransaction(headerID int64, tx *sqlx.Tx, eventID string) error { _, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+eventID+`) VALUES ($1, $2) ON CONFLICT (header_id) DO diff --git a/pkg/omni/shared/helpers/test_helpers/database.go b/pkg/omni/shared/helpers/test_helpers/database.go index 5b81dc83..576b569c 100644 --- a/pkg/omni/shared/helpers/test_helpers/database.go +++ b/pkg/omni/shared/helpers/test_helpers/database.go @@ -238,7 +238,7 @@ func SetupENSContract(wantedEvents, wantedMethods []string) *contract.Contract { } func TearDown(db *postgres.DB) { - tx, err := db.Begin() + tx, err := db.Beginx() Expect(err).NotTo(HaveOccurred()) _, err = tx.Exec(`DELETE FROM blocks`) diff --git a/pkg/omni/shared/repository/event_repository.go b/pkg/omni/shared/repository/event_repository.go index 4b599447..a499638d 100644 --- a/pkg/omni/shared/repository/event_repository.go +++ b/pkg/omni/shared/repository/event_repository.go @@ -97,7 +97,7 @@ func (r *eventRepository) persistLogs(logs []types.Log, eventInfo types.Event, c // Creates a custom postgres command to persist logs for the given event (compatible with light synced vDB) func (r *eventRepository) persistLightSyncLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error { - tx, err := r.db.Begin() + tx, err := r.db.Beginx() if err != nil { return err } @@ -151,7 +151,7 @@ func (r *eventRepository) persistLightSyncLogs(logs []types.Log, eventInfo types // Creates a custom postgres command to persist logs for the given event (compatible with fully synced vDB) func (r *eventRepository) persistFullSyncLogs(logs []types.Log, eventInfo types.Event, contractAddr, contractName string) error { - tx, err := r.db.Begin() + tx, err := r.db.Beginx() if err != nil { return err } diff --git a/pkg/omni/shared/repository/method_repository.go b/pkg/omni/shared/repository/method_repository.go index 4231404b..3449d876 100644 --- a/pkg/omni/shared/repository/method_repository.go +++ b/pkg/omni/shared/repository/method_repository.go @@ -77,7 +77,7 @@ func (r *methodRepository) PersistResults(results []types.Result, methodInfo typ // Creates a custom postgres command to persist logs for the given event func (r *methodRepository) persistResults(results []types.Result, methodInfo types.Method, contractAddr, contractName string) error { - tx, err := r.DB.Begin() + tx, err := r.DB.Beginx() if err != nil { return err }