diff --git a/pkg/transformers/integration_tests/vat_flux.go b/pkg/transformers/integration_tests/vat_flux.go index f9b9197f..628c7dbb 100644 --- a/pkg/transformers/integration_tests/vat_flux.go +++ b/pkg/transformers/integration_tests/vat_flux.go @@ -18,6 +18,8 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/transformers/factories" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_flux" "github.com/vulcanize/vulcanizedb/test_config" ) @@ -40,8 +42,13 @@ var _ = Describe("VatFlux Transformer", func() { err = persistHeader(rpcClient, db, blockNumber) Expect(err).NotTo(HaveOccurred()) - initializer := vat_flux.VatFluxTransformerInitializer{Config: config} - transformer := initializer.NewVatFluxTransformer(db, blockchain) + initializer := factories.Transformer{ + Config: config, + Fetcher: &shared.Fetcher{}, + Converter: &vat_flux.VatFluxConverter{}, + Repository: &vat_flux.VatFluxRepository{}, + } + transformer := initializer.NewTransformer(db, blockchain) err = transformer.Execute() Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/transformers/test_data/mocks/vat_flux/converter.go b/pkg/transformers/test_data/mocks/vat_flux/converter.go index c8cfd15a..759596e8 100644 --- a/pkg/transformers/test_data/mocks/vat_flux/converter.go +++ b/pkg/transformers/test_data/mocks/vat_flux/converter.go @@ -4,19 +4,18 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data" - "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_flux" ) -type MockVatFlux struct { +type MockVatFluxConverter struct { err error PassedLogs []types.Log } -func (converter *MockVatFlux) ToModels(ethLogs []types.Log) ([]vat_flux.VatFluxModel, error) { +func (converter *MockVatFluxConverter) ToModels(ethLogs []types.Log) ([]interface{}, error) { converter.PassedLogs = ethLogs - return []vat_flux.VatFluxModel{test_data.VatFluxModel}, converter.err + return []interface{}{test_data.VatFluxModel}, converter.err } -func (converter *MockVatFlux) SetConverterError(e error) { +func (converter *MockVatFluxConverter) SetConverterError(e error) { converter.err = e } diff --git a/pkg/transformers/test_data/mocks/vat_flux/repository.go b/pkg/transformers/test_data/mocks/vat_flux/repository.go index 332ba278..0cc7fab5 100644 --- a/pkg/transformers/test_data/mocks/vat_flux/repository.go +++ b/pkg/transformers/test_data/mocks/vat_flux/repository.go @@ -16,7 +16,7 @@ package vat_flux import ( "github.com/vulcanize/vulcanizedb/pkg/core" - "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_flux" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) type MockVatFluxRepository struct { @@ -28,26 +28,31 @@ type MockVatFluxRepository struct { PassedStartingBlockNumber int64 PassedEndingBlockNumber int64 PassedHeaderID int64 - PassedModels []vat_flux.VatFluxModel + PassedModels []interface{} + SetDbCalled bool } -func (repository *MockVatFluxRepository) MarkCheckedHeader(headerId int64) error { - repository.MarkHeaderCheckedPassedHeaderID = headerId - return repository.markHeaderCheckedErr -} - -func (repository *MockVatFluxRepository) Create(headerID int64, models []vat_flux.VatFluxModel) error { +func (repository *MockVatFluxRepository) Create(headerID int64, models []interface{}) error { repository.PassedHeaderID = headerID repository.PassedModels = models return repository.createErr } +func (repository *MockVatFluxRepository) MarkHeaderChecked(headerID int64) error { + repository.MarkHeaderCheckedPassedHeaderID = headerID + return repository.markHeaderCheckedErr +} + func (repository *MockVatFluxRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { repository.PassedStartingBlockNumber = startingBlockNumber repository.PassedEndingBlockNumber = endingBlockNumber return repository.missingHeaders, repository.missingHeadersErr } +func (repository *MockVatFluxRepository) SetDB(db *postgres.DB) { + repository.SetDbCalled = true +} + func (repository *MockVatFluxRepository) SetMarkHeaderCheckedErr(e error) { repository.markHeaderCheckedErr = e } diff --git a/pkg/transformers/transformers.go b/pkg/transformers/transformers.go index a31fb451..91fb7ad8 100644 --- a/pkg/transformers/transformers.go +++ b/pkg/transformers/transformers.go @@ -140,7 +140,12 @@ var ( Repository: &vat_tune.VatTuneRepository{}, Fetcher: &shared.Fetcher{}, }.NewTransformer - VatFluxTransformerInitializer = vat_flux.VatFluxTransformerInitializer{Config: vat_flux.VatFluxConfig}.NewVatFluxTransformer + VatFluxTransformerInitializer = factories.Transformer{ + Config: vat_flux.VatFluxConfig, + Converter: &vat_flux.VatFluxConverter{}, + Repository: &vat_flux.VatFluxRepository{}, + Fetcher: &shared.Fetcher{}, + }.NewTransformer ) func TransformerInitializers() []shared.TransformerInitializer { diff --git a/pkg/transformers/vat_flux/config.go b/pkg/transformers/vat_flux/config.go index c7f3449f..737266c3 100644 --- a/pkg/transformers/vat_flux/config.go +++ b/pkg/transformers/vat_flux/config.go @@ -2,10 +2,11 @@ package vat_flux import "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" -var VatFluxConfig = shared.TransformerConfig{ +var VatFluxConfig = shared.SingleTransformerConfig{ + TransformerName: shared.VatFluxLabel, ContractAddresses: []string{shared.VatContractAddress}, ContractAbi: shared.VatABI, - Topics: []string{shared.VatFluxSignature}, + Topic: shared.VatFluxSignature, StartingBlockNumber: 0, EndingBlockNumber: 10000000, } diff --git a/pkg/transformers/vat_flux/converter.go b/pkg/transformers/vat_flux/converter.go index db5fa25b..81d4c885 100644 --- a/pkg/transformers/vat_flux/converter.go +++ b/pkg/transformers/vat_flux/converter.go @@ -24,14 +24,10 @@ import ( "math/big" ) -type Converter interface { - ToModels(ethLogs []types.Log) ([]VatFluxModel, error) -} - type VatFluxConverter struct{} -func (VatFluxConverter) ToModels(ethLogs []types.Log) ([]VatFluxModel, error) { - var models []VatFluxModel +func (VatFluxConverter) ToModels(ethLogs []types.Log) ([]interface{}, error) { + var models []interface{} for _, ethLog := range ethLogs { err := verifyLog(ethLog) if err != nil { diff --git a/pkg/transformers/vat_flux/repository.go b/pkg/transformers/vat_flux/repository.go index 3e36b4ea..5486bb56 100644 --- a/pkg/transformers/vat_flux/repository.go +++ b/pkg/transformers/vat_flux/repository.go @@ -15,34 +15,31 @@ package vat_flux import ( + "fmt" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) -type Repository interface { - Create(headerId int64, models []VatFluxModel) error - MissingHeaders(startingBlock, endingBlock int64) ([]core.Header, error) - MarkCheckedHeader(headerId int64) error -} - type VatFluxRepository struct { - DB *postgres.DB + db *postgres.DB } -func NewVatFluxRepository(db *postgres.DB) VatFluxRepository { - return VatFluxRepository{DB: db} -} - -func (repository VatFluxRepository) Create(headerId int64, models []VatFluxModel) error { - tx, err := repository.DB.Begin() +func (repository VatFluxRepository) Create(headerId int64, models []interface{}) error { + tx, err := repository.db.Begin() if err != nil { return err } for _, model := range models { + vatFlux, ok := model.(VatFluxModel) + if !ok { + tx.Rollback() + return fmt.Errorf("model of type %T, not %T", model, VatFluxModel{}) + } + _, err := tx.Exec(`INSERT INTO maker.vat_flux (header_id, ilk, dst, src, rad, tx_idx, log_idx, raw_log) VALUES($1, $2, $3, $4, $5::NUMERIC, $6, $7, $8)`, - headerId, model.Ilk, model.Dst, model.Src, model.Rad, model.TransactionIndex, model.LogIndex, model.Raw) + headerId, vatFlux.Ilk, vatFlux.Dst, vatFlux.Src, vatFlux.Rad, vatFlux.TransactionIndex, vatFlux.LogIndex, vatFlux.Raw) if err != nil { tx.Rollback() return err @@ -57,28 +54,33 @@ func (repository VatFluxRepository) Create(headerId int64, models []VatFluxModel tx.Rollback() return err } + return tx.Commit() } -func (repository VatFluxRepository) MissingHeaders(startingBlock, endingBlock int64) ([]core.Header, error) { - var headers []core.Header - err := repository.DB.Select(&headers, - `SELECT headers.id, block_number from headers - LEFT JOIN checked_headers on headers.id = header_id - WHERE (header_id ISNULL OR vat_flux_checked IS FALSE) - AND headers.block_number >= $1 - AND headers.block_number <= $2 - AND headers.eth_node_fingerprint = $3`, - startingBlock, endingBlock, repository.DB.Node.ID) - - return headers, err -} - -func (repository VatFluxRepository) MarkCheckedHeader(headerId int64) error { - _, err := repository.DB.Exec(`INSERT INTO public.checked_headers (header_id, vat_flux_checked) +func (repository VatFluxRepository) MarkHeaderChecked(headerId int64) error { + _, err := repository.db.Exec(`INSERT INTO public.checked_headers (header_id, vat_flux_checked) VALUES($1, $2) ON CONFLICT (header_id) DO UPDATE SET vat_flux_checked = $2`, headerId, true) return err } + +func (repository VatFluxRepository) MissingHeaders(startingBlock, endingBlock int64) ([]core.Header, error) { + var headers []core.Header + err := repository.db.Select(&headers, + `SELECT headers.id, block_number from headers + LEFT JOIN checked_headers on headers.id = header_id + WHERE (header_id ISNULL OR vat_flux_checked IS FALSE) + AND headers.block_number >= $1 + AND headers.block_number <= $2 + AND headers.eth_node_fingerprint = $3`, + startingBlock, endingBlock, repository.db.Node.ID) + + return headers, err +} + +func (repository *VatFluxRepository) SetDB(db *postgres.DB) { + repository.db = db +} diff --git a/pkg/transformers/vat_flux/repository_test.go b/pkg/transformers/vat_flux/repository_test.go index 7a272a2b..92e4a452 100644 --- a/pkg/transformers/vat_flux/repository_test.go +++ b/pkg/transformers/vat_flux/repository_test.go @@ -24,20 +24,24 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data" "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_flux" "github.com/vulcanize/vulcanizedb/test_config" + "math/rand" ) var _ = Describe("VatFlux Repository", func() { - var db *postgres.DB - var repository vat_flux.VatFluxRepository - var headerRepository repositories.HeaderRepository - var headerId int64 - var err error + var ( + db *postgres.DB + repository vat_flux.VatFluxRepository + headerRepository repositories.HeaderRepository + headerId int64 + err error + ) BeforeEach(func() { node := test_config.NewTestNode() db = test_config.NewTestDB(node) test_config.CleanTestDB(db) - repository = vat_flux.VatFluxRepository{DB: db} + repository = vat_flux.VatFluxRepository{} + repository.SetDB(db) headerRepository = repositories.NewHeaderRepository(db) headerId, err = headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) Expect(err).NotTo(HaveOccurred()) @@ -57,7 +61,7 @@ var _ = Describe("VatFlux Repository", func() { It("persists vat flux records", func() { anotherVatFlux := test_data.VatFluxModel anotherVatFlux.TransactionIndex = test_data.VatFluxModel.TransactionIndex + 1 - err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel, anotherVatFlux}) + err = repository.Create(headerId, []interface{}{test_data.VatFluxModel, anotherVatFlux}) var dbResult []VatFluxDBResult err = db.Select(&dbResult, `SELECT * from maker.vat_flux where header_id = $1`, headerId) @@ -75,28 +79,30 @@ var _ = Describe("VatFlux Repository", func() { }) It("returns an error if the insertion fails", func() { - err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel}) + err = repository.Create(headerId, []interface{}{test_data.VatFluxModel}) Expect(err).NotTo(HaveOccurred()) - err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel}) + + err = repository.Create(headerId, []interface{}{test_data.VatFluxModel}) + Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("pq: duplicate key value violates unique constraint")) }) It("allows for multiple vat flux events in one transaction if they have different log indexes", func() { - err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel}) + err = repository.Create(headerId, []interface{}{test_data.VatFluxModel}) Expect(err).NotTo(HaveOccurred()) anotherVatFlux := test_data.VatFluxModel anotherVatFlux.LogIndex = anotherVatFlux.LogIndex + 1 - err = repository.Create(headerId, []vat_flux.VatFluxModel{anotherVatFlux}) + err = repository.Create(headerId, []interface{}{anotherVatFlux}) Expect(err).NotTo(HaveOccurred()) }) It("marks the header as checked for vat flux logs", func() { - err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel}) - Expect(err).NotTo(HaveOccurred()) + err = repository.Create(headerId, []interface{}{test_data.VatFluxModel}) + Expect(err).NotTo(HaveOccurred()) var headerChecked bool err = db.Get(&headerChecked, `SELECT vat_flux_checked FROM public.checked_headers WHERE header_id = $1`, headerId) Expect(err).NotTo(HaveOccurred()) @@ -106,9 +112,10 @@ var _ = Describe("VatFlux Repository", func() { It("updates the header to checked if checked headers row already exists", func() { _, err := db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerId) Expect(err).NotTo(HaveOccurred()) - err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel}) - Expect(err).NotTo(HaveOccurred()) + err = repository.Create(headerId, []interface{}{test_data.VatFluxModel}) + + Expect(err).NotTo(HaveOccurred()) var headerChecked bool err = db.Get(&headerChecked, `SELECT vat_flux_checked FROM public.checked_headers WHERE header_id = $1`, headerId) Expect(err).NotTo(HaveOccurred()) @@ -116,7 +123,7 @@ var _ = Describe("VatFlux Repository", func() { }) It("removes vat flux if corresponding header is deleted", func() { - err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel}) + err = repository.Create(headerId, []interface{}{test_data.VatFluxModel}) Expect(err).NotTo(HaveOccurred()) _, err = db.Exec(`DELETE FROM headers WHERE id = $1`, headerId) @@ -129,30 +136,45 @@ var _ = Describe("VatFlux Repository", func() { }) It("wraps create in a transaction", func() { - err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel, test_data.VatFluxModel}) + err = repository.Create(headerId, []interface{}{test_data.VatFluxModel, test_data.VatFluxModel}) + Expect(err).To(HaveOccurred()) var count int - err = repository.DB.QueryRowx(`SELECT count(*) FROM maker.vat_flux`).Scan(&count) + err = db.QueryRowx(`SELECT count(*) FROM maker.vat_flux`).Scan(&count) Expect(count).To(Equal(0)) }) + + It("returns an error if model is of wrong type", func() { + err = repository.Create(headerId, []interface{}{test_data.WrongModel{}}) + + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("model of type")) + }) }) Describe("MissingHeaders", func() { - It("returns headers that haven't been checked", func() { - startingBlock := GinkgoRandomSeed() - vatFluxBlock := startingBlock + 1 - endingBlock := startingBlock + 2 - outsideRangeBlock := startingBlock + 3 + var ( + startingBlock, vatFluxBlock, endingBlock, outsideRangeBlock int64 + headerIds, blockNumbers []int64 + ) - var headerIds []int64 - blockNumbers := []int64{startingBlock, vatFluxBlock, endingBlock, outsideRangeBlock} + BeforeEach(func() { + startingBlock = rand.Int63() + vatFluxBlock = startingBlock + 1 + endingBlock = startingBlock + 2 + outsideRangeBlock = startingBlock + 3 + + blockNumbers = []int64{startingBlock, vatFluxBlock, endingBlock, outsideRangeBlock} + headerIds = []int64{} for _, n := range blockNumbers { headerId, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(n)) Expect(err).NotTo(HaveOccurred()) headerIds = append(headerIds, headerId) } + }) - err = repository.MarkCheckedHeader(headerIds[0]) + It("returns headers that haven't been checked", func() { + err = repository.MarkHeaderChecked(headerIds[0]) Expect(err).NotTo(HaveOccurred()) headers, err := repository.MissingHeaders(startingBlock, endingBlock) @@ -163,21 +185,8 @@ var _ = Describe("VatFlux Repository", func() { }) It("returns header ids when checked_headers.vat_flux is false", func() { - startingBlock := GinkgoRandomSeed() - vatFluxBlock := startingBlock + 1 - endingBlock := startingBlock + 2 - outsideRangeBlock := startingBlock + 3 - - var headerIds []int64 - blockNumbers := []int64{startingBlock, vatFluxBlock, endingBlock, outsideRangeBlock} - for _, n := range blockNumbers { - headerId, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(n)) - Expect(err).NotTo(HaveOccurred()) - headerIds = append(headerIds, headerId) - } - - err = repository.MarkCheckedHeader(headerIds[0]) - _, err = repository.DB.Exec(`INSERT INTO checked_headers (header_id) VALUES ($1)`, headerIds[1]) + err = repository.MarkHeaderChecked(headerIds[0]) + _, err = db.Exec(`INSERT INTO checked_headers (header_id) VALUES ($1)`, headerIds[1]) Expect(err).NotTo(HaveOccurred()) headers, err := repository.MissingHeaders(startingBlock, endingBlock) @@ -188,26 +197,17 @@ var _ = Describe("VatFlux Repository", func() { }) It("only returns header ids for the current node", func() { - startingBlock := GinkgoRandomSeed() - vatFluxBlock := startingBlock + 1 - endingBlock := startingBlock + 2 - outsideRangeBlock := startingBlock + 3 db2 := test_config.NewTestDB(core.Node{ID: "second node"}) headerRepository2 := repositories.NewHeaderRepository(db2) - repository2 := vat_flux.NewVatFluxRepository(db2) + repository2 := vat_flux.VatFluxRepository{} + repository2.SetDB(db2) - var headerIds []int64 - blockNumbers := []int64{startingBlock, vatFluxBlock, endingBlock, outsideRangeBlock} for _, n := range blockNumbers { - headerId, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(n)) - Expect(err).NotTo(HaveOccurred()) - headerIds = append(headerIds, headerId) - _, err = headerRepository2.CreateOrUpdateHeader(fakes.GetFakeHeader(n)) Expect(err).NotTo(HaveOccurred()) } - err = repository.MarkCheckedHeader(headerIds[0]) + err = repository.MarkHeaderChecked(headerIds[0]) Expect(err).NotTo(HaveOccurred()) nodeOneMissingHeaders, err := repository.MissingHeaders(startingBlock, endingBlock) @@ -220,9 +220,9 @@ var _ = Describe("VatFlux Repository", func() { }) }) - Describe("MarkCheckedHeader", func() { + Describe("MarkHeaderChecked", func() { It("creates a new checked_header record", func() { - err := repository.MarkCheckedHeader(headerId) + err := repository.MarkHeaderChecked(headerId) Expect(err).NotTo(HaveOccurred()) var checkedHeaderResult = CheckedHeaderResult{} @@ -232,7 +232,7 @@ var _ = Describe("VatFlux Repository", func() { }) It("updates an existing checked header", func() { - _, err := repository.DB.Exec(`INSERT INTO checked_headers (header_id) VALUES($1)`, headerId) + _, err := db.Exec(`INSERT INTO checked_headers (header_id) VALUES($1)`, headerId) Expect(err).NotTo(HaveOccurred()) var checkedHeaderResult CheckedHeaderResult @@ -240,7 +240,7 @@ var _ = Describe("VatFlux Repository", func() { Expect(err).NotTo(HaveOccurred()) Expect(checkedHeaderResult.VatFluxChecked).To(BeFalse()) - err = repository.MarkCheckedHeader(headerId) + err = repository.MarkHeaderChecked(headerId) Expect(err).NotTo(HaveOccurred()) err = db.Get(&checkedHeaderResult, `SELECT vat_flux_checked FROM checked_headers WHERE header_id = $1`, headerId) diff --git a/pkg/transformers/vat_flux/transformer.go b/pkg/transformers/vat_flux/transformer.go deleted file mode 100644 index 0d0d8bf8..00000000 --- a/pkg/transformers/vat_flux/transformer.go +++ /dev/null @@ -1,63 +0,0 @@ -package vat_flux - -import ( - "github.com/ethereum/go-ethereum/common" - "github.com/vulcanize/vulcanizedb/pkg/core" - "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" - "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" - "log" -) - -type VatFluxTransformerInitializer struct { - Config shared.TransformerConfig -} - -func (initializer VatFluxTransformerInitializer) NewVatFluxTransformer(db *postgres.DB, blockChain core.BlockChain) shared.Transformer { - converter := VatFluxConverter{} - fetcher := shared.NewFetcher(blockChain) - repository := NewVatFluxRepository(db) - return VatFluxTransformer{ - Config: initializer.Config, - Converter: converter, - Fetcher: fetcher, - Repository: repository, - } -} - -type VatFluxTransformer struct { - Config shared.TransformerConfig - Converter Converter - Fetcher shared.LogFetcher - Repository Repository -} - -func (transformer VatFluxTransformer) Execute() error { - missingHeaders, err := transformer.Repository.MissingHeaders(transformer.Config.StartingBlockNumber, transformer.Config.EndingBlockNumber) - if err != nil { - return err - } - log.Printf("Fetching vat flux event logs for %d headers \n", len(missingHeaders)) - for _, header := range missingHeaders { - topics := [][]common.Hash{{common.HexToHash(shared.VatFluxSignature)}} - matchingLogs, err := transformer.Fetcher.FetchLogs(VatFluxConfig.ContractAddresses, topics, header.BlockNumber) - if err != nil { - return err - } - if len(matchingLogs) < 1 { - err = transformer.Repository.MarkCheckedHeader(header.Id) - if err != nil { - return err - } - continue - } - models, err := transformer.Converter.ToModels(matchingLogs) - if err != nil { - return err - } - err = transformer.Repository.Create(header.Id, models) - if err != nil { - return err - } - } - return nil -} diff --git a/pkg/transformers/vat_flux/transformer_test.go b/pkg/transformers/vat_flux/transformer_test.go index d621e34c..fae561dc 100644 --- a/pkg/transformers/vat_flux/transformer_test.go +++ b/pkg/transformers/vat_flux/transformer_test.go @@ -1,57 +1,58 @@ package vat_flux_test import ( - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/ethereum/go-ethereum/common" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/fakes" + "github.com/vulcanize/vulcanizedb/pkg/transformers/factories" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data" "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data/mocks" vat_flux_mocks "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data/mocks/vat_flux" "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_flux" + "math/rand" ) -type setupOptions struct { - setMissingHeadersError bool - setFetcherError bool - setConverterError bool - setCreateError bool - fetchedLogs []types.Log - missingHeaders []core.Header -} - var _ = Describe("Vat flux transformer", func() { var ( - config shared.TransformerConfig - converter *vat_flux_mocks.MockVatFlux - fetcher *mocks.MockLogFetcher - repository *vat_flux_mocks.MockVatFluxRepository - transformer vat_flux.VatFluxTransformer + config = vat_flux.VatFluxConfig + fetcher mocks.MockLogFetcher + converter vat_flux_mocks.MockVatFluxConverter + repository vat_flux_mocks.MockVatFluxRepository + transformer shared.Transformer + headerOne core.Header + headerTwo core.Header ) BeforeEach(func() { - config = vat_flux.VatFluxConfig - converter = &vat_flux_mocks.MockVatFlux{} - fetcher = &mocks.MockLogFetcher{} - repository = &vat_flux_mocks.MockVatFluxRepository{} - transformer = vat_flux.VatFluxTransformer{ + fetcher = mocks.MockLogFetcher{} + converter = vat_flux_mocks.MockVatFluxConverter{} + repository = vat_flux_mocks.MockVatFluxRepository{} + headerOne = core.Header{Id: rand.Int63(), BlockNumber: rand.Int63()} + headerTwo = core.Header{Id: rand.Int63(), BlockNumber: rand.Int63()} + transformer = factories.Transformer{ Config: config, - Converter: converter, - Fetcher: fetcher, - Repository: repository, - } + Converter: &converter, + Fetcher: &fetcher, + Repository: &repository, + }.NewTransformer(nil, nil) + }) + + It("sets the blockchain and database", func() { + Expect(fetcher.SetBcCalled).To(BeTrue()) + Expect(repository.SetDbCalled).To(BeTrue()) }) It("gets missing headers for block numbers specified in config", func() { err := transformer.Execute() Expect(err).NotTo(HaveOccurred()) - Expect(repository.PassedStartingBlockNumber).To(Equal(vat_flux.VatFluxConfig.StartingBlockNumber)) - Expect(repository.PassedEndingBlockNumber).To(Equal(vat_flux.VatFluxConfig.EndingBlockNumber)) + Expect(repository.PassedStartingBlockNumber).To(Equal(config.StartingBlockNumber)) + Expect(repository.PassedEndingBlockNumber).To(Equal(config.EndingBlockNumber)) }) It("returns error if repository returns error for missing headers", func() { @@ -62,29 +63,20 @@ var _ = Describe("Vat flux transformer", func() { Expect(err).To(MatchError(fakes.FakeError)) }) - It("marks the header as checked when there are no logs", func() { - header := core.Header{Id: GinkgoRandomSeed()} - repository.SetMissingHeaders([]core.Header{header}) - err := transformer.Execute() - - Expect(err).NotTo(HaveOccurred()) - Expect(repository.MarkHeaderCheckedPassedHeaderID).To(Equal(header.Id)) - }) - It("fetches logs for missing headers", func() { - repository.SetMissingHeaders([]core.Header{{BlockNumber: 1}, {BlockNumber: 2}}) + repository.SetMissingHeaders([]core.Header{headerOne, headerTwo}) err := transformer.Execute() Expect(err).NotTo(HaveOccurred()) - Expect(fetcher.FetchedBlocks).To(Equal([]int64{1, 2})) - Expect(fetcher.FetchedContractAddresses).To(Equal([][]string{vat_flux.VatFluxConfig.ContractAddresses, vat_flux.VatFluxConfig.ContractAddresses})) + Expect(fetcher.FetchedBlocks).To(Equal([]int64{headerOne.BlockNumber, headerTwo.BlockNumber})) + Expect(fetcher.FetchedContractAddresses).To(Equal([][]string{config.ContractAddresses, config.ContractAddresses})) Expect(fetcher.FetchedTopics).To(Equal([][]common.Hash{{common.HexToHash(shared.VatFluxSignature)}})) }) It("returns error if fetcher returns error", func() { fetcher.SetFetcherError(fakes.FakeError) - repository.SetMissingHeaders([]core.Header{{BlockNumber: 1}}) + repository.SetMissingHeaders([]core.Header{headerOne}) err := transformer.Execute() @@ -92,15 +84,17 @@ var _ = Describe("Vat flux transformer", func() { Expect(err).To(MatchError(fakes.FakeError)) }) + It("marks the header as checked when there are no logs", func() { + repository.SetMissingHeaders([]core.Header{headerOne}) + err := transformer.Execute() + + Expect(err).NotTo(HaveOccurred()) + Expect(repository.MarkHeaderCheckedPassedHeaderID).To(Equal(headerOne.Id)) + }) + It("returns error if marking header checked returns err", func() { - repository.SetMissingHeaders([]core.Header{{Id: int64(123)}}) + repository.SetMissingHeaders([]core.Header{headerOne}) repository.SetMarkHeaderCheckedErr(fakes.FakeError) - mockFetcher := &mocks.MockLogFetcher{} - transformer := vat_flux.VatFluxTransformer{ - Converter: converter, - Fetcher: mockFetcher, - Repository: repository, - } err := transformer.Execute() @@ -110,12 +104,7 @@ var _ = Describe("Vat flux transformer", func() { It("converts matching logs", func() { fetcher.SetFetchedLogs([]types.Log{test_data.VatFluxLog}) - repository.SetMissingHeaders([]core.Header{{BlockNumber: 1}}) - transformer := vat_flux.VatFluxTransformer{ - Fetcher: fetcher, - Converter: converter, - Repository: repository, - } + repository.SetMissingHeaders([]core.Header{headerOne}) err := transformer.Execute() @@ -126,7 +115,7 @@ var _ = Describe("Vat flux transformer", func() { It("returns error if converter returns error", func() { converter.SetConverterError(fakes.FakeError) fetcher.SetFetchedLogs([]types.Log{test_data.VatFluxLog}) - repository.SetMissingHeaders([]core.Header{{BlockNumber: 1}}) + repository.SetMissingHeaders([]core.Header{headerOne}) err := transformer.Execute() @@ -136,19 +125,18 @@ var _ = Describe("Vat flux transformer", func() { It("persists vat flux model", func() { fetcher.SetFetchedLogs([]types.Log{test_data.VatFluxLog}) - fakeHeader := core.Header{BlockNumber: 1, Id: 2} - repository.SetMissingHeaders([]core.Header{fakeHeader}) + repository.SetMissingHeaders([]core.Header{headerOne}) err := transformer.Execute() Expect(err).NotTo(HaveOccurred()) - Expect(repository.PassedHeaderID).To(Equal(fakeHeader.Id)) - Expect(repository.PassedModels).To(Equal([]vat_flux.VatFluxModel{test_data.VatFluxModel})) + Expect(repository.PassedHeaderID).To(Equal(headerOne.Id)) + Expect(repository.PassedModels).To(Equal([]interface{}{test_data.VatFluxModel})) }) It("returns error if repository returns error for create", func() { fetcher.SetFetchedLogs([]types.Log{test_data.VatFluxLog}) - repository.SetMissingHeaders([]core.Header{{BlockNumber: 1, Id: 2}}) + repository.SetMissingHeaders([]core.Header{headerOne}) repository.SetCreateError(fakes.FakeError) err := transformer.Execute()