From 0551147453cdbaa1436821da2e64fee01b8eab84 Mon Sep 17 00:00:00 2001 From: Rob Mulholand Date: Tue, 24 Jul 2018 11:38:49 -0500 Subject: [PATCH] Add Pep price feed --- cmd/lightSync.go | 8 +- cmd/syncPriceFeeds.go | 106 ++++++++++++++++++ .../1532533671_create_peps_table.down.sql | 2 + .../1532533671_create_peps_table.up.sql | 9 ++ db/schema.sql | 55 +++++++++ integration_test/contract_test.go | 3 +- pkg/fakes/mock_blockchain.go | 103 ++++++++++------- pkg/fakes/mock_header_repository.go | 21 ++-- pkg/fakes/mock_transformer.go | 36 ++++++ pkg/geth/blockchain.go | 1 + pkg/geth/blockchain_test.go | 9 +- pkg/history/header_validator.go | 3 +- pkg/history/populate_headers.go | 34 ++++-- pkg/history/populate_headers_test.go | 52 +++++++-- pkg/transformers/pep/constants.go | 6 + pkg/transformers/pep/fetcher.go | 54 +++++++++ pkg/transformers/pep/fetcher_test.go | 72 ++++++++++++ pkg/transformers/pep/pep.go | 7 ++ pkg/transformers/pep/pep_suite_test.go | 13 +++ pkg/transformers/pep/repository.go | 27 +++++ pkg/transformers/pep/repository_test.go | 36 ++++++ pkg/transformers/pep/transformer.go | 59 ++++++++++ pkg/transformers/pep/transformer_test.go | 44 ++++++++ pkg/transformers/shared/fetcher_test.go | 2 +- pkg/transformers/transformer.go | 9 ++ test_config/test_config.go | 1 + 26 files changed, 697 insertions(+), 75 deletions(-) create mode 100644 cmd/syncPriceFeeds.go create mode 100644 db/migrations/1532533671_create_peps_table.down.sql create mode 100644 db/migrations/1532533671_create_peps_table.up.sql create mode 100644 pkg/fakes/mock_transformer.go create mode 100644 pkg/transformers/pep/constants.go create mode 100644 pkg/transformers/pep/fetcher.go create mode 100644 pkg/transformers/pep/fetcher_test.go create mode 100644 pkg/transformers/pep/pep.go create mode 100644 pkg/transformers/pep/pep_suite_test.go create mode 100644 pkg/transformers/pep/repository.go create mode 100644 pkg/transformers/pep/repository_test.go create mode 100644 pkg/transformers/pep/transformer.go create mode 100644 pkg/transformers/pep/transformer_test.go create mode 100644 pkg/transformers/transformer.go diff --git a/cmd/lightSync.go b/cmd/lightSync.go index 242c1e53..bb191df9 100644 --- a/cmd/lightSync.go +++ b/cmd/lightSync.go @@ -31,6 +31,7 @@ import ( vRpc "github.com/vulcanize/vulcanizedb/pkg/geth/converters/rpc" "github.com/vulcanize/vulcanizedb/pkg/geth/node" "github.com/vulcanize/vulcanizedb/pkg/history" + "github.com/vulcanize/vulcanizedb/pkg/transformers" "github.com/vulcanize/vulcanizedb/utils" ) @@ -64,7 +65,12 @@ func init() { } func backFillAllHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, missingBlocksPopulated chan int, startingBlockNumber int64) { - missingBlocksPopulated <- history.PopulateMissingHeaders(blockchain, headerRepository, startingBlockNumber) + emptyTransformers := []transformers.Transformer{} + populated, err := history.PopulateMissingHeaders(blockchain, headerRepository, startingBlockNumber, emptyTransformers) + if err != nil { + log.Fatal("Error populating headers: ", err) + } + missingBlocksPopulated <- populated } func lightSync() { diff --git a/cmd/syncPriceFeeds.go b/cmd/syncPriceFeeds.go new file mode 100644 index 00000000..f70482c0 --- /dev/null +++ b/cmd/syncPriceFeeds.go @@ -0,0 +1,106 @@ +// Copyright © 2018 NAME HERE +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "log" + "os" + "time" + + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/rpc" + "github.com/spf13/cobra" + + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" + "github.com/vulcanize/vulcanizedb/pkg/geth" + "github.com/vulcanize/vulcanizedb/pkg/geth/client" + vRpc "github.com/vulcanize/vulcanizedb/pkg/geth/converters/rpc" + "github.com/vulcanize/vulcanizedb/pkg/geth/node" + "github.com/vulcanize/vulcanizedb/pkg/history" + "github.com/vulcanize/vulcanizedb/pkg/transformers" + "github.com/vulcanize/vulcanizedb/pkg/transformers/pep" + "github.com/vulcanize/vulcanizedb/utils" +) + +// syncPriceFeedsCmd represents the syncPriceFeeds command +var syncPriceFeedsCmd = &cobra.Command{ + Use: "syncPriceFeeds", + Short: "Sync block headers with price feed data", + Long: `Sync Ethereum block headers and price feed data. For example: + +./vulcanizedb syncPriceFeeds --config --starting-block-number + +Price feed data will be updated when price feed contracts log value events.`, + Run: func(cmd *cobra.Command, args []string) { + syncPriceFeeds() + }, +} + +func init() { + rootCmd.AddCommand(syncPriceFeedsCmd) + syncPriceFeedsCmd.Flags().Int64VarP(&startingBlockNumber, "starting-block-number", "s", 0, "block number at which to start tracking price feeds") +} + +func backFillPriceFeeds(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, missingBlocksPopulated chan int, startingBlockNumber int64, transformers []transformers.Transformer) { + populated, err := history.PopulateMissingHeaders(blockchain, headerRepository, startingBlockNumber, transformers) + if err != nil { + log.Fatal("Error populating headers: ", err) + } + missingBlocksPopulated <- populated +} + +func syncPriceFeeds() { + ticker := time.NewTicker(pollingInterval) + defer ticker.Stop() + rawRpcClient, err := rpc.Dial(ipc) + if err != nil { + log.Fatal(err) + } + rpcClient := client.NewRpcClient(rawRpcClient, ipc) + ethClient := ethclient.NewClient(rawRpcClient) + client := client.NewEthClient(ethClient) + node := node.MakeNode(rpcClient) + transactionConverter := vRpc.NewRpcTransactionConverter(client) + blockChain := geth.NewBlockChain(client, node, transactionConverter) + + lastBlock := blockChain.LastBlock().Int64() + if lastBlock == 0 { + log.Fatal("geth initial: state sync not finished") + } + if startingBlockNumber > lastBlock { + log.Fatal("starting block number > current block number") + } + + db := utils.LoadPostgres(databaseConfig, blockChain.Node()) + headerRepository := repositories.NewHeaderRepository(&db) + validator := history.NewHeaderValidator(blockChain, headerRepository, validationWindow) + missingBlocksPopulated := make(chan int) + transformers := []transformers.Transformer{ + pep.NewPepTransformer(blockChain, &db), + } + go backFillPriceFeeds(blockChain, headerRepository, missingBlocksPopulated, startingBlockNumber, transformers) + + for { + select { + case <-ticker.C: + window := validator.ValidateHeaders() + window.Log(os.Stdout) + case <-missingBlocksPopulated: + go backFillPriceFeeds(blockChain, headerRepository, missingBlocksPopulated, startingBlockNumber, transformers) + } + } +} diff --git a/db/migrations/1532533671_create_peps_table.down.sql b/db/migrations/1532533671_create_peps_table.down.sql new file mode 100644 index 00000000..2e0b3fd4 --- /dev/null +++ b/db/migrations/1532533671_create_peps_table.down.sql @@ -0,0 +1,2 @@ +DROP TABLE maker.peps; +DROP SCHEMA maker; diff --git a/db/migrations/1532533671_create_peps_table.up.sql b/db/migrations/1532533671_create_peps_table.up.sql new file mode 100644 index 00000000..7dbf11c7 --- /dev/null +++ b/db/migrations/1532533671_create_peps_table.up.sql @@ -0,0 +1,9 @@ +CREATE TABLE maker.peps ( + id SERIAL PRIMARY KEY, + block_number BIGINT NOT NULL, + header_id INTEGER NOT NULL, + usd_value NUMERIC, + CONSTRAINT headers_fk FOREIGN KEY (header_id) + REFERENCES headers (id) + ON DELETE CASCADE +); diff --git a/db/schema.sql b/db/schema.sql index 2f77595c..9b89e19b 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -119,6 +119,38 @@ CREATE SEQUENCE maker.frob_id_seq ALTER SEQUENCE maker.frob_id_seq OWNED BY maker.frob.id; +-- +-- Name: peps; Type: TABLE; Schema: maker; Owner: - +-- + +CREATE TABLE maker.peps ( + id integer NOT NULL, + block_number bigint NOT NULL, + header_id integer NOT NULL, + usd_value numeric +); + + +-- +-- Name: peps_id_seq; Type: SEQUENCE; Schema: maker; Owner: - +-- + +CREATE SEQUENCE maker.peps_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: peps_id_seq; Type: SEQUENCE OWNED BY; Schema: maker; Owner: - +-- + +ALTER SEQUENCE maker.peps_id_seq OWNED BY maker.peps.id; + + -- -- Name: logs; Type: TABLE; Schema: public; Owner: - -- @@ -505,6 +537,13 @@ ALTER TABLE ONLY maker.flip_kick ALTER COLUMN db_id SET DEFAULT nextval('maker.f ALTER TABLE ONLY maker.frob ALTER COLUMN id SET DEFAULT nextval('maker.frob_id_seq'::regclass); +-- +-- Name: peps id; Type: DEFAULT; Schema: maker; Owner: - +-- + +ALTER TABLE ONLY maker.peps ALTER COLUMN id SET DEFAULT nextval('maker.peps_id_seq'::regclass); + + -- -- Name: blocks id; Type: DEFAULT; Schema: public; Owner: - -- @@ -600,6 +639,14 @@ ALTER TABLE ONLY maker.frob ADD CONSTRAINT frob_pkey PRIMARY KEY (id); +-- +-- Name: peps peps_pkey; Type: CONSTRAINT; Schema: maker; Owner: - +-- + +ALTER TABLE ONLY maker.peps + ADD CONSTRAINT peps_pkey PRIMARY KEY (id); + + -- -- Name: blocks blocks_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -747,6 +794,14 @@ ALTER TABLE ONLY maker.frob ADD CONSTRAINT frob_header_id_fkey FOREIGN KEY (header_id) REFERENCES public.headers(id) ON DELETE CASCADE; +-- +-- Name: peps headers_fk; Type: FK CONSTRAINT; Schema: maker; Owner: - +-- + +ALTER TABLE ONLY maker.peps + ADD CONSTRAINT headers_fk FOREIGN KEY (header_id) REFERENCES public.headers(id) ON DELETE CASCADE; + + -- -- Name: transactions blocks_fk; Type: FK CONSTRAINT; Schema: public; Owner: - -- diff --git a/integration_test/contract_test.go b/integration_test/contract_test.go index ad204edb..010c3448 100644 --- a/integration_test/contract_test.go +++ b/integration_test/contract_test.go @@ -59,11 +59,10 @@ var _ = Describe("Reading contracts", func() { transactionConverter := rpc2.NewRpcTransactionConverter(ethClient) blockChain := geth.NewBlockChain(blockChainClient, node, transactionConverter) - logs, err := blockChain.GetLogs(core.Contract{Hash: "x123"}, big.NewInt(4703824), nil) + logs, err := blockChain.GetLogs(core.Contract{Hash: "0x123"}, big.NewInt(4703824), nil) Expect(err).To(BeNil()) Expect(len(logs)).To(Equal(0)) - }) }) diff --git a/pkg/fakes/mock_blockchain.go b/pkg/fakes/mock_blockchain.go index 1c12bdf9..1523a39f 100644 --- a/pkg/fakes/mock_blockchain.go +++ b/pkg/fakes/mock_blockchain.go @@ -3,10 +3,10 @@ package fakes import ( "math/big" - . "github.com/onsi/gomega" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core/types" + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/core" ) @@ -18,9 +18,15 @@ type MockBlockChain struct { fetchContractDataPassedMethodArg interface{} fetchContractDataPassedResult interface{} fetchContractDataPassedBlockNumber int64 + getLogsPassedContract core.Contract + getLogsPassedStartingBlockNumber *big.Int + getLogsPassedEndingBlockNumber *big.Int + getLogsReturnLogs []core.Log + getLogsReturnErr error getBlockByNumberErr error logQuery ethereum.FilterQuery logQueryErr error + logQueryReturnLogs []types.Log lastBlock *big.Int node core.Node } @@ -31,68 +37,85 @@ func NewMockBlockChain() *MockBlockChain { } } -func (blockChain *MockBlockChain) SetFetchContractDataErr(err error) { - blockChain.fetchContractDataErr = err +func (chain *MockBlockChain) SetFetchContractDataErr(err error) { + chain.fetchContractDataErr = err } -func (blockChain *MockBlockChain) SetLastBlock(blockNumber *big.Int) { - blockChain.lastBlock = blockNumber +func (chain *MockBlockChain) SetLastBlock(blockNumber *big.Int) { + chain.lastBlock = blockNumber } -func (blockChain *MockBlockChain) SetGetBlockByNumberErr(err error) { - blockChain.getBlockByNumberErr = err +func (chain *MockBlockChain) SetGetBlockByNumberErr(err error) { + chain.getBlockByNumberErr = err } -func (blockChain *MockBlockChain) SetGetLogsErr(err error) { - blockChain.logQueryErr = err +func (chain *MockBlockChain) SetGetEthLogsWithCustomQueryErr(err error) { + chain.logQueryErr = err +} + +func (chain *MockBlockChain) SetGetEthLogsWithCustomQueryReturnLogs(logs []types.Log) { + chain.logQueryReturnLogs = logs +} + +func (chain *MockBlockChain) SetGetLogsReturnErr(err error) { + chain.getLogsReturnErr = err +} + +func (chain *MockBlockChain) SetGetLogsReturnLogs(logs []core.Log) { + chain.getLogsReturnLogs = logs +} + +func (chain *MockBlockChain) FetchContractData(abiJSON string, address string, method string, methodArg interface{}, result interface{}, blockNumber int64) error { + chain.fetchContractDataPassedAbi = abiJSON + chain.fetchContractDataPassedAddress = address + chain.fetchContractDataPassedMethod = method + chain.fetchContractDataPassedMethodArg = methodArg + chain.fetchContractDataPassedResult = result + chain.fetchContractDataPassedBlockNumber = blockNumber + return chain.fetchContractDataErr +} + +func (chain *MockBlockChain) GetBlockByNumber(blockNumber int64) (core.Block, error) { + return core.Block{Number: blockNumber}, chain.getBlockByNumberErr } func (blockChain *MockBlockChain) GetEthLogsWithCustomQuery(query ethereum.FilterQuery) ([]types.Log, error) { blockChain.logQuery = query - return []types.Log{}, blockChain.logQueryErr + return blockChain.logQueryReturnLogs, blockChain.logQueryErr } -func (blockChain *MockBlockChain) GetHeaderByNumber(blockNumber int64) (core.Header, error) { +func (chain *MockBlockChain) GetHeaderByNumber(blockNumber int64) (core.Header, error) { return core.Header{BlockNumber: blockNumber}, nil } -func (blockChain *MockBlockChain) FetchContractData(abiJSON string, address string, method string, methodArg interface{}, result interface{}, blockNumber int64) error { - blockChain.fetchContractDataPassedAbi = abiJSON - blockChain.fetchContractDataPassedAddress = address - blockChain.fetchContractDataPassedMethod = method - blockChain.fetchContractDataPassedMethodArg = methodArg - blockChain.fetchContractDataPassedResult = result - blockChain.fetchContractDataPassedBlockNumber = blockNumber - return blockChain.fetchContractDataErr +func (chain *MockBlockChain) GetLogs(contract core.Contract, startingBlockNumber, endingBlockNumber *big.Int) ([]core.Log, error) { + chain.getLogsPassedContract = contract + chain.getLogsPassedStartingBlockNumber = startingBlockNumber + chain.getLogsPassedEndingBlockNumber = endingBlockNumber + return chain.getLogsReturnLogs, chain.getLogsReturnErr } -func (blockChain *MockBlockChain) CallContract(contractHash string, input []byte, blockNumber *big.Int) ([]byte, error) { +func (chain *MockBlockChain) CallContract(contractHash string, input []byte, blockNumber *big.Int) ([]byte, error) { return []byte{}, nil } -func (blockChain *MockBlockChain) LastBlock() *big.Int { - return blockChain.lastBlock +func (chain *MockBlockChain) LastBlock() *big.Int { + return chain.lastBlock } -func (blockChain *MockBlockChain) GetLogs(contract core.Contract, startingBlock *big.Int, endingBlock *big.Int) ([]core.Log, error) { - return []core.Log{}, nil +func (chain *MockBlockChain) Node() core.Node { + return chain.node } -func (blockChain *MockBlockChain) Node() core.Node { - return blockChain.node -} - -func (blockChain *MockBlockChain) GetBlockByNumber(blockNumber int64) (core.Block, error) { - return core.Block{Number: blockNumber}, blockChain.getBlockByNumberErr -} - -// TODO: handle methodArg being nil (can't match nil to nil in Gomega) -func (blockChain *MockBlockChain) AssertFetchContractDataCalledWith(abiJSON string, address string, method string, methodArg interface{}, result interface{}, blockNumber int64) { - Expect(blockChain.fetchContractDataPassedAbi).To(Equal(abiJSON)) - Expect(blockChain.fetchContractDataPassedAddress).To(Equal(address)) - Expect(blockChain.fetchContractDataPassedMethod).To(Equal(method)) - Expect(blockChain.fetchContractDataPassedResult).To(Equal(result)) - Expect(blockChain.fetchContractDataPassedBlockNumber).To(Equal(blockNumber)) +func (chain *MockBlockChain) AssertFetchContractDataCalledWith(abiJSON string, address string, method string, methodArg interface{}, result interface{}, blockNumber int64) { + Expect(chain.fetchContractDataPassedAbi).To(Equal(abiJSON)) + Expect(chain.fetchContractDataPassedAddress).To(Equal(address)) + Expect(chain.fetchContractDataPassedMethod).To(Equal(method)) + if methodArg != nil { + Expect(chain.fetchContractDataPassedMethodArg).To(Equal(methodArg)) + } + Expect(chain.fetchContractDataPassedResult).To(Equal(result)) + Expect(chain.fetchContractDataPassedBlockNumber).To(Equal(blockNumber)) } func (blockChain *MockBlockChain) AssertGetEthLogsWithCustomQueryCalledWith(query ethereum.FilterQuery) { diff --git a/pkg/fakes/mock_header_repository.go b/pkg/fakes/mock_header_repository.go index e5254e9c..9f188937 100644 --- a/pkg/fakes/mock_header_repository.go +++ b/pkg/fakes/mock_header_repository.go @@ -7,23 +7,28 @@ import ( ) type MockHeaderRepository struct { - createOrUpdateBlockNumbersCallCount int - createOrUpdateBlockNumbersPassedBlockNumbers []int64 - missingBlockNumbers []int64 + createOrUpdateHeaderCallCount int + createOrUpdateHeaderPassedBlockNumbers []int64 + createOrUpdateHeaderReturnID int64 + missingBlockNumbers []int64 } func NewMockHeaderRepository() *MockHeaderRepository { return &MockHeaderRepository{} } +func (repository *MockHeaderRepository) SetCreateOrUpdateHeaderReturnID(id int64) { + repository.createOrUpdateHeaderReturnID = id +} + func (repository *MockHeaderRepository) SetMissingBlockNumbers(blockNumbers []int64) { repository.missingBlockNumbers = blockNumbers } func (repository *MockHeaderRepository) CreateOrUpdateHeader(header core.Header) (int64, error) { - repository.createOrUpdateBlockNumbersCallCount++ - repository.createOrUpdateBlockNumbersPassedBlockNumbers = append(repository.createOrUpdateBlockNumbersPassedBlockNumbers, header.BlockNumber) - return 0, nil + repository.createOrUpdateHeaderCallCount++ + repository.createOrUpdateHeaderPassedBlockNumbers = append(repository.createOrUpdateHeaderPassedBlockNumbers, header.BlockNumber) + return repository.createOrUpdateHeaderReturnID, nil } func (*MockHeaderRepository) GetHeader(blockNumber int64) (core.Header, error) { @@ -35,6 +40,6 @@ func (repository *MockHeaderRepository) MissingBlockNumbers(startingBlockNumber, } func (repository *MockHeaderRepository) AssertCreateOrUpdateHeaderCallCountAndPassedBlockNumbers(times int, blockNumbers []int64) { - Expect(repository.createOrUpdateBlockNumbersCallCount).To(Equal(times)) - Expect(repository.createOrUpdateBlockNumbersPassedBlockNumbers).To(Equal(blockNumbers)) + Expect(repository.createOrUpdateHeaderCallCount).To(Equal(times)) + Expect(repository.createOrUpdateHeaderPassedBlockNumbers).To(Equal(blockNumbers)) } diff --git a/pkg/fakes/mock_transformer.go b/pkg/fakes/mock_transformer.go new file mode 100644 index 00000000..9c47df76 --- /dev/null +++ b/pkg/fakes/mock_transformer.go @@ -0,0 +1,36 @@ +package fakes + +import ( + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/pkg/core" +) + +type MockTransformer struct { + passedHeader core.Header + passedHeaderID int64 + executeErr error +} + +func NewMockTransformer() *MockTransformer { + return &MockTransformer{ + passedHeader: core.Header{}, + passedHeaderID: 0, + executeErr: nil, + } +} + +func (transformer *MockTransformer) SetExecuteErr(err error) { + transformer.executeErr = err +} + +func (transformer *MockTransformer) Execute(header core.Header, headerID int64) error { + transformer.passedHeader = header + transformer.passedHeaderID = headerID + return transformer.executeErr +} + +func (transformer *MockTransformer) AssertExecuteCalledWith(header core.Header, headerID int64) { + Expect(header).To(Equal(transformer.passedHeader)) + Expect(headerID).To(Equal(transformer.passedHeaderID)) +} diff --git a/pkg/geth/blockchain.go b/pkg/geth/blockchain.go index a772cc32..fac84971 100644 --- a/pkg/geth/blockchain.go +++ b/pkg/geth/blockchain.go @@ -53,6 +53,7 @@ func (blockChain *BlockChain) GetLogs(contract core.Contract, startingBlockNumbe FromBlock: startingBlockNumber, ToBlock: endingBlockNumber, Addresses: []common.Address{contractAddress}, + Topics: nil, } gethLogs, err := blockChain.GetEthLogsWithCustomQuery(fc) if err != nil { diff --git a/pkg/geth/blockchain_test.go b/pkg/geth/blockchain_test.go index c48e8c9a..f300d8a6 100644 --- a/pkg/geth/blockchain_test.go +++ b/pkg/geth/blockchain_test.go @@ -112,6 +112,7 @@ var _ = Describe("Geth blockchain", func() { Addresses: []common.Address{address}, Topics: [][]common.Hash{{topic}}, } + _, err := blockChain.GetEthLogsWithCustomQuery(query) Expect(err).NotTo(HaveOccurred()) @@ -123,8 +124,14 @@ var _ = Describe("Geth blockchain", func() { contract := vulcCore.Contract{Hash: common.BytesToHash([]byte{1, 2, 3, 4, 5}).Hex()} startingBlockNumber := big.NewInt(1) endingBlockNumber := big.NewInt(2) + query := ethereum.FilterQuery{ + FromBlock: startingBlockNumber, + ToBlock: endingBlockNumber, + Addresses: []common.Address{common.HexToAddress(contract.Hash)}, + Topics: nil, + } - _, err := blockChain.GetLogs(contract, startingBlockNumber, endingBlockNumber) + _, err := blockChain.GetEthLogsWithCustomQuery(query) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError(fakes.FakeError)) diff --git a/pkg/history/header_validator.go b/pkg/history/header_validator.go index b733aaee..2db28603 100644 --- a/pkg/history/header_validator.go +++ b/pkg/history/header_validator.go @@ -3,6 +3,7 @@ package history import ( "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore" + "github.com/vulcanize/vulcanizedb/pkg/transformers" ) type HeaderValidator struct { @@ -22,6 +23,6 @@ func NewHeaderValidator(blockChain core.BlockChain, repository datastore.HeaderR func (validator HeaderValidator) ValidateHeaders() ValidationWindow { window := MakeValidationWindow(validator.blockChain, validator.windowSize) blockNumbers := MakeRange(window.LowerBound, window.UpperBound) - RetrieveAndUpdateHeaders(validator.blockChain, validator.headerRepository, blockNumbers) + RetrieveAndUpdateHeaders(validator.blockChain, validator.headerRepository, blockNumbers, []transformers.Transformer{}) return window } diff --git a/pkg/history/populate_headers.go b/pkg/history/populate_headers.go index 1fc1a80d..2e2fa9c8 100644 --- a/pkg/history/populate_headers.go +++ b/pkg/history/populate_headers.go @@ -1,29 +1,41 @@ package history import ( + "log" + "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore" - "log" + "github.com/vulcanize/vulcanizedb/pkg/transformers" ) -func PopulateMissingHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, startingBlockNumber int64) int { +func PopulateMissingHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, startingBlockNumber int64, transformers []transformers.Transformer) (int, error) { lastBlock := blockchain.LastBlock().Int64() blockRange := headerRepository.MissingBlockNumbers(startingBlockNumber, lastBlock, blockchain.Node().ID) log.SetPrefix("") log.Printf("Backfilling %d blocks\n\n", len(blockRange)) - RetrieveAndUpdateHeaders(blockchain, headerRepository, blockRange) - return len(blockRange) + _, err := RetrieveAndUpdateHeaders(blockchain, headerRepository, blockRange, transformers) + if err != nil { + return 0, err + } + return len(blockRange), nil } -func RetrieveAndUpdateHeaders(blockchain core.BlockChain, headerRepository datastore.HeaderRepository, blockNumbers []int64) int { +func RetrieveAndUpdateHeaders(chain core.BlockChain, headerRepository datastore.HeaderRepository, blockNumbers []int64, transformers []transformers.Transformer) (int, error) { for _, blockNumber := range blockNumbers { - header, err := blockchain.GetHeaderByNumber(blockNumber) + header, err := chain.GetHeaderByNumber(blockNumber) if err != nil { - log.Printf("failed to retrieve block number: %d\n", blockNumber) - return 0 + return 0, err + } + id, err := headerRepository.CreateOrUpdateHeader(header) + if err != nil { + return 0, err + } + for _, transformer := range transformers { + err := transformer.Execute(header, id) + if err != nil { + return 0, err + } } - // TODO: handle possible error here - headerRepository.CreateOrUpdateHeader(header) } - return len(blockNumbers) + return len(blockNumbers), nil } diff --git a/pkg/history/populate_headers_test.go b/pkg/history/populate_headers_test.go index 5d3bf8b5..3a5466e7 100644 --- a/pkg/history/populate_headers_test.go +++ b/pkg/history/populate_headers_test.go @@ -6,8 +6,10 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/pkg/history" + "github.com/vulcanize/vulcanizedb/pkg/transformers" ) var _ = Describe("Populating headers", func() { @@ -18,17 +20,15 @@ var _ = Describe("Populating headers", func() { headerRepository = fakes.NewMockHeaderRepository() }) - Describe("When 1 missing header", func() { + It("returns number of headers added", func() { + blockChain := fakes.NewMockBlockChain() + blockChain.SetLastBlock(big.NewInt(2)) + headerRepository.SetMissingBlockNumbers([]int64{2}) - It("returns number of headers added", func() { - blockChain := fakes.NewMockBlockChain() - blockChain.SetLastBlock(big.NewInt(2)) - headerRepository.SetMissingBlockNumbers([]int64{2}) + headersAdded, err := history.PopulateMissingHeaders(blockChain, headerRepository, 1, []transformers.Transformer{}) - headersAdded := history.PopulateMissingHeaders(blockChain, headerRepository, 1) - - Expect(headersAdded).To(Equal(1)) - }) + Expect(err).NotTo(HaveOccurred()) + Expect(headersAdded).To(Equal(1)) }) It("adds missing headers to the db", func() { @@ -36,8 +36,40 @@ var _ = Describe("Populating headers", func() { blockChain.SetLastBlock(big.NewInt(2)) headerRepository.SetMissingBlockNumbers([]int64{2}) - history.PopulateMissingHeaders(blockChain, headerRepository, 1) + _, err := history.PopulateMissingHeaders(blockChain, headerRepository, 1, []transformers.Transformer{}) + Expect(err).NotTo(HaveOccurred()) headerRepository.AssertCreateOrUpdateHeaderCallCountAndPassedBlockNumbers(1, []int64{2}) }) + + It("executes passed transformers with created headers", func() { + blockNumber := int64(54321) + blockChain := fakes.NewMockBlockChain() + blockChain.SetLastBlock(big.NewInt(blockNumber)) + headerRepository.SetMissingBlockNumbers([]int64{blockNumber}) + headerID := int64(12345) + headerRepository.SetCreateOrUpdateHeaderReturnID(headerID) + transformer := fakes.NewMockTransformer() + + _, err := history.PopulateMissingHeaders(blockChain, headerRepository, 1, []transformers.Transformer{transformer}) + + Expect(err).NotTo(HaveOccurred()) + transformer.AssertExecuteCalledWith(core.Header{BlockNumber: blockNumber}, headerID) + }) + + It("returns error if executing transformer fails", func() { + blockNumber := int64(54321) + blockChain := fakes.NewMockBlockChain() + blockChain.SetLastBlock(big.NewInt(blockNumber)) + headerRepository.SetMissingBlockNumbers([]int64{blockNumber}) + headerID := int64(12345) + headerRepository.SetCreateOrUpdateHeaderReturnID(headerID) + transformer := fakes.NewMockTransformer() + transformer.SetExecuteErr(fakes.FakeError) + + _, err := history.PopulateMissingHeaders(blockChain, headerRepository, 1, []transformers.Transformer{transformer}) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) + }) }) diff --git a/pkg/transformers/pep/constants.go b/pkg/transformers/pep/constants.go new file mode 100644 index 00000000..ecd0160c --- /dev/null +++ b/pkg/transformers/pep/constants.go @@ -0,0 +1,6 @@ +package pep + +var ( + LogValueTopic0 = "0x296ba4ca62c6c21c95e828080cb8aec7481b71390585605300a8a76f9e95b527" + PepAddress = "0x99041f808d598b782d5a3e498681c2452a31da08" +) diff --git a/pkg/transformers/pep/fetcher.go b/pkg/transformers/pep/fetcher.go new file mode 100644 index 00000000..0a014744 --- /dev/null +++ b/pkg/transformers/pep/fetcher.go @@ -0,0 +1,54 @@ +package pep + +import ( + "errors" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/vulcanize/vulcanizedb/pkg/core" + "math/big" +) + +var ( + ErrMultipleLogs = errors.New("multiple matching logs found in block") + ErrNoMatchingLog = errors.New("no matching log") +) + +type IPepFetcher interface { + FetchPepValue(header core.Header) (string, error) +} + +type PepFetcher struct { + blockChain core.BlockChain +} + +func NewPepFetcher(chain core.BlockChain) PepFetcher { + return PepFetcher{ + blockChain: chain, + } +} + +func (fetcher PepFetcher) FetchPepValue(header core.Header) (string, error) { + blockNumber := big.NewInt(header.BlockNumber) + query := ethereum.FilterQuery{ + FromBlock: blockNumber, + ToBlock: blockNumber, + Addresses: []common.Address{common.HexToAddress(PepAddress)}, + Topics: [][]common.Hash{{common.HexToHash(LogValueTopic0)}}, + } + logs, err := fetcher.blockChain.GetEthLogsWithCustomQuery(query) + return fetcher.getLogValue(logs, err) +} + +func (fetcher PepFetcher) getLogValue(logs []types.Log, err error) (string, error) { + if err != nil { + return "", err + } + if len(logs) < 1 { + return "", ErrNoMatchingLog + } + if len(logs) > 1 { + return "", ErrMultipleLogs + } + return string(logs[0].Data), nil +} diff --git a/pkg/transformers/pep/fetcher_test.go b/pkg/transformers/pep/fetcher_test.go new file mode 100644 index 00000000..b1dd498f --- /dev/null +++ b/pkg/transformers/pep/fetcher_test.go @@ -0,0 +1,72 @@ +package pep_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/fakes" + "github.com/vulcanize/vulcanizedb/pkg/transformers/pep" + "math/big" +) + +var _ = Describe("Pep fetcher", func() { + It("calls contract to peek mkr/usd value", func() { + mockBlockChain := fakes.NewMockBlockChain() + mockBlockChain.SetGetEthLogsWithCustomQueryReturnLogs([]types.Log{{}}) + fetcher := pep.NewPepFetcher(mockBlockChain) + blockNumber := int64(100) + header := core.Header{ + BlockNumber: blockNumber, + Hash: "", + Raw: nil, + } + + _, err := fetcher.FetchPepValue(header) + + Expect(err).NotTo(HaveOccurred()) + expectedQuery := ethereum.FilterQuery{ + FromBlock: big.NewInt(blockNumber), + ToBlock: big.NewInt(blockNumber), + Addresses: []common.Address{common.HexToAddress(pep.PepAddress)}, + Topics: [][]common.Hash{{common.HexToHash(pep.LogValueTopic0)}}, + } + mockBlockChain.AssertGetEthLogsWithCustomQueryCalledWith(expectedQuery) + }) + + It("returns error if contract call fails", func() { + mockBlockChain := fakes.NewMockBlockChain() + mockBlockChain.SetGetEthLogsWithCustomQueryReturnLogs([]types.Log{{}}) + mockBlockChain.SetGetEthLogsWithCustomQueryErr(fakes.FakeError) + fetcher := pep.NewPepFetcher(mockBlockChain) + + _, err := fetcher.FetchPepValue(core.Header{}) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) + }) + + It("returns no matching logs error if no logs returned", func() { + mockBlockChain := fakes.NewMockBlockChain() + fetcher := pep.NewPepFetcher(mockBlockChain) + + _, err := fetcher.FetchPepValue(core.Header{}) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(pep.ErrNoMatchingLog)) + }) + + It("returns error if more than one matching logs returned", func() { + mockBlockChain := fakes.NewMockBlockChain() + mockBlockChain.SetGetEthLogsWithCustomQueryReturnLogs([]types.Log{{}, {}}) + fetcher := pep.NewPepFetcher(mockBlockChain) + + _, err := fetcher.FetchPepValue(core.Header{}) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(pep.ErrMultipleLogs)) + }) +}) diff --git a/pkg/transformers/pep/pep.go b/pkg/transformers/pep/pep.go new file mode 100644 index 00000000..f652b420 --- /dev/null +++ b/pkg/transformers/pep/pep.go @@ -0,0 +1,7 @@ +package pep + +type Pep struct { + BlockNumber int64 `db:"block_number"` + HeaderID int64 `db:"header_id"` + UsdValue string `db:"usd_value"` +} diff --git a/pkg/transformers/pep/pep_suite_test.go b/pkg/transformers/pep/pep_suite_test.go new file mode 100644 index 00000000..1a885590 --- /dev/null +++ b/pkg/transformers/pep/pep_suite_test.go @@ -0,0 +1,13 @@ +package pep_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestPep(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Pep Suite") +} diff --git a/pkg/transformers/pep/repository.go b/pkg/transformers/pep/repository.go new file mode 100644 index 00000000..65afba49 --- /dev/null +++ b/pkg/transformers/pep/repository.go @@ -0,0 +1,27 @@ +package pep + +import ( + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" +) + +type IPepRepository interface { + CreatePep(pep Pep) error +} + +type PepRepository struct { + db *postgres.DB +} + +func NewPepRepository(db *postgres.DB) PepRepository { + return PepRepository{ + db: db, + } +} + +func (repository PepRepository) CreatePep(pep Pep) error { + _, err := repository.db.Exec(`INSERT INTO maker.peps (block_number, header_id, usd_value) VALUES ($1, $2, $3::NUMERIC)`, pep.BlockNumber, pep.HeaderID, pep.UsdValue) + if err != nil { + return err + } + return nil +} diff --git a/pkg/transformers/pep/repository_test.go b/pkg/transformers/pep/repository_test.go new file mode 100644 index 00000000..85393b3d --- /dev/null +++ b/pkg/transformers/pep/repository_test.go @@ -0,0 +1,36 @@ +package pep_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" + "github.com/vulcanize/vulcanizedb/pkg/transformers/pep" + "github.com/vulcanize/vulcanizedb/test_config" +) + +var _ = Describe("Pep repository", func() { + It("creates a pep", func() { + db := test_config.NewTestDB(core.Node{}) + test_config.CleanTestDB(db) + repository := pep.NewPepRepository(db) + header := core.Header{BlockNumber: 12345} + headerRepository := repositories.NewHeaderRepository(db) + headerID, err := headerRepository.CreateOrUpdateHeader(header) + Expect(err).NotTo(HaveOccurred()) + pepToAdd := pep.Pep{ + BlockNumber: header.BlockNumber, + HeaderID: headerID, + UsdValue: "123.456", + } + + err = repository.CreatePep(pepToAdd) + + Expect(err).NotTo(HaveOccurred()) + var dbPep pep.Pep + err = db.Get(&dbPep, `SELECT block_number, header_id, usd_value FROM maker.peps WHERE header_id = $1`, pepToAdd.HeaderID) + Expect(err).NotTo(HaveOccurred()) + Expect(dbPep).To(Equal(pepToAdd)) + }) +}) diff --git a/pkg/transformers/pep/transformer.go b/pkg/transformers/pep/transformer.go new file mode 100644 index 00000000..bffd8814 --- /dev/null +++ b/pkg/transformers/pep/transformer.go @@ -0,0 +1,59 @@ +package pep + +import ( + "math/big" + + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" +) + +var Ether = big.NewFloat(1e18) +var Ray = big.NewFloat(1e27) + +type PepTransformer struct { + fetcher IPepFetcher + repository IPepRepository +} + +func NewPepTransformer(chain core.BlockChain, db *postgres.DB) PepTransformer { + fetcher := NewPepFetcher(chain) + repository := NewPepRepository(db) + return PepTransformer{ + fetcher: fetcher, + repository: repository, + } +} + +func (transformer PepTransformer) Execute(header core.Header, headerID int64) error { + logValue, err := transformer.fetcher.FetchPepValue(header) + if err != nil { + if err == ErrNoMatchingLog { + return nil + } + return err + } + pep := getPep(logValue, header, headerID) + return transformer.repository.CreatePep(pep) +} + +func getPep(logValue string, header core.Header, headerID int64) Pep { + valueInUSD := convert("wad", logValue, 15) + pep := Pep{ + BlockNumber: header.BlockNumber, + HeaderID: headerID, + UsdValue: valueInUSD, + } + return pep +} + +func convert(conversion string, value string, prec int) string { + var bgflt = big.NewFloat(0.0) + bgflt.SetString(value) + switch conversion { + case "ray": + bgflt.Quo(bgflt, Ray) + case "wad": + bgflt.Quo(bgflt, Ether) + } + return bgflt.Text('g', prec) +} diff --git a/pkg/transformers/pep/transformer_test.go b/pkg/transformers/pep/transformer_test.go new file mode 100644 index 00000000..220774e4 --- /dev/null +++ b/pkg/transformers/pep/transformer_test.go @@ -0,0 +1,44 @@ +package pep_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" + "github.com/vulcanize/vulcanizedb/pkg/fakes" + "github.com/vulcanize/vulcanizedb/pkg/transformers/pep" + "github.com/vulcanize/vulcanizedb/test_config" +) + +var _ = Describe("Pep transformer", func() { + It("returns nil if no logs found", func() { + chain := fakes.NewMockBlockChain() + db := test_config.NewTestDB(core.Node{}) + transformer := pep.NewPepTransformer(chain, db) + + err := transformer.Execute(core.Header{}, 123) + + Expect(err).NotTo(HaveOccurred()) + }) + + It("creates pep row for found log", func() { + chain := fakes.NewMockBlockChain() + chain.SetGetEthLogsWithCustomQueryReturnLogs([]types.Log{{Data: []byte{1, 2, 3, 4, 5}}}) + db := test_config.NewTestDB(core.Node{}) + headerRepository := repositories.NewHeaderRepository(db) + header := core.Header{BlockNumber: 12345} + headerID, err := headerRepository.CreateOrUpdateHeader(header) + Expect(err).NotTo(HaveOccurred()) + transformer := pep.NewPepTransformer(chain, db) + + err = transformer.Execute(header, headerID) + + Expect(err).NotTo(HaveOccurred()) + var dbPep pep.Pep + err = db.Get(&dbPep, `SELECT block_number, header_id, usd_value FROM maker.peps WHERE header_id = $1`, headerID) + Expect(err).NotTo(HaveOccurred()) + Expect(dbPep.BlockNumber).To(Equal(header.BlockNumber)) + }) +}) diff --git a/pkg/transformers/shared/fetcher_test.go b/pkg/transformers/shared/fetcher_test.go index 6e4838f3..93b4ad5a 100644 --- a/pkg/transformers/shared/fetcher_test.go +++ b/pkg/transformers/shared/fetcher_test.go @@ -51,7 +51,7 @@ var _ = Describe("Fetcher", func() { It("returns an error if fetching the logs fails", func() { blockChain := fakes.NewMockBlockChain() - blockChain.SetGetLogsErr(fakes.FakeError) + blockChain.SetGetEthLogsWithCustomQueryErr(fakes.FakeError) fetcher := shared.NewFetcher(blockChain) _, err := fetcher.FetchLogs("", [][]common.Hash{}, int64(1)) diff --git a/pkg/transformers/transformer.go b/pkg/transformers/transformer.go new file mode 100644 index 00000000..6c034478 --- /dev/null +++ b/pkg/transformers/transformer.go @@ -0,0 +1,9 @@ +package transformers + +import ( + "github.com/vulcanize/vulcanizedb/pkg/core" +) + +type Transformer interface { + Execute(header core.Header, headerID int64) error +} diff --git a/test_config/test_config.go b/test_config/test_config.go index e6926a82..eafce6f0 100644 --- a/test_config/test_config.go +++ b/test_config/test_config.go @@ -77,6 +77,7 @@ func CleanTestDB(db *postgres.DB) { db.MustExec("DELETE FROM headers") db.MustExec("DELETE FROM log_filters") db.MustExec("DELETE FROM logs") + db.MustExec("DELETE FROM maker.peps") db.MustExec("DELETE FROM receipts") db.MustExec("DELETE FROM transactions") db.MustExec("DELETE FROM watched_contracts")