Refactor Tend transformer using factory
This commit is contained in:
parent
4d36f6200d
commit
a4df8f348d
@ -17,6 +17,7 @@ package tend
|
||||
import "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
|
||||
var TendConfig = shared.TransformerConfig{
|
||||
TransformerName: "tend",
|
||||
ContractAddresses: []string{shared.FlipperContractAddress},
|
||||
ContractAbi: shared.FlipperABI,
|
||||
Topics: []string{shared.TendFunctionSignature},
|
||||
|
@ -23,17 +23,9 @@ import (
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
)
|
||||
|
||||
type Converter interface {
|
||||
ToModels(ethLogs []types.Log) ([]TendModel, error)
|
||||
}
|
||||
|
||||
type TendConverter struct{}
|
||||
|
||||
func NewTendConverter() TendConverter {
|
||||
return TendConverter{}
|
||||
}
|
||||
|
||||
func (c TendConverter) ToModels(ethLogs []types.Log) (results []TendModel, err error) {
|
||||
func (TendConverter) ToModels(ethLogs []types.Log) (results []interface{}, err error) {
|
||||
for _, ethLog := range ethLogs {
|
||||
err := validateLog(ethLog)
|
||||
if err != nil {
|
||||
|
@ -28,7 +28,7 @@ var _ = Describe("Tend TendConverter", func() {
|
||||
var converter tend.TendConverter
|
||||
|
||||
BeforeEach(func() {
|
||||
converter = tend.NewTendConverter()
|
||||
converter = tend.TendConverter{}
|
||||
})
|
||||
|
||||
Describe("ToModels", func() {
|
||||
|
@ -19,31 +19,24 @@ import (
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
)
|
||||
|
||||
type Repository interface {
|
||||
Create(headerId int64, tends []TendModel) error
|
||||
MarkHeaderChecked(headerId int64) error
|
||||
MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error)
|
||||
}
|
||||
|
||||
type TendRepository struct {
|
||||
DB *postgres.DB
|
||||
}
|
||||
|
||||
func NewTendRepository(db *postgres.DB) TendRepository {
|
||||
return TendRepository{DB: db}
|
||||
}
|
||||
|
||||
func (r TendRepository) Create(headerId int64, tends []TendModel) error {
|
||||
tx, err := r.DB.Begin()
|
||||
func (repository TendRepository) Create(headerId int64, models []interface{}) error {
|
||||
tx, err := repository.DB.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, tend := range tends {
|
||||
var tend TendModel
|
||||
for _, model := range models {
|
||||
tend = model.(TendModel)
|
||||
_, err = tx.Exec(
|
||||
`INSERT into maker.tend (header_id, bid_id, lot, bid, guy, tic, tx_idx, raw_log)
|
||||
VALUES($1, $2, $3, $4, $5, $6, $7, $8)`,
|
||||
VALUES($1, $2, $3, $4, $5, $6, $7, $8)`,
|
||||
headerId, tend.BidId, tend.Lot, tend.Bid, tend.Guy, tend.Tic, tend.TransactionIndex, tend.Raw,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
return err
|
||||
@ -53,6 +46,7 @@ func (r TendRepository) Create(headerId int64, tends []TendModel) error {
|
||||
VALUES ($1, $2)
|
||||
ON CONFLICT (header_id) DO
|
||||
UPDATE SET tend_checked = $2`, headerId, true)
|
||||
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
return err
|
||||
@ -60,28 +54,32 @@ func (r TendRepository) Create(headerId int64, tends []TendModel) error {
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (r TendRepository) MarkHeaderChecked(headerId int64) error {
|
||||
_, err := r.DB.Exec(`INSERT INTO public.checked_headers (header_id, tend_checked)
|
||||
func (repository TendRepository) MarkHeaderChecked(headerId int64) error {
|
||||
_, err := repository.DB.Exec(`INSERT INTO public.checked_headers (header_id, tend_checked)
|
||||
VALUES ($1, $2)
|
||||
ON CONFLICT (header_id) DO
|
||||
UPDATE SET tend_checked = $2`, headerId, true)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r TendRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
func (repository TendRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
|
||||
var result []core.Header
|
||||
err := r.DB.Select(
|
||||
err := repository.DB.Select(
|
||||
&result,
|
||||
`SELECT headers.id, headers.block_number FROM headers
|
||||
LEFT JOIN checked_headers on headers.id = header_id
|
||||
LEFT JOIN checked_headers on headers.id = header_id
|
||||
WHERE (header_id ISNULL OR tend_checked IS FALSE)
|
||||
AND headers.block_number >= $1
|
||||
AND headers.block_number <= $2
|
||||
AND headers.eth_node_fingerprint = $3`,
|
||||
AND headers.block_number >= $1
|
||||
AND headers.block_number <= $2
|
||||
AND headers.eth_node_fingerprint = $3`,
|
||||
startingBlockNumber,
|
||||
endingBlockNumber,
|
||||
r.DB.Node.ID,
|
||||
repository.DB.Node.ID,
|
||||
)
|
||||
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (repository *TendRepository) SetDB(db *postgres.DB) {
|
||||
repository.DB = db
|
||||
}
|
||||
|
@ -32,6 +32,7 @@ var _ = Describe("TendRepository", func() {
|
||||
db *postgres.DB
|
||||
tendRepository tend.TendRepository
|
||||
headerRepository repositories.HeaderRepository
|
||||
headerId int64
|
||||
err error
|
||||
)
|
||||
|
||||
@ -40,17 +41,15 @@ var _ = Describe("TendRepository", func() {
|
||||
db = test_config.NewTestDB(node)
|
||||
test_config.CleanTestDB(db)
|
||||
headerRepository = repositories.NewHeaderRepository(db)
|
||||
tendRepository = tend.NewTendRepository(db)
|
||||
headerId, err = headerRepository.CreateOrUpdateHeader(core.Header{})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
tendRepository = tend.TendRepository{DB: db}
|
||||
})
|
||||
|
||||
Describe("Create", func() {
|
||||
var headerId int64
|
||||
|
||||
BeforeEach(func() {
|
||||
headerId, err = headerRepository.CreateOrUpdateHeader(fakes.FakeHeader)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
err := tendRepository.Create(headerId, []tend.TendModel{test_data.TendModel})
|
||||
It("persists a tend record", func() {
|
||||
err := tendRepository.Create(headerId, []interface{}{test_data.TendModel})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
@ -81,13 +80,18 @@ var _ = Describe("TendRepository", func() {
|
||||
})
|
||||
|
||||
It("returns an error if inserting a tend record fails", func() {
|
||||
err = tendRepository.Create(headerId, []tend.TendModel{test_data.TendModel})
|
||||
err := tendRepository.Create(headerId, []interface{}{test_data.TendModel})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
err = tendRepository.Create(headerId, []interface{}{test_data.TendModel})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("pq: duplicate key value violates unique constraint"))
|
||||
})
|
||||
|
||||
It("deletes the tend record if its corresponding header record is deleted", func() {
|
||||
err := tendRepository.Create(headerId, []interface{}{test_data.TendModel})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
var count int
|
||||
err = db.QueryRow(`SELECT count(*) from maker.tend`).Scan(&count)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
@ -182,7 +186,7 @@ var _ = Describe("TendRepository", func() {
|
||||
node2 := core.Node{}
|
||||
db2 := test_config.NewTestDB(node2)
|
||||
headerRepository2 := repositories.NewHeaderRepository(db2)
|
||||
tendRepository2 := tend.NewTendRepository(db2)
|
||||
tendRepository2 := tend.TendRepository{DB: db2}
|
||||
|
||||
for _, number := range []int64{startingBlock, tendBlock, endingBlock} {
|
||||
headerRepository2.CreateOrUpdateHeader(fakes.GetFakeHeader(number))
|
||||
|
@ -1,88 +0,0 @@
|
||||
// Copyright 2018 Vulcanize
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tend
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
|
||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
|
||||
)
|
||||
|
||||
type TendTransformer struct {
|
||||
Repository Repository
|
||||
Fetcher shared.LogFetcher
|
||||
Converter Converter
|
||||
Config shared.TransformerConfig
|
||||
}
|
||||
|
||||
type TendTransformerInitializer struct {
|
||||
Config shared.TransformerConfig
|
||||
}
|
||||
|
||||
func (i TendTransformerInitializer) NewTendTransformer(db *postgres.DB, blockChain core.BlockChain) shared.Transformer {
|
||||
converter := NewTendConverter()
|
||||
fetcher := shared.NewFetcher(blockChain)
|
||||
repository := NewTendRepository(db)
|
||||
return TendTransformer{
|
||||
Fetcher: fetcher,
|
||||
Repository: repository,
|
||||
Converter: converter,
|
||||
Config: i.Config,
|
||||
}
|
||||
}
|
||||
|
||||
func (t TendTransformer) Execute() error {
|
||||
config := t.Config
|
||||
topics := [][]common.Hash{{common.HexToHash(shared.TendFunctionSignature)}}
|
||||
|
||||
missingHeaders, err := t.Repository.MissingHeaders(config.StartingBlockNumber, config.EndingBlockNumber)
|
||||
if err != nil {
|
||||
log.Println("Error fetching missing headers:", err)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("Fetching tend event logs for %d headers \n", len(missingHeaders))
|
||||
for _, header := range missingHeaders {
|
||||
ethLogs, err := t.Fetcher.FetchLogs(config.ContractAddresses, topics, header.BlockNumber)
|
||||
if err != nil {
|
||||
log.Println("Error fetching matching logs:", err)
|
||||
return err
|
||||
}
|
||||
if len(ethLogs) < 1 {
|
||||
err := t.Repository.MarkHeaderChecked(header.Id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
models, err := t.Converter.ToModels(ethLogs)
|
||||
if err != nil {
|
||||
log.Println("Error converting logs:", err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = t.Repository.Create(header.Id, models)
|
||||
if err != nil {
|
||||
log.Println("Error persisting tend record:", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -15,7 +15,7 @@
|
||||
package tend_test
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/transformers/factories"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
. "github.com/onsi/ginkgo"
|
||||
@ -32,24 +32,28 @@ import (
|
||||
)
|
||||
|
||||
var _ = Describe("Tend Transformer", func() {
|
||||
var repository tend_mocks.MockTendRepository
|
||||
var fetcher mocks.MockLogFetcher
|
||||
var converter tend_mocks.MockTendConverter
|
||||
var transformer tend.TendTransformer
|
||||
var blockNumber1 = rand.Int63()
|
||||
var blockNumber2 = rand.Int63()
|
||||
var (
|
||||
config = tend.TendConfig
|
||||
converter tend_mocks.MockTendConverter
|
||||
repository tend_mocks.MockTendRepository
|
||||
fetcher mocks.MockLogFetcher
|
||||
transformer shared.Transformer
|
||||
headerOne core.Header
|
||||
headerTwo core.Header
|
||||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
converter = tend_mocks.MockTendConverter{}
|
||||
repository = tend_mocks.MockTendRepository{}
|
||||
fetcher = mocks.MockLogFetcher{}
|
||||
converter = tend_mocks.MockTendConverter{}
|
||||
|
||||
transformer = tend.TendTransformer{
|
||||
Repository: &repository,
|
||||
headerOne = core.Header{Id: GinkgoRandomSeed(), BlockNumber: GinkgoRandomSeed()}
|
||||
headerTwo = core.Header{Id: GinkgoRandomSeed(), BlockNumber: GinkgoRandomSeed()}
|
||||
transformer = factories.Transformer{
|
||||
Config: config,
|
||||
Fetcher: &fetcher,
|
||||
Converter: &converter,
|
||||
Config: tend.TendConfig,
|
||||
}
|
||||
Repository: &repository,
|
||||
}.NewTransformer(nil, nil)
|
||||
})
|
||||
|
||||
It("gets missing headers for blocks in the configured range", func() {
|
||||
@ -68,12 +72,12 @@ var _ = Describe("Tend Transformer", func() {
|
||||
})
|
||||
|
||||
It("fetches eth logs for each missing header", func() {
|
||||
repository.SetMissingHeaders([]core.Header{{BlockNumber: blockNumber1}, {BlockNumber: blockNumber2}})
|
||||
repository.SetMissingHeaders([]core.Header{headerOne, headerTwo})
|
||||
expectedTopics := [][]common.Hash{{common.HexToHash(shared.TendFunctionSignature)}}
|
||||
err := transformer.Execute()
|
||||
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(fetcher.FetchedBlocks).To(Equal([]int64{blockNumber1, blockNumber2}))
|
||||
Expect(fetcher.FetchedBlocks).To(Equal([]int64{headerOne.BlockNumber, headerTwo.BlockNumber}))
|
||||
Expect(fetcher.FetchedTopics).To(Equal(expectedTopics))
|
||||
Expect(fetcher.FetchedContractAddresses).To(Equal([][]string{tend.TendConfig.ContractAddresses, tend.TendConfig.ContractAddresses}))
|
||||
})
|
||||
@ -88,34 +92,17 @@ var _ = Describe("Tend Transformer", func() {
|
||||
})
|
||||
|
||||
It("marks header checked if no logs returned", func() {
|
||||
mockConverter := &tend_mocks.MockTendConverter{}
|
||||
mockRepository := &tend_mocks.MockTendRepository{}
|
||||
headerID := int64(123)
|
||||
mockRepository.SetMissingHeaders([]core.Header{{Id: headerID}})
|
||||
mockFetcher := &mocks.MockLogFetcher{}
|
||||
transformer := tend.TendTransformer{
|
||||
Converter: mockConverter,
|
||||
Fetcher: mockFetcher,
|
||||
Repository: mockRepository,
|
||||
}
|
||||
repository.SetMissingHeaders([]core.Header{headerOne})
|
||||
|
||||
err := transformer.Execute()
|
||||
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
mockRepository.AssertMarkHeaderCheckedCalledWith(headerID)
|
||||
repository.AssertMarkHeaderCheckedCalledWith(headerOne.Id)
|
||||
})
|
||||
|
||||
It("returns error if marking header checked returns err", func() {
|
||||
mockConverter := &tend_mocks.MockTendConverter{}
|
||||
mockRepository := &tend_mocks.MockTendRepository{}
|
||||
mockRepository.SetMissingHeaders([]core.Header{{Id: int64(123)}})
|
||||
mockRepository.SetMarkHeaderCheckedErr(fakes.FakeError)
|
||||
mockFetcher := &mocks.MockLogFetcher{}
|
||||
transformer := tend.TendTransformer{
|
||||
Converter: mockConverter,
|
||||
Fetcher: mockFetcher,
|
||||
Repository: mockRepository,
|
||||
}
|
||||
repository.SetMissingHeaders([]core.Header{headerOne})
|
||||
repository.SetMarkHeaderCheckedErr(fakes.FakeError)
|
||||
|
||||
err := transformer.Execute()
|
||||
|
||||
@ -124,37 +111,37 @@ var _ = Describe("Tend Transformer", func() {
|
||||
})
|
||||
|
||||
It("converts an eth log to a Model", func() {
|
||||
repository.SetMissingHeaders([]core.Header{{BlockNumber: 1}})
|
||||
repository.SetMissingHeaders([]core.Header{headerOne})
|
||||
fetcher.SetFetchedLogs([]types.Log{test_data.TendLogNote})
|
||||
err := transformer.Execute()
|
||||
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(converter.LogsToConvert).To(Equal([]types.Log{test_data.TendLogNote}))
|
||||
Expect(converter.PassedLogs).To(Equal([]types.Log{test_data.TendLogNote}))
|
||||
})
|
||||
|
||||
It("returns an error if converter fails", func() {
|
||||
repository.SetMissingHeaders([]core.Header{{BlockNumber: 1}})
|
||||
repository.SetMissingHeaders([]core.Header{headerOne})
|
||||
fetcher.SetFetchedLogs([]types.Log{test_data.TendLogNote})
|
||||
converter.SetConverterError(fakes.FakeError)
|
||||
err := transformer.Execute()
|
||||
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err).To(MatchError(fakes.FakeError))
|
||||
})
|
||||
|
||||
It("persists the tend record", func() {
|
||||
headerId := int64(1)
|
||||
repository.SetMissingHeaders([]core.Header{{BlockNumber: blockNumber1, Id: headerId}})
|
||||
repository.SetMissingHeaders([]core.Header{headerOne})
|
||||
fetcher.SetFetchedLogs([]types.Log{test_data.TendLogNote})
|
||||
|
||||
err := transformer.Execute()
|
||||
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(repository.PassedHeaderID).To(Equal(headerId))
|
||||
Expect(repository.PassedHeaderID).To(Equal(headerOne.Id))
|
||||
Expect(repository.PassedTendModel).To(Equal(test_data.TendModel))
|
||||
})
|
||||
|
||||
It("returns error if persisting tend record fails", func() {
|
||||
repository.SetMissingHeaders([]core.Header{{BlockNumber: blockNumber1}})
|
||||
repository.SetMissingHeaders([]core.Header{headerOne})
|
||||
fetcher.SetFetchedLogs([]types.Log{test_data.TendLogNote})
|
||||
repository.SetCreateError(fakes.FakeError)
|
||||
err := transformer.Execute()
|
||||
|
@ -70,9 +70,15 @@ var (
|
||||
PitFileIlkTransformerInitializer = ilk.PitFileIlkTransformerInitializer{Config: pitFileConfig}.NewPitFileIlkTransformer
|
||||
PitFileStabilityFeeTransformerInitializer = stability_fee.PitFileStabilityFeeTransformerInitializer{Config: pitFileConfig}.NewPitFileStabilityFeeTransformer
|
||||
PriceFeedTransformerInitializer = price_feeds.PriceFeedTransformerInitializer{Config: price_feeds.PriceFeedConfig}.NewPriceFeedTransformer
|
||||
TendTransformerInitializer = tend.TendTransformerInitializer{Config: tend.TendConfig}.NewTendTransformer
|
||||
VatGrabTransformerInitializer = vat_grab.VatGrabTransformerInitializer{Config: vat_grab.VatGrabConfig}.NewVatGrabTransformer
|
||||
VatInitTransformerInitializer = factories.Transformer{
|
||||
TendTransformerInitializer = factories.Transformer{
|
||||
Config: tend.TendConfig,
|
||||
Converter: &tend.TendConverter{},
|
||||
Repository: &tend.TendRepository{},
|
||||
Fetcher: &shared.Fetcher{},
|
||||
}.NewTransformer
|
||||
|
||||
VatInitTransformerInitializer = factories.Transformer{
|
||||
Config: vat_init.VatInitConfig,
|
||||
Converter: &vat_init.VatInitConverter{},
|
||||
Repository: &vat_init.VatInitRepository{},
|
||||
|
Loading…
Reference in New Issue
Block a user