Add raw log to price feeds

This commit is contained in:
Rob Mulholand 2018-09-10 16:57:17 -05:00
parent b2ba7ee1e3
commit d69063636e
13 changed files with 118 additions and 121 deletions

View File

@ -1,12 +1,10 @@
CREATE TABLE maker.price_feeds (
id SERIAL PRIMARY KEY,
block_number BIGINT NOT NULL,
header_id INTEGER NOT NULL,
header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE,
medianizer_address bytea,
tx_idx INTEGER NOT NULL,
usd_value NUMERIC,
UNIQUE (header_id, medianizer_address, tx_idx),
CONSTRAINT headers_fk FOREIGN KEY (header_id)
REFERENCES headers (id)
ON DELETE CASCADE
tx_idx INTEGER NOT NULL,
raw_log JSONB,
UNIQUE (header_id, medianizer_address, tx_idx)
);

View File

@ -409,8 +409,9 @@ CREATE TABLE maker.price_feeds (
block_number bigint NOT NULL,
header_id integer NOT NULL,
medianizer_address bytea,
usd_value numeric,
tx_idx integer NOT NULL,
usd_value numeric
raw_log jsonb
);
@ -1425,14 +1426,6 @@ ALTER TABLE ONLY maker.frob
ADD CONSTRAINT frob_header_id_fkey FOREIGN KEY (header_id) REFERENCES public.headers(id) ON DELETE CASCADE;
--
-- Name: price_feeds headers_fk; Type: FK CONSTRAINT; Schema: maker; Owner: -
--
ALTER TABLE ONLY maker.price_feeds
ADD CONSTRAINT headers_fk FOREIGN KEY (header_id) REFERENCES public.headers(id) ON DELETE CASCADE;
--
-- Name: pit_file_debt_ceiling pit_file_debt_ceiling_header_id_fkey; Type: FK CONSTRAINT; Schema: maker; Owner: -
--
@ -1457,6 +1450,14 @@ ALTER TABLE ONLY maker.pit_file_stability_fee
ADD CONSTRAINT pit_file_stability_fee_header_id_fkey FOREIGN KEY (header_id) REFERENCES public.headers(id) ON DELETE CASCADE;
--
-- Name: price_feeds price_feeds_header_id_fkey; Type: FK CONSTRAINT; Schema: maker; Owner: -
--
ALTER TABLE ONLY maker.price_feeds
ADD CONSTRAINT price_feeds_header_id_fkey FOREIGN KEY (header_id) REFERENCES public.headers(id) ON DELETE CASCADE;
--
-- Name: tend tend_header_id_fkey; Type: FK CONSTRAINT; Schema: maker; Owner: -
--

View File

@ -15,18 +15,24 @@
package price_feeds
import (
"encoding/json"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
)
type Converter interface {
ToModel(log types.Log, headerID int64) (PriceFeedModel, error)
}
type PriceFeedConverter struct{}
func (converter PriceFeedConverter) ToModel(log types.Log, headerID int64) PriceFeedModel {
func (converter PriceFeedConverter) ToModel(log types.Log, headerID int64) (PriceFeedModel, error) {
raw, err := json.Marshal(log)
return PriceFeedModel{
BlockNumber: log.BlockNumber,
HeaderID: headerID,
MedianizerAddress: log.Address.Bytes(),
UsdValue: Convert("wad", hexutil.Encode(log.Data), 15),
TransactionIndex: log.TxIndex,
}
Raw: raw,
}, err
}

View File

@ -15,44 +15,21 @@
package price_feeds_test
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/transformers/price_feeds"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/test_data"
)
var _ = Describe("Price feed converter", func() {
var _ = Describe("Price feed Converter", func() {
It("converts a log to a price feed model", func() {
medianizerAddress := common.HexToAddress("0x99041f808d598b782d5a3e498681c2452a31da08")
blockNumber := uint64(6147230)
txIndex := uint(119)
// https://etherscan.io/tx/0xa51a50a2adbfba4e2ab3d72dfd67a21c769f1bc8d2b180663a15500a56cde58f
log := types.Log{
Address: medianizerAddress,
Topics: []common.Hash{common.HexToHash(shared.LogValueSignature)},
Data: common.FromHex("00000000000000000000000000000000000000000000001486f658319fb0c100"),
BlockNumber: blockNumber,
TxHash: common.HexToHash("0xa51a50a2adbfba4e2ab3d72dfd67a21c769f1bc8d2b180663a15500a56cde58f"),
TxIndex: txIndex,
BlockHash: common.HexToHash("0x27ecebbf69eefa3bb3cf65f472322a80ff4946653a50a2171dc605f49829467d"),
Index: 0,
Removed: false,
}
converter := price_feeds.PriceFeedConverter{}
headerID := int64(123)
model := converter.ToModel(log, headerID)
model, err := converter.ToModel(test_data.EthPriceFeedLog, headerID)
expectedModel := price_feeds.PriceFeedModel{
BlockNumber: blockNumber,
HeaderID: headerID,
MedianizerAddress: medianizerAddress[:],
UsdValue: "378.6599388897",
TransactionIndex: txIndex,
}
Expect(model).To(Equal(expectedModel))
Expect(err).NotTo(HaveOccurred())
Expect(model).To(Equal(test_data.PriceFeedModel))
})
})

View File

@ -26,10 +26,10 @@ type LogValueEntity struct {
type PriceFeedModel struct {
BlockNumber uint64 `db:"block_number"`
HeaderID int64 `db:"header_id"`
MedianizerAddress []byte `db:"medianizer_address"`
UsdValue string `db:"usd_value"`
TransactionIndex uint `db:"tx_idx"`
Raw []byte `db:"raw_log"`
}
func Convert(conversion string, value string, prec int) string {

View File

@ -20,7 +20,7 @@ import (
)
type IPriceFeedRepository interface {
Create(model PriceFeedModel) error
Create(headerID int64, model PriceFeedModel) error
MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error)
}
@ -32,9 +32,9 @@ func NewPriceFeedRepository(db *postgres.DB) PriceFeedRepository {
return PriceFeedRepository{db: db}
}
func (repository PriceFeedRepository) Create(model PriceFeedModel) error {
_, err := repository.db.Exec(`INSERT INTO maker.price_feeds (block_number, header_id, medianizer_address, usd_value, tx_idx)
VALUES ($1, $2, $3, $4::NUMERIC, $5)`, model.BlockNumber, model.HeaderID, model.MedianizerAddress, model.UsdValue, model.TransactionIndex)
func (repository PriceFeedRepository) Create(headerID int64, model PriceFeedModel) error {
_, err := repository.db.Exec(`INSERT INTO maker.price_feeds (block_number, header_id, medianizer_address, usd_value, tx_idx, raw_log)
VALUES ($1, $2, $3, $4::NUMERIC, $5, $6)`, model.BlockNumber, headerID, model.MedianizerAddress, model.UsdValue, model.TransactionIndex, model.Raw)
return err
}

View File

@ -21,6 +21,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/transformers/price_feeds"
"github.com/vulcanize/vulcanizedb/pkg/transformers/test_data"
"github.com/vulcanize/vulcanizedb/test_config"
)
@ -30,26 +31,21 @@ var _ = Describe("Price feeds repository", func() {
db := test_config.NewTestDB(core.Node{})
test_config.CleanTestDB(db)
headerRepository := repositories.NewHeaderRepository(db)
blockNumber := uint64(12345)
header := core.Header{BlockNumber: int64(blockNumber)}
headerID, err := headerRepository.CreateOrUpdateHeader(header)
headerID, err := headerRepository.CreateOrUpdateHeader(core.Header{})
Expect(err).NotTo(HaveOccurred())
priceFeedUpdate := price_feeds.PriceFeedModel{
BlockNumber: blockNumber,
HeaderID: headerID,
MedianizerAddress: []byte{1, 2, 3, 4, 5},
UsdValue: "123.45",
TransactionIndex: 1,
}
priceFeedRepository := price_feeds.NewPriceFeedRepository(db)
err = priceFeedRepository.Create(priceFeedUpdate)
err = priceFeedRepository.Create(headerID, test_data.PriceFeedModel)
Expect(err).NotTo(HaveOccurred())
var dbPriceFeedUpdate price_feeds.PriceFeedModel
err = db.Get(&dbPriceFeedUpdate, `SELECT block_number, header_id, medianizer_address, usd_value, tx_idx FROM maker.price_feeds WHERE header_id = $1`, headerID)
err = db.Get(&dbPriceFeedUpdate, `SELECT block_number, medianizer_address, usd_value, tx_idx, raw_log FROM maker.price_feeds WHERE header_id = $1`, headerID)
Expect(err).NotTo(HaveOccurred())
Expect(dbPriceFeedUpdate).To(Equal(priceFeedUpdate))
Expect(dbPriceFeedUpdate.BlockNumber).To(Equal(test_data.PriceFeedModel.BlockNumber))
Expect(dbPriceFeedUpdate.MedianizerAddress).To(Equal(test_data.PriceFeedModel.MedianizerAddress))
Expect(dbPriceFeedUpdate.UsdValue).To(Equal(test_data.PriceFeedModel.UsdValue))
Expect(dbPriceFeedUpdate.TransactionIndex).To(Equal(test_data.PriceFeedModel.TransactionIndex))
Expect(dbPriceFeedUpdate.Raw).To(MatchJSON(test_data.PriceFeedModel.Raw))
})
It("does not duplicate price feed updates", func() {
@ -60,18 +56,11 @@ var _ = Describe("Price feeds repository", func() {
header := core.Header{BlockNumber: int64(blockNumber)}
headerID, err := headerRepository.CreateOrUpdateHeader(header)
Expect(err).NotTo(HaveOccurred())
priceFeedUpdate := price_feeds.PriceFeedModel{
BlockNumber: blockNumber,
HeaderID: headerID,
MedianizerAddress: []byte{1, 2, 3, 4, 5},
UsdValue: "123.45",
TransactionIndex: 1,
}
priceFeedRepository := price_feeds.NewPriceFeedRepository(db)
err = priceFeedRepository.Create(priceFeedUpdate)
err = priceFeedRepository.Create(headerID, test_data.PriceFeedModel)
Expect(err).NotTo(HaveOccurred())
err = priceFeedRepository.Create(priceFeedUpdate)
err = priceFeedRepository.Create(headerID, test_data.PriceFeedModel)
Expect(err).To(HaveOccurred())
})
@ -94,12 +83,7 @@ var _ = Describe("Price feeds repository", func() {
Expect(err).NotTo(HaveOccurred())
}
priceFeedRepository := price_feeds.NewPriceFeedRepository(db)
priceFeedUpdate := price_feeds.PriceFeedModel{
BlockNumber: uint64(blockNumbers[1]),
HeaderID: headerIDs[1],
UsdValue: "123.45",
}
err := priceFeedRepository.Create(priceFeedUpdate)
err := priceFeedRepository.Create(headerIDs[1], test_data.PriceFeedModel)
Expect(err).NotTo(HaveOccurred())
headers, err := priceFeedRepository.MissingHeaders(startingBlockNumber, endingBlockNumber)
@ -129,10 +113,7 @@ var _ = Describe("Price feeds repository", func() {
}
priceFeedRepository := price_feeds.NewPriceFeedRepository(db)
priceFeedRepositoryTwo := price_feeds.NewPriceFeedRepository(dbTwo)
err := priceFeedRepository.Create(price_feeds.PriceFeedModel{
HeaderID: headerIDs[0],
UsdValue: "123.45",
})
err := priceFeedRepository.Create(headerIDs[0], test_data.PriceFeedModel)
Expect(err).NotTo(HaveOccurred())
nodeOneMissingHeaders, err := priceFeedRepository.MissingHeaders(blockNumbers[0], blockNumbers[len(blockNumbers)-1])

View File

@ -30,7 +30,7 @@ func (initializer PriceFeedTransformerInitializer) NewPriceFeedTransformer(db *p
repository := NewPriceFeedRepository(db)
return PriceFeedTransformer{
Config: initializer.Config,
converter: converter,
Converter: converter,
Fetcher: fetcher,
Repository: repository,
}
@ -38,7 +38,7 @@ func (initializer PriceFeedTransformerInitializer) NewPriceFeedTransformer(db *p
type PriceFeedTransformer struct {
Config IPriceFeedConfig
converter PriceFeedConverter
Converter Converter
Fetcher IPriceFeedFetcher
Repository IPriceFeedRepository
}
@ -54,8 +54,11 @@ func (transformer PriceFeedTransformer) Execute() error {
return err
}
for _, log := range logs {
model := transformer.converter.ToModel(log, header.Id)
err = transformer.Repository.Create(model)
model, err := transformer.Converter.ToModel(log, header.Id)
if err != nil {
return err
}
err = transformer.Repository.Create(header.Id, model)
if err != nil {
return err
}

View File

@ -15,10 +15,6 @@
package price_feeds_test
import (
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -26,14 +22,17 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/pkg/transformers/price_feeds"
"github.com/vulcanize/vulcanizedb/pkg/transformers/test_data"
price_feeds_mocks "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data/mocks/price_feeds"
)
var _ = Describe("Price feed transformer", func() {
It("gets missing headers for price feeds", func() {
mockConverter := &price_feeds_mocks.MockPriceFeedConverter{}
mockRepository := &price_feeds_mocks.MockPriceFeedRepository{}
transformer := price_feeds.PriceFeedTransformer{
Config: price_feeds.PriceFeedConfig,
Converter: mockConverter,
Fetcher: &price_feeds_mocks.MockPriceFeedFetcher{},
Repository: mockRepository,
}
@ -45,9 +44,11 @@ var _ = Describe("Price feed transformer", func() {
})
It("returns error is missing headers call returns err", func() {
mockConverter := &price_feeds_mocks.MockPriceFeedConverter{}
mockRepository := &price_feeds_mocks.MockPriceFeedRepository{}
mockRepository.SetMissingHeadersErr(fakes.FakeError)
transformer := price_feeds.PriceFeedTransformer{
Converter: mockConverter,
Fetcher: &price_feeds_mocks.MockPriceFeedFetcher{},
Repository: mockRepository,
}
@ -59,12 +60,14 @@ var _ = Describe("Price feed transformer", func() {
})
It("fetches logs for missing headers", func() {
mockConverter := &price_feeds_mocks.MockPriceFeedConverter{}
mockRepository := &price_feeds_mocks.MockPriceFeedRepository{}
blockNumberOne := int64(1)
blockNumberTwo := int64(2)
mockRepository.SetMissingHeaders([]core.Header{{BlockNumber: blockNumberOne}, {BlockNumber: blockNumberTwo}})
mockFetcher := &price_feeds_mocks.MockPriceFeedFetcher{}
transformer := price_feeds.PriceFeedTransformer{
Converter: mockConverter,
Fetcher: mockFetcher,
Repository: mockRepository,
}
@ -76,11 +79,13 @@ var _ = Describe("Price feed transformer", func() {
})
It("returns err if fetcher returns err", func() {
mockConverter := &price_feeds_mocks.MockPriceFeedConverter{}
mockRepository := &price_feeds_mocks.MockPriceFeedRepository{}
mockRepository.SetMissingHeaders([]core.Header{{BlockNumber: 1}})
mockFetcher := &price_feeds_mocks.MockPriceFeedFetcher{}
mockFetcher.SetReturnErr(fakes.FakeError)
transformer := price_feeds.PriceFeedTransformer{
Converter: mockConverter,
Fetcher: mockFetcher,
Repository: mockRepository,
}
@ -91,31 +96,55 @@ var _ = Describe("Price feed transformer", func() {
Expect(err).To(MatchError(fakes.FakeError))
})
It("converts log to a model", func() {
mockConverter := &price_feeds_mocks.MockPriceFeedConverter{}
mockFetcher := &price_feeds_mocks.MockPriceFeedFetcher{}
mockFetcher.SetReturnLogs([]types.Log{test_data.EthPriceFeedLog})
mockRepository := &price_feeds_mocks.MockPriceFeedRepository{}
headerID := int64(11111)
mockRepository.SetMissingHeaders([]core.Header{{BlockNumber: 1, Id: headerID}})
transformer := price_feeds.PriceFeedTransformer{
Fetcher: mockFetcher,
Converter: mockConverter,
Repository: mockRepository,
}
err := transformer.Execute()
Expect(err).NotTo(HaveOccurred())
Expect(mockConverter.PassedHeaderID).To(Equal(headerID))
Expect(mockConverter.PassedLog).To(Equal(test_data.EthPriceFeedLog))
})
It("returns err if converter returns err", func() {
mockConverter := &price_feeds_mocks.MockPriceFeedConverter{}
mockConverter.SetConverterErr(fakes.FakeError)
mockFetcher := &price_feeds_mocks.MockPriceFeedFetcher{}
mockFetcher.SetReturnLogs([]types.Log{test_data.EthPriceFeedLog})
mockRepository := &price_feeds_mocks.MockPriceFeedRepository{}
headerID := int64(11111)
mockRepository.SetMissingHeaders([]core.Header{{BlockNumber: 1, Id: headerID}})
transformer := price_feeds.PriceFeedTransformer{
Fetcher: mockFetcher,
Converter: mockConverter,
Repository: mockRepository,
}
err := transformer.Execute()
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})
It("persists model converted from log", func() {
mockConverter := &price_feeds_mocks.MockPriceFeedConverter{}
mockRepository := &price_feeds_mocks.MockPriceFeedRepository{}
headerID := int64(11111)
mockRepository.SetMissingHeaders([]core.Header{{BlockNumber: 1, Id: headerID}})
mockFetcher := &price_feeds_mocks.MockPriceFeedFetcher{}
blockNumber := uint64(22222)
txIndex := uint(33333)
usdValue := int64(44444)
etherMultiplier, _ := price_feeds.Ether.Int64()
rawUsdValue := big.NewInt(0)
rawUsdValue = rawUsdValue.Mul(big.NewInt(usdValue), big.NewInt(etherMultiplier))
address := common.BytesToAddress([]byte{1, 2, 3, 4, 5})
fakeLog := types.Log{
Address: address,
Topics: nil,
Data: rawUsdValue.Bytes(),
BlockNumber: blockNumber,
TxHash: common.Hash{},
TxIndex: txIndex,
BlockHash: common.Hash{},
Index: 0,
Removed: false,
}
mockFetcher.SetReturnLogs([]types.Log{fakeLog})
mockFetcher.SetReturnLogs([]types.Log{test_data.EthPriceFeedLog})
transformer := price_feeds.PriceFeedTransformer{
Converter: mockConverter,
Fetcher: mockFetcher,
Repository: mockRepository,
}
@ -123,23 +152,18 @@ var _ = Describe("Price feed transformer", func() {
err := transformer.Execute()
Expect(err).NotTo(HaveOccurred())
expectedModel := price_feeds.PriceFeedModel{
BlockNumber: blockNumber,
HeaderID: headerID,
MedianizerAddress: address.Bytes(),
UsdValue: fmt.Sprintf("%d", usdValue),
TransactionIndex: txIndex,
}
mockRepository.AssertCreateCalledWith(expectedModel)
mockRepository.AssertCreateCalledWith(headerID, test_data.PriceFeedModel)
})
It("returns error if creating price feed update returns error", func() {
mockConverter := &price_feeds_mocks.MockPriceFeedConverter{}
mockRepository := &price_feeds_mocks.MockPriceFeedRepository{}
mockRepository.SetMissingHeaders([]core.Header{{BlockNumber: 1, Id: 2}})
mockRepository.SetCreateErr(fakes.FakeError)
mockFetcher := &price_feeds_mocks.MockPriceFeedFetcher{}
mockFetcher.SetReturnLogs([]types.Log{{}})
transformer := price_feeds.PriceFeedTransformer{
Converter: mockConverter,
Fetcher: mockFetcher,
Repository: mockRepository,
}

View File

@ -26,6 +26,7 @@ type MockPriceFeedRepository struct {
missingHeaders []core.Header
missingHeadersErr error
passedEndingBlockNumber int64
passedHeaderID int64
passedModel price_feeds.PriceFeedModel
passedStartingBlockNumber int64
}
@ -42,7 +43,8 @@ func (repository *MockPriceFeedRepository) SetMissingHeaders(headers []core.Head
repository.missingHeaders = headers
}
func (repository *MockPriceFeedRepository) Create(model price_feeds.PriceFeedModel) error {
func (repository *MockPriceFeedRepository) Create(headerID int64, model price_feeds.PriceFeedModel) error {
repository.passedHeaderID = headerID
repository.passedModel = model
return repository.createErr
}
@ -53,7 +55,8 @@ func (repository *MockPriceFeedRepository) MissingHeaders(startingBlockNumber, e
return repository.missingHeaders, repository.missingHeadersErr
}
func (repository *MockPriceFeedRepository) AssertCreateCalledWith(model price_feeds.PriceFeedModel) {
func (repository *MockPriceFeedRepository) AssertCreateCalledWith(headerID int64, model price_feeds.PriceFeedModel) {
Expect(repository.passedHeaderID).To(Equal(headerID))
Expect(repository.passedModel).To(Equal(model))
}

View File

@ -47,4 +47,5 @@ var PriceFeedModel = price_feeds.PriceFeedModel{
MedianizerAddress: EthPriceFeedLog.Address[:],
UsdValue: "378.6599388897",
TransactionIndex: EthPriceFeedLog.TxIndex,
Raw: rawPriceFeedLog,
}

View File

@ -84,6 +84,9 @@ func CleanTestDB(db *postgres.DB) {
db.MustExec("DELETE FROM maker.pit_file_debt_ceiling")
db.MustExec("DELETE FROM maker.pit_file_ilk")
db.MustExec("DELETE FROM maker.pit_file_stability_fee")
db.MustExec("DELETE FROM maker.price_feeds")
db.MustExec("DELETE FROM maker.tend")
db.MustExec("DELETE FROM maker.vat_init")
db.MustExec("DELETE FROM receipts")
db.MustExec("DELETE FROM transactions")
db.MustExec("DELETE FROM watched_contracts")