Use transformer factory for Vat flux

This commit is contained in:
Rob Mulholand 2018-10-24 16:40:31 -05:00
parent be249437c2
commit a0cae99437
10 changed files with 172 additions and 232 deletions

View File

@ -18,6 +18,8 @@ import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/transformers/factories"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/vat_flux" "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_flux"
"github.com/vulcanize/vulcanizedb/test_config" "github.com/vulcanize/vulcanizedb/test_config"
) )
@ -40,8 +42,13 @@ var _ = Describe("VatFlux Transformer", func() {
err = persistHeader(rpcClient, db, blockNumber) err = persistHeader(rpcClient, db, blockNumber)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
initializer := vat_flux.VatFluxTransformerInitializer{Config: config} initializer := factories.Transformer{
transformer := initializer.NewVatFluxTransformer(db, blockchain) Config: config,
Fetcher: &shared.Fetcher{},
Converter: &vat_flux.VatFluxConverter{},
Repository: &vat_flux.VatFluxRepository{},
}
transformer := initializer.NewTransformer(db, blockchain)
err = transformer.Execute() err = transformer.Execute()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())

View File

@ -4,19 +4,18 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/pkg/transformers/test_data" "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data"
"github.com/vulcanize/vulcanizedb/pkg/transformers/vat_flux"
) )
type MockVatFlux struct { type MockVatFluxConverter struct {
err error err error
PassedLogs []types.Log PassedLogs []types.Log
} }
func (converter *MockVatFlux) ToModels(ethLogs []types.Log) ([]vat_flux.VatFluxModel, error) { func (converter *MockVatFluxConverter) ToModels(ethLogs []types.Log) ([]interface{}, error) {
converter.PassedLogs = ethLogs converter.PassedLogs = ethLogs
return []vat_flux.VatFluxModel{test_data.VatFluxModel}, converter.err return []interface{}{test_data.VatFluxModel}, converter.err
} }
func (converter *MockVatFlux) SetConverterError(e error) { func (converter *MockVatFluxConverter) SetConverterError(e error) {
converter.err = e converter.err = e
} }

View File

@ -16,7 +16,7 @@ package vat_flux
import ( import (
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/transformers/vat_flux" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
) )
type MockVatFluxRepository struct { type MockVatFluxRepository struct {
@ -28,26 +28,31 @@ type MockVatFluxRepository struct {
PassedStartingBlockNumber int64 PassedStartingBlockNumber int64
PassedEndingBlockNumber int64 PassedEndingBlockNumber int64
PassedHeaderID int64 PassedHeaderID int64
PassedModels []vat_flux.VatFluxModel PassedModels []interface{}
SetDbCalled bool
} }
func (repository *MockVatFluxRepository) MarkCheckedHeader(headerId int64) error { func (repository *MockVatFluxRepository) Create(headerID int64, models []interface{}) error {
repository.MarkHeaderCheckedPassedHeaderID = headerId
return repository.markHeaderCheckedErr
}
func (repository *MockVatFluxRepository) Create(headerID int64, models []vat_flux.VatFluxModel) error {
repository.PassedHeaderID = headerID repository.PassedHeaderID = headerID
repository.PassedModels = models repository.PassedModels = models
return repository.createErr return repository.createErr
} }
func (repository *MockVatFluxRepository) MarkHeaderChecked(headerID int64) error {
repository.MarkHeaderCheckedPassedHeaderID = headerID
return repository.markHeaderCheckedErr
}
func (repository *MockVatFluxRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { func (repository *MockVatFluxRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
repository.PassedStartingBlockNumber = startingBlockNumber repository.PassedStartingBlockNumber = startingBlockNumber
repository.PassedEndingBlockNumber = endingBlockNumber repository.PassedEndingBlockNumber = endingBlockNumber
return repository.missingHeaders, repository.missingHeadersErr return repository.missingHeaders, repository.missingHeadersErr
} }
func (repository *MockVatFluxRepository) SetDB(db *postgres.DB) {
repository.SetDbCalled = true
}
func (repository *MockVatFluxRepository) SetMarkHeaderCheckedErr(e error) { func (repository *MockVatFluxRepository) SetMarkHeaderCheckedErr(e error) {
repository.markHeaderCheckedErr = e repository.markHeaderCheckedErr = e
} }

View File

@ -140,7 +140,12 @@ var (
Repository: &vat_tune.VatTuneRepository{}, Repository: &vat_tune.VatTuneRepository{},
Fetcher: &shared.Fetcher{}, Fetcher: &shared.Fetcher{},
}.NewTransformer }.NewTransformer
VatFluxTransformerInitializer = vat_flux.VatFluxTransformerInitializer{Config: vat_flux.VatFluxConfig}.NewVatFluxTransformer VatFluxTransformerInitializer = factories.Transformer{
Config: vat_flux.VatFluxConfig,
Converter: &vat_flux.VatFluxConverter{},
Repository: &vat_flux.VatFluxRepository{},
Fetcher: &shared.Fetcher{},
}.NewTransformer
) )
func TransformerInitializers() []shared.TransformerInitializer { func TransformerInitializers() []shared.TransformerInitializer {

View File

@ -2,10 +2,11 @@ package vat_flux
import "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" import "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
var VatFluxConfig = shared.TransformerConfig{ var VatFluxConfig = shared.SingleTransformerConfig{
TransformerName: shared.VatFluxLabel,
ContractAddresses: []string{shared.VatContractAddress}, ContractAddresses: []string{shared.VatContractAddress},
ContractAbi: shared.VatABI, ContractAbi: shared.VatABI,
Topics: []string{shared.VatFluxSignature}, Topic: shared.VatFluxSignature,
StartingBlockNumber: 0, StartingBlockNumber: 0,
EndingBlockNumber: 10000000, EndingBlockNumber: 10000000,
} }

View File

@ -24,14 +24,10 @@ import (
"math/big" "math/big"
) )
type Converter interface {
ToModels(ethLogs []types.Log) ([]VatFluxModel, error)
}
type VatFluxConverter struct{} type VatFluxConverter struct{}
func (VatFluxConverter) ToModels(ethLogs []types.Log) ([]VatFluxModel, error) { func (VatFluxConverter) ToModels(ethLogs []types.Log) ([]interface{}, error) {
var models []VatFluxModel var models []interface{}
for _, ethLog := range ethLogs { for _, ethLog := range ethLogs {
err := verifyLog(ethLog) err := verifyLog(ethLog)
if err != nil { if err != nil {

View File

@ -15,34 +15,31 @@
package vat_flux package vat_flux
import ( import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
) )
type Repository interface {
Create(headerId int64, models []VatFluxModel) error
MissingHeaders(startingBlock, endingBlock int64) ([]core.Header, error)
MarkCheckedHeader(headerId int64) error
}
type VatFluxRepository struct { type VatFluxRepository struct {
DB *postgres.DB db *postgres.DB
} }
func NewVatFluxRepository(db *postgres.DB) VatFluxRepository { func (repository VatFluxRepository) Create(headerId int64, models []interface{}) error {
return VatFluxRepository{DB: db} tx, err := repository.db.Begin()
}
func (repository VatFluxRepository) Create(headerId int64, models []VatFluxModel) error {
tx, err := repository.DB.Begin()
if err != nil { if err != nil {
return err return err
} }
for _, model := range models { for _, model := range models {
vatFlux, ok := model.(VatFluxModel)
if !ok {
tx.Rollback()
return fmt.Errorf("model of type %T, not %T", model, VatFluxModel{})
}
_, err := tx.Exec(`INSERT INTO maker.vat_flux (header_id, ilk, dst, src, rad, tx_idx, log_idx, raw_log) _, err := tx.Exec(`INSERT INTO maker.vat_flux (header_id, ilk, dst, src, rad, tx_idx, log_idx, raw_log)
VALUES($1, $2, $3, $4, $5::NUMERIC, $6, $7, $8)`, VALUES($1, $2, $3, $4, $5::NUMERIC, $6, $7, $8)`,
headerId, model.Ilk, model.Dst, model.Src, model.Rad, model.TransactionIndex, model.LogIndex, model.Raw) headerId, vatFlux.Ilk, vatFlux.Dst, vatFlux.Src, vatFlux.Rad, vatFlux.TransactionIndex, vatFlux.LogIndex, vatFlux.Raw)
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()
return err return err
@ -57,28 +54,33 @@ func (repository VatFluxRepository) Create(headerId int64, models []VatFluxModel
tx.Rollback() tx.Rollback()
return err return err
} }
return tx.Commit() return tx.Commit()
} }
func (repository VatFluxRepository) MissingHeaders(startingBlock, endingBlock int64) ([]core.Header, error) { func (repository VatFluxRepository) MarkHeaderChecked(headerId int64) error {
var headers []core.Header _, err := repository.db.Exec(`INSERT INTO public.checked_headers (header_id, vat_flux_checked)
err := repository.DB.Select(&headers,
`SELECT headers.id, block_number from headers
LEFT JOIN checked_headers on headers.id = header_id
WHERE (header_id ISNULL OR vat_flux_checked IS FALSE)
AND headers.block_number >= $1
AND headers.block_number <= $2
AND headers.eth_node_fingerprint = $3`,
startingBlock, endingBlock, repository.DB.Node.ID)
return headers, err
}
func (repository VatFluxRepository) MarkCheckedHeader(headerId int64) error {
_, err := repository.DB.Exec(`INSERT INTO public.checked_headers (header_id, vat_flux_checked)
VALUES($1, $2) VALUES($1, $2)
ON CONFLICT (header_id) DO ON CONFLICT (header_id) DO
UPDATE SET vat_flux_checked = $2`, headerId, true) UPDATE SET vat_flux_checked = $2`, headerId, true)
return err return err
} }
func (repository VatFluxRepository) MissingHeaders(startingBlock, endingBlock int64) ([]core.Header, error) {
var headers []core.Header
err := repository.db.Select(&headers,
`SELECT headers.id, block_number from headers
LEFT JOIN checked_headers on headers.id = header_id
WHERE (header_id ISNULL OR vat_flux_checked IS FALSE)
AND headers.block_number >= $1
AND headers.block_number <= $2
AND headers.eth_node_fingerprint = $3`,
startingBlock, endingBlock, repository.db.Node.ID)
return headers, err
}
func (repository *VatFluxRepository) SetDB(db *postgres.DB) {
repository.db = db
}

View File

@ -24,20 +24,24 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/transformers/test_data" "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data"
"github.com/vulcanize/vulcanizedb/pkg/transformers/vat_flux" "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_flux"
"github.com/vulcanize/vulcanizedb/test_config" "github.com/vulcanize/vulcanizedb/test_config"
"math/rand"
) )
var _ = Describe("VatFlux Repository", func() { var _ = Describe("VatFlux Repository", func() {
var db *postgres.DB var (
var repository vat_flux.VatFluxRepository db *postgres.DB
var headerRepository repositories.HeaderRepository repository vat_flux.VatFluxRepository
var headerId int64 headerRepository repositories.HeaderRepository
var err error headerId int64
err error
)
BeforeEach(func() { BeforeEach(func() {
node := test_config.NewTestNode() node := test_config.NewTestNode()
db = test_config.NewTestDB(node) db = test_config.NewTestDB(node)
test_config.CleanTestDB(db) test_config.CleanTestDB(db)
repository = vat_flux.VatFluxRepository{DB: db} repository = vat_flux.VatFluxRepository{}
repository.SetDB(db)
headerRepository = repositories.NewHeaderRepository(db) headerRepository = repositories.NewHeaderRepository(db)
headerId, err = headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) headerId, err = headerRepository.CreateOrUpdateHeader(fakes.FakeHeader)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -57,7 +61,7 @@ var _ = Describe("VatFlux Repository", func() {
It("persists vat flux records", func() { It("persists vat flux records", func() {
anotherVatFlux := test_data.VatFluxModel anotherVatFlux := test_data.VatFluxModel
anotherVatFlux.TransactionIndex = test_data.VatFluxModel.TransactionIndex + 1 anotherVatFlux.TransactionIndex = test_data.VatFluxModel.TransactionIndex + 1
err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel, anotherVatFlux}) err = repository.Create(headerId, []interface{}{test_data.VatFluxModel, anotherVatFlux})
var dbResult []VatFluxDBResult var dbResult []VatFluxDBResult
err = db.Select(&dbResult, `SELECT * from maker.vat_flux where header_id = $1`, headerId) err = db.Select(&dbResult, `SELECT * from maker.vat_flux where header_id = $1`, headerId)
@ -75,28 +79,30 @@ var _ = Describe("VatFlux Repository", func() {
}) })
It("returns an error if the insertion fails", func() { It("returns an error if the insertion fails", func() {
err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel}) err = repository.Create(headerId, []interface{}{test_data.VatFluxModel})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel})
err = repository.Create(headerId, []interface{}{test_data.VatFluxModel})
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("pq: duplicate key value violates unique constraint")) Expect(err.Error()).To(ContainSubstring("pq: duplicate key value violates unique constraint"))
}) })
It("allows for multiple vat flux events in one transaction if they have different log indexes", func() { It("allows for multiple vat flux events in one transaction if they have different log indexes", func() {
err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel}) err = repository.Create(headerId, []interface{}{test_data.VatFluxModel})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
anotherVatFlux := test_data.VatFluxModel anotherVatFlux := test_data.VatFluxModel
anotherVatFlux.LogIndex = anotherVatFlux.LogIndex + 1 anotherVatFlux.LogIndex = anotherVatFlux.LogIndex + 1
err = repository.Create(headerId, []vat_flux.VatFluxModel{anotherVatFlux}) err = repository.Create(headerId, []interface{}{anotherVatFlux})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
}) })
It("marks the header as checked for vat flux logs", func() { It("marks the header as checked for vat flux logs", func() {
err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel}) err = repository.Create(headerId, []interface{}{test_data.VatFluxModel})
Expect(err).NotTo(HaveOccurred())
Expect(err).NotTo(HaveOccurred())
var headerChecked bool var headerChecked bool
err = db.Get(&headerChecked, `SELECT vat_flux_checked FROM public.checked_headers WHERE header_id = $1`, headerId) err = db.Get(&headerChecked, `SELECT vat_flux_checked FROM public.checked_headers WHERE header_id = $1`, headerId)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -106,9 +112,10 @@ var _ = Describe("VatFlux Repository", func() {
It("updates the header to checked if checked headers row already exists", func() { It("updates the header to checked if checked headers row already exists", func() {
_, err := db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerId) _, err := db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerId)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel})
Expect(err).NotTo(HaveOccurred())
err = repository.Create(headerId, []interface{}{test_data.VatFluxModel})
Expect(err).NotTo(HaveOccurred())
var headerChecked bool var headerChecked bool
err = db.Get(&headerChecked, `SELECT vat_flux_checked FROM public.checked_headers WHERE header_id = $1`, headerId) err = db.Get(&headerChecked, `SELECT vat_flux_checked FROM public.checked_headers WHERE header_id = $1`, headerId)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -116,7 +123,7 @@ var _ = Describe("VatFlux Repository", func() {
}) })
It("removes vat flux if corresponding header is deleted", func() { It("removes vat flux if corresponding header is deleted", func() {
err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel}) err = repository.Create(headerId, []interface{}{test_data.VatFluxModel})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
_, err = db.Exec(`DELETE FROM headers WHERE id = $1`, headerId) _, err = db.Exec(`DELETE FROM headers WHERE id = $1`, headerId)
@ -129,30 +136,45 @@ var _ = Describe("VatFlux Repository", func() {
}) })
It("wraps create in a transaction", func() { It("wraps create in a transaction", func() {
err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel, test_data.VatFluxModel}) err = repository.Create(headerId, []interface{}{test_data.VatFluxModel, test_data.VatFluxModel})
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
var count int var count int
err = repository.DB.QueryRowx(`SELECT count(*) FROM maker.vat_flux`).Scan(&count) err = db.QueryRowx(`SELECT count(*) FROM maker.vat_flux`).Scan(&count)
Expect(count).To(Equal(0)) Expect(count).To(Equal(0))
}) })
It("returns an error if model is of wrong type", func() {
err = repository.Create(headerId, []interface{}{test_data.WrongModel{}})
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("model of type"))
})
}) })
Describe("MissingHeaders", func() { Describe("MissingHeaders", func() {
It("returns headers that haven't been checked", func() { var (
startingBlock := GinkgoRandomSeed() startingBlock, vatFluxBlock, endingBlock, outsideRangeBlock int64
vatFluxBlock := startingBlock + 1 headerIds, blockNumbers []int64
endingBlock := startingBlock + 2 )
outsideRangeBlock := startingBlock + 3
var headerIds []int64 BeforeEach(func() {
blockNumbers := []int64{startingBlock, vatFluxBlock, endingBlock, outsideRangeBlock} startingBlock = rand.Int63()
vatFluxBlock = startingBlock + 1
endingBlock = startingBlock + 2
outsideRangeBlock = startingBlock + 3
blockNumbers = []int64{startingBlock, vatFluxBlock, endingBlock, outsideRangeBlock}
headerIds = []int64{}
for _, n := range blockNumbers { for _, n := range blockNumbers {
headerId, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(n)) headerId, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(n))
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
headerIds = append(headerIds, headerId) headerIds = append(headerIds, headerId)
} }
})
err = repository.MarkCheckedHeader(headerIds[0]) It("returns headers that haven't been checked", func() {
err = repository.MarkHeaderChecked(headerIds[0])
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
headers, err := repository.MissingHeaders(startingBlock, endingBlock) headers, err := repository.MissingHeaders(startingBlock, endingBlock)
@ -163,21 +185,8 @@ var _ = Describe("VatFlux Repository", func() {
}) })
It("returns header ids when checked_headers.vat_flux is false", func() { It("returns header ids when checked_headers.vat_flux is false", func() {
startingBlock := GinkgoRandomSeed() err = repository.MarkHeaderChecked(headerIds[0])
vatFluxBlock := startingBlock + 1 _, err = db.Exec(`INSERT INTO checked_headers (header_id) VALUES ($1)`, headerIds[1])
endingBlock := startingBlock + 2
outsideRangeBlock := startingBlock + 3
var headerIds []int64
blockNumbers := []int64{startingBlock, vatFluxBlock, endingBlock, outsideRangeBlock}
for _, n := range blockNumbers {
headerId, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(n))
Expect(err).NotTo(HaveOccurred())
headerIds = append(headerIds, headerId)
}
err = repository.MarkCheckedHeader(headerIds[0])
_, err = repository.DB.Exec(`INSERT INTO checked_headers (header_id) VALUES ($1)`, headerIds[1])
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
headers, err := repository.MissingHeaders(startingBlock, endingBlock) headers, err := repository.MissingHeaders(startingBlock, endingBlock)
@ -188,26 +197,17 @@ var _ = Describe("VatFlux Repository", func() {
}) })
It("only returns header ids for the current node", func() { It("only returns header ids for the current node", func() {
startingBlock := GinkgoRandomSeed()
vatFluxBlock := startingBlock + 1
endingBlock := startingBlock + 2
outsideRangeBlock := startingBlock + 3
db2 := test_config.NewTestDB(core.Node{ID: "second node"}) db2 := test_config.NewTestDB(core.Node{ID: "second node"})
headerRepository2 := repositories.NewHeaderRepository(db2) headerRepository2 := repositories.NewHeaderRepository(db2)
repository2 := vat_flux.NewVatFluxRepository(db2) repository2 := vat_flux.VatFluxRepository{}
repository2.SetDB(db2)
var headerIds []int64
blockNumbers := []int64{startingBlock, vatFluxBlock, endingBlock, outsideRangeBlock}
for _, n := range blockNumbers { for _, n := range blockNumbers {
headerId, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(n))
Expect(err).NotTo(HaveOccurred())
headerIds = append(headerIds, headerId)
_, err = headerRepository2.CreateOrUpdateHeader(fakes.GetFakeHeader(n)) _, err = headerRepository2.CreateOrUpdateHeader(fakes.GetFakeHeader(n))
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
} }
err = repository.MarkCheckedHeader(headerIds[0]) err = repository.MarkHeaderChecked(headerIds[0])
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
nodeOneMissingHeaders, err := repository.MissingHeaders(startingBlock, endingBlock) nodeOneMissingHeaders, err := repository.MissingHeaders(startingBlock, endingBlock)
@ -220,9 +220,9 @@ var _ = Describe("VatFlux Repository", func() {
}) })
}) })
Describe("MarkCheckedHeader", func() { Describe("MarkHeaderChecked", func() {
It("creates a new checked_header record", func() { It("creates a new checked_header record", func() {
err := repository.MarkCheckedHeader(headerId) err := repository.MarkHeaderChecked(headerId)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
var checkedHeaderResult = CheckedHeaderResult{} var checkedHeaderResult = CheckedHeaderResult{}
@ -232,7 +232,7 @@ var _ = Describe("VatFlux Repository", func() {
}) })
It("updates an existing checked header", func() { It("updates an existing checked header", func() {
_, err := repository.DB.Exec(`INSERT INTO checked_headers (header_id) VALUES($1)`, headerId) _, err := db.Exec(`INSERT INTO checked_headers (header_id) VALUES($1)`, headerId)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
var checkedHeaderResult CheckedHeaderResult var checkedHeaderResult CheckedHeaderResult
@ -240,7 +240,7 @@ var _ = Describe("VatFlux Repository", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(checkedHeaderResult.VatFluxChecked).To(BeFalse()) Expect(checkedHeaderResult.VatFluxChecked).To(BeFalse())
err = repository.MarkCheckedHeader(headerId) err = repository.MarkHeaderChecked(headerId)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = db.Get(&checkedHeaderResult, `SELECT vat_flux_checked FROM checked_headers WHERE header_id = $1`, headerId) err = db.Get(&checkedHeaderResult, `SELECT vat_flux_checked FROM checked_headers WHERE header_id = $1`, headerId)

View File

@ -1,63 +0,0 @@
package vat_flux
import (
"github.com/ethereum/go-ethereum/common"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"log"
)
type VatFluxTransformerInitializer struct {
Config shared.TransformerConfig
}
func (initializer VatFluxTransformerInitializer) NewVatFluxTransformer(db *postgres.DB, blockChain core.BlockChain) shared.Transformer {
converter := VatFluxConverter{}
fetcher := shared.NewFetcher(blockChain)
repository := NewVatFluxRepository(db)
return VatFluxTransformer{
Config: initializer.Config,
Converter: converter,
Fetcher: fetcher,
Repository: repository,
}
}
type VatFluxTransformer struct {
Config shared.TransformerConfig
Converter Converter
Fetcher shared.LogFetcher
Repository Repository
}
func (transformer VatFluxTransformer) Execute() error {
missingHeaders, err := transformer.Repository.MissingHeaders(transformer.Config.StartingBlockNumber, transformer.Config.EndingBlockNumber)
if err != nil {
return err
}
log.Printf("Fetching vat flux event logs for %d headers \n", len(missingHeaders))
for _, header := range missingHeaders {
topics := [][]common.Hash{{common.HexToHash(shared.VatFluxSignature)}}
matchingLogs, err := transformer.Fetcher.FetchLogs(VatFluxConfig.ContractAddresses, topics, header.BlockNumber)
if err != nil {
return err
}
if len(matchingLogs) < 1 {
err = transformer.Repository.MarkCheckedHeader(header.Id)
if err != nil {
return err
}
continue
}
models, err := transformer.Converter.ToModels(matchingLogs)
if err != nil {
return err
}
err = transformer.Repository.Create(header.Id, models)
if err != nil {
return err
}
}
return nil
}

View File

@ -1,57 +1,58 @@
package vat_flux_test package vat_flux_test
import ( import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/ethereum/go-ethereum/common"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/pkg/transformers/factories"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/test_data" "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data"
"github.com/vulcanize/vulcanizedb/pkg/transformers/test_data/mocks" "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data/mocks"
vat_flux_mocks "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data/mocks/vat_flux" vat_flux_mocks "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data/mocks/vat_flux"
"github.com/vulcanize/vulcanizedb/pkg/transformers/vat_flux" "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_flux"
"math/rand"
) )
type setupOptions struct {
setMissingHeadersError bool
setFetcherError bool
setConverterError bool
setCreateError bool
fetchedLogs []types.Log
missingHeaders []core.Header
}
var _ = Describe("Vat flux transformer", func() { var _ = Describe("Vat flux transformer", func() {
var ( var (
config shared.TransformerConfig config = vat_flux.VatFluxConfig
converter *vat_flux_mocks.MockVatFlux fetcher mocks.MockLogFetcher
fetcher *mocks.MockLogFetcher converter vat_flux_mocks.MockVatFluxConverter
repository *vat_flux_mocks.MockVatFluxRepository repository vat_flux_mocks.MockVatFluxRepository
transformer vat_flux.VatFluxTransformer transformer shared.Transformer
headerOne core.Header
headerTwo core.Header
) )
BeforeEach(func() { BeforeEach(func() {
config = vat_flux.VatFluxConfig fetcher = mocks.MockLogFetcher{}
converter = &vat_flux_mocks.MockVatFlux{} converter = vat_flux_mocks.MockVatFluxConverter{}
fetcher = &mocks.MockLogFetcher{} repository = vat_flux_mocks.MockVatFluxRepository{}
repository = &vat_flux_mocks.MockVatFluxRepository{} headerOne = core.Header{Id: rand.Int63(), BlockNumber: rand.Int63()}
transformer = vat_flux.VatFluxTransformer{ headerTwo = core.Header{Id: rand.Int63(), BlockNumber: rand.Int63()}
transformer = factories.Transformer{
Config: config, Config: config,
Converter: converter, Converter: &converter,
Fetcher: fetcher, Fetcher: &fetcher,
Repository: repository, Repository: &repository,
} }.NewTransformer(nil, nil)
})
It("sets the blockchain and database", func() {
Expect(fetcher.SetBcCalled).To(BeTrue())
Expect(repository.SetDbCalled).To(BeTrue())
}) })
It("gets missing headers for block numbers specified in config", func() { It("gets missing headers for block numbers specified in config", func() {
err := transformer.Execute() err := transformer.Execute()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(repository.PassedStartingBlockNumber).To(Equal(vat_flux.VatFluxConfig.StartingBlockNumber)) Expect(repository.PassedStartingBlockNumber).To(Equal(config.StartingBlockNumber))
Expect(repository.PassedEndingBlockNumber).To(Equal(vat_flux.VatFluxConfig.EndingBlockNumber)) Expect(repository.PassedEndingBlockNumber).To(Equal(config.EndingBlockNumber))
}) })
It("returns error if repository returns error for missing headers", func() { It("returns error if repository returns error for missing headers", func() {
@ -62,29 +63,20 @@ var _ = Describe("Vat flux transformer", func() {
Expect(err).To(MatchError(fakes.FakeError)) Expect(err).To(MatchError(fakes.FakeError))
}) })
It("marks the header as checked when there are no logs", func() {
header := core.Header{Id: GinkgoRandomSeed()}
repository.SetMissingHeaders([]core.Header{header})
err := transformer.Execute()
Expect(err).NotTo(HaveOccurred())
Expect(repository.MarkHeaderCheckedPassedHeaderID).To(Equal(header.Id))
})
It("fetches logs for missing headers", func() { It("fetches logs for missing headers", func() {
repository.SetMissingHeaders([]core.Header{{BlockNumber: 1}, {BlockNumber: 2}}) repository.SetMissingHeaders([]core.Header{headerOne, headerTwo})
err := transformer.Execute() err := transformer.Execute()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(fetcher.FetchedBlocks).To(Equal([]int64{1, 2})) Expect(fetcher.FetchedBlocks).To(Equal([]int64{headerOne.BlockNumber, headerTwo.BlockNumber}))
Expect(fetcher.FetchedContractAddresses).To(Equal([][]string{vat_flux.VatFluxConfig.ContractAddresses, vat_flux.VatFluxConfig.ContractAddresses})) Expect(fetcher.FetchedContractAddresses).To(Equal([][]string{config.ContractAddresses, config.ContractAddresses}))
Expect(fetcher.FetchedTopics).To(Equal([][]common.Hash{{common.HexToHash(shared.VatFluxSignature)}})) Expect(fetcher.FetchedTopics).To(Equal([][]common.Hash{{common.HexToHash(shared.VatFluxSignature)}}))
}) })
It("returns error if fetcher returns error", func() { It("returns error if fetcher returns error", func() {
fetcher.SetFetcherError(fakes.FakeError) fetcher.SetFetcherError(fakes.FakeError)
repository.SetMissingHeaders([]core.Header{{BlockNumber: 1}}) repository.SetMissingHeaders([]core.Header{headerOne})
err := transformer.Execute() err := transformer.Execute()
@ -92,15 +84,17 @@ var _ = Describe("Vat flux transformer", func() {
Expect(err).To(MatchError(fakes.FakeError)) Expect(err).To(MatchError(fakes.FakeError))
}) })
It("marks the header as checked when there are no logs", func() {
repository.SetMissingHeaders([]core.Header{headerOne})
err := transformer.Execute()
Expect(err).NotTo(HaveOccurred())
Expect(repository.MarkHeaderCheckedPassedHeaderID).To(Equal(headerOne.Id))
})
It("returns error if marking header checked returns err", func() { It("returns error if marking header checked returns err", func() {
repository.SetMissingHeaders([]core.Header{{Id: int64(123)}}) repository.SetMissingHeaders([]core.Header{headerOne})
repository.SetMarkHeaderCheckedErr(fakes.FakeError) repository.SetMarkHeaderCheckedErr(fakes.FakeError)
mockFetcher := &mocks.MockLogFetcher{}
transformer := vat_flux.VatFluxTransformer{
Converter: converter,
Fetcher: mockFetcher,
Repository: repository,
}
err := transformer.Execute() err := transformer.Execute()
@ -110,12 +104,7 @@ var _ = Describe("Vat flux transformer", func() {
It("converts matching logs", func() { It("converts matching logs", func() {
fetcher.SetFetchedLogs([]types.Log{test_data.VatFluxLog}) fetcher.SetFetchedLogs([]types.Log{test_data.VatFluxLog})
repository.SetMissingHeaders([]core.Header{{BlockNumber: 1}}) repository.SetMissingHeaders([]core.Header{headerOne})
transformer := vat_flux.VatFluxTransformer{
Fetcher: fetcher,
Converter: converter,
Repository: repository,
}
err := transformer.Execute() err := transformer.Execute()
@ -126,7 +115,7 @@ var _ = Describe("Vat flux transformer", func() {
It("returns error if converter returns error", func() { It("returns error if converter returns error", func() {
converter.SetConverterError(fakes.FakeError) converter.SetConverterError(fakes.FakeError)
fetcher.SetFetchedLogs([]types.Log{test_data.VatFluxLog}) fetcher.SetFetchedLogs([]types.Log{test_data.VatFluxLog})
repository.SetMissingHeaders([]core.Header{{BlockNumber: 1}}) repository.SetMissingHeaders([]core.Header{headerOne})
err := transformer.Execute() err := transformer.Execute()
@ -136,19 +125,18 @@ var _ = Describe("Vat flux transformer", func() {
It("persists vat flux model", func() { It("persists vat flux model", func() {
fetcher.SetFetchedLogs([]types.Log{test_data.VatFluxLog}) fetcher.SetFetchedLogs([]types.Log{test_data.VatFluxLog})
fakeHeader := core.Header{BlockNumber: 1, Id: 2} repository.SetMissingHeaders([]core.Header{headerOne})
repository.SetMissingHeaders([]core.Header{fakeHeader})
err := transformer.Execute() err := transformer.Execute()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(repository.PassedHeaderID).To(Equal(fakeHeader.Id)) Expect(repository.PassedHeaderID).To(Equal(headerOne.Id))
Expect(repository.PassedModels).To(Equal([]vat_flux.VatFluxModel{test_data.VatFluxModel})) Expect(repository.PassedModels).To(Equal([]interface{}{test_data.VatFluxModel}))
}) })
It("returns error if repository returns error for create", func() { It("returns error if repository returns error for create", func() {
fetcher.SetFetchedLogs([]types.Log{test_data.VatFluxLog}) fetcher.SetFetchedLogs([]types.Log{test_data.VatFluxLog})
repository.SetMissingHeaders([]core.Header{{BlockNumber: 1, Id: 2}}) repository.SetMissingHeaders([]core.Header{headerOne})
repository.SetCreateError(fakes.FakeError) repository.SetCreateError(fakes.FakeError)
err := transformer.Execute() err := transformer.Execute()