Merge pull request #131 from 8thlight/VDB-31-improve-error-handling-in-transformers

VDB 31 improve error handling in transformers
This commit is contained in:
Andrew J Yao 2019-01-11 11:06:43 -08:00 committed by GitHub
commit 5efd683c54
30 changed files with 630 additions and 352 deletions

View File

@ -16,6 +16,7 @@ package bite
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -30,32 +31,41 @@ func (repository *BiteRepository) SetDB(db *postgres.DB) {
} }
func (repository BiteRepository) Create(headerID int64, models []interface{}) error { func (repository BiteRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
biteModel, ok := model.(BiteModel) biteModel, ok := model.(BiteModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, BiteModel{}) return fmt.Errorf("model of type %T, not %T", model, BiteModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT into maker.bite (header_id, ilk, urn, ink, art, iart, tab, nflip, log_idx, tx_idx, raw_log) `INSERT into maker.bite (header_id, ilk, urn, ink, art, iart, tab, nflip, log_idx, tx_idx, raw_log)
VALUES($1, $2, $3, $4::NUMERIC, $5::NUMERIC, $6::NUMERIC, $7::NUMERIC, $8::NUMERIC, $9, $10, $11)`, VALUES($1, $2, $3, $4::NUMERIC, $5::NUMERIC, $6::NUMERIC, $7::NUMERIC, $8::NUMERIC, $9, $10, $11)`,
headerID, biteModel.Ilk, biteModel.Urn, biteModel.Ink, biteModel.Art, biteModel.IArt, biteModel.Tab, biteModel.NFlip, biteModel.LogIndex, biteModel.TransactionIndex, biteModel.Raw, headerID, biteModel.Ilk, biteModel.Urn, biteModel.Ink, biteModel.Art, biteModel.IArt, biteModel.Tab, biteModel.NFlip, biteModel.LogIndex, biteModel.TransactionIndex, biteModel.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.BiteChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.BiteChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()

View File

@ -16,6 +16,7 @@ package chop_lump
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -26,33 +27,42 @@ type CatFileChopLumpRepository struct {
} }
func (repository CatFileChopLumpRepository) Create(headerID int64, models []interface{}) error { func (repository CatFileChopLumpRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
chopLump, ok := model.(CatFileChopLumpModel) chopLump, ok := model.(CatFileChopLumpModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, CatFileChopLumpModel{}) return fmt.Errorf("model of type %T, not %T", model, CatFileChopLumpModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT into maker.cat_file_chop_lump (header_id, ilk, what, data, tx_idx, log_idx, raw_log) `INSERT into maker.cat_file_chop_lump (header_id, ilk, what, data, tx_idx, log_idx, raw_log)
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`, VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
headerID, chopLump.Ilk, chopLump.What, chopLump.Data, chopLump.TransactionIndex, chopLump.LogIndex, chopLump.Raw, headerID, chopLump.Ilk, chopLump.What, chopLump.Data, chopLump.TransactionIndex, chopLump.LogIndex, chopLump.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.CatFileChopLumpChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.CatFileChopLumpChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()
} }

View File

@ -16,6 +16,7 @@ package flip
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -26,32 +27,41 @@ type CatFileFlipRepository struct {
} }
func (repository CatFileFlipRepository) Create(headerID int64, models []interface{}) error { func (repository CatFileFlipRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
flip, ok := model.(CatFileFlipModel) flip, ok := model.(CatFileFlipModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, CatFileFlipModel{}) return fmt.Errorf("model of type %T, not %T", model, CatFileFlipModel{})
} }
_, err = repository.db.Exec( _, execErr := repository.db.Exec(
`INSERT into maker.cat_file_flip (header_id, ilk, what, flip, tx_idx, log_idx, raw_log) `INSERT into maker.cat_file_flip (header_id, ilk, what, flip, tx_idx, log_idx, raw_log)
VALUES($1, $2, $3, $4, $5, $6, $7)`, VALUES($1, $2, $3, $4, $5, $6, $7)`,
headerID, flip.Ilk, flip.What, flip.Flip, flip.TransactionIndex, flip.LogIndex, flip.Raw, headerID, flip.Ilk, flip.What, flip.Flip, flip.TransactionIndex, flip.LogIndex, flip.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.CatFileFlipChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.CatFileFlipChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()
} }

View File

@ -16,6 +16,7 @@ package pit_vow
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -26,32 +27,41 @@ type CatFilePitVowRepository struct {
} }
func (repository CatFilePitVowRepository) Create(headerID int64, models []interface{}) error { func (repository CatFilePitVowRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
vow, ok := model.(CatFilePitVowModel) vow, ok := model.(CatFilePitVowModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, CatFilePitVowModel{}) return fmt.Errorf("model of type %T, not %T", model, CatFilePitVowModel{})
} }
_, err = repository.db.Exec( _, execErr := repository.db.Exec(
`INSERT into maker.cat_file_pit_vow (header_id, what, data, tx_idx, log_idx, raw_log) `INSERT into maker.cat_file_pit_vow (header_id, what, data, tx_idx, log_idx, raw_log)
VALUES($1, $2, $3, $4, $5, $6)`, VALUES($1, $2, $3, $4, $5, $6)`,
headerID, vow.What, vow.Data, vow.TransactionIndex, vow.LogIndex, vow.Raw, headerID, vow.What, vow.Data, vow.TransactionIndex, vow.LogIndex, vow.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.CatFilePitVowChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.CatFilePitVowChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()
} }

View File

@ -16,6 +16,7 @@ package deal
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -26,33 +27,42 @@ type DealRepository struct {
} }
func (repository DealRepository) Create(headerID int64, models []interface{}) error { func (repository DealRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
dealModel, ok := model.(DealModel) dealModel, ok := model.(DealModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, DealModel{}) return fmt.Errorf("model of type %T, not %T", model, DealModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT into maker.deal (header_id, bid_id, contract_address, log_idx, tx_idx, raw_log) `INSERT into maker.deal (header_id, bid_id, contract_address, log_idx, tx_idx, raw_log)
VALUES($1, $2, $3, $4, $5, $6)`, VALUES($1, $2, $3, $4, $5, $6)`,
headerID, dealModel.BidId, dealModel.ContractAddress, dealModel.LogIndex, dealModel.TransactionIndex, dealModel.Raw, headerID, dealModel.BidId, dealModel.ContractAddress, dealModel.LogIndex, dealModel.TransactionIndex, dealModel.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DealChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DealChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()
} }

View File

@ -16,6 +16,7 @@ package dent
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -26,38 +27,51 @@ type DentRepository struct {
} }
func (repository DentRepository) Create(headerID int64, models []interface{}) error { func (repository DentRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
tic, err := shared.GetTicInTx(headerID, tx) tic, getTicErr := shared.GetTicInTx(headerID, tx)
if err != nil { if getTicErr != nil {
return err rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return getTicErr
} }
for _, model := range models { for _, model := range models {
dent, ok := model.(DentModel) dent, ok := model.(DentModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, DentModel{}) return fmt.Errorf("model of type %T, not %T", model, DentModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT into maker.dent (header_id, bid_id, lot, bid, guy, tic, log_idx, tx_idx, raw_log) `INSERT into maker.dent (header_id, bid_id, lot, bid, guy, tic, log_idx, tx_idx, raw_log)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)`, VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
headerID, dent.BidId, dent.Lot, dent.Bid, dent.Guy, tic, dent.LogIndex, dent.TransactionIndex, dent.Raw, headerID, dent.BidId, dent.Lot, dent.Bid, dent.Guy, tic, dent.LogIndex, dent.TransactionIndex, dent.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DentChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DentChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()
} }

View File

@ -16,6 +16,7 @@ package drip_drip
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -26,32 +27,41 @@ type DripDripRepository struct {
} }
func (repository DripDripRepository) Create(headerID int64, models []interface{}) error { func (repository DripDripRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
dripDrip, ok := model.(DripDripModel) dripDrip, ok := model.(DripDripModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, DripDripModel{}) return fmt.Errorf("model of type %T, not %T", model, DripDripModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT into maker.drip_drip (header_id, ilk, log_idx, tx_idx, raw_log) `INSERT into maker.drip_drip (header_id, ilk, log_idx, tx_idx, raw_log)
VALUES($1, $2, $3, $4, $5)`, VALUES($1, $2, $3, $4, $5)`,
headerID, dripDrip.Ilk, dripDrip.LogIndex, dripDrip.TransactionIndex, dripDrip.Raw, headerID, dripDrip.Ilk, dripDrip.LogIndex, dripDrip.TransactionIndex, dripDrip.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripDripChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripDripChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()
} }

View File

@ -16,6 +16,7 @@ package ilk
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -26,34 +27,43 @@ type DripFileIlkRepository struct {
} }
func (repository DripFileIlkRepository) Create(headerID int64, models []interface{}) error { func (repository DripFileIlkRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
ilk, ok := model.(DripFileIlkModel) ilk, ok := model.(DripFileIlkModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, DripFileIlkModel{}) return fmt.Errorf("model of type %T, not %T", model, DripFileIlkModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT into maker.drip_file_ilk (header_id, ilk, vow, tax, log_idx, tx_idx, raw_log) `INSERT into maker.drip_file_ilk (header_id, ilk, vow, tax, log_idx, tx_idx, raw_log)
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`, VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
headerID, ilk.Ilk, ilk.Vow, ilk.Tax, ilk.LogIndex, ilk.TransactionIndex, ilk.Raw, headerID, ilk.Ilk, ilk.Vow, ilk.Tax, ilk.LogIndex, ilk.TransactionIndex, ilk.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripFileIlkChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripFileIlkChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()

View File

@ -16,6 +16,7 @@ package repo
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -26,34 +27,43 @@ type DripFileRepoRepository struct {
} }
func (repository DripFileRepoRepository) Create(headerID int64, models []interface{}) error { func (repository DripFileRepoRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
repo, ok := model.(DripFileRepoModel) repo, ok := model.(DripFileRepoModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, DripFileRepoModel{}) return fmt.Errorf("model of type %T, not %T", model, DripFileRepoModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT into maker.drip_file_repo (header_id, what, data, log_idx, tx_idx, raw_log) `INSERT into maker.drip_file_repo (header_id, what, data, log_idx, tx_idx, raw_log)
VALUES($1, $2, $3::NUMERIC, $4, $5, $6)`, VALUES($1, $2, $3::NUMERIC, $4, $5, $6)`,
headerID, repo.What, repo.Data, repo.LogIndex, repo.TransactionIndex, repo.Raw, headerID, repo.What, repo.Data, repo.LogIndex, repo.TransactionIndex, repo.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripFileRepoChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripFileRepoChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()

View File

@ -16,6 +16,7 @@ package vow
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -26,33 +27,42 @@ type DripFileVowRepository struct {
} }
func (repository DripFileVowRepository) Create(headerID int64, models []interface{}) error { func (repository DripFileVowRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
vow, ok := model.(DripFileVowModel) vow, ok := model.(DripFileVowModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, DripFileVowModel{}) return fmt.Errorf("model of type %T, not %T", model, DripFileVowModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT into maker.drip_file_vow (header_id, what, data, log_idx, tx_idx, raw_log) `INSERT into maker.drip_file_vow (header_id, what, data, log_idx, tx_idx, raw_log)
VALUES($1, $2, $3, $4, $5, $6)`, VALUES($1, $2, $3, $4, $5, $6)`,
headerID, vow.What, vow.Data, vow.LogIndex, vow.TransactionIndex, vow.Raw, headerID, vow.What, vow.Data, vow.LogIndex, vow.TransactionIndex, vow.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripFileVowChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripFileVowChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()

View File

@ -16,6 +16,7 @@ package flap_kick
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -26,9 +27,9 @@ type FlapKickRepository struct {
} }
func (repository *FlapKickRepository) Create(headerID int64, models []interface{}) error { func (repository *FlapKickRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
flapKickModel, ok := model.(FlapKickModel) flapKickModel, ok := model.(FlapKickModel)
@ -36,21 +37,27 @@ func (repository *FlapKickRepository) Create(headerID int64, models []interface{
return fmt.Errorf("model of type %T, not %T", model, FlapKickModel{}) return fmt.Errorf("model of type %T, not %T", model, FlapKickModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT into maker.flap_kick (header_id, bid_id, lot, bid, gal, "end", tx_idx, log_idx, raw_log) `INSERT into maker.flap_kick (header_id, bid_id, lot, bid, gal, "end", tx_idx, log_idx, raw_log)
VALUES($1, $2::NUMERIC, $3::NUMERIC, $4::NUMERIC, $5, $6, $7, $8, $9)`, VALUES($1, $2::NUMERIC, $3::NUMERIC, $4::NUMERIC, $5, $6, $7, $8, $9)`,
headerID, flapKickModel.BidId, flapKickModel.Lot, flapKickModel.Bid, flapKickModel.Gal, flapKickModel.End, flapKickModel.TransactionIndex, flapKickModel.LogIndex, flapKickModel.Raw, headerID, flapKickModel.BidId, flapKickModel.Lot, flapKickModel.Bid, flapKickModel.Gal, flapKickModel.End, flapKickModel.TransactionIndex, flapKickModel.LogIndex, flapKickModel.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.FlapKickChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.FlapKickChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()
} }

View File

@ -16,6 +16,7 @@ package flip_kick
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
@ -27,9 +28,9 @@ type FlipKickRepository struct {
} }
func (repository FlipKickRepository) Create(headerID int64, models []interface{}) error { func (repository FlipKickRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
flipKickModel, ok := model.(FlipKickModel) flipKickModel, ok := model.(FlipKickModel)
@ -37,20 +38,26 @@ func (repository FlipKickRepository) Create(headerID int64, models []interface{}
return fmt.Errorf("model of type %T, not %T", model, FlipKickModel{}) return fmt.Errorf("model of type %T, not %T", model, FlipKickModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT into maker.flip_kick (header_id, bid_id, lot, bid, gal, "end", urn, tab, tx_idx, log_idx, raw_log) `INSERT into maker.flip_kick (header_id, bid_id, lot, bid, gal, "end", urn, tab, tx_idx, log_idx, raw_log)
VALUES($1, $2::NUMERIC, $3::NUMERIC, $4::NUMERIC, $5, $6, $7, $8::NUMERIC, $9, $10, $11)`, VALUES($1, $2::NUMERIC, $3::NUMERIC, $4::NUMERIC, $5, $6, $7, $8::NUMERIC, $9, $10, $11)`,
headerID, flipKickModel.BidId, flipKickModel.Lot, flipKickModel.Bid, flipKickModel.Gal, flipKickModel.End, flipKickModel.Urn, flipKickModel.Tab, flipKickModel.TransactionIndex, flipKickModel.LogIndex, flipKickModel.Raw, headerID, flipKickModel.BidId, flipKickModel.Lot, flipKickModel.Bid, flipKickModel.Gal, flipKickModel.End, flipKickModel.Urn, flipKickModel.Tab, flipKickModel.TransactionIndex, flipKickModel.LogIndex, flipKickModel.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.FlipKickChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.FlipKickChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()
} }

View File

@ -16,6 +16,7 @@ package flop_kick
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -26,9 +27,9 @@ type FlopKickRepository struct {
} }
func (repository FlopKickRepository) Create(headerID int64, models []interface{}) error { func (repository FlopKickRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, flopKick := range models { for _, flopKick := range models {
flopKickModel, ok := flopKick.(Model) flopKickModel, ok := flopKick.(Model)
@ -37,21 +38,27 @@ func (repository FlopKickRepository) Create(headerID int64, models []interface{}
return fmt.Errorf("model of type %T, not %T", flopKick, Model{}) return fmt.Errorf("model of type %T, not %T", flopKick, Model{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT into maker.flop_kick (header_id, bid_id, lot, bid, gal, "end", tx_idx, log_idx, raw_log) `INSERT into maker.flop_kick (header_id, bid_id, lot, bid, gal, "end", tx_idx, log_idx, raw_log)
VALUES($1, $2::NUMERIC, $3::NUMERIC, $4::NUMERIC, $5, $6, $7, $8, $9)`, VALUES($1, $2::NUMERIC, $3::NUMERIC, $4::NUMERIC, $5, $6, $7, $8, $9)`,
headerID, flopKickModel.BidId, flopKickModel.Lot, flopKickModel.Bid, flopKickModel.Gal, flopKickModel.End, flopKickModel.TransactionIndex, flopKickModel.LogIndex, flopKickModel.Raw, headerID, flopKickModel.BidId, flopKickModel.Lot, flopKickModel.Bid, flopKickModel.Gal, flopKickModel.End, flopKickModel.TransactionIndex, flopKickModel.LogIndex, flopKickModel.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.FlopKickChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.FlopKickChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()

View File

@ -16,6 +16,7 @@ package frob
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
@ -27,29 +28,38 @@ type FrobRepository struct {
} }
func (repository FrobRepository) Create(headerID int64, models []interface{}) error { func (repository FrobRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
frobModel, ok := model.(FrobModel) frobModel, ok := model.(FrobModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, FrobModel{}) return fmt.Errorf("model of type %T, not %T", model, FrobModel{})
} }
_, err = tx.Exec(`INSERT INTO maker.frob (header_id, art, dart, dink, iart, ilk, ink, urn, raw_log, log_idx, tx_idx) _, execErr := tx.Exec(`INSERT INTO maker.frob (header_id, art, dart, dink, iart, ilk, ink, urn, raw_log, log_idx, tx_idx)
VALUES($1, $2::NUMERIC, $3::NUMERIC, $4::NUMERIC, $5::NUMERIC, $6, $7::NUMERIC, $8, $9, $10, $11)`, VALUES($1, $2::NUMERIC, $3::NUMERIC, $4::NUMERIC, $5::NUMERIC, $6, $7::NUMERIC, $8, $9, $10, $11)`,
headerID, frobModel.Art, frobModel.Dart, frobModel.Dink, frobModel.IArt, frobModel.Ilk, frobModel.Ink, frobModel.Urn, frobModel.Raw, frobModel.LogIndex, frobModel.TransactionIndex) headerID, frobModel.Art, frobModel.Dart, frobModel.Dink, frobModel.IArt, frobModel.Ilk, frobModel.Ink, frobModel.Urn, frobModel.Raw, frobModel.LogIndex, frobModel.TransactionIndex)
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.FrobChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.FrobChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()
} }

View File

@ -16,6 +16,7 @@ package debt_ceiling
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -26,34 +27,43 @@ type PitFileDebtCeilingRepository struct {
} }
func (repository PitFileDebtCeilingRepository) Create(headerID int64, models []interface{}) error { func (repository PitFileDebtCeilingRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
pitFileDC, ok := model.(PitFileDebtCeilingModel) pitFileDC, ok := model.(PitFileDebtCeilingModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, PitFileDebtCeilingModel{}) return fmt.Errorf("model of type %T, not %T", model, PitFileDebtCeilingModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT into maker.pit_file_debt_ceiling (header_id, what, data, log_idx, tx_idx, raw_log) `INSERT into maker.pit_file_debt_ceiling (header_id, what, data, log_idx, tx_idx, raw_log)
VALUES($1, $2, $3::NUMERIC, $4, $5, $6)`, VALUES($1, $2, $3::NUMERIC, $4, $5, $6)`,
headerID, pitFileDC.What, pitFileDC.Data, pitFileDC.LogIndex, pitFileDC.TransactionIndex, pitFileDC.Raw, headerID, pitFileDC.What, pitFileDC.Data, pitFileDC.LogIndex, pitFileDC.TransactionIndex, pitFileDC.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.PitFileDebtCeilingChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.PitFileDebtCeilingChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()

View File

@ -16,6 +16,7 @@ package ilk
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -26,33 +27,42 @@ type PitFileIlkRepository struct {
} }
func (repository PitFileIlkRepository) Create(headerID int64, models []interface{}) error { func (repository PitFileIlkRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
pitFileIlk, ok := model.(PitFileIlkModel) pitFileIlk, ok := model.(PitFileIlkModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, PitFileIlkModel{}) return fmt.Errorf("model of type %T, not %T", model, PitFileIlkModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT into maker.pit_file_ilk (header_id, ilk, what, data, log_idx, tx_idx, raw_log) `INSERT into maker.pit_file_ilk (header_id, ilk, what, data, log_idx, tx_idx, raw_log)
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`, VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
headerID, pitFileIlk.Ilk, pitFileIlk.What, pitFileIlk.Data, pitFileIlk.LogIndex, pitFileIlk.TransactionIndex, pitFileIlk.Raw, headerID, pitFileIlk.Ilk, pitFileIlk.What, pitFileIlk.Data, pitFileIlk.LogIndex, pitFileIlk.TransactionIndex, pitFileIlk.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.PitFileIlkChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.PitFileIlkChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()
} }

View File

@ -16,6 +16,7 @@ package price_feeds
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -26,29 +27,38 @@ type PriceFeedRepository struct {
} }
func (repository PriceFeedRepository) Create(headerID int64, models []interface{}) error { func (repository PriceFeedRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
priceUpdate, ok := model.(PriceFeedModel) priceUpdate, ok := model.(PriceFeedModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, PriceFeedModel{}) return fmt.Errorf("model of type %T, not %T", model, PriceFeedModel{})
} }
_, err = tx.Exec(`INSERT INTO maker.price_feeds (block_number, header_id, medianizer_address, usd_value, log_idx, tx_idx, raw_log) _, execErr := tx.Exec(`INSERT INTO maker.price_feeds (block_number, header_id, medianizer_address, usd_value, log_idx, tx_idx, raw_log)
VALUES ($1, $2, $3, $4::NUMERIC, $5, $6, $7)`, priceUpdate.BlockNumber, headerID, priceUpdate.MedianizerAddress, priceUpdate.UsdValue, priceUpdate.LogIndex, priceUpdate.TransactionIndex, priceUpdate.Raw) VALUES ($1, $2, $3, $4::NUMERIC, $5, $6, $7)`, priceUpdate.BlockNumber, headerID, priceUpdate.MedianizerAddress, priceUpdate.UsdValue, priceUpdate.LogIndex, priceUpdate.TransactionIndex, priceUpdate.Raw)
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.PriceFeedsChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.PriceFeedsChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()
} }

View File

@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
) )
func MarkHeaderChecked(headerID int64, db *postgres.DB, checkedHeadersColumn string) error { func MarkHeaderChecked(headerID int64, db *postgres.DB, checkedHeadersColumn string) error {
@ -100,3 +101,14 @@ func CreateNotCheckedSQL(boolColumns []string) string {
return result.String() return result.String()
} }
func GetTicInTx(headerID int64, tx *sql.Tx) (int64, error) {
var blockTimestamp int64
err := tx.QueryRow(`SELECT block_timestamp FROM public.headers WHERE id = $1;`, headerID).Scan(&blockTimestamp)
if err != nil {
return 0, err
}
tic := blockTimestamp + constants.TTL
return tic, nil
}

View File

@ -15,7 +15,6 @@
package shared package shared
import ( import (
"database/sql"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
"math/big" "math/big"
) )
@ -79,15 +78,3 @@ func convert(conversion string, value string, precision int) string {
} }
return result.Text('f', precision) return result.Text('f', precision)
} }
// Grabs the block timestamp for an headerID, and adds the TTL constant
func GetTicInTx(headerID int64, tx *sql.Tx) (int64, error) {
var blockTimestamp int64
err := tx.QueryRow(`SELECT block_timestamp FROM public.headers WHERE id = $1;`, headerID).Scan(&blockTimestamp)
if err != nil {
return 0, err
}
tic := blockTimestamp + constants.TTL
return tic, nil
}

View File

@ -16,6 +16,7 @@ package tend
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -26,39 +27,52 @@ type TendRepository struct {
} }
func (repository TendRepository) Create(headerID int64, models []interface{}) error { func (repository TendRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
tic, err := shared.GetTicInTx(headerID, tx) tic, getTicErr := shared.GetTicInTx(headerID, tx)
if err != nil { if getTicErr != nil {
return err rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return getTicErr
} }
for _, model := range models { for _, model := range models {
tend, ok := model.(TendModel) tend, ok := model.(TendModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, TendModel{}) return fmt.Errorf("model of type %T, not %T", model, TendModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT into maker.tend (header_id, bid_id, lot, bid, guy, tic, log_idx, tx_idx, raw_log) `INSERT into maker.tend (header_id, bid_id, lot, bid, guy, tic, log_idx, tx_idx, raw_log)
VALUES($1, $2, $3::NUMERIC, $4::NUMERIC, $5, $6, $7, $8, $9)`, VALUES($1, $2, $3::NUMERIC, $4::NUMERIC, $5, $6, $7, $8, $9)`,
headerID, tend.BidId, tend.Lot, tend.Bid, tend.Guy, tic, tend.LogIndex, tend.TransactionIndex, tend.Raw, headerID, tend.BidId, tend.Lot, tend.Bid, tend.Guy, tic, tend.LogIndex, tend.TransactionIndex, tend.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.TendChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.TendChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()
} }

View File

@ -16,6 +16,7 @@ package vat_flux
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -26,31 +27,40 @@ type VatFluxRepository struct {
} }
func (repository VatFluxRepository) Create(headerID int64, models []interface{}) error { func (repository VatFluxRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
vatFlux, ok := model.(VatFluxModel) vatFlux, ok := model.(VatFluxModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, VatFluxModel{}) return fmt.Errorf("model of type %T, not %T", model, VatFluxModel{})
} }
_, err = tx.Exec(`INSERT INTO maker.vat_flux (header_id, ilk, dst, src, rad, tx_idx, log_idx, raw_log) _, execErr := tx.Exec(`INSERT INTO maker.vat_flux (header_id, ilk, dst, src, rad, tx_idx, log_idx, raw_log)
VALUES($1, $2, $3, $4, $5::NUMERIC, $6, $7, $8)`, VALUES($1, $2, $3, $4, $5::NUMERIC, $6, $7, $8)`,
headerID, vatFlux.Ilk, vatFlux.Dst, vatFlux.Src, vatFlux.Rad, vatFlux.TransactionIndex, vatFlux.LogIndex, vatFlux.Raw) headerID, vatFlux.Ilk, vatFlux.Dst, vatFlux.Src, vatFlux.Rad, vatFlux.TransactionIndex, vatFlux.LogIndex, vatFlux.Raw)
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatFluxChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatFluxChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()

View File

@ -16,6 +16,7 @@ package vat_fold
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
@ -27,32 +28,41 @@ type VatFoldRepository struct {
} }
func (repository VatFoldRepository) Create(headerID int64, models []interface{}) error { func (repository VatFoldRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
vatFold, ok := model.(VatFoldModel) vatFold, ok := model.(VatFoldModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, VatFoldModel{}) return fmt.Errorf("model of type %T, not %T", model, VatFoldModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT into maker.vat_fold (header_id, ilk, urn, rate, log_idx, tx_idx, raw_log) `INSERT into maker.vat_fold (header_id, ilk, urn, rate, log_idx, tx_idx, raw_log)
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`, VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
headerID, vatFold.Ilk, vatFold.Urn, vatFold.Rate, vatFold.LogIndex, vatFold.TransactionIndex, vatFold.Raw, headerID, vatFold.Ilk, vatFold.Urn, vatFold.Rate, vatFold.LogIndex, vatFold.TransactionIndex, vatFold.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatFoldChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatFoldChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()

View File

@ -2,6 +2,7 @@ package vat_grab
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
@ -13,31 +14,40 @@ type VatGrabRepository struct {
} }
func (repository VatGrabRepository) Create(headerID int64, models []interface{}) error { func (repository VatGrabRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
vatGrab, ok := model.(VatGrabModel) vatGrab, ok := model.(VatGrabModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, VatGrabModel{}) return fmt.Errorf("model of type %T, not %T", model, VatGrabModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT into maker.vat_grab (header_id, ilk, urn, v, w, dink, dart, log_idx, tx_idx, raw_log) `INSERT into maker.vat_grab (header_id, ilk, urn, v, w, dink, dart, log_idx, tx_idx, raw_log)
VALUES($1, $2, $3, $4, $5, $6::NUMERIC, $7::NUMERIC, $8, $9, $10)`, VALUES($1, $2, $3, $4, $5, $6::NUMERIC, $7::NUMERIC, $8, $9, $10)`,
headerID, vatGrab.Ilk, vatGrab.Urn, vatGrab.V, vatGrab.W, vatGrab.Dink, vatGrab.Dart, vatGrab.LogIndex, vatGrab.TransactionIndex, vatGrab.Raw, headerID, vatGrab.Ilk, vatGrab.Urn, vatGrab.V, vatGrab.W, vatGrab.Dink, vatGrab.Dart, vatGrab.LogIndex, vatGrab.TransactionIndex, vatGrab.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatGrabChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatGrabChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()
} }

View File

@ -16,6 +16,7 @@ package vat_heal
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
@ -31,31 +32,40 @@ func (repository *VatHealRepository) SetDB(db *postgres.DB) {
} }
func (repository VatHealRepository) Create(headerID int64, models []interface{}) error { func (repository VatHealRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
vatHeal, ok := model.(VatHealModel) vatHeal, ok := model.(VatHealModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, VatHealModel{}) return fmt.Errorf("model of type %T, not %T", model, VatHealModel{})
} }
_, err := tx.Exec(`INSERT INTO maker.vat_heal (header_id, urn, v, rad, log_idx, tx_idx, raw_log) _, execErr := tx.Exec(`INSERT INTO maker.vat_heal (header_id, urn, v, rad, log_idx, tx_idx, raw_log)
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`, VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
headerID, vatHeal.Urn, vatHeal.V, vatHeal.Rad, vatHeal.LogIndex, vatHeal.TransactionIndex, vatHeal.Raw) headerID, vatHeal.Urn, vatHeal.V, vatHeal.Rad, vatHeal.LogIndex, vatHeal.TransactionIndex, vatHeal.Raw)
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatHealChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatHealChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()
} }

View File

@ -16,6 +16,7 @@ package vat_init
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -26,33 +27,42 @@ type VatInitRepository struct {
} }
func (repository VatInitRepository) Create(headerID int64, models []interface{}) error { func (repository VatInitRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
vatInit, ok := model.(VatInitModel) vatInit, ok := model.(VatInitModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, VatInitModel{}) return fmt.Errorf("model of type %T, not %T", model, VatInitModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT INTO maker.vat_init (header_id, ilk, log_idx, tx_idx, raw_log) `INSERT INTO maker.vat_init (header_id, ilk, log_idx, tx_idx, raw_log)
VALUES($1, $2, $3, $4, $5)`, VALUES($1, $2, $3, $4, $5)`,
headerID, vatInit.Ilk, vatInit.LogIndex, vatInit.TransactionIndex, vatInit.Raw, headerID, vatInit.Ilk, vatInit.LogIndex, vatInit.TransactionIndex, vatInit.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatInitChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatInitChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()

View File

@ -16,6 +16,7 @@ package vat_move
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -26,34 +27,43 @@ type VatMoveRepository struct {
} }
func (repository VatMoveRepository) Create(headerID int64, models []interface{}) error { func (repository VatMoveRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
vatMove, ok := model.(VatMoveModel) vatMove, ok := model.(VatMoveModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, VatMoveModel{}) return fmt.Errorf("model of type %T, not %T", model, VatMoveModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT INTO maker.vat_move (header_id, src, dst, rad, log_idx, tx_idx, raw_log) `INSERT INTO maker.vat_move (header_id, src, dst, rad, log_idx, tx_idx, raw_log)
VALUES ($1, $2, $3, $4::NUMERIC, $5, $6, $7)`, VALUES ($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
headerID, vatMove.Src, vatMove.Dst, vatMove.Rad, vatMove.LogIndex, vatMove.TransactionIndex, vatMove.Raw, headerID, vatMove.Src, vatMove.Dst, vatMove.Rad, vatMove.LogIndex, vatMove.TransactionIndex, vatMove.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatMoveChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatMoveChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()

View File

@ -2,6 +2,7 @@ package vat_slip
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -12,32 +13,41 @@ type VatSlipRepository struct {
} }
func (repository VatSlipRepository) Create(headerID int64, models []interface{}) error { func (repository VatSlipRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
vatSlip, ok := model.(VatSlipModel) vatSlip, ok := model.(VatSlipModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, VatSlipModel{}) return fmt.Errorf("model of type %T, not %T", model, VatSlipModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT into maker.vat_slip (header_id, ilk, guy, rad, tx_idx, log_idx, raw_log) `INSERT into maker.vat_slip (header_id, ilk, guy, rad, tx_idx, log_idx, raw_log)
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`, VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
headerID, vatSlip.Ilk, vatSlip.Guy, vatSlip.Rad, vatSlip.TransactionIndex, vatSlip.LogIndex, vatSlip.Raw, headerID, vatSlip.Ilk, vatSlip.Guy, vatSlip.Rad, vatSlip.TransactionIndex, vatSlip.LogIndex, vatSlip.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatSlipChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatSlipChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()

View File

@ -2,6 +2,7 @@ package vat_toll
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -12,32 +13,41 @@ type VatTollRepository struct {
} }
func (repository VatTollRepository) Create(headerID int64, models []interface{}) error { func (repository VatTollRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
vatToll, ok := model.(VatTollModel) vatToll, ok := model.(VatTollModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, VatTollModel{}) return fmt.Errorf("model of type %T, not %T", model, VatTollModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT into maker.vat_toll (header_id, ilk, urn, take, tx_idx, log_idx, raw_log) `INSERT into maker.vat_toll (header_id, ilk, urn, take, tx_idx, log_idx, raw_log)
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`, VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
headerID, vatToll.Ilk, vatToll.Urn, vatToll.Take, vatToll.TransactionIndex, vatToll.LogIndex, vatToll.Raw, headerID, vatToll.Ilk, vatToll.Urn, vatToll.Take, vatToll.TransactionIndex, vatToll.LogIndex, vatToll.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatTollChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatTollChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()
} }

View File

@ -2,6 +2,7 @@ package vat_tune
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -12,32 +13,41 @@ type VatTuneRepository struct {
} }
func (repository VatTuneRepository) Create(headerID int64, models []interface{}) error { func (repository VatTuneRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
vatTune, ok := model.(VatTuneModel) vatTune, ok := model.(VatTuneModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, VatTuneModel{}) return fmt.Errorf("model of type %T, not %T", model, VatTuneModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT into maker.vat_tune (header_id, ilk, urn, v, w, dink, dart, tx_idx, log_idx, raw_log) `INSERT into maker.vat_tune (header_id, ilk, urn, v, w, dink, dart, tx_idx, log_idx, raw_log)
VALUES($1, $2, $3, $4, $5, $6::NUMERIC, $7::NUMERIC, $8, $9, $10)`, VALUES($1, $2, $3, $4, $5, $6::NUMERIC, $7::NUMERIC, $8, $9, $10)`,
headerID, vatTune.Ilk, vatTune.Urn, vatTune.V, vatTune.W, vatTune.Dink, vatTune.Dart, vatTune.TransactionIndex, vatTune.LogIndex, vatTune.Raw, headerID, vatTune.Ilk, vatTune.Urn, vatTune.V, vatTune.W, vatTune.Dink, vatTune.Dart, vatTune.TransactionIndex, vatTune.LogIndex, vatTune.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatTuneChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatTuneChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()
} }

View File

@ -16,6 +16,7 @@ package vow_flog
import ( import (
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
@ -26,34 +27,43 @@ type VowFlogRepository struct {
} }
func (repository VowFlogRepository) Create(headerID int64, models []interface{}) error { func (repository VowFlogRepository) Create(headerID int64, models []interface{}) error {
tx, err := repository.db.Begin() tx, dBaseErr := repository.db.Begin()
if err != nil { if dBaseErr != nil {
return err return dBaseErr
} }
for _, model := range models { for _, model := range models {
flog, ok := model.(VowFlogModel) flog, ok := model.(VowFlogModel)
if !ok { if !ok {
tx.Rollback() rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return fmt.Errorf("model of type %T, not %T", model, VowFlogModel{}) return fmt.Errorf("model of type %T, not %T", model, VowFlogModel{})
} }
_, err = tx.Exec( _, execErr := tx.Exec(
`INSERT into maker.vow_flog (header_id, era, log_idx, tx_idx, raw_log) `INSERT into maker.vow_flog (header_id, era, log_idx, tx_idx, raw_log)
VALUES($1, $2::NUMERIC, $3, $4, $5)`, VALUES($1, $2::NUMERIC, $3, $4, $5)`,
headerID, flog.Era, flog.LogIndex, flog.TransactionIndex, flog.Raw, headerID, flog.Era, flog.LogIndex, flog.TransactionIndex, flog.Raw,
) )
if err != nil { if execErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return execErr
} }
} }
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VowFlogChecked) checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VowFlogChecked)
if err != nil { if checkHeaderErr != nil {
tx.Rollback() rollbackErr := tx.Rollback()
return err if rollbackErr != nil {
log.Error("failed to rollback ", rollbackErr)
}
return checkHeaderErr
} }
return tx.Commit() return tx.Commit()