From e0253cce552ed182015dc042dbbfee11ccad3be8 Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Mon, 22 Oct 2018 14:54:09 -0500 Subject: [PATCH] Add log index to Price Feed events --- ...1534275305_create_price_feeds_table.up.sql | 5 +- db/schema.sql | 7 +- .../integration_tests/price_feeds.go | 78 +++++++++---------- pkg/transformers/price_feeds/converter.go | 3 +- pkg/transformers/price_feeds/model.go | 3 +- pkg/transformers/price_feeds/repository.go | 4 +- .../price_feeds/repository_test.go | 33 ++++++-- pkg/transformers/test_data/price_feed.go | 5 +- 8 files changed, 82 insertions(+), 56 deletions(-) diff --git a/db/migrations/1534275305_create_price_feeds_table.up.sql b/db/migrations/1534275305_create_price_feeds_table.up.sql index 408417fe..9ea8c12a 100644 --- a/db/migrations/1534275305_create_price_feeds_table.up.sql +++ b/db/migrations/1534275305_create_price_feeds_table.up.sql @@ -2,9 +2,10 @@ CREATE TABLE maker.price_feeds ( id SERIAL PRIMARY KEY, block_number BIGINT NOT NULL, header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE, - medianizer_address bytea, + medianizer_address TEXT, usd_value NUMERIC, + log_idx INTEGER NOT NULL, tx_idx INTEGER NOT NULL, raw_log JSONB, - UNIQUE (header_id, medianizer_address, tx_idx) + UNIQUE (header_id, medianizer_address, tx_idx, log_idx) ); \ No newline at end of file diff --git a/db/schema.sql b/db/schema.sql index 90f1de40..dd14f1c0 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -647,8 +647,9 @@ CREATE TABLE maker.price_feeds ( id integer NOT NULL, block_number bigint NOT NULL, header_id integer NOT NULL, - medianizer_address bytea, + medianizer_address text, usd_value numeric, + log_idx integer NOT NULL, tx_idx integer NOT NULL, raw_log jsonb ); @@ -1977,11 +1978,11 @@ ALTER TABLE ONLY maker.pit_file_stability_fee -- --- Name: price_feeds price_feeds_header_id_medianizer_address_tx_idx_key; Type: CONSTRAINT; Schema: maker; Owner: - +-- Name: price_feeds price_feeds_header_id_medianizer_address_tx_idx_log_idx_key; Type: CONSTRAINT; Schema: maker; Owner: - -- ALTER TABLE ONLY maker.price_feeds - ADD CONSTRAINT price_feeds_header_id_medianizer_address_tx_idx_key UNIQUE (header_id, medianizer_address, tx_idx); + ADD CONSTRAINT price_feeds_header_id_medianizer_address_tx_idx_log_idx_key UNIQUE (header_id, medianizer_address, tx_idx, log_idx); -- diff --git a/pkg/transformers/integration_tests/price_feeds.go b/pkg/transformers/integration_tests/price_feeds.go index 1d77e097..3fe2e58e 100644 --- a/pkg/transformers/integration_tests/price_feeds.go +++ b/pkg/transformers/integration_tests/price_feeds.go @@ -15,13 +15,12 @@ package integration_tests import ( - "time" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/geth/client" "github.com/vulcanize/vulcanizedb/pkg/transformers/price_feeds" "github.com/vulcanize/vulcanizedb/test_config" ) @@ -30,6 +29,7 @@ var _ = Describe("Price feeds transformer", func() { var ( db *postgres.DB blockChain core.BlockChain + rpcClient client.RpcClient ) BeforeEach(func() { @@ -39,73 +39,71 @@ var _ = Describe("Price feeds transformer", func() { Expect(err).NotTo(HaveOccurred()) db = test_config.NewTestDB(blockChain.Node()) test_config.CleanTestDB(db) - - for i := 8763054; i < 8763063; i++ { - err = persistHeader(rpcClient, db, int64(i)) - Expect(err).NotTo(HaveOccurred()) - } }) It("persists a ETH/USD price feed event", func() { + blockNumber := int64(8763054) + err := persistHeader(rpcClient, db, blockNumber) + Expect(err).NotTo(HaveOccurred()) config := price_feeds.IPriceFeedConfig{ ContractAddresses: []string{"0x9FfFE440258B79c5d6604001674A4722FfC0f7Bc"}, - StartingBlockNumber: 8763054, - EndingBlockNumber: 8763054, + StartingBlockNumber: blockNumber, + EndingBlockNumber: blockNumber, } transformerInitializer := price_feeds.PriceFeedTransformerInitializer{Config: config} transformer := transformerInitializer.NewPriceFeedTransformer(db, blockChain) - err := transformer.Execute() + err = transformer.Execute() - time.AfterFunc(5*time.Second, func() { - defer GinkgoRecover() - Expect(err).NotTo(HaveOccurred()) - var model price_feeds.PriceFeedModel - err = db.Get(&model, `SELECT block_number, medianizer_address, usd_value, tx_idx, raw_log FROM maker.price_feeds WHERE block_number = $1`, config.StartingBlockNumber) - Expect(err).NotTo(HaveOccurred()) - Expect(model.UsdValue).To(Equal("207.314891143")) - }) + Expect(err).NotTo(HaveOccurred()) + var model price_feeds.PriceFeedModel + err = db.Get(&model, `SELECT block_number, medianizer_address, usd_value, tx_idx, raw_log FROM maker.price_feeds WHERE block_number = $1`, config.StartingBlockNumber) + Expect(err).NotTo(HaveOccurred()) + Expect(model.UsdValue).To(Equal("207.314891143")) + Expect(model.MedianizerAddress).To(Equal(config.ContractAddresses[0])) }) It("persists a MKR/USD price feed event", func() { + blockNumber := int64(8763059) + err := persistHeader(rpcClient, db, blockNumber) + Expect(err).NotTo(HaveOccurred()) config := price_feeds.IPriceFeedConfig{ ContractAddresses: []string{"0xB1997239Cfc3d15578A3a09730f7f84A90BB4975"}, - StartingBlockNumber: 8763059, - EndingBlockNumber: 8763059, + StartingBlockNumber: blockNumber, + EndingBlockNumber: blockNumber, } transformerInitializer := price_feeds.PriceFeedTransformerInitializer{Config: config} transformer := transformerInitializer.NewPriceFeedTransformer(db, blockChain) - err := transformer.Execute() + err = transformer.Execute() - time.AfterFunc(5*time.Second, func() { - defer GinkgoRecover() - Expect(err).NotTo(HaveOccurred()) - var model price_feeds.PriceFeedModel - err = db.Get(&model, `SELECT block_number, medianizer_address, usd_value, tx_idx, raw_log FROM maker.price_feeds WHERE block_number = $1`, config.StartingBlockNumber) - Expect(err).NotTo(HaveOccurred()) - Expect(model.UsdValue).To(Equal("391.803979212")) - }) + Expect(err).NotTo(HaveOccurred()) + var model price_feeds.PriceFeedModel + err = db.Get(&model, `SELECT block_number, medianizer_address, usd_value, tx_idx, raw_log FROM maker.price_feeds WHERE block_number = $1`, config.StartingBlockNumber) + Expect(err).NotTo(HaveOccurred()) + Expect(model.UsdValue).To(Equal("391.803979212")) + Expect(model.MedianizerAddress).To(Equal(config.ContractAddresses[0])) }) It("persists a REP/USD price feed event", func() { + blockNumber := int64(8763062) + err := persistHeader(rpcClient, db, blockNumber) + Expect(err).NotTo(HaveOccurred()) config := price_feeds.IPriceFeedConfig{ ContractAddresses: []string{"0xf88bBDc1E2718F8857F30A180076ec38d53cf296"}, - StartingBlockNumber: 8763062, - EndingBlockNumber: 8763062, + StartingBlockNumber: blockNumber, + EndingBlockNumber: blockNumber, } transformerInitializer := price_feeds.PriceFeedTransformerInitializer{Config: config} transformer := transformerInitializer.NewPriceFeedTransformer(db, blockChain) - err := transformer.Execute() + err = transformer.Execute() - time.AfterFunc(5*time.Second, func() { - defer GinkgoRecover() - Expect(err).NotTo(HaveOccurred()) - var model price_feeds.PriceFeedModel - err = db.Get(&model, `SELECT block_number, medianizer_address, usd_value, tx_idx, raw_log FROM maker.price_feeds WHERE block_number = $1`, config.StartingBlockNumber) - Expect(err).NotTo(HaveOccurred()) - Expect(model.UsdValue).To(Equal("12.8169284827")) - }) + Expect(err).NotTo(HaveOccurred()) + var model price_feeds.PriceFeedModel + err = db.Get(&model, `SELECT block_number, medianizer_address, usd_value, tx_idx, raw_log FROM maker.price_feeds WHERE block_number = $1`, config.StartingBlockNumber) + Expect(err).NotTo(HaveOccurred()) + Expect(model.UsdValue).To(Equal("12.8169284827")) + Expect(model.MedianizerAddress).To(Equal(config.ContractAddresses[0])) }) }) diff --git a/pkg/transformers/price_feeds/converter.go b/pkg/transformers/price_feeds/converter.go index 83fc5f7a..7a966883 100644 --- a/pkg/transformers/price_feeds/converter.go +++ b/pkg/transformers/price_feeds/converter.go @@ -34,8 +34,9 @@ func (converter PriceFeedConverter) ToModels(logs []types.Log, headerID int64) ( } model := PriceFeedModel{ BlockNumber: log.BlockNumber, - MedianizerAddress: log.Address.Bytes(), + MedianizerAddress: log.Address.String(), UsdValue: Convert("wad", hexutil.Encode(log.Data), 15), + LogIndex: log.Index, TransactionIndex: log.TxIndex, Raw: raw, } diff --git a/pkg/transformers/price_feeds/model.go b/pkg/transformers/price_feeds/model.go index d61a2b64..6fad860b 100644 --- a/pkg/transformers/price_feeds/model.go +++ b/pkg/transformers/price_feeds/model.go @@ -26,8 +26,9 @@ type LogValueEntity struct { type PriceFeedModel struct { BlockNumber uint64 `db:"block_number"` - MedianizerAddress []byte `db:"medianizer_address"` + MedianizerAddress string `db:"medianizer_address"` UsdValue string `db:"usd_value"` + LogIndex uint `db:"log_idx"` TransactionIndex uint `db:"tx_idx"` Raw []byte `db:"raw_log"` } diff --git a/pkg/transformers/price_feeds/repository.go b/pkg/transformers/price_feeds/repository.go index 383411c2..8f55fd4e 100644 --- a/pkg/transformers/price_feeds/repository.go +++ b/pkg/transformers/price_feeds/repository.go @@ -39,8 +39,8 @@ func (repository PriceFeedRepository) Create(headerID int64, models []PriceFeedM return err } for _, model := range models { - _, err = tx.Exec(`INSERT INTO maker.price_feeds (block_number, header_id, medianizer_address, usd_value, tx_idx, raw_log) - VALUES ($1, $2, $3, $4::NUMERIC, $5, $6)`, model.BlockNumber, headerID, model.MedianizerAddress, model.UsdValue, model.TransactionIndex, model.Raw) + _, err = tx.Exec(`INSERT INTO maker.price_feeds (block_number, header_id, medianizer_address, usd_value, log_idx, tx_idx, raw_log) + VALUES ($1, $2, $3, $4::NUMERIC, $5, $6, $7)`, model.BlockNumber, headerID, model.MedianizerAddress, model.UsdValue, model.LogIndex, model.TransactionIndex, model.Raw) if err != nil { tx.Rollback() return err diff --git a/pkg/transformers/price_feeds/repository_test.go b/pkg/transformers/price_feeds/repository_test.go index d419bf3f..9ff3e9e7 100644 --- a/pkg/transformers/price_feeds/repository_test.go +++ b/pkg/transformers/price_feeds/repository_test.go @@ -51,23 +51,40 @@ var _ = Describe("Price feeds repository", func() { BeforeEach(func() { headerID, err = headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) Expect(err).NotTo(HaveOccurred()) - - err = priceFeedRepository.Create(headerID, []price_feeds.PriceFeedModel{test_data.PriceFeedModel}) - Expect(err).NotTo(HaveOccurred()) }) It("persists a price feed update", func() { + err = priceFeedRepository.Create(headerID, []price_feeds.PriceFeedModel{test_data.PriceFeedModel}) + + Expect(err).NotTo(HaveOccurred()) var dbPriceFeedUpdate price_feeds.PriceFeedModel - err = db.Get(&dbPriceFeedUpdate, `SELECT block_number, medianizer_address, usd_value, tx_idx, raw_log FROM maker.price_feeds WHERE header_id = $1`, headerID) + err = db.Get(&dbPriceFeedUpdate, `SELECT block_number, medianizer_address, usd_value, log_idx, tx_idx, raw_log FROM maker.price_feeds WHERE header_id = $1`, headerID) Expect(err).NotTo(HaveOccurred()) Expect(dbPriceFeedUpdate.BlockNumber).To(Equal(test_data.PriceFeedModel.BlockNumber)) Expect(dbPriceFeedUpdate.MedianizerAddress).To(Equal(test_data.PriceFeedModel.MedianizerAddress)) Expect(dbPriceFeedUpdate.UsdValue).To(Equal(test_data.PriceFeedModel.UsdValue)) + Expect(dbPriceFeedUpdate.LogIndex).To(Equal(test_data.PriceFeedModel.LogIndex)) Expect(dbPriceFeedUpdate.TransactionIndex).To(Equal(test_data.PriceFeedModel.TransactionIndex)) Expect(dbPriceFeedUpdate.Raw).To(MatchJSON(test_data.PriceFeedModel.Raw)) }) It("marks headerID as checked for price feed logs", func() { + err = priceFeedRepository.Create(headerID, []price_feeds.PriceFeedModel{test_data.PriceFeedModel}) + + Expect(err).NotTo(HaveOccurred()) + var headerChecked bool + err = db.Get(&headerChecked, `SELECT price_feeds_checked FROM public.checked_headers WHERE header_id = $1`, headerID) + Expect(err).NotTo(HaveOccurred()) + Expect(headerChecked).To(BeTrue()) + }) + + It("updates the header to checked if checked headers row already exists", func() { + _, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerID) + Expect(err).NotTo(HaveOccurred()) + + err = priceFeedRepository.Create(headerID, []price_feeds.PriceFeedModel{test_data.PriceFeedModel}) + + Expect(err).NotTo(HaveOccurred()) var headerChecked bool err = db.Get(&headerChecked, `SELECT price_feeds_checked FROM public.checked_headers WHERE header_id = $1`, headerID) Expect(err).NotTo(HaveOccurred()) @@ -75,6 +92,9 @@ var _ = Describe("Price feeds repository", func() { }) It("does not duplicate price feed updates", func() { + err = priceFeedRepository.Create(headerID, []price_feeds.PriceFeedModel{test_data.PriceFeedModel}) + Expect(err).NotTo(HaveOccurred()) + err = priceFeedRepository.Create(headerID, []price_feeds.PriceFeedModel{test_data.PriceFeedModel}) Expect(err).To(HaveOccurred()) @@ -82,11 +102,14 @@ var _ = Describe("Price feeds repository", func() { }) It("removes price feed if corresponding header is deleted", func() { + err = priceFeedRepository.Create(headerID, []price_feeds.PriceFeedModel{test_data.PriceFeedModel}) + Expect(err).NotTo(HaveOccurred()) + _, err = db.Exec(`DELETE FROM headers WHERE id = $1`, headerID) Expect(err).NotTo(HaveOccurred()) var dbResult price_feeds.PriceFeedModel - err = db.Get(&dbResult, `SELECT block_number, medianizer_address, usd_value, tx_idx, raw_log FROM maker.price_feeds WHERE header_id = $1`, headerID) + err = db.Get(&dbResult, `SELECT block_number, medianizer_address, usd_value, log_idx, tx_idx, raw_log FROM maker.price_feeds WHERE header_id = $1`, headerID) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(sql.ErrNoRows)) }) diff --git a/pkg/transformers/test_data/price_feed.go b/pkg/transformers/test_data/price_feed.go index 31eac642..e95fccee 100644 --- a/pkg/transformers/test_data/price_feed.go +++ b/pkg/transformers/test_data/price_feed.go @@ -37,15 +37,16 @@ var EthPriceFeedLog = types.Log{ TxHash: common.HexToHash("0xa51a50a2adbfba4e2ab3d72dfd67a21c769f1bc8d2b180663a15500a56cde58f"), TxIndex: txIndex, BlockHash: common.HexToHash("0x27ecebbf69eefa3bb3cf65f472322a80ff4946653a50a2171dc605f49829467d"), - Index: 0, + Index: 8, Removed: false, } var rawPriceFeedLog, _ = json.Marshal(EthPriceFeedLog) var PriceFeedModel = price_feeds.PriceFeedModel{ BlockNumber: blockNumber, - MedianizerAddress: EthPriceFeedLog.Address[:], + MedianizerAddress: EthPriceFeedLog.Address.String(), UsdValue: "378.6599388897", + LogIndex: EthPriceFeedLog.Index, TransactionIndex: EthPriceFeedLog.TxIndex, Raw: rawPriceFeedLog, }