Merge branch 'staging' into batch-historical-headers
This commit is contained in:
commit
f1ba21b505
@ -81,9 +81,15 @@ func lightSync() {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
window := validator.ValidateHeaders()
|
window, err := validator.ValidateHeaders()
|
||||||
|
if err != nil {
|
||||||
|
log.Error("ValidateHeaders failed in lightSync: ", err)
|
||||||
|
}
|
||||||
window.Log(os.Stdout)
|
window.Log(os.Stdout)
|
||||||
case <-missingBlocksPopulated:
|
case n := <-missingBlocksPopulated:
|
||||||
|
if n == 0 {
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
}
|
||||||
go backFillAllHeaders(blockChain, headerRepository, missingBlocksPopulated, startingBlockNumber)
|
go backFillAllHeaders(blockChain, headerRepository, missingBlocksPopulated, startingBlockNumber)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@ package repositories
|
|||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
)
|
)
|
||||||
@ -38,16 +39,21 @@ func (repository HeaderRepository) GetHeader(blockNumber int64) (core.Header, er
|
|||||||
return header, err
|
return header, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository HeaderRepository) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64 {
|
func (repository HeaderRepository) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) ([]int64, error) {
|
||||||
numbers := make([]int64, 0)
|
numbers := make([]int64, 0)
|
||||||
repository.database.Select(&numbers, `SELECT all_block_numbers
|
err := repository.database.Select(&numbers, `SELECT all_block_numbers
|
||||||
FROM (
|
FROM (
|
||||||
SELECT generate_series($1::INT, $2::INT) AS all_block_numbers) series
|
SELECT generate_series($1::INT, $2::INT) AS all_block_numbers) series
|
||||||
WHERE all_block_numbers NOT IN (
|
WHERE all_block_numbers NOT IN (
|
||||||
SELECT block_number FROM headers WHERE eth_node_fingerprint = $3
|
SELECT block_number FROM headers WHERE eth_node_fingerprint = $3
|
||||||
) `,
|
) `,
|
||||||
startingBlockNumber, endingBlockNumber, nodeID)
|
startingBlockNumber, endingBlockNumber, nodeID)
|
||||||
return numbers
|
if err != nil {
|
||||||
|
log.Errorf("MissingBlockNumbers failed to get blocks between %v - %v for node %v",
|
||||||
|
startingBlockNumber, endingBlockNumber, nodeID)
|
||||||
|
return []int64{}, err
|
||||||
|
}
|
||||||
|
return numbers, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository HeaderRepository) HeaderExists(blockNumber int64) (bool, error) {
|
func (repository HeaderRepository) HeaderExists(blockNumber int64) (bool, error) {
|
||||||
|
@ -19,7 +19,6 @@ var _ = Describe("Block header repository", func() {
|
|||||||
rawHeader []byte
|
rawHeader []byte
|
||||||
err error
|
err error
|
||||||
timestamp string
|
timestamp string
|
||||||
node core.Node
|
|
||||||
db *postgres.DB
|
db *postgres.DB
|
||||||
repo repositories.HeaderRepository
|
repo repositories.HeaderRepository
|
||||||
header core.Header
|
header core.Header
|
||||||
@ -30,8 +29,7 @@ var _ = Describe("Block header repository", func() {
|
|||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
timestamp = big.NewInt(123456789).String()
|
timestamp = big.NewInt(123456789).String()
|
||||||
|
|
||||||
node = core.Node{ID: "Fingerprint"}
|
db = test_config.NewTestDB(test_config.NewTestNode())
|
||||||
db = test_config.NewTestDB(node)
|
|
||||||
test_config.CleanTestDB(db)
|
test_config.CleanTestDB(db)
|
||||||
repo = repositories.NewHeaderRepository(db)
|
repo = repositories.NewHeaderRepository(db)
|
||||||
header = core.Header{
|
header = core.Header{
|
||||||
@ -44,7 +42,7 @@ var _ = Describe("Block header repository", func() {
|
|||||||
|
|
||||||
Describe("creating or updating a header", func() {
|
Describe("creating or updating a header", func() {
|
||||||
It("adds a header", func() {
|
It("adds a header", func() {
|
||||||
_, err := repo.CreateOrUpdateHeader(header)
|
_, err = repo.CreateOrUpdateHeader(header)
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
var dbHeader core.Header
|
var dbHeader core.Header
|
||||||
@ -57,7 +55,7 @@ var _ = Describe("Block header repository", func() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
It("adds node data to header", func() {
|
It("adds node data to header", func() {
|
||||||
_, err := repo.CreateOrUpdateHeader(header)
|
_, err = repo.CreateOrUpdateHeader(header)
|
||||||
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
var ethNodeId int64
|
var ethNodeId int64
|
||||||
@ -71,7 +69,7 @@ var _ = Describe("Block header repository", func() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
It("returns valid header exists error if attempting duplicate headers", func() {
|
It("returns valid header exists error if attempting duplicate headers", func() {
|
||||||
_, err := repo.CreateOrUpdateHeader(header)
|
_, err = repo.CreateOrUpdateHeader(header)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
_, err = repo.CreateOrUpdateHeader(header)
|
_, err = repo.CreateOrUpdateHeader(header)
|
||||||
@ -85,8 +83,9 @@ var _ = Describe("Block header repository", func() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
It("replaces header if hash is different", func() {
|
It("replaces header if hash is different", func() {
|
||||||
_, err := repo.CreateOrUpdateHeader(header)
|
_, err = repo.CreateOrUpdateHeader(header)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
headerTwo := core.Header{
|
headerTwo := core.Header{
|
||||||
BlockNumber: header.BlockNumber,
|
BlockNumber: header.BlockNumber,
|
||||||
Hash: common.BytesToHash([]byte{5, 4, 3, 2, 1}).Hex(),
|
Hash: common.BytesToHash([]byte{5, 4, 3, 2, 1}).Hex(),
|
||||||
@ -105,10 +104,12 @@ var _ = Describe("Block header repository", func() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
It("does not replace header if node fingerprint is different", func() {
|
It("does not replace header if node fingerprint is different", func() {
|
||||||
_, err := repo.CreateOrUpdateHeader(header)
|
_, err = repo.CreateOrUpdateHeader(header)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
nodeTwo := core.Node{ID: "FingerprintTwo"}
|
nodeTwo := core.Node{ID: "FingerprintTwo"}
|
||||||
dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo)
|
dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
repoTwo := repositories.NewHeaderRepository(dbTwo)
|
repoTwo := repositories.NewHeaderRepository(dbTwo)
|
||||||
headerTwo := core.Header{
|
headerTwo := core.Header{
|
||||||
BlockNumber: header.BlockNumber,
|
BlockNumber: header.BlockNumber,
|
||||||
@ -127,10 +128,13 @@ var _ = Describe("Block header repository", func() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
It("only replaces header with matching node fingerprint", func() {
|
It("only replaces header with matching node fingerprint", func() {
|
||||||
_, err := repo.CreateOrUpdateHeader(header)
|
_, err = repo.CreateOrUpdateHeader(header)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
nodeTwo := core.Node{ID: "FingerprintTwo"}
|
nodeTwo := core.Node{ID: "FingerprintTwo"}
|
||||||
dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo)
|
dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
repoTwo := repositories.NewHeaderRepository(dbTwo)
|
repoTwo := repositories.NewHeaderRepository(dbTwo)
|
||||||
headerTwo := core.Header{
|
headerTwo := core.Header{
|
||||||
BlockNumber: header.BlockNumber,
|
BlockNumber: header.BlockNumber,
|
||||||
@ -162,7 +166,7 @@ var _ = Describe("Block header repository", func() {
|
|||||||
|
|
||||||
Describe("Getting a header", func() {
|
Describe("Getting a header", func() {
|
||||||
It("returns header if it exists", func() {
|
It("returns header if it exists", func() {
|
||||||
_, err := repo.CreateOrUpdateHeader(header)
|
_, err = repo.CreateOrUpdateHeader(header)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
dbHeader, err := repo.GetHeader(header.BlockNumber)
|
dbHeader, err := repo.GetHeader(header.BlockNumber)
|
||||||
@ -174,9 +178,10 @@ var _ = Describe("Block header repository", func() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
It("does not return header for a different node fingerprint", func() {
|
It("does not return header for a different node fingerprint", func() {
|
||||||
_, err := repo.CreateOrUpdateHeader(header)
|
_, err = repo.CreateOrUpdateHeader(header)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
nodeTwo := core.Node{ID: "NodeFingerprintTwo"}
|
|
||||||
|
nodeTwo := core.Node{ID: "FingerprintTwo"}
|
||||||
dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo)
|
dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
repoTwo := repositories.NewHeaderRepository(dbTwo)
|
repoTwo := repositories.NewHeaderRepository(dbTwo)
|
||||||
@ -211,7 +216,8 @@ var _ = Describe("Block header repository", func() {
|
|||||||
})
|
})
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
missingBlockNumbers := repo.MissingBlockNumbers(1, 5, node.ID)
|
missingBlockNumbers, err := repo.MissingBlockNumbers(1, 5, db.Node.ID)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
Expect(missingBlockNumbers).To(ConsistOf([]int64{2, 4}))
|
Expect(missingBlockNumbers).To(ConsistOf([]int64{2, 4}))
|
||||||
})
|
})
|
||||||
@ -238,12 +244,13 @@ var _ = Describe("Block header repository", func() {
|
|||||||
})
|
})
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
nodeTwo := core.Node{ID: "NodeFingerprintTwo"}
|
nodeTwo := core.Node{ID: "FingerprintTwo"}
|
||||||
dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo)
|
dbTwo, err := postgres.NewDB(test_config.DBConfig, nodeTwo)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
repoTwo := repositories.NewHeaderRepository(dbTwo)
|
repoTwo := repositories.NewHeaderRepository(dbTwo)
|
||||||
|
|
||||||
missingBlockNumbers := repoTwo.MissingBlockNumbers(1, 5, nodeTwo.ID)
|
missingBlockNumbers, err := repoTwo.MissingBlockNumbers(1, 5, nodeTwo.ID)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
Expect(missingBlockNumbers).To(ConsistOf([]int64{1, 2, 3, 4, 5}))
|
Expect(missingBlockNumbers).To(ConsistOf([]int64{1, 2, 3, 4, 5}))
|
||||||
})
|
})
|
||||||
|
@ -40,7 +40,7 @@ type FilterRepository interface {
|
|||||||
type HeaderRepository interface {
|
type HeaderRepository interface {
|
||||||
CreateOrUpdateHeader(header core.Header) (int64, error)
|
CreateOrUpdateHeader(header core.Header) (int64, error)
|
||||||
GetHeader(blockNumber int64) (core.Header, error)
|
GetHeader(blockNumber int64) (core.Header, error)
|
||||||
MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64
|
MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) ([]int64, error)
|
||||||
HeaderExists(blockNumber int64) (bool, error)
|
HeaderExists(blockNumber int64) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,8 +41,8 @@ func (*MockHeaderRepository) GetHeader(blockNumber int64) (core.Header, error) {
|
|||||||
return core.Header{BlockNumber: blockNumber}, nil
|
return core.Header{BlockNumber: blockNumber}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *MockHeaderRepository) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) []int64 {
|
func (repository *MockHeaderRepository) MissingBlockNumbers(startingBlockNumber, endingBlockNumber int64, nodeID string) ([]int64, error) {
|
||||||
return repository.missingBlockNumbers
|
return repository.missingBlockNumbers, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repository *MockHeaderRepository) HeaderExists(blockNumber int64) (bool, error) {
|
func (repository *MockHeaderRepository) HeaderExists(blockNumber int64) (bool, error) {
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package history
|
package history
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore"
|
||||||
)
|
)
|
||||||
@ -19,9 +20,13 @@ func NewHeaderValidator(blockChain core.BlockChain, repository datastore.HeaderR
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (validator HeaderValidator) ValidateHeaders() ValidationWindow {
|
func (validator HeaderValidator) ValidateHeaders() (ValidationWindow, error) {
|
||||||
window := MakeValidationWindow(validator.blockChain, validator.windowSize)
|
window := MakeValidationWindow(validator.blockChain, validator.windowSize)
|
||||||
blockNumbers := MakeRange(window.LowerBound, window.UpperBound)
|
blockNumbers := MakeRange(window.LowerBound, window.UpperBound)
|
||||||
RetrieveAndUpdateHeaders(validator.blockChain, validator.headerRepository, blockNumbers)
|
_, err := RetrieveAndUpdateHeaders(validator.blockChain, validator.headerRepository, blockNumbers)
|
||||||
return window
|
if err != nil {
|
||||||
|
log.Error("Error in ValidateHeaders: ", err)
|
||||||
|
return ValidationWindow{}, err
|
||||||
|
}
|
||||||
|
return window, nil
|
||||||
}
|
}
|
||||||
|
@ -1,22 +1,43 @@
|
|||||||
package history_test
|
package history_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/fakes"
|
"github.com/vulcanize/vulcanizedb/pkg/fakes"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/history"
|
"github.com/vulcanize/vulcanizedb/pkg/history"
|
||||||
"math/big"
|
"math/big"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ = Describe("Header validator", func() {
|
var _ = Describe("Header validator", func() {
|
||||||
|
var (
|
||||||
|
headerRepository *fakes.MockHeaderRepository
|
||||||
|
blockChain *fakes.MockBlockChain
|
||||||
|
)
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
headerRepository = fakes.NewMockHeaderRepository()
|
||||||
|
blockChain = fakes.NewMockBlockChain()
|
||||||
|
})
|
||||||
|
|
||||||
It("attempts to create every header in the validation window", func() {
|
It("attempts to create every header in the validation window", func() {
|
||||||
headerRepository := fakes.NewMockHeaderRepository()
|
|
||||||
headerRepository.SetMissingBlockNumbers([]int64{})
|
headerRepository.SetMissingBlockNumbers([]int64{})
|
||||||
blockChain := fakes.NewMockBlockChain()
|
|
||||||
blockChain.SetLastBlock(big.NewInt(3))
|
blockChain.SetLastBlock(big.NewInt(3))
|
||||||
validator := history.NewHeaderValidator(blockChain, headerRepository, 2)
|
validator := history.NewHeaderValidator(blockChain, headerRepository, 2)
|
||||||
|
|
||||||
validator.ValidateHeaders()
|
_, err := validator.ValidateHeaders()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
headerRepository.AssertCreateOrUpdateHeaderCallCountAndPassedBlockNumbers(3, []int64{1, 2, 3})
|
headerRepository.AssertCreateOrUpdateHeaderCallCountAndPassedBlockNumbers(3, []int64{1, 2, 3})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("propagates header repository errors", func() {
|
||||||
|
blockChain.SetLastBlock(big.NewInt(3))
|
||||||
|
headerRepositoryError := errors.New("CreateOrUpdate")
|
||||||
|
headerRepository.SetCreateOrUpdateHeaderReturnErr(headerRepositoryError)
|
||||||
|
validator := history.NewHeaderValidator(blockChain, headerRepository, 2)
|
||||||
|
|
||||||
|
_, err := validator.ValidateHeaders()
|
||||||
|
Expect(err).To(MatchError(headerRepositoryError))
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
@ -11,13 +11,20 @@ import (
|
|||||||
func PopulateMissingHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, startingBlockNumber int64) (int, error) {
|
func PopulateMissingHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, startingBlockNumber int64) (int, error) {
|
||||||
lastBlock := blockchain.LastBlock().Int64()
|
lastBlock := blockchain.LastBlock().Int64()
|
||||||
headerAlreadyExists, err := headerRepository.HeaderExists(lastBlock)
|
headerAlreadyExists, err := headerRepository.HeaderExists(lastBlock)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Error("Error in checking header in PopulateMissingHeaders: ", err)
|
||||||
return 0, err
|
return 0, err
|
||||||
} else if headerAlreadyExists {
|
} else if headerAlreadyExists {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
blockNumbers := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID)
|
blockNumbers, err := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Error getting missing block numbers in PopulateMissingHeaders: ", err)
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
log.Printf("Backfilling %d blocks\n\n", len(blockNumbers))
|
log.Printf("Backfilling %d blocks\n\n", len(blockNumbers))
|
||||||
_, err = RetrieveAndUpdateHeaders(blockchain, headerRepository, blockNumbers)
|
_, err = RetrieveAndUpdateHeaders(blockchain, headerRepository, blockNumbers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -16,6 +16,7 @@ package bite
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -30,32 +31,41 @@ func (repository *BiteRepository) SetDB(db *postgres.DB) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository BiteRepository) Create(headerID int64, models []interface{}) error {
|
func (repository BiteRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
biteModel, ok := model.(BiteModel)
|
biteModel, ok := model.(BiteModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, BiteModel{})
|
return fmt.Errorf("model of type %T, not %T", model, BiteModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT into maker.bite (header_id, ilk, urn, ink, art, iart, tab, nflip, log_idx, tx_idx, raw_log)
|
`INSERT into maker.bite (header_id, ilk, urn, ink, art, iart, tab, nflip, log_idx, tx_idx, raw_log)
|
||||||
VALUES($1, $2, $3, $4::NUMERIC, $5::NUMERIC, $6::NUMERIC, $7::NUMERIC, $8::NUMERIC, $9, $10, $11)`,
|
VALUES($1, $2, $3, $4::NUMERIC, $5::NUMERIC, $6::NUMERIC, $7::NUMERIC, $8::NUMERIC, $9, $10, $11)`,
|
||||||
headerID, biteModel.Ilk, biteModel.Urn, biteModel.Ink, biteModel.Art, biteModel.IArt, biteModel.Tab, biteModel.NFlip, biteModel.LogIndex, biteModel.TransactionIndex, biteModel.Raw,
|
headerID, biteModel.Ilk, biteModel.Urn, biteModel.Ink, biteModel.Art, biteModel.IArt, biteModel.Tab, biteModel.NFlip, biteModel.LogIndex, biteModel.TransactionIndex, biteModel.Raw,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.BiteChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.BiteChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
|
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
|
@ -16,6 +16,7 @@ package chop_lump
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -26,33 +27,42 @@ type CatFileChopLumpRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository CatFileChopLumpRepository) Create(headerID int64, models []interface{}) error {
|
func (repository CatFileChopLumpRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
chopLump, ok := model.(CatFileChopLumpModel)
|
chopLump, ok := model.(CatFileChopLumpModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, CatFileChopLumpModel{})
|
return fmt.Errorf("model of type %T, not %T", model, CatFileChopLumpModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT into maker.cat_file_chop_lump (header_id, ilk, what, data, tx_idx, log_idx, raw_log)
|
`INSERT into maker.cat_file_chop_lump (header_id, ilk, what, data, tx_idx, log_idx, raw_log)
|
||||||
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
|
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
|
||||||
headerID, chopLump.Ilk, chopLump.What, chopLump.Data, chopLump.TransactionIndex, chopLump.LogIndex, chopLump.Raw,
|
headerID, chopLump.Ilk, chopLump.What, chopLump.Data, chopLump.TransactionIndex, chopLump.LogIndex, chopLump.Raw,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.CatFileChopLumpChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.CatFileChopLumpChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ package flip
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -26,32 +27,41 @@ type CatFileFlipRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository CatFileFlipRepository) Create(headerID int64, models []interface{}) error {
|
func (repository CatFileFlipRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
flip, ok := model.(CatFileFlipModel)
|
flip, ok := model.(CatFileFlipModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, CatFileFlipModel{})
|
return fmt.Errorf("model of type %T, not %T", model, CatFileFlipModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = repository.db.Exec(
|
_, execErr := repository.db.Exec(
|
||||||
`INSERT into maker.cat_file_flip (header_id, ilk, what, flip, tx_idx, log_idx, raw_log)
|
`INSERT into maker.cat_file_flip (header_id, ilk, what, flip, tx_idx, log_idx, raw_log)
|
||||||
VALUES($1, $2, $3, $4, $5, $6, $7)`,
|
VALUES($1, $2, $3, $4, $5, $6, $7)`,
|
||||||
headerID, flip.Ilk, flip.What, flip.Flip, flip.TransactionIndex, flip.LogIndex, flip.Raw,
|
headerID, flip.Ilk, flip.What, flip.Flip, flip.TransactionIndex, flip.LogIndex, flip.Raw,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.CatFileFlipChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.CatFileFlipChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ package pit_vow
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -26,32 +27,41 @@ type CatFilePitVowRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository CatFilePitVowRepository) Create(headerID int64, models []interface{}) error {
|
func (repository CatFilePitVowRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
vow, ok := model.(CatFilePitVowModel)
|
vow, ok := model.(CatFilePitVowModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, CatFilePitVowModel{})
|
return fmt.Errorf("model of type %T, not %T", model, CatFilePitVowModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = repository.db.Exec(
|
_, execErr := repository.db.Exec(
|
||||||
`INSERT into maker.cat_file_pit_vow (header_id, what, data, tx_idx, log_idx, raw_log)
|
`INSERT into maker.cat_file_pit_vow (header_id, what, data, tx_idx, log_idx, raw_log)
|
||||||
VALUES($1, $2, $3, $4, $5, $6)`,
|
VALUES($1, $2, $3, $4, $5, $6)`,
|
||||||
headerID, vow.What, vow.Data, vow.TransactionIndex, vow.LogIndex, vow.Raw,
|
headerID, vow.What, vow.Data, vow.TransactionIndex, vow.LogIndex, vow.Raw,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.CatFilePitVowChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.CatFilePitVowChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ package deal
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -26,33 +27,42 @@ type DealRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository DealRepository) Create(headerID int64, models []interface{}) error {
|
func (repository DealRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
dealModel, ok := model.(DealModel)
|
dealModel, ok := model.(DealModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, DealModel{})
|
return fmt.Errorf("model of type %T, not %T", model, DealModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT into maker.deal (header_id, bid_id, contract_address, log_idx, tx_idx, raw_log)
|
`INSERT into maker.deal (header_id, bid_id, contract_address, log_idx, tx_idx, raw_log)
|
||||||
VALUES($1, $2, $3, $4, $5, $6)`,
|
VALUES($1, $2, $3, $4, $5, $6)`,
|
||||||
headerID, dealModel.BidId, dealModel.ContractAddress, dealModel.LogIndex, dealModel.TransactionIndex, dealModel.Raw,
|
headerID, dealModel.BidId, dealModel.ContractAddress, dealModel.LogIndex, dealModel.TransactionIndex, dealModel.Raw,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DealChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DealChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ package dent
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -26,38 +27,51 @@ type DentRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository DentRepository) Create(headerID int64, models []interface{}) error {
|
func (repository DentRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
|
|
||||||
tic, err := shared.GetTicInTx(headerID, tx)
|
tic, getTicErr := shared.GetTicInTx(headerID, tx)
|
||||||
if err != nil {
|
if getTicErr != nil {
|
||||||
return err
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return getTicErr
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
dent, ok := model.(DentModel)
|
dent, ok := model.(DentModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, DentModel{})
|
return fmt.Errorf("model of type %T, not %T", model, DentModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT into maker.dent (header_id, bid_id, lot, bid, guy, tic, log_idx, tx_idx, raw_log)
|
`INSERT into maker.dent (header_id, bid_id, lot, bid, guy, tic, log_idx, tx_idx, raw_log)
|
||||||
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
|
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
|
||||||
headerID, dent.BidId, dent.Lot, dent.Bid, dent.Guy, tic, dent.LogIndex, dent.TransactionIndex, dent.Raw,
|
headerID, dent.BidId, dent.Lot, dent.Bid, dent.Guy, tic, dent.LogIndex, dent.TransactionIndex, dent.Raw,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DentChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DentChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ package drip_drip
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -26,32 +27,41 @@ type DripDripRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository DripDripRepository) Create(headerID int64, models []interface{}) error {
|
func (repository DripDripRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
dripDrip, ok := model.(DripDripModel)
|
dripDrip, ok := model.(DripDripModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, DripDripModel{})
|
return fmt.Errorf("model of type %T, not %T", model, DripDripModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT into maker.drip_drip (header_id, ilk, log_idx, tx_idx, raw_log)
|
`INSERT into maker.drip_drip (header_id, ilk, log_idx, tx_idx, raw_log)
|
||||||
VALUES($1, $2, $3, $4, $5)`,
|
VALUES($1, $2, $3, $4, $5)`,
|
||||||
headerID, dripDrip.Ilk, dripDrip.LogIndex, dripDrip.TransactionIndex, dripDrip.Raw,
|
headerID, dripDrip.Ilk, dripDrip.LogIndex, dripDrip.TransactionIndex, dripDrip.Raw,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripDripChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripDripChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ package ilk
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -26,34 +27,43 @@ type DripFileIlkRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository DripFileIlkRepository) Create(headerID int64, models []interface{}) error {
|
func (repository DripFileIlkRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
ilk, ok := model.(DripFileIlkModel)
|
ilk, ok := model.(DripFileIlkModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, DripFileIlkModel{})
|
return fmt.Errorf("model of type %T, not %T", model, DripFileIlkModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT into maker.drip_file_ilk (header_id, ilk, vow, tax, log_idx, tx_idx, raw_log)
|
`INSERT into maker.drip_file_ilk (header_id, ilk, vow, tax, log_idx, tx_idx, raw_log)
|
||||||
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
|
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
|
||||||
headerID, ilk.Ilk, ilk.Vow, ilk.Tax, ilk.LogIndex, ilk.TransactionIndex, ilk.Raw,
|
headerID, ilk.Ilk, ilk.Vow, ilk.Tax, ilk.LogIndex, ilk.TransactionIndex, ilk.Raw,
|
||||||
)
|
)
|
||||||
|
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripFileIlkChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripFileIlkChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
|
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
|
@ -16,6 +16,7 @@ package repo
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -26,34 +27,43 @@ type DripFileRepoRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository DripFileRepoRepository) Create(headerID int64, models []interface{}) error {
|
func (repository DripFileRepoRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
repo, ok := model.(DripFileRepoModel)
|
repo, ok := model.(DripFileRepoModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, DripFileRepoModel{})
|
return fmt.Errorf("model of type %T, not %T", model, DripFileRepoModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT into maker.drip_file_repo (header_id, what, data, log_idx, tx_idx, raw_log)
|
`INSERT into maker.drip_file_repo (header_id, what, data, log_idx, tx_idx, raw_log)
|
||||||
VALUES($1, $2, $3::NUMERIC, $4, $5, $6)`,
|
VALUES($1, $2, $3::NUMERIC, $4, $5, $6)`,
|
||||||
headerID, repo.What, repo.Data, repo.LogIndex, repo.TransactionIndex, repo.Raw,
|
headerID, repo.What, repo.Data, repo.LogIndex, repo.TransactionIndex, repo.Raw,
|
||||||
)
|
)
|
||||||
|
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripFileRepoChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripFileRepoChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
|
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
|
@ -16,6 +16,7 @@ package vow
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -26,33 +27,42 @@ type DripFileVowRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository DripFileVowRepository) Create(headerID int64, models []interface{}) error {
|
func (repository DripFileVowRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
vow, ok := model.(DripFileVowModel)
|
vow, ok := model.(DripFileVowModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, DripFileVowModel{})
|
return fmt.Errorf("model of type %T, not %T", model, DripFileVowModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT into maker.drip_file_vow (header_id, what, data, log_idx, tx_idx, raw_log)
|
`INSERT into maker.drip_file_vow (header_id, what, data, log_idx, tx_idx, raw_log)
|
||||||
VALUES($1, $2, $3, $4, $5, $6)`,
|
VALUES($1, $2, $3, $4, $5, $6)`,
|
||||||
headerID, vow.What, vow.Data, vow.LogIndex, vow.TransactionIndex, vow.Raw,
|
headerID, vow.What, vow.Data, vow.LogIndex, vow.TransactionIndex, vow.Raw,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripFileVowChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.DripFileVowChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
|
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
|
@ -16,6 +16,7 @@ package flap_kick
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -26,9 +27,9 @@ type FlapKickRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository *FlapKickRepository) Create(headerID int64, models []interface{}) error {
|
func (repository *FlapKickRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
flapKickModel, ok := model.(FlapKickModel)
|
flapKickModel, ok := model.(FlapKickModel)
|
||||||
@ -36,21 +37,27 @@ func (repository *FlapKickRepository) Create(headerID int64, models []interface{
|
|||||||
return fmt.Errorf("model of type %T, not %T", model, FlapKickModel{})
|
return fmt.Errorf("model of type %T, not %T", model, FlapKickModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT into maker.flap_kick (header_id, bid_id, lot, bid, gal, "end", tx_idx, log_idx, raw_log)
|
`INSERT into maker.flap_kick (header_id, bid_id, lot, bid, gal, "end", tx_idx, log_idx, raw_log)
|
||||||
VALUES($1, $2::NUMERIC, $3::NUMERIC, $4::NUMERIC, $5, $6, $7, $8, $9)`,
|
VALUES($1, $2::NUMERIC, $3::NUMERIC, $4::NUMERIC, $5, $6, $7, $8, $9)`,
|
||||||
headerID, flapKickModel.BidId, flapKickModel.Lot, flapKickModel.Bid, flapKickModel.Gal, flapKickModel.End, flapKickModel.TransactionIndex, flapKickModel.LogIndex, flapKickModel.Raw,
|
headerID, flapKickModel.BidId, flapKickModel.Lot, flapKickModel.Bid, flapKickModel.Gal, flapKickModel.End, flapKickModel.TransactionIndex, flapKickModel.LogIndex, flapKickModel.Raw,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.FlapKickChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.FlapKickChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ package flip_kick
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
@ -27,9 +28,9 @@ type FlipKickRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository FlipKickRepository) Create(headerID int64, models []interface{}) error {
|
func (repository FlipKickRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
flipKickModel, ok := model.(FlipKickModel)
|
flipKickModel, ok := model.(FlipKickModel)
|
||||||
@ -37,20 +38,26 @@ func (repository FlipKickRepository) Create(headerID int64, models []interface{}
|
|||||||
return fmt.Errorf("model of type %T, not %T", model, FlipKickModel{})
|
return fmt.Errorf("model of type %T, not %T", model, FlipKickModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT into maker.flip_kick (header_id, bid_id, lot, bid, gal, "end", urn, tab, tx_idx, log_idx, raw_log)
|
`INSERT into maker.flip_kick (header_id, bid_id, lot, bid, gal, "end", urn, tab, tx_idx, log_idx, raw_log)
|
||||||
VALUES($1, $2::NUMERIC, $3::NUMERIC, $4::NUMERIC, $5, $6, $7, $8::NUMERIC, $9, $10, $11)`,
|
VALUES($1, $2::NUMERIC, $3::NUMERIC, $4::NUMERIC, $5, $6, $7, $8::NUMERIC, $9, $10, $11)`,
|
||||||
headerID, flipKickModel.BidId, flipKickModel.Lot, flipKickModel.Bid, flipKickModel.Gal, flipKickModel.End, flipKickModel.Urn, flipKickModel.Tab, flipKickModel.TransactionIndex, flipKickModel.LogIndex, flipKickModel.Raw,
|
headerID, flipKickModel.BidId, flipKickModel.Lot, flipKickModel.Bid, flipKickModel.Gal, flipKickModel.End, flipKickModel.Urn, flipKickModel.Tab, flipKickModel.TransactionIndex, flipKickModel.LogIndex, flipKickModel.Raw,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.FlipKickChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.FlipKickChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ package flop_kick
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -26,9 +27,9 @@ type FlopKickRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository FlopKickRepository) Create(headerID int64, models []interface{}) error {
|
func (repository FlopKickRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
for _, flopKick := range models {
|
for _, flopKick := range models {
|
||||||
flopKickModel, ok := flopKick.(Model)
|
flopKickModel, ok := flopKick.(Model)
|
||||||
@ -37,21 +38,27 @@ func (repository FlopKickRepository) Create(headerID int64, models []interface{}
|
|||||||
return fmt.Errorf("model of type %T, not %T", flopKick, Model{})
|
return fmt.Errorf("model of type %T, not %T", flopKick, Model{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT into maker.flop_kick (header_id, bid_id, lot, bid, gal, "end", tx_idx, log_idx, raw_log)
|
`INSERT into maker.flop_kick (header_id, bid_id, lot, bid, gal, "end", tx_idx, log_idx, raw_log)
|
||||||
VALUES($1, $2::NUMERIC, $3::NUMERIC, $4::NUMERIC, $5, $6, $7, $8, $9)`,
|
VALUES($1, $2::NUMERIC, $3::NUMERIC, $4::NUMERIC, $5, $6, $7, $8, $9)`,
|
||||||
headerID, flopKickModel.BidId, flopKickModel.Lot, flopKickModel.Bid, flopKickModel.Gal, flopKickModel.End, flopKickModel.TransactionIndex, flopKickModel.LogIndex, flopKickModel.Raw,
|
headerID, flopKickModel.BidId, flopKickModel.Lot, flopKickModel.Bid, flopKickModel.Gal, flopKickModel.End, flopKickModel.TransactionIndex, flopKickModel.LogIndex, flopKickModel.Raw,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.FlopKickChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.FlopKickChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
|
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
|
@ -16,6 +16,7 @@ package frob
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
@ -27,29 +28,38 @@ type FrobRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository FrobRepository) Create(headerID int64, models []interface{}) error {
|
func (repository FrobRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
frobModel, ok := model.(FrobModel)
|
frobModel, ok := model.(FrobModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, FrobModel{})
|
return fmt.Errorf("model of type %T, not %T", model, FrobModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(`INSERT INTO maker.frob (header_id, art, dart, dink, iart, ilk, ink, urn, raw_log, log_idx, tx_idx)
|
_, execErr := tx.Exec(`INSERT INTO maker.frob (header_id, art, dart, dink, iart, ilk, ink, urn, raw_log, log_idx, tx_idx)
|
||||||
VALUES($1, $2::NUMERIC, $3::NUMERIC, $4::NUMERIC, $5::NUMERIC, $6, $7::NUMERIC, $8, $9, $10, $11)`,
|
VALUES($1, $2::NUMERIC, $3::NUMERIC, $4::NUMERIC, $5::NUMERIC, $6, $7::NUMERIC, $8, $9, $10, $11)`,
|
||||||
headerID, frobModel.Art, frobModel.Dart, frobModel.Dink, frobModel.IArt, frobModel.Ilk, frobModel.Ink, frobModel.Urn, frobModel.Raw, frobModel.LogIndex, frobModel.TransactionIndex)
|
headerID, frobModel.Art, frobModel.Dart, frobModel.Dink, frobModel.IArt, frobModel.Ilk, frobModel.Ink, frobModel.Urn, frobModel.Raw, frobModel.LogIndex, frobModel.TransactionIndex)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.FrobChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.FrobChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ package debt_ceiling
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -26,34 +27,43 @@ type PitFileDebtCeilingRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository PitFileDebtCeilingRepository) Create(headerID int64, models []interface{}) error {
|
func (repository PitFileDebtCeilingRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
pitFileDC, ok := model.(PitFileDebtCeilingModel)
|
pitFileDC, ok := model.(PitFileDebtCeilingModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, PitFileDebtCeilingModel{})
|
return fmt.Errorf("model of type %T, not %T", model, PitFileDebtCeilingModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT into maker.pit_file_debt_ceiling (header_id, what, data, log_idx, tx_idx, raw_log)
|
`INSERT into maker.pit_file_debt_ceiling (header_id, what, data, log_idx, tx_idx, raw_log)
|
||||||
VALUES($1, $2, $3::NUMERIC, $4, $5, $6)`,
|
VALUES($1, $2, $3::NUMERIC, $4, $5, $6)`,
|
||||||
headerID, pitFileDC.What, pitFileDC.Data, pitFileDC.LogIndex, pitFileDC.TransactionIndex, pitFileDC.Raw,
|
headerID, pitFileDC.What, pitFileDC.Data, pitFileDC.LogIndex, pitFileDC.TransactionIndex, pitFileDC.Raw,
|
||||||
)
|
)
|
||||||
|
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.PitFileDebtCeilingChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.PitFileDebtCeilingChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
|
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
|
@ -16,6 +16,7 @@ package ilk
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -26,33 +27,42 @@ type PitFileIlkRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository PitFileIlkRepository) Create(headerID int64, models []interface{}) error {
|
func (repository PitFileIlkRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
pitFileIlk, ok := model.(PitFileIlkModel)
|
pitFileIlk, ok := model.(PitFileIlkModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, PitFileIlkModel{})
|
return fmt.Errorf("model of type %T, not %T", model, PitFileIlkModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT into maker.pit_file_ilk (header_id, ilk, what, data, log_idx, tx_idx, raw_log)
|
`INSERT into maker.pit_file_ilk (header_id, ilk, what, data, log_idx, tx_idx, raw_log)
|
||||||
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
|
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
|
||||||
headerID, pitFileIlk.Ilk, pitFileIlk.What, pitFileIlk.Data, pitFileIlk.LogIndex, pitFileIlk.TransactionIndex, pitFileIlk.Raw,
|
headerID, pitFileIlk.Ilk, pitFileIlk.What, pitFileIlk.Data, pitFileIlk.LogIndex, pitFileIlk.TransactionIndex, pitFileIlk.Raw,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.PitFileIlkChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.PitFileIlkChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ package price_feeds
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -26,29 +27,38 @@ type PriceFeedRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository PriceFeedRepository) Create(headerID int64, models []interface{}) error {
|
func (repository PriceFeedRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
priceUpdate, ok := model.(PriceFeedModel)
|
priceUpdate, ok := model.(PriceFeedModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, PriceFeedModel{})
|
return fmt.Errorf("model of type %T, not %T", model, PriceFeedModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(`INSERT INTO maker.price_feeds (block_number, header_id, medianizer_address, usd_value, log_idx, tx_idx, raw_log)
|
_, execErr := tx.Exec(`INSERT INTO maker.price_feeds (block_number, header_id, medianizer_address, usd_value, log_idx, tx_idx, raw_log)
|
||||||
VALUES ($1, $2, $3, $4::NUMERIC, $5, $6, $7)`, priceUpdate.BlockNumber, headerID, priceUpdate.MedianizerAddress, priceUpdate.UsdValue, priceUpdate.LogIndex, priceUpdate.TransactionIndex, priceUpdate.Raw)
|
VALUES ($1, $2, $3, $4::NUMERIC, $5, $6, $7)`, priceUpdate.BlockNumber, headerID, priceUpdate.MedianizerAddress, priceUpdate.UsdValue, priceUpdate.LogIndex, priceUpdate.TransactionIndex, priceUpdate.Raw)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.PriceFeedsChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.PriceFeedsChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
)
|
)
|
||||||
|
|
||||||
func MarkHeaderChecked(headerID int64, db *postgres.DB, checkedHeadersColumn string) error {
|
func MarkHeaderChecked(headerID int64, db *postgres.DB, checkedHeadersColumn string) error {
|
||||||
@ -100,3 +101,14 @@ func CreateNotCheckedSQL(boolColumns []string) string {
|
|||||||
|
|
||||||
return result.String()
|
return result.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetTicInTx(headerID int64, tx *sql.Tx) (int64, error) {
|
||||||
|
var blockTimestamp int64
|
||||||
|
err := tx.QueryRow(`SELECT block_timestamp FROM public.headers WHERE id = $1;`, headerID).Scan(&blockTimestamp)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
tic := blockTimestamp + constants.TTL
|
||||||
|
return tic, nil
|
||||||
|
}
|
||||||
|
@ -15,7 +15,6 @@
|
|||||||
package shared
|
package shared
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
"math/big"
|
"math/big"
|
||||||
)
|
)
|
||||||
@ -79,15 +78,3 @@ func convert(conversion string, value string, precision int) string {
|
|||||||
}
|
}
|
||||||
return result.Text('f', precision)
|
return result.Text('f', precision)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Grabs the block timestamp for an headerID, and adds the TTL constant
|
|
||||||
func GetTicInTx(headerID int64, tx *sql.Tx) (int64, error) {
|
|
||||||
var blockTimestamp int64
|
|
||||||
err := tx.QueryRow(`SELECT block_timestamp FROM public.headers WHERE id = $1;`, headerID).Scan(&blockTimestamp)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
tic := blockTimestamp + constants.TTL
|
|
||||||
return tic, nil
|
|
||||||
}
|
|
||||||
|
@ -16,6 +16,7 @@ package tend
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -26,39 +27,52 @@ type TendRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository TendRepository) Create(headerID int64, models []interface{}) error {
|
func (repository TendRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
|
|
||||||
tic, err := shared.GetTicInTx(headerID, tx)
|
tic, getTicErr := shared.GetTicInTx(headerID, tx)
|
||||||
if err != nil {
|
if getTicErr != nil {
|
||||||
return err
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return getTicErr
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
tend, ok := model.(TendModel)
|
tend, ok := model.(TendModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, TendModel{})
|
return fmt.Errorf("model of type %T, not %T", model, TendModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT into maker.tend (header_id, bid_id, lot, bid, guy, tic, log_idx, tx_idx, raw_log)
|
`INSERT into maker.tend (header_id, bid_id, lot, bid, guy, tic, log_idx, tx_idx, raw_log)
|
||||||
VALUES($1, $2, $3::NUMERIC, $4::NUMERIC, $5, $6, $7, $8, $9)`,
|
VALUES($1, $2, $3::NUMERIC, $4::NUMERIC, $5, $6, $7, $8, $9)`,
|
||||||
headerID, tend.BidId, tend.Lot, tend.Bid, tend.Guy, tic, tend.LogIndex, tend.TransactionIndex, tend.Raw,
|
headerID, tend.BidId, tend.Lot, tend.Bid, tend.Guy, tic, tend.LogIndex, tend.TransactionIndex, tend.Raw,
|
||||||
)
|
)
|
||||||
|
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.TendChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.TendChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ package vat_flux
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -26,31 +27,40 @@ type VatFluxRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository VatFluxRepository) Create(headerID int64, models []interface{}) error {
|
func (repository VatFluxRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
vatFlux, ok := model.(VatFluxModel)
|
vatFlux, ok := model.(VatFluxModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, VatFluxModel{})
|
return fmt.Errorf("model of type %T, not %T", model, VatFluxModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(`INSERT INTO maker.vat_flux (header_id, ilk, dst, src, rad, tx_idx, log_idx, raw_log)
|
_, execErr := tx.Exec(`INSERT INTO maker.vat_flux (header_id, ilk, dst, src, rad, tx_idx, log_idx, raw_log)
|
||||||
VALUES($1, $2, $3, $4, $5::NUMERIC, $6, $7, $8)`,
|
VALUES($1, $2, $3, $4, $5::NUMERIC, $6, $7, $8)`,
|
||||||
headerID, vatFlux.Ilk, vatFlux.Dst, vatFlux.Src, vatFlux.Rad, vatFlux.TransactionIndex, vatFlux.LogIndex, vatFlux.Raw)
|
headerID, vatFlux.Ilk, vatFlux.Dst, vatFlux.Src, vatFlux.Rad, vatFlux.TransactionIndex, vatFlux.LogIndex, vatFlux.Raw)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatFluxChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatFluxChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
|
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
|
@ -16,6 +16,7 @@ package vat_fold
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
@ -27,32 +28,41 @@ type VatFoldRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository VatFoldRepository) Create(headerID int64, models []interface{}) error {
|
func (repository VatFoldRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
vatFold, ok := model.(VatFoldModel)
|
vatFold, ok := model.(VatFoldModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, VatFoldModel{})
|
return fmt.Errorf("model of type %T, not %T", model, VatFoldModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT into maker.vat_fold (header_id, ilk, urn, rate, log_idx, tx_idx, raw_log)
|
`INSERT into maker.vat_fold (header_id, ilk, urn, rate, log_idx, tx_idx, raw_log)
|
||||||
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
|
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
|
||||||
headerID, vatFold.Ilk, vatFold.Urn, vatFold.Rate, vatFold.LogIndex, vatFold.TransactionIndex, vatFold.Raw,
|
headerID, vatFold.Ilk, vatFold.Urn, vatFold.Rate, vatFold.LogIndex, vatFold.TransactionIndex, vatFold.Raw,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatFoldChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatFoldChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
|
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
|
@ -2,6 +2,7 @@ package vat_grab
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
@ -13,31 +14,40 @@ type VatGrabRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository VatGrabRepository) Create(headerID int64, models []interface{}) error {
|
func (repository VatGrabRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
vatGrab, ok := model.(VatGrabModel)
|
vatGrab, ok := model.(VatGrabModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, VatGrabModel{})
|
return fmt.Errorf("model of type %T, not %T", model, VatGrabModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT into maker.vat_grab (header_id, ilk, urn, v, w, dink, dart, log_idx, tx_idx, raw_log)
|
`INSERT into maker.vat_grab (header_id, ilk, urn, v, w, dink, dart, log_idx, tx_idx, raw_log)
|
||||||
VALUES($1, $2, $3, $4, $5, $6::NUMERIC, $7::NUMERIC, $8, $9, $10)`,
|
VALUES($1, $2, $3, $4, $5, $6::NUMERIC, $7::NUMERIC, $8, $9, $10)`,
|
||||||
headerID, vatGrab.Ilk, vatGrab.Urn, vatGrab.V, vatGrab.W, vatGrab.Dink, vatGrab.Dart, vatGrab.LogIndex, vatGrab.TransactionIndex, vatGrab.Raw,
|
headerID, vatGrab.Ilk, vatGrab.Urn, vatGrab.V, vatGrab.W, vatGrab.Dink, vatGrab.Dart, vatGrab.LogIndex, vatGrab.TransactionIndex, vatGrab.Raw,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatGrabChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatGrabChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ package vat_heal
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
@ -31,31 +32,40 @@ func (repository *VatHealRepository) SetDB(db *postgres.DB) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository VatHealRepository) Create(headerID int64, models []interface{}) error {
|
func (repository VatHealRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
vatHeal, ok := model.(VatHealModel)
|
vatHeal, ok := model.(VatHealModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, VatHealModel{})
|
return fmt.Errorf("model of type %T, not %T", model, VatHealModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := tx.Exec(`INSERT INTO maker.vat_heal (header_id, urn, v, rad, log_idx, tx_idx, raw_log)
|
_, execErr := tx.Exec(`INSERT INTO maker.vat_heal (header_id, urn, v, rad, log_idx, tx_idx, raw_log)
|
||||||
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
|
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
|
||||||
headerID, vatHeal.Urn, vatHeal.V, vatHeal.Rad, vatHeal.LogIndex, vatHeal.TransactionIndex, vatHeal.Raw)
|
headerID, vatHeal.Urn, vatHeal.V, vatHeal.Rad, vatHeal.LogIndex, vatHeal.TransactionIndex, vatHeal.Raw)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatHealChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatHealChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ package vat_init
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -26,33 +27,42 @@ type VatInitRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository VatInitRepository) Create(headerID int64, models []interface{}) error {
|
func (repository VatInitRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
vatInit, ok := model.(VatInitModel)
|
vatInit, ok := model.(VatInitModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, VatInitModel{})
|
return fmt.Errorf("model of type %T, not %T", model, VatInitModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT INTO maker.vat_init (header_id, ilk, log_idx, tx_idx, raw_log)
|
`INSERT INTO maker.vat_init (header_id, ilk, log_idx, tx_idx, raw_log)
|
||||||
VALUES($1, $2, $3, $4, $5)`,
|
VALUES($1, $2, $3, $4, $5)`,
|
||||||
headerID, vatInit.Ilk, vatInit.LogIndex, vatInit.TransactionIndex, vatInit.Raw,
|
headerID, vatInit.Ilk, vatInit.LogIndex, vatInit.TransactionIndex, vatInit.Raw,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatInitChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatInitChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
|
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
|
@ -16,6 +16,7 @@ package vat_move
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -26,34 +27,43 @@ type VatMoveRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository VatMoveRepository) Create(headerID int64, models []interface{}) error {
|
func (repository VatMoveRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
vatMove, ok := model.(VatMoveModel)
|
vatMove, ok := model.(VatMoveModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, VatMoveModel{})
|
return fmt.Errorf("model of type %T, not %T", model, VatMoveModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT INTO maker.vat_move (header_id, src, dst, rad, log_idx, tx_idx, raw_log)
|
`INSERT INTO maker.vat_move (header_id, src, dst, rad, log_idx, tx_idx, raw_log)
|
||||||
VALUES ($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
|
VALUES ($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
|
||||||
headerID, vatMove.Src, vatMove.Dst, vatMove.Rad, vatMove.LogIndex, vatMove.TransactionIndex, vatMove.Raw,
|
headerID, vatMove.Src, vatMove.Dst, vatMove.Rad, vatMove.LogIndex, vatMove.TransactionIndex, vatMove.Raw,
|
||||||
)
|
)
|
||||||
|
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatMoveChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatMoveChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
|
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
|
@ -2,6 +2,7 @@ package vat_slip
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -12,32 +13,41 @@ type VatSlipRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository VatSlipRepository) Create(headerID int64, models []interface{}) error {
|
func (repository VatSlipRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
vatSlip, ok := model.(VatSlipModel)
|
vatSlip, ok := model.(VatSlipModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, VatSlipModel{})
|
return fmt.Errorf("model of type %T, not %T", model, VatSlipModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT into maker.vat_slip (header_id, ilk, guy, rad, tx_idx, log_idx, raw_log)
|
`INSERT into maker.vat_slip (header_id, ilk, guy, rad, tx_idx, log_idx, raw_log)
|
||||||
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
|
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
|
||||||
headerID, vatSlip.Ilk, vatSlip.Guy, vatSlip.Rad, vatSlip.TransactionIndex, vatSlip.LogIndex, vatSlip.Raw,
|
headerID, vatSlip.Ilk, vatSlip.Guy, vatSlip.Rad, vatSlip.TransactionIndex, vatSlip.LogIndex, vatSlip.Raw,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatSlipChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatSlipChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
|
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
|
@ -2,6 +2,7 @@ package vat_toll
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -12,32 +13,41 @@ type VatTollRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository VatTollRepository) Create(headerID int64, models []interface{}) error {
|
func (repository VatTollRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
vatToll, ok := model.(VatTollModel)
|
vatToll, ok := model.(VatTollModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, VatTollModel{})
|
return fmt.Errorf("model of type %T, not %T", model, VatTollModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT into maker.vat_toll (header_id, ilk, urn, take, tx_idx, log_idx, raw_log)
|
`INSERT into maker.vat_toll (header_id, ilk, urn, take, tx_idx, log_idx, raw_log)
|
||||||
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
|
VALUES($1, $2, $3, $4::NUMERIC, $5, $6, $7)`,
|
||||||
headerID, vatToll.Ilk, vatToll.Urn, vatToll.Take, vatToll.TransactionIndex, vatToll.LogIndex, vatToll.Raw,
|
headerID, vatToll.Ilk, vatToll.Urn, vatToll.Take, vatToll.TransactionIndex, vatToll.LogIndex, vatToll.Raw,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatTollChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatTollChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ package vat_tune
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -12,32 +13,41 @@ type VatTuneRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository VatTuneRepository) Create(headerID int64, models []interface{}) error {
|
func (repository VatTuneRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
vatTune, ok := model.(VatTuneModel)
|
vatTune, ok := model.(VatTuneModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, VatTuneModel{})
|
return fmt.Errorf("model of type %T, not %T", model, VatTuneModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT into maker.vat_tune (header_id, ilk, urn, v, w, dink, dart, tx_idx, log_idx, raw_log)
|
`INSERT into maker.vat_tune (header_id, ilk, urn, v, w, dink, dart, tx_idx, log_idx, raw_log)
|
||||||
VALUES($1, $2, $3, $4, $5, $6::NUMERIC, $7::NUMERIC, $8, $9, $10)`,
|
VALUES($1, $2, $3, $4, $5, $6::NUMERIC, $7::NUMERIC, $8, $9, $10)`,
|
||||||
headerID, vatTune.Ilk, vatTune.Urn, vatTune.V, vatTune.W, vatTune.Dink, vatTune.Dart, vatTune.TransactionIndex, vatTune.LogIndex, vatTune.Raw,
|
headerID, vatTune.Ilk, vatTune.Urn, vatTune.V, vatTune.W, vatTune.Dink, vatTune.Dart, vatTune.TransactionIndex, vatTune.LogIndex, vatTune.Raw,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatTuneChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VatTuneChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ package vow_flog
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/constants"
|
||||||
@ -26,34 +27,43 @@ type VowFlogRepository struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repository VowFlogRepository) Create(headerID int64, models []interface{}) error {
|
func (repository VowFlogRepository) Create(headerID int64, models []interface{}) error {
|
||||||
tx, err := repository.db.Begin()
|
tx, dBaseErr := repository.db.Begin()
|
||||||
if err != nil {
|
if dBaseErr != nil {
|
||||||
return err
|
return dBaseErr
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, model := range models {
|
for _, model := range models {
|
||||||
flog, ok := model.(VowFlogModel)
|
flog, ok := model.(VowFlogModel)
|
||||||
if !ok {
|
if !ok {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
return fmt.Errorf("model of type %T, not %T", model, VowFlogModel{})
|
return fmt.Errorf("model of type %T, not %T", model, VowFlogModel{})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, execErr := tx.Exec(
|
||||||
`INSERT into maker.vow_flog (header_id, era, log_idx, tx_idx, raw_log)
|
`INSERT into maker.vow_flog (header_id, era, log_idx, tx_idx, raw_log)
|
||||||
VALUES($1, $2::NUMERIC, $3, $4, $5)`,
|
VALUES($1, $2::NUMERIC, $3, $4, $5)`,
|
||||||
headerID, flog.Era, flog.LogIndex, flog.TransactionIndex, flog.Raw,
|
headerID, flog.Era, flog.LogIndex, flog.TransactionIndex, flog.Raw,
|
||||||
)
|
)
|
||||||
|
|
||||||
if err != nil {
|
if execErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return execErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VowFlogChecked)
|
checkHeaderErr := shared.MarkHeaderCheckedInTransaction(headerID, tx, constants.VowFlogChecked)
|
||||||
if err != nil {
|
if checkHeaderErr != nil {
|
||||||
tx.Rollback()
|
rollbackErr := tx.Rollback()
|
||||||
return err
|
if rollbackErr != nil {
|
||||||
|
log.Error("failed to rollback ", rollbackErr)
|
||||||
|
}
|
||||||
|
return checkHeaderErr
|
||||||
}
|
}
|
||||||
|
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
|
@ -111,6 +111,7 @@ func CleanTestDB(db *postgres.DB) {
|
|||||||
db.MustExec("DELETE FROM watched_contracts")
|
db.MustExec("DELETE FROM watched_contracts")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns a new test node, with the same ID
|
||||||
func NewTestNode() core.Node {
|
func NewTestNode() core.Node {
|
||||||
return core.Node{
|
return core.Node{
|
||||||
GenesisBlock: "GENESIS",
|
GenesisBlock: "GENESIS",
|
||||||
|
Loading…
Reference in New Issue
Block a user