vat.fold: switch to batched interface

This commit is contained in:
David Terry 2018-10-09 13:34:22 +03:00
parent b3e2218eb5
commit 3e9901864b
8 changed files with 104 additions and 53 deletions

View File

@ -23,12 +23,12 @@ import (
type MockVatFoldConverter struct { type MockVatFoldConverter struct {
converterErr error converterErr error
PassedLog types.Log PassedLogs []types.Log
} }
func (converter *MockVatFoldConverter) ToModel(ethLog types.Log) (vat_fold.VatFoldModel, error) { func (converter *MockVatFoldConverter) ToModels(ethLogs []types.Log) ([]vat_fold.VatFoldModel, error) {
converter.PassedLog = ethLog converter.PassedLogs = ethLogs
return test_data.VatFoldModel, converter.converterErr return []vat_fold.VatFoldModel{test_data.VatFoldModel}, converter.converterErr
} }
func (converter *MockVatFoldConverter) SetConverterError(e error) { func (converter *MockVatFoldConverter) SetConverterError(e error) {

View File

@ -15,32 +15,45 @@
package vat_fold package vat_fold
import ( import (
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/transformers/vat_fold" "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_fold"
) )
type MockVatFoldRepository struct { type MockVatFoldRepository struct {
createErr error createErr error
markHeaderCheckedErr error
markHeaderCheckedPassedHeaderID int64
missingHeaders []core.Header missingHeaders []core.Header
missingHeadersErr error missingHeadersErr error
PassedStartingBlockNumber int64 PassedStartingBlockNumber int64
PassedEndingBlockNumber int64 PassedEndingBlockNumber int64
PassedHeaderID int64 PassedHeaderID int64
PassedModel vat_fold.VatFoldModel PassedModels []vat_fold.VatFoldModel
} }
func (repository *MockVatFoldRepository) Create(headerID int64, model vat_fold.VatFoldModel) error { func (repository *MockVatFoldRepository) Create(headerID int64, models []vat_fold.VatFoldModel) error {
repository.PassedHeaderID = headerID repository.PassedHeaderID = headerID
repository.PassedModel = model repository.PassedModels = models
return repository.createErr return repository.createErr
} }
func (repository *MockVatFoldRepository) MarkHeaderChecked(headerID int64) error {
repository.markHeaderCheckedPassedHeaderID = headerID
return repository.markHeaderCheckedErr
}
func (repository *MockVatFoldRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { func (repository *MockVatFoldRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
repository.PassedStartingBlockNumber = startingBlockNumber repository.PassedStartingBlockNumber = startingBlockNumber
repository.PassedEndingBlockNumber = endingBlockNumber repository.PassedEndingBlockNumber = endingBlockNumber
return repository.missingHeaders, repository.missingHeadersErr return repository.missingHeaders, repository.missingHeadersErr
} }
func (repository *MockVatFoldRepository) SetMarkHeaderCheckedErr(e error) {
repository.markHeaderCheckedErr = e
}
func (repository *MockVatFoldRepository) SetMissingHeadersErr(e error) { func (repository *MockVatFoldRepository) SetMissingHeadersErr(e error) {
repository.missingHeadersErr = e repository.missingHeadersErr = e
} }
@ -52,3 +65,7 @@ func (repository *MockVatFoldRepository) SetMissingHeaders(headers []core.Header
func (repository *MockVatFoldRepository) SetCreateError(e error) { func (repository *MockVatFoldRepository) SetCreateError(e error) {
repository.createErr = e repository.createErr = e
} }
func (repository *MockVatFoldRepository) AssertMarkHeaderCheckedCalledWith(i int64) {
Expect(repository.markHeaderCheckedPassedHeaderID).To(Equal(i))
}

View File

@ -26,29 +26,35 @@ import (
) )
type Converter interface { type Converter interface {
ToModel(ethLog types.Log) (VatFoldModel, error) ToModels(ethLogs []types.Log) ([]VatFoldModel, error)
} }
type VatFoldConverter struct{} type VatFoldConverter struct{}
func (VatFoldConverter) ToModel(ethLog types.Log) (VatFoldModel, error) { func (VatFoldConverter) ToModels(ethLogs []types.Log) ([]VatFoldModel, error) {
var models []VatFoldModel
for _, ethLog := range ethLogs {
err := verifyLog(ethLog) err := verifyLog(ethLog)
if err != nil { if err != nil {
return VatFoldModel{}, err return nil, err
} }
ilk := string(bytes.Trim(ethLog.Topics[1].Bytes(), "\x00")) ilk := string(bytes.Trim(ethLog.Topics[1].Bytes(), "\x00"))
urn := common.HexToAddress(ethLog.Topics[2].String()).String() urn := common.BytesToAddress(ethLog.Topics[2].Bytes()).String()
rate := big.NewInt(0).SetBytes(ethLog.Topics[3].Bytes()).String() rate := big.NewInt(0).SetBytes(ethLog.Topics[3].Bytes()).String()
raw, err := json.Marshal(ethLog) raw, err := json.Marshal(ethLog)
return VatFoldModel{ model := VatFoldModel{
Ilk: ilk, Ilk: ilk,
Urn: urn, Urn: urn,
Rate: rate, Rate: rate,
TransactionIndex: ethLog.TxIndex, TransactionIndex: ethLog.TxIndex,
Raw: raw, Raw: raw,
}, err }
models = append(models, model)
}
return models, nil
} }
func verifyLog(log types.Log) error { func verifyLog(log types.Log) error {

View File

@ -28,7 +28,7 @@ var _ = Describe("Vat fold converter", func() {
converter := vat_fold.VatFoldConverter{} converter := vat_fold.VatFoldConverter{}
badLog := types.Log{} badLog := types.Log{}
_, err := converter.ToModel(badLog) _, err := converter.ToModels([]types.Log{badLog})
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
@ -36,9 +36,9 @@ var _ = Describe("Vat fold converter", func() {
It("converts a log to an model", func() { It("converts a log to an model", func() {
converter := vat_fold.VatFoldConverter{} converter := vat_fold.VatFoldConverter{}
model, err := converter.ToModel(test_data.EthVatFoldLog) model, err := converter.ToModels([]types.Log{test_data.EthVatFoldLog})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(model).To(Equal(test_data.VatFoldModel)) Expect(model).To(Equal([]vat_fold.VatFoldModel{test_data.VatFoldModel}))
}) })
}) })

View File

@ -20,7 +20,8 @@ import (
) )
type Repository interface { type Repository interface {
Create(headerID int64, model VatFoldModel) error Create(headerID int64, models []VatFoldModel) error
MarkHeaderChecked(headerID int64) error
MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error)
} }
@ -34,10 +35,31 @@ func NewVatFoldRepository(db *postgres.DB) VatFoldRepository {
} }
} }
func (repository VatFoldRepository) Create(headerID int64, model VatFoldModel) error { func (repository VatFoldRepository) Create(headerID int64, models []VatFoldModel) error {
_, err := repository.db.Exec(`INSERT INTO maker.vat_fold (header_id, ilk, urn, rate, raw_log, tx_idx) tx, err := repository.db.Begin()
if err != nil {
return err
}
for _, model := range models {
_, err = tx.Exec(
`INSERT into maker.vat_fold (header_id, ilk, urn, rate, tx_idx, raw_log)
VALUES($1, $2, $3, $4::NUMERIC, $5, $6)`, VALUES($1, $2, $3, $4::NUMERIC, $5, $6)`,
headerID, model.Ilk, model.Urn, model.Rate, model.Raw, model.TransactionIndex) headerID, model.Ilk, model.Urn, model.Rate, model.TransactionIndex, model.Raw,
)
if err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}
func (repository VatFoldRepository) MarkHeaderChecked(headerID int64) error {
_, err := repository.db.Exec(`INSERT INTO public.checked_headers (header_id, vat_fold_checked)
VALUES ($1, $2)
ON CONFLICT (header_id) DO
UPDATE SET vat_fold_checked = $2`, headerID, true)
return err return err
} }
@ -46,8 +68,8 @@ func (repository VatFoldRepository) MissingHeaders(startingBlockNumber, endingBl
err := repository.db.Select( err := repository.db.Select(
&result, &result,
`SELECT headers.id, headers.block_number FROM headers `SELECT headers.id, headers.block_number FROM headers
LEFT JOIN maker.vat_fold on headers.id = header_id LEFT JOIN checked_headers on headers.id = header_id
WHERE header_id ISNULL WHERE (header_id ISNULL OR vat_fold_checked IS FALSE)
AND headers.block_number >= $1 AND headers.block_number >= $1
AND headers.block_number <= $2 AND headers.block_number <= $2
AND headers.eth_node_fingerprint = $3`, AND headers.eth_node_fingerprint = $3`,
@ -55,6 +77,5 @@ func (repository VatFoldRepository) MissingHeaders(startingBlockNumber, endingBl
endingBlockNumber, endingBlockNumber,
repository.db.Node.ID, repository.db.Node.ID,
) )
return result, err return result, err
} }

View File

@ -46,7 +46,7 @@ var _ = Describe("", func() {
headerID = id headerID = id
vatFoldRepository = vat_fold.NewVatFoldRepository(db) vatFoldRepository = vat_fold.NewVatFoldRepository(db)
err = vatFoldRepository.Create(headerID, test_data.VatFoldModel) err = vatFoldRepository.Create(headerID, []vat_fold.VatFoldModel{test_data.VatFoldModel})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
}) })
@ -63,7 +63,7 @@ var _ = Describe("", func() {
}) })
It("does not duplicate vat events", func() { It("does not duplicate vat events", func() {
err := vatFoldRepository.Create(headerID, test_data.VatFoldModel) err := vatFoldRepository.Create(headerID, []vat_fold.VatFoldModel{test_data.VatFoldModel})
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("pq: duplicate key value violates unique constraint")) Expect(err.Error()).To(ContainSubstring("pq: duplicate key value violates unique constraint"))
@ -164,7 +164,10 @@ func initRepository(options repositoryOptions) vat_fold.VatFoldRepository {
} }
if options.storeEvent { if options.storeEvent {
err := vatfoldRepository.Create(headerIDs[options.storedEventBlockNumber], test_data.VatFoldModel) err := vatfoldRepository.Create(
headerIDs[options.storedEventBlockNumber],
[]vat_fold.VatFoldModel{test_data.VatFoldModel},
)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
} }

View File

@ -59,15 +59,19 @@ func (transformer VatFoldTransformer) Execute() error {
if err != nil { if err != nil {
return err return err
} }
for _, log := range matchingLogs { if len(matchingLogs) < 1 {
model, err := transformer.Converter.ToModel(log) err = transformer.Repository.MarkHeaderChecked(header.Id)
if err != nil { if err != nil {
return err return err
} }
err = transformer.Repository.Create(header.Id, model) }
models, err := transformer.Converter.ToModels(matchingLogs)
if err != nil { if err != nil {
return err return err
} }
err = transformer.Repository.Create(header.Id, models)
if err != nil {
return err
} }
} }
return nil return nil

View File

@ -134,7 +134,7 @@ var _ = Describe("Vat fold transformer", func() {
err := transformer.Execute() err := transformer.Execute()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(converter.PassedLog).To(Equal(test_data.EthVatFoldLog)) Expect(converter.PassedLogs).To(Equal([]types.Log{test_data.EthVatFoldLog}))
}) })
It("returns error if converter returns error", func() { It("returns error if converter returns error", func() {
@ -161,7 +161,7 @@ var _ = Describe("Vat fold transformer", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(repository.PassedHeaderID).To(Equal(fakeHeader.Id)) Expect(repository.PassedHeaderID).To(Equal(fakeHeader.Id))
Expect(repository.PassedModel).To(Equal(test_data.VatFoldModel)) Expect(repository.PassedModels).To(Equal([]vat_fold.VatFoldModel{test_data.VatFoldModel}))
}) })
It("returns error if repository returns error for create", func() { It("returns error if repository returns error for create", func() {