Log checked headers for flip kick events

This commit is contained in:
Rob Mulholand 2018-09-20 15:13:37 -05:00
parent d8b1a61ff5
commit 009cbb8aa0
11 changed files with 235 additions and 98 deletions

View File

@ -0,0 +1,2 @@
ALTER TABLE public.checked_headers
DROP COLUMN flip_kick_checked;

View File

@ -0,0 +1,2 @@
ALTER TABLE public.checked_headers
ADD COLUMN flip_kick_checked BOOLEAN NOT NULL DEFAULT FALSE;

View File

@ -671,7 +671,8 @@ CREATE TABLE public.checked_headers (
header_id integer NOT NULL,
price_feeds_checked boolean DEFAULT false NOT NULL,
deal_checked boolean DEFAULT false NOT NULL,
dent_checked boolean DEFAULT false NOT NULL
dent_checked boolean DEFAULT false NOT NULL,
flip_kick_checked boolean DEFAULT false NOT NULL
);

View File

@ -28,59 +28,67 @@ import (
)
type Converter interface {
ToEntity(contractAddress string, contractAbi string, ethLog types.Log) (*FlipKickEntity, error)
ToModel(flipKick FlipKickEntity) (FlipKickModel, error)
ToEntities(contractAddress string, contractAbi string, ethLogs []types.Log) ([]FlipKickEntity, error)
ToModels(flipKicks []FlipKickEntity) ([]FlipKickModel, error)
}
type FlipKickConverter struct{}
func (FlipKickConverter) ToEntity(contractAddress string, contractAbi string, ethLog types.Log) (*FlipKickEntity, error) {
entity := &FlipKickEntity{}
address := common.HexToAddress(contractAddress)
abi, err := geth.ParseAbi(contractAbi)
if err != nil {
return entity, err
func (FlipKickConverter) ToEntities(contractAddress string, contractAbi string, ethLogs []types.Log) (results []FlipKickEntity, err error) {
for _, ethLog := range ethLogs {
entity := &FlipKickEntity{}
address := common.HexToAddress(contractAddress)
abi, err := geth.ParseAbi(contractAbi)
if err != nil {
return nil, err
}
contract := bind.NewBoundContract(address, abi, nil, nil, nil)
err = contract.UnpackLog(entity, "Kick", ethLog)
if err != nil {
return nil, err
}
entity.Raw = ethLog
entity.TransactionIndex = ethLog.TxIndex
results = append(results, *entity)
}
contract := bind.NewBoundContract(address, abi, nil, nil, nil)
err = contract.UnpackLog(entity, "Kick", ethLog)
if err != nil {
return entity, err
}
entity.Raw = ethLog
entity.TransactionIndex = ethLog.TxIndex
return entity, nil
return results, nil
}
func (FlipKickConverter) ToModel(flipKick FlipKickEntity) (FlipKickModel, error) {
if flipKick.Id == nil {
return FlipKickModel{}, errors.New("FlipKick log ID cannot be nil.")
}
func (FlipKickConverter) ToModels(flipKicks []FlipKickEntity) (results []FlipKickModel, err error) {
for _, flipKick := range flipKicks {
if flipKick.Id == nil {
return nil, errors.New("FlipKick log ID cannot be nil.")
}
id := flipKick.Id.String()
lot := shared.ConvertNilToEmptyString(flipKick.Lot.String())
bid := shared.ConvertNilToEmptyString(flipKick.Bid.String())
gal := flipKick.Gal.String()
endValue := shared.ConvertNilToZeroTimeValue(flipKick.End)
end := time.Unix(endValue, 0)
urn := common.BytesToAddress(flipKick.Urn[:common.AddressLength]).String()
tab := shared.ConvertNilToEmptyString(flipKick.Tab.String())
rawLogJson, err := json.Marshal(flipKick.Raw)
if err != nil {
return FlipKickModel{}, err
}
rawLogString := string(rawLogJson)
id := flipKick.Id.String()
lot := shared.ConvertNilToEmptyString(flipKick.Lot.String())
bid := shared.ConvertNilToEmptyString(flipKick.Bid.String())
gal := flipKick.Gal.String()
endValue := shared.ConvertNilToZeroTimeValue(flipKick.End)
end := time.Unix(endValue, 0)
urn := common.BytesToAddress(flipKick.Urn[:common.AddressLength]).String()
tab := shared.ConvertNilToEmptyString(flipKick.Tab.String())
rawLogJson, err := json.Marshal(flipKick.Raw)
if err != nil {
return nil, err
}
rawLogString := string(rawLogJson)
return FlipKickModel{
BidId: id,
Lot: lot,
Bid: bid,
Gal: gal,
End: end,
Urn: urn,
Tab: tab,
TransactionIndex: flipKick.TransactionIndex,
Raw: rawLogString,
}, nil
model := FlipKickModel{
BidId: id,
Lot: lot,
Bid: bid,
Gal: gal,
End: end,
Urn: urn,
Tab: tab,
TransactionIndex: flipKick.TransactionIndex,
Raw: rawLogString,
}
results = append(results, model)
}
return results, err
}

View File

@ -33,9 +33,11 @@ var _ = Describe("FlipKick Converter", func() {
Describe("ToEntity", func() {
It("converts an Eth Log to a FlipKickEntity", func() {
entity, err := converter.ToEntity(shared.FlipperContractAddress, shared.FlipperABI, test_data.EthFlipKickLog)
entities, err := converter.ToEntities(shared.FlipperContractAddress, shared.FlipperABI, []types.Log{test_data.EthFlipKickLog})
Expect(err).NotTo(HaveOccurred())
Expect(len(entities)).To(Equal(1))
entity := entities[0]
Expect(entity.Id).To(Equal(test_data.FlipKickEntity.Id))
Expect(entity.Lot).To(Equal(test_data.FlipKickEntity.Lot))
Expect(entity.Bid).To(Equal(test_data.FlipKickEntity.Bid))
@ -47,7 +49,7 @@ var _ = Describe("FlipKick Converter", func() {
})
It("returns an error if converting log to entity fails", func() {
_, err := converter.ToEntity(shared.FlipperContractAddress, "error abi", test_data.EthFlipKickLog)
_, err := converter.ToEntities(shared.FlipperContractAddress, "error abi", []types.Log{test_data.EthFlipKickLog})
Expect(err).To(HaveOccurred())
})
@ -71,16 +73,19 @@ var _ = Describe("FlipKick Converter", func() {
})
It("converts an Entity to a Model", func() {
model, err := converter.ToModel(test_data.FlipKickEntity)
models, err := converter.ToModels([]flip_kick.FlipKickEntity{test_data.FlipKickEntity})
Expect(err).NotTo(HaveOccurred())
Expect(model).To(Equal(test_data.FlipKickModel))
Expect(len(models)).To(Equal(1))
Expect(models[0]).To(Equal(test_data.FlipKickModel))
})
It("handles nil values", func() {
model, err := converter.ToModel(emptyEntity)
models, err := converter.ToModels([]flip_kick.FlipKickEntity{emptyEntity})
Expect(err).NotTo(HaveOccurred())
Expect(len(models)).To(Equal(1))
model := models[0]
Expect(model.BidId).To(Equal("1"))
Expect(model.Lot).To(Equal(emptyString))
Expect(model.Bid).To(Equal(emptyString))
@ -93,10 +98,9 @@ var _ = Describe("FlipKick Converter", func() {
It("returns an error if the flip kick event id is nil", func() {
emptyEntity.Id = nil
entity, err := converter.ToModel(emptyEntity)
_, err := converter.ToModels([]flip_kick.FlipKickEntity{emptyEntity})
Expect(err).To(HaveOccurred())
Expect(entity).To(Equal(flip_kick.FlipKickModel{}))
})
})
})

View File

@ -22,7 +22,8 @@ import (
)
type Repository interface {
Create(headerId int64, flipKick FlipKickModel) error
Create(headerId int64, flipKicks []FlipKickModel) error
MarkHeaderChecked(headerId int64) error
MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error)
}
@ -33,12 +34,38 @@ type FlipKickRepository struct {
func NewFlipKickRepository(db *postgres.DB) FlipKickRepository {
return FlipKickRepository{DB: db}
}
func (fkr FlipKickRepository) Create(headerId int64, flipKick FlipKickModel) error {
_, err := fkr.DB.Exec(
`INSERT into maker.flip_kick (header_id, bid_id, lot, bid, gal, "end", urn, tab, tx_idx, raw_log)
func (fkr FlipKickRepository) Create(headerId int64, flipKicks []FlipKickModel) error {
tx, err := fkr.DB.Begin()
if err != nil {
return err
}
for _, flipKick := range flipKicks {
_, err := tx.Exec(
`INSERT into maker.flip_kick (header_id, bid_id, lot, bid, gal, "end", urn, tab, tx_idx, raw_log)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`,
headerId, flipKick.BidId, flipKick.Lot, flipKick.Bid, flipKick.Gal, flipKick.End, flipKick.Urn, flipKick.Tab, flipKick.TransactionIndex, flipKick.Raw,
)
headerId, flipKick.BidId, flipKick.Lot, flipKick.Bid, flipKick.Gal, flipKick.End, flipKick.Urn, flipKick.Tab, flipKick.TransactionIndex, flipKick.Raw,
)
if err != nil {
tx.Rollback()
return err
}
}
_, err = tx.Exec(`INSERT INTO public.checked_headers (header_id, flip_kick_checked)
VALUES ($1, $2)
ON CONFLICT (header_id) DO
UPDATE SET flip_kick_checked = $2`, headerId, true)
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}
func (fkr FlipKickRepository) MarkHeaderChecked(headerId int64) error {
_, err := fkr.DB.Exec(`INSERT INTO public.checked_headers (header_id, flip_kick_checked)
VALUES ($1, $2)
ON CONFLICT (header_id) DO
UPDATE SET flip_kick_checked = $2`, headerId, true)
return err
}
@ -47,8 +74,8 @@ func (fkr FlipKickRepository) MissingHeaders(startingBlockNumber, endingBlockNum
err := fkr.DB.Select(
&result,
`SELECT headers.id, headers.block_number FROM headers
LEFT JOIN maker.flip_kick on headers.id = header_id
WHERE header_id ISNULL
LEFT JOIN checked_headers on headers.id = header_id
WHERE (header_id ISNULL OR flip_kick_checked IS FALSE)
AND headers.block_number >= $1
AND headers.block_number <= $2
AND headers.eth_node_fingerprint = $3`,

View File

@ -54,8 +54,8 @@ var _ = Describe("FlipKick Repository", func() {
Expect(err).NotTo(HaveOccurred())
})
It("persists a flip_kick record", func() {
err := flipKickRepository.Create(headerId, flipKick)
It("persists flip_kick records", func() {
err := flipKickRepository.Create(headerId, []flip_kick.FlipKickModel{flipKick})
Expect(err).NotTo(HaveOccurred())
assertDBRecordCount(db, "maker.flip_kick", 1)
@ -75,17 +75,27 @@ var _ = Describe("FlipKick Repository", func() {
Expect(dbResult.Raw).To(MatchJSON(flipKick.Raw))
})
It("returns an error if inserting the flip_kick record fails", func() {
err := flipKickRepository.Create(headerId, test_data.FlipKickModel)
It("marks header checked", func() {
err := flipKickRepository.Create(headerId, []flip_kick.FlipKickModel{flipKick})
Expect(err).NotTo(HaveOccurred())
err = flipKickRepository.Create(headerId, test_data.FlipKickModel)
var headerChecked bool
err = db.Get(&headerChecked, `SELECT flip_kick_checked FROM public.checked_headers WHERE header_id = $1`, headerId)
Expect(err).NotTo(HaveOccurred())
Expect(headerChecked).To(BeTrue())
})
It("returns an error if inserting the flip_kick record fails", func() {
err := flipKickRepository.Create(headerId, []flip_kick.FlipKickModel{flipKick})
Expect(err).NotTo(HaveOccurred())
err = flipKickRepository.Create(headerId, []flip_kick.FlipKickModel{flipKick})
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("pq: duplicate key value violates unique constraint"))
})
It("deletes the flip_kick records if its corresponding header record is deleted", func() {
err := flipKickRepository.Create(headerId, test_data.FlipKickModel)
err := flipKickRepository.Create(headerId, []flip_kick.FlipKickModel{flipKick})
Expect(err).NotTo(HaveOccurred())
assertDBRecordCount(db, "maker.flip_kick", 1)
assertDBRecordCount(db, "headers", 1)
@ -98,6 +108,31 @@ var _ = Describe("FlipKick Repository", func() {
})
})
Describe("MarkHeaderChecked", func() {
It("creates a row for a new headerID", func() {
err := flipKickRepository.MarkHeaderChecked(headerId)
Expect(err).NotTo(HaveOccurred())
var headerChecked bool
err = db.Get(&headerChecked, `SELECT flip_kick_checked FROM public.checked_headers WHERE header_id = $1`, headerId)
Expect(err).NotTo(HaveOccurred())
Expect(headerChecked).To(BeTrue())
})
It("updates row when headerID already exists", func() {
_, err := db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerId)
Expect(err).NotTo(HaveOccurred())
err = flipKickRepository.MarkHeaderChecked(headerId)
Expect(err).NotTo(HaveOccurred())
var headerChecked bool
err = db.Get(&headerChecked, `SELECT flip_kick_checked FROM public.checked_headers WHERE header_id = $1`, headerId)
Expect(err).NotTo(HaveOccurred())
Expect(headerChecked).To(BeTrue())
})
})
Describe("When there are multiple nodes", func() {
var db2 *postgres.DB
var flipKickRepository2 flip_kick.FlipKickRepository
@ -130,11 +165,10 @@ var _ = Describe("FlipKick Repository", func() {
})
Describe("MissingHeaders", func() {
It("returns headers for which there isn't an associated flip_kick record", func() {
It("returns headers that haven't been marked as checked", func() {
startingBlock := blockNumber - 3
endingBlock := blockNumber + 3
err := flipKickRepository.Create(headerId, test_data.FlipKickModel)
err := flipKickRepository.MarkHeaderChecked(headerId)
Expect(err).NotTo(HaveOccurred())
newBlockNumber := blockNumber + 3

View File

@ -95,21 +95,25 @@ func (fkt FlipKickTransformer) Execute() error {
if err != nil {
resultingErrors = append(resultingErrors, newTransformerError(err, header.BlockNumber, FetcherError))
}
if len(ethLogs) < 1 {
err := fkt.Repository.MarkHeaderChecked(header.Id)
if err != nil {
return err
}
}
for _, ethLog := range ethLogs {
entity, err := fkt.Converter.ToEntity(config.ContractAddress, config.ContractAbi, ethLog)
if err != nil {
resultingErrors = append(resultingErrors, newTransformerError(err, header.BlockNumber, LogToEntityError))
}
model, err := fkt.Converter.ToModel(*entity)
if err != nil {
resultingErrors = append(resultingErrors, newTransformerError(err, header.BlockNumber, EntityToModelError))
}
entities, err := fkt.Converter.ToEntities(config.ContractAddress, config.ContractAbi, ethLogs)
if err != nil {
resultingErrors = append(resultingErrors, newTransformerError(err, header.BlockNumber, LogToEntityError))
}
models, err := fkt.Converter.ToModels(entities)
if err != nil {
resultingErrors = append(resultingErrors, newTransformerError(err, header.BlockNumber, EntityToModelError))
}
err = fkt.Repository.Create(header.Id, model)
if err != nil {
resultingErrors = append(resultingErrors, newTransformerError(err, header.BlockNumber, RepositoryError))
}
err = fkt.Repository.Create(header.Id, models)
if err != nil {
resultingErrors = append(resultingErrors, newTransformerError(err, header.BlockNumber, RepositoryError))
}
}

View File

@ -96,6 +96,42 @@ var _ = Describe("FlipKick Transformer", func() {
Expect(err.Error()).To(ContainSubstring("error(s) transforming FlipKick event logs"))
})
It("marks header checked if no logs returned", func() {
mockConverter := &flip_kick_mocks.MockFlipKickConverter{}
mockRepository := &flip_kick_mocks.MockFlipKickRepository{}
headerID := int64(123)
mockRepository.SetHeadersToReturn([]core.Header{{Id: headerID}})
mockFetcher := &mocks.MockLogFetcher{}
transformer := flip_kick.FlipKickTransformer{
Converter: mockConverter,
Fetcher: mockFetcher,
Repository: mockRepository,
}
err := transformer.Execute()
Expect(err).NotTo(HaveOccurred())
mockRepository.AssertMarkHeaderCheckedCalledWith(headerID)
})
It("returns error if marking header checked returns err", func() {
mockConverter := &flip_kick_mocks.MockFlipKickConverter{}
mockRepository := &flip_kick_mocks.MockFlipKickRepository{}
mockRepository.SetHeadersToReturn([]core.Header{{Id: int64(123)}})
mockRepository.SetMarkHeaderCheckedErr(fakes.FakeError)
mockFetcher := &mocks.MockLogFetcher{}
transformer := flip_kick.FlipKickTransformer{
Converter: mockConverter,
Fetcher: mockFetcher,
Repository: mockRepository,
}
err := transformer.Execute()
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})
It("converts the logs", func() {
err := transformer.Execute()
Expect(err).NotTo(HaveOccurred())

View File

@ -29,17 +29,18 @@ type MockFlipKickConverter struct {
ConverterError error
}
func (mfkc *MockFlipKickConverter) ToEntity(contractAddress string, contractAbi string, ethLog types.Log) (*flip_kick.FlipKickEntity, error) {
func (mfkc *MockFlipKickConverter) ToEntities(contractAddress string, contractAbi string, ethLogs []types.Log) ([]flip_kick.FlipKickEntity, error) {
mfkc.ConverterContract = contractAddress
mfkc.ConverterAbi = contractAbi
mfkc.LogsToConvert = append(mfkc.LogsToConvert, ethLog)
return &test_data.FlipKickEntity, mfkc.ConverterError
mfkc.LogsToConvert = append(mfkc.LogsToConvert, ethLogs...)
return []flip_kick.FlipKickEntity{test_data.FlipKickEntity}, mfkc.ConverterError
}
func (mfkc *MockFlipKickConverter) ToModel(flipKickEntity flip_kick.FlipKickEntity) (flip_kick.FlipKickModel, error) {
mfkc.EntitiesToConvert = append(mfkc.EntitiesToConvert, flipKickEntity)
return test_data.FlipKickModel, nil
func (mfkc *MockFlipKickConverter) ToModels(flipKickEntities []flip_kick.FlipKickEntity) ([]flip_kick.FlipKickModel, error) {
mfkc.EntitiesToConvert = append(mfkc.EntitiesToConvert, flipKickEntities...)
return []flip_kick.FlipKickModel{test_data.FlipKickModel}, nil
}
func (mfkc *MockFlipKickConverter) SetConverterError(err error) {
mfkc.ConverterError = err
}

View File

@ -15,27 +15,36 @@
package flip_kick
import (
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/transformers/flip_kick"
)
type MockFlipKickRepository struct {
HeaderIds []int64
HeadersToReturn []core.Header
StartingBlockNumber int64
EndingBlockNumber int64
FlipKicksCreated []flip_kick.FlipKickModel
CreateRecordError error
MissingHeadersError error
CreateRecordError error
EndingBlockNumber int64
FlipKicksCreated []flip_kick.FlipKickModel
HeaderIds []int64
HeadersToReturn []core.Header
MissingHeadersError error
StartingBlockNumber int64
markHeaderCheckedErr error
markHeaderCheckedPassedHeaderId int64
}
func (mfkr *MockFlipKickRepository) Create(headerId int64, flipKick flip_kick.FlipKickModel) error {
func (mfkr *MockFlipKickRepository) Create(headerId int64, flipKick []flip_kick.FlipKickModel) error {
mfkr.HeaderIds = append(mfkr.HeaderIds, headerId)
mfkr.FlipKicksCreated = append(mfkr.FlipKicksCreated, flipKick)
mfkr.FlipKicksCreated = append(mfkr.FlipKicksCreated, flipKick...)
return mfkr.CreateRecordError
}
func (mfkr *MockFlipKickRepository) MarkHeaderChecked(headerId int64) error {
mfkr.markHeaderCheckedPassedHeaderId = headerId
return mfkr.markHeaderCheckedErr
}
func (mfkr *MockFlipKickRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
mfkr.StartingBlockNumber = startingBlockNumber
mfkr.EndingBlockNumber = endingBlockNumber
@ -50,6 +59,15 @@ func (mfkr *MockFlipKickRepository) SetHeadersToReturn(headers []core.Header) {
func (mfkr *MockFlipKickRepository) SetCreateRecordError(err error) {
mfkr.CreateRecordError = err
}
func (mfkr *MockFlipKickRepository) SetMarkHeaderCheckedErr(err error) {
mfkr.markHeaderCheckedErr = err
}
func (mfkr *MockFlipKickRepository) SetMissingHeadersError(err error) {
mfkr.MissingHeadersError = err
}
func (mfkr *MockFlipKickRepository) AssertMarkHeaderCheckedCalledWith(headerId int64) {
Expect(mfkr.markHeaderCheckedPassedHeaderId).To(Equal(headerId))
}