From fb360d8562d9c323a807a46e18121ed5448d996d Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Thu, 20 Feb 2020 16:12:52 -0600 Subject: [PATCH] changes to super node to improve compatibility with watcher --- pkg/ipfs/ipld/btc_header.go | 2 +- pkg/ipfs/ipld/btc_tx.go | 2 +- pkg/ipfs/ipld/eth_header.go | 2 +- pkg/ipfs/ipld/eth_receipt.go | 2 +- pkg/ipfs/ipld/eth_state.go | 2 +- pkg/ipfs/ipld/eth_storage.go | 2 +- pkg/ipfs/ipld/eth_tx.go | 2 +- pkg/ipfs/ipld/shared.go | 4 +- pkg/super_node/backfiller.go | 4 +- pkg/super_node/backfiller_test.go | 28 +-- pkg/super_node/btc/converter.go | 4 +- pkg/super_node/btc/converter_test.go | 4 +- pkg/super_node/btc/filterer.go | 25 +-- pkg/super_node/btc/filterer_test.go | 17 -- pkg/super_node/btc/ipld_fetcher.go | 44 ++-- pkg/super_node/btc/ipld_fetcher_test.go | 17 -- pkg/super_node/btc/mocks/converter.go | 8 +- pkg/super_node/btc/mocks/publisher.go | 16 +- pkg/super_node/btc/mocks/test_data.go | 2 +- pkg/super_node/btc/payload_fetcher_test.go | 17 -- pkg/super_node/btc/publisher.go | 6 +- pkg/super_node/btc/publisher_test.go | 2 +- pkg/super_node/btc/resolver.go | 54 ----- pkg/super_node/btc/resolver_test.go | 17 -- pkg/super_node/btc/retriever_test.go | 17 -- pkg/super_node/btc/streamer_test.go | 17 -- pkg/super_node/btc/types.go | 55 +---- pkg/super_node/constructors.go | 12 -- pkg/super_node/eth/api.go | 21 +- pkg/super_node/eth/backend.go | 76 +++---- pkg/super_node/eth/converter.go | 4 +- pkg/super_node/eth/converter_test.go | 2 +- pkg/super_node/eth/filterer.go | 68 +++--- pkg/super_node/eth/filterer_test.go | 204 +++++++++--------- pkg/super_node/eth/ipld_fetcher.go | 113 ++++++---- pkg/super_node/eth/ipld_fetcher_test.go | 37 ++-- pkg/super_node/eth/mocks/converter.go | 8 +- pkg/super_node/eth/mocks/publisher.go | 16 +- pkg/super_node/eth/mocks/test_data.go | 78 ++++--- pkg/super_node/eth/publisher.go | 6 +- pkg/super_node/eth/publisher_test.go | 8 +- pkg/super_node/eth/resolver.go | 78 ------- pkg/super_node/eth/resolver_test.go | 55 ----- pkg/super_node/eth/types.go | 72 +++---- pkg/super_node/service.go | 37 ++-- pkg/super_node/service_test.go | 4 +- pkg/super_node/shared/intefaces.go | 13 +- .../mocks/{fetcher.go => payload_fetcher.go} | 6 +- pkg/super_node/shared/mocks/retriever.go | 16 +- pkg/super_node/shared/types.go | 7 +- pkg/super_node/subscription.go | 8 +- pkg/{super_node => }/watcher/config.go | 0 pkg/{super_node => }/watcher/constructors.go | 2 +- .../watcher/eth/repository.go | 28 ++- pkg/{super_node => }/watcher/service.go | 16 +- .../watcher/shared/interfaces.go | 0 56 files changed, 546 insertions(+), 821 deletions(-) delete mode 100644 pkg/super_node/btc/filterer_test.go delete mode 100644 pkg/super_node/btc/ipld_fetcher_test.go delete mode 100644 pkg/super_node/btc/payload_fetcher_test.go delete mode 100644 pkg/super_node/btc/resolver.go delete mode 100644 pkg/super_node/btc/resolver_test.go delete mode 100644 pkg/super_node/btc/retriever_test.go delete mode 100644 pkg/super_node/btc/streamer_test.go delete mode 100644 pkg/super_node/eth/resolver.go delete mode 100644 pkg/super_node/eth/resolver_test.go rename pkg/super_node/shared/mocks/{fetcher.go => payload_fetcher.go} (90%) rename pkg/{super_node => }/watcher/config.go (100%) rename pkg/{super_node => }/watcher/constructors.go (96%) rename pkg/{super_node => }/watcher/eth/repository.go (73%) rename pkg/{super_node => }/watcher/service.go (91%) rename pkg/{super_node => }/watcher/shared/interfaces.go (100%) diff --git a/pkg/ipfs/ipld/btc_header.go b/pkg/ipfs/ipld/btc_header.go index 8e630057..5d171de0 100644 --- a/pkg/ipfs/ipld/btc_header.go +++ b/pkg/ipfs/ipld/btc_header.go @@ -48,7 +48,7 @@ func NewBtcHeader(header *wire.BlockHeader) (*BtcHeader, error) { return nil, err } rawdata := w.Bytes() - c, err := rawdataToCid(MBitcoinHeader, rawdata, mh.DBL_SHA2_256) + c, err := RawdataToCid(MBitcoinHeader, rawdata, mh.DBL_SHA2_256) if err != nil { return nil, err } diff --git a/pkg/ipfs/ipld/btc_tx.go b/pkg/ipfs/ipld/btc_tx.go index b535a233..02cd5bc4 100644 --- a/pkg/ipfs/ipld/btc_tx.go +++ b/pkg/ipfs/ipld/btc_tx.go @@ -33,7 +33,7 @@ func NewBtcTx(tx *wire.MsgTx) (*BtcTx, error) { return nil, err } rawdata := w.Bytes() - c, err := rawdataToCid(MBitcoinTx, rawdata, mh.DBL_SHA2_256) + c, err := RawdataToCid(MBitcoinTx, rawdata, mh.DBL_SHA2_256) if err != nil { return nil, err } diff --git a/pkg/ipfs/ipld/eth_header.go b/pkg/ipfs/ipld/eth_header.go index 3f0ae730..55b8bc34 100644 --- a/pkg/ipfs/ipld/eth_header.go +++ b/pkg/ipfs/ipld/eth_header.go @@ -49,7 +49,7 @@ func NewEthHeader(header *types.Header) (*EthHeader, error) { if err != nil { return nil, err } - c, err := rawdataToCid(MEthHeader, headerRLP, mh.KECCAK_256) + c, err := RawdataToCid(MEthHeader, headerRLP, mh.KECCAK_256) if err != nil { return nil, err } diff --git a/pkg/ipfs/ipld/eth_receipt.go b/pkg/ipfs/ipld/eth_receipt.go index 99915f1f..1a39cf9c 100644 --- a/pkg/ipfs/ipld/eth_receipt.go +++ b/pkg/ipfs/ipld/eth_receipt.go @@ -48,7 +48,7 @@ func NewReceipt(receipt *types.ReceiptForStorage) (*EthReceipt, error) { if err != nil { return nil, err } - c, err := rawdataToCid(MEthTxReceipt, receiptRLP, mh.KECCAK_256) + c, err := RawdataToCid(MEthTxReceipt, receiptRLP, mh.KECCAK_256) if err != nil { return nil, err } diff --git a/pkg/ipfs/ipld/eth_state.go b/pkg/ipfs/ipld/eth_state.go index c01765bb..cf8f9a6c 100644 --- a/pkg/ipfs/ipld/eth_state.go +++ b/pkg/ipfs/ipld/eth_state.go @@ -41,7 +41,7 @@ var _ node.Node = (*EthStateTrie)(nil) // FromStateTrieRLP takes the RLP bytes of an ethereum // state trie node to return it as an IPLD node for further processing. func FromStateTrieRLP(stateNodeRLP []byte) (*EthStateTrie, error) { - c, err := rawdataToCid(MEthStateTrie, stateNodeRLP, mh.KECCAK_256) + c, err := RawdataToCid(MEthStateTrie, stateNodeRLP, mh.KECCAK_256) if err != nil { return nil, err } diff --git a/pkg/ipfs/ipld/eth_storage.go b/pkg/ipfs/ipld/eth_storage.go index 6d9e1cbe..b0d3af79 100644 --- a/pkg/ipfs/ipld/eth_storage.go +++ b/pkg/ipfs/ipld/eth_storage.go @@ -41,7 +41,7 @@ var _ node.Node = (*EthStorageTrie)(nil) // FromStorageTrieRLP takes the RLP bytes of an ethereum // storage trie node to return it as an IPLD node for further processing. func FromStorageTrieRLP(storageNodeRLP []byte) (*EthStorageTrie, error) { - c, err := rawdataToCid(MEthStorageTrie, storageNodeRLP, mh.KECCAK_256) + c, err := RawdataToCid(MEthStorageTrie, storageNodeRLP, mh.KECCAK_256) if err != nil { return nil, err } diff --git a/pkg/ipfs/ipld/eth_tx.go b/pkg/ipfs/ipld/eth_tx.go index a18bfa39..4fc4d20a 100644 --- a/pkg/ipfs/ipld/eth_tx.go +++ b/pkg/ipfs/ipld/eth_tx.go @@ -50,7 +50,7 @@ func NewEthTx(tx *types.Transaction) (*EthTx, error) { if err != nil { return nil, err } - c, err := rawdataToCid(MEthTx, txRLP, mh.KECCAK_256) + c, err := RawdataToCid(MEthTx, txRLP, mh.KECCAK_256) if err != nil { return nil, err } diff --git a/pkg/ipfs/ipld/shared.go b/pkg/ipfs/ipld/shared.go index a47debe7..802d944c 100644 --- a/pkg/ipfs/ipld/shared.go +++ b/pkg/ipfs/ipld/shared.go @@ -40,9 +40,9 @@ const ( MBitcoinTx = 0xb1 ) -// rawdataToCid takes the desired codec and a slice of bytes +// RawdataToCid takes the desired codec and a slice of bytes // and returns the proper cid of the object. -func rawdataToCid(codec uint64, rawdata []byte, multiHash uint64) (cid.Cid, error) { +func RawdataToCid(codec uint64, rawdata []byte, multiHash uint64) (cid.Cid, error) { c, err := cid.Prefix{ Codec: codec, Version: 1, diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index 1ff8dd7d..1144b899 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -52,7 +52,7 @@ type BackFillService struct { // Interface for fetching payloads over at historical blocks; over http Fetcher shared.PayloadFetcher // Channel for forwarding backfill payloads to the ScreenAndServe process - ScreenAndServeChan chan shared.StreamedIPLDs + ScreenAndServeChan chan shared.ConvertedData // Check frequency GapCheckFrequency time.Duration // Size of batch fetches @@ -62,7 +62,7 @@ type BackFillService struct { } // NewBackFillService returns a new BackFillInterface -func NewBackFillService(settings *shared.SuperNodeConfig, screenAndServeChan chan shared.StreamedIPLDs) (BackFillInterface, error) { +func NewBackFillService(settings *shared.SuperNodeConfig, screenAndServeChan chan shared.ConvertedData) (BackFillInterface, error) { publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath) if err != nil { return nil, err diff --git a/pkg/super_node/backfiller_test.go b/pkg/super_node/backfiller_test.go index 7facc74c..7bcc58a2 100644 --- a/pkg/super_node/backfiller_test.go +++ b/pkg/super_node/backfiller_test.go @@ -41,10 +41,10 @@ var _ = Describe("BackFiller", func() { ReturnErr: nil, } mockConverter := &mocks.IterativePayloadConverter{ - ReturnIPLDPayload: []eth.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload}, + ReturnIPLDPayload: []eth.ConvertedPayload{mocks.MockConvertedPayload, mocks.MockConvertedPayload}, ReturnErr: nil, } - mockRetriever := &mocks2.MockCIDRetriever{ + mockRetriever := &mocks2.CIDRetriever{ FirstBlockNumberToReturn: 0, GapsToRetrieve: []shared.Gap{ { @@ -52,7 +52,7 @@ var _ = Describe("BackFiller", func() { }, }, } - mockFetcher := &mocks2.IPLDFetcher{ + mockFetcher := &mocks2.PayloadFetcher{ PayloadsToReturn: map[uint64]shared.RawChainData{ 100: mocks.MockStateDiffPayload, 101: mocks.MockStateDiffPayload, @@ -77,8 +77,8 @@ var _ = Describe("BackFiller", func() { Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload)) Expect(mockCidRepo.PassedCIDPayload[1]).To(Equal(mocks.MockCIDPayload)) Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(2)) - Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockIPLDPayload)) - Expect(mockPublisher.PassedIPLDPayload[1]).To(Equal(mocks.MockIPLDPayload)) + Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockConvertedPayload)) + Expect(mockPublisher.PassedIPLDPayload[1]).To(Equal(mocks.MockConvertedPayload)) Expect(len(mockConverter.PassedStatediffPayload)).To(Equal(2)) Expect(mockConverter.PassedStatediffPayload[0]).To(Equal(mocks.MockStateDiffPayload)) Expect(mockConverter.PassedStatediffPayload[1]).To(Equal(mocks.MockStateDiffPayload)) @@ -96,10 +96,10 @@ var _ = Describe("BackFiller", func() { ReturnErr: nil, } mockConverter := &mocks.IterativePayloadConverter{ - ReturnIPLDPayload: []eth.IPLDPayload{mocks.MockIPLDPayload}, + ReturnIPLDPayload: []eth.ConvertedPayload{mocks.MockConvertedPayload}, ReturnErr: nil, } - mockRetriever := &mocks2.MockCIDRetriever{ + mockRetriever := &mocks2.CIDRetriever{ FirstBlockNumberToReturn: 0, GapsToRetrieve: []shared.Gap{ { @@ -107,7 +107,7 @@ var _ = Describe("BackFiller", func() { }, }, } - mockFetcher := &mocks2.IPLDFetcher{ + mockFetcher := &mocks2.PayloadFetcher{ PayloadsToReturn: map[uint64]shared.RawChainData{ 100: mocks.MockStateDiffPayload, }, @@ -130,7 +130,7 @@ var _ = Describe("BackFiller", func() { Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(1)) Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload)) Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(1)) - Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockIPLDPayload)) + Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockConvertedPayload)) Expect(len(mockConverter.PassedStatediffPayload)).To(Equal(1)) Expect(mockConverter.PassedStatediffPayload[0]).To(Equal(mocks.MockStateDiffPayload)) Expect(mockRetriever.CalledTimes).To(Equal(1)) @@ -147,14 +147,14 @@ var _ = Describe("BackFiller", func() { ReturnErr: nil, } mockConverter := &mocks.IterativePayloadConverter{ - ReturnIPLDPayload: []eth.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload}, + ReturnIPLDPayload: []eth.ConvertedPayload{mocks.MockConvertedPayload, mocks.MockConvertedPayload}, ReturnErr: nil, } - mockRetriever := &mocks2.MockCIDRetriever{ + mockRetriever := &mocks2.CIDRetriever{ FirstBlockNumberToReturn: 3, GapsToRetrieve: []shared.Gap{}, } - mockFetcher := &mocks2.IPLDFetcher{ + mockFetcher := &mocks2.PayloadFetcher{ PayloadsToReturn: map[uint64]shared.RawChainData{ 1: mocks.MockStateDiffPayload, 2: mocks.MockStateDiffPayload, @@ -179,8 +179,8 @@ var _ = Describe("BackFiller", func() { Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload)) Expect(mockCidRepo.PassedCIDPayload[1]).To(Equal(mocks.MockCIDPayload)) Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(2)) - Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockIPLDPayload)) - Expect(mockPublisher.PassedIPLDPayload[1]).To(Equal(mocks.MockIPLDPayload)) + Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockConvertedPayload)) + Expect(mockPublisher.PassedIPLDPayload[1]).To(Equal(mocks.MockConvertedPayload)) Expect(len(mockConverter.PassedStatediffPayload)).To(Equal(2)) Expect(mockConverter.PassedStatediffPayload[0]).To(Equal(mocks.MockStateDiffPayload)) Expect(mockConverter.PassedStatediffPayload[1]).To(Equal(mocks.MockStateDiffPayload)) diff --git a/pkg/super_node/btc/converter.go b/pkg/super_node/btc/converter.go index cc10b305..7559fa17 100644 --- a/pkg/super_node/btc/converter.go +++ b/pkg/super_node/btc/converter.go @@ -40,7 +40,7 @@ func NewPayloadConverter(chainConfig *chaincfg.Params) *PayloadConverter { // Convert method is used to convert a bitcoin BlockPayload to an IPLDPayload // Satisfies the shared.PayloadConverter interface -func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) { +func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.ConvertedData, error) { btcBlockPayload, ok := payload.(BlockPayload) if !ok { return nil, fmt.Errorf("btc converter: expected payload type %T got %T", BlockPayload{}, payload) @@ -87,7 +87,7 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Streame } txMeta[i] = txModel } - return IPLDPayload{ + return ConvertedPayload{ BlockPayload: btcBlockPayload, TxMetaData: txMeta, }, nil diff --git a/pkg/super_node/btc/converter_test.go b/pkg/super_node/btc/converter_test.go index 142f511f..c76ad6c6 100644 --- a/pkg/super_node/btc/converter_test.go +++ b/pkg/super_node/btc/converter_test.go @@ -31,9 +31,9 @@ var _ = Describe("Converter", func() { converter := btc.NewPayloadConverter(&chaincfg.MainNetParams) payload, err := converter.Convert(mocks.MockBlockPayload) Expect(err).ToNot(HaveOccurred()) - convertedPayload, ok := payload.(btc.IPLDPayload) + convertedPayload, ok := payload.(btc.ConvertedPayload) Expect(ok).To(BeTrue()) - Expect(convertedPayload).To(Equal(mocks.MockIPLDPayload)) + Expect(convertedPayload).To(Equal(mocks.MockConvertedPayload)) Expect(convertedPayload.BlockHeight).To(Equal(mocks.MockBlockHeight)) Expect(convertedPayload.Header).To(Equal(&mocks.MockBlock.Header)) Expect(convertedPayload.Txs).To(Equal(mocks.MockTransactions)) diff --git a/pkg/super_node/btc/filterer.go b/pkg/super_node/btc/filterer.go index e37d1d80..3299ca18 100644 --- a/pkg/super_node/btc/filterer.go +++ b/pkg/super_node/btc/filterer.go @@ -33,37 +33,37 @@ func NewResponseFilterer() *ResponseFilterer { } // Filter is used to filter through btc data to extract and package requested data into a Payload -func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload shared.StreamedIPLDs) (shared.ServerResponse, error) { +func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload shared.ConvertedData) (shared.IPLDs, error) { btcFilters, ok := filter.(*SubscriptionSettings) if !ok { - return StreamResponse{}, fmt.Errorf("btc filterer expected filter type %T got %T", &SubscriptionSettings{}, filter) + return IPLDs{}, fmt.Errorf("btc filterer expected filter type %T got %T", &SubscriptionSettings{}, filter) } - btcPayload, ok := payload.(IPLDPayload) + btcPayload, ok := payload.(ConvertedPayload) if !ok { - return StreamResponse{}, fmt.Errorf("btc filterer expected payload type %T got %T", IPLDPayload{}, payload) + return IPLDs{}, fmt.Errorf("btc filterer expected payload type %T got %T", ConvertedPayload{}, payload) } height := int64(btcPayload.BlockPayload.BlockHeight) if checkRange(btcFilters.Start.Int64(), btcFilters.End.Int64(), height) { - response := new(StreamResponse) + response := new(IPLDs) if err := s.filterHeaders(btcFilters.HeaderFilter, response, btcPayload); err != nil { - return StreamResponse{}, err + return IPLDs{}, err } if err := s.filterTransactions(btcFilters.TxFilter, response, btcPayload); err != nil { - return StreamResponse{}, err + return IPLDs{}, err } response.BlockNumber = big.NewInt(height) return *response, nil } - return StreamResponse{}, nil + return IPLDs{}, nil } -func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *StreamResponse, payload IPLDPayload) error { +func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *IPLDs, payload ConvertedPayload) error { if !headerFilter.Off { headerBuffer := new(bytes.Buffer) if err := payload.Header.Serialize(headerBuffer); err != nil { return err } - response.SerializedHeaders = append(response.SerializedHeaders, headerBuffer.Bytes()) + response.Headers = append(response.Headers, headerBuffer.Bytes()) } return nil } @@ -75,15 +75,16 @@ func checkRange(start, end, actual int64) bool { return false } -func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *StreamResponse, payload IPLDPayload) error { +func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *IPLDs, payload ConvertedPayload) error { if !trxFilter.Off { + response.Transactions = make([][]byte, 0, len(payload.TxMetaData)) for i, txMeta := range payload.TxMetaData { if checkTransaction(txMeta, trxFilter) { trxBuffer := new(bytes.Buffer) if err := payload.Txs[i].MsgTx().Serialize(trxBuffer); err != nil { return err } - response.SerializedTxs = append(response.SerializedTxs, trxBuffer.Bytes()) + response.Transactions = append(response.Transactions, trxBuffer.Bytes()) } } } diff --git a/pkg/super_node/btc/filterer_test.go b/pkg/super_node/btc/filterer_test.go deleted file mode 100644 index 8dd3c1ae..00000000 --- a/pkg/super_node/btc/filterer_test.go +++ /dev/null @@ -1,17 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package btc diff --git a/pkg/super_node/btc/ipld_fetcher.go b/pkg/super_node/btc/ipld_fetcher.go index 633cbf75..7aabd2ab 100644 --- a/pkg/super_node/btc/ipld_fetcher.go +++ b/pkg/super_node/btc/ipld_fetcher.go @@ -52,13 +52,13 @@ func NewIPLDFetcher(ipfsPath string) (*IPLDFetcher, error) { } // Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper -func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.FetchedIPLDs, error) { +func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { cidWrapper, ok := cids.(*CIDWrapper) if !ok { return nil, fmt.Errorf("btc fetcher: expected cids type %T got %T", &CIDWrapper{}, cids) } log.Debug("fetching iplds") - iplds := new(IPLDWrapper) + iplds := IPLDs{} iplds.BlockNumber = cidWrapper.BlockNumber var err error iplds.Headers, err = f.FetchHeaders(cidWrapper.Headers) @@ -74,42 +74,50 @@ func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.FetchedIPLDs, e // FetchHeaders fetches headers // It uses the f.fetchBatch method -func (f *IPLDFetcher) FetchHeaders(cids []HeaderModel) ([]blocks.Block, error) { +func (f *IPLDFetcher) FetchHeaders(cids []HeaderModel) ([][]byte, error) { log.Debug("fetching header iplds") - headerCids := make([]cid.Cid, 0, len(cids)) - for _, c := range cids { + headerCids := make([]cid.Cid, len(cids)) + for i, c := range cids { dc, err := cid.Decode(c.CID) if err != nil { return nil, err } - headerCids = append(headerCids, dc) + headerCids[i] = dc } headers := f.fetchBatch(headerCids) - if len(headers) != len(headerCids) { - log.Errorf("ipfs fetcher: number of header blocks returned (%d) does not match number expected (%d)", len(headers), len(headerCids)) - return headers, errUnexpectedNumberOfIPLDs + headersBytes := make([][]byte, len(headers)) + for i, header := range headers { + headersBytes[i] = header.RawData() } - return headers, nil + if len(headersBytes) != len(headerCids) { + log.Errorf("ipfs fetcher: number of header blocks returned (%d) does not match number expected (%d)", len(headers), len(headerCids)) + return headersBytes, errUnexpectedNumberOfIPLDs + } + return headersBytes, nil } // FetchTrxs fetches transactions // It uses the f.fetchBatch method -func (f *IPLDFetcher) FetchTrxs(cids []TxModel) ([]blocks.Block, error) { +func (f *IPLDFetcher) FetchTrxs(cids []TxModel) ([][]byte, error) { log.Debug("fetching transaction iplds") - trxCids := make([]cid.Cid, 0, len(cids)) - for _, c := range cids { + trxCids := make([]cid.Cid, len(cids)) + for i, c := range cids { dc, err := cid.Decode(c.CID) if err != nil { return nil, err } - trxCids = append(trxCids, dc) + trxCids[i] = dc } trxs := f.fetchBatch(trxCids) - if len(trxs) != len(trxCids) { - log.Errorf("ipfs fetcher: number of transaction blocks returned (%d) does not match number expected (%d)", len(trxs), len(trxCids)) - return trxs, errUnexpectedNumberOfIPLDs + trxBytes := make([][]byte, len(trxs)) + for i, trx := range trxs { + trxBytes[i] = trx.RawData() } - return trxs, nil + if len(trxBytes) != len(trxCids) { + log.Errorf("ipfs fetcher: number of transaction blocks returned (%d) does not match number expected (%d)", len(trxs), len(trxCids)) + return trxBytes, errUnexpectedNumberOfIPLDs + } + return trxBytes, nil } // fetch is used to fetch a single cid diff --git a/pkg/super_node/btc/ipld_fetcher_test.go b/pkg/super_node/btc/ipld_fetcher_test.go deleted file mode 100644 index 8dd3c1ae..00000000 --- a/pkg/super_node/btc/ipld_fetcher_test.go +++ /dev/null @@ -1,17 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package btc diff --git a/pkg/super_node/btc/mocks/converter.go b/pkg/super_node/btc/mocks/converter.go index 8f7f2ccd..5ba7a096 100644 --- a/pkg/super_node/btc/mocks/converter.go +++ b/pkg/super_node/btc/mocks/converter.go @@ -26,12 +26,12 @@ import ( // PayloadConverter is the underlying struct for the Converter interface type PayloadConverter struct { PassedStatediffPayload btc.BlockPayload - ReturnIPLDPayload btc.IPLDPayload + ReturnIPLDPayload btc.ConvertedPayload ReturnErr error } // Convert method is used to convert a geth statediff.Payload to a IPLDPayload -func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) { +func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.ConvertedData, error) { stateDiffPayload, ok := payload.(btc.BlockPayload) if !ok { return nil, fmt.Errorf("convert expected payload type %T got %T", btc.BlockPayload{}, payload) @@ -43,13 +43,13 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Streame // IterativePayloadConverter is the underlying struct for the Converter interface type IterativePayloadConverter struct { PassedStatediffPayload []btc.BlockPayload - ReturnIPLDPayload []btc.IPLDPayload + ReturnIPLDPayload []btc.ConvertedPayload ReturnErr error iteration int } // Convert method is used to convert a geth statediff.Payload to a IPLDPayload -func (pc *IterativePayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) { +func (pc *IterativePayloadConverter) Convert(payload shared.RawChainData) (shared.ConvertedData, error) { stateDiffPayload, ok := payload.(btc.BlockPayload) if !ok { return nil, fmt.Errorf("convert expected payload type %T got %T", btc.BlockPayload{}, payload) diff --git a/pkg/super_node/btc/mocks/publisher.go b/pkg/super_node/btc/mocks/publisher.go index dc15fe52..c9a7cc59 100644 --- a/pkg/super_node/btc/mocks/publisher.go +++ b/pkg/super_node/btc/mocks/publisher.go @@ -26,16 +26,16 @@ import ( // IPLDPublisher is the underlying struct for the Publisher interface type IPLDPublisher struct { - PassedIPLDPayload btc.IPLDPayload + PassedIPLDPayload btc.ConvertedPayload ReturnCIDPayload *btc.CIDPayload ReturnErr error } // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload -func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) { - ipldPayload, ok := payload.(btc.IPLDPayload) +func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) { + ipldPayload, ok := payload.(btc.ConvertedPayload) if !ok { - return nil, fmt.Errorf("publish expected payload type %T got %T", &btc.IPLDPayload{}, payload) + return nil, fmt.Errorf("publish expected payload type %T got %T", &btc.ConvertedPayload{}, payload) } pub.PassedIPLDPayload = ipldPayload return pub.ReturnCIDPayload, pub.ReturnErr @@ -43,17 +43,17 @@ func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForI // IterativeIPLDPublisher is the underlying struct for the Publisher interface; used in testing type IterativeIPLDPublisher struct { - PassedIPLDPayload []btc.IPLDPayload + PassedIPLDPayload []btc.ConvertedPayload ReturnCIDPayload []*btc.CIDPayload ReturnErr error iteration int } // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload -func (pub *IterativeIPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) { - ipldPayload, ok := payload.(btc.IPLDPayload) +func (pub *IterativeIPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) { + ipldPayload, ok := payload.(btc.ConvertedPayload) if !ok { - return nil, fmt.Errorf("publish expected payload type %T got %T", &btc.IPLDPayload{}, payload) + return nil, fmt.Errorf("publish expected payload type %T got %T", &btc.ConvertedPayload{}, payload) } pub.PassedIPLDPayload = append(pub.PassedIPLDPayload, ipldPayload) if len(pub.ReturnCIDPayload) < pub.iteration+1 { diff --git a/pkg/super_node/btc/mocks/test_data.go b/pkg/super_node/btc/mocks/test_data.go index 72193b11..0d5645bf 100644 --- a/pkg/super_node/btc/mocks/test_data.go +++ b/pkg/super_node/btc/mocks/test_data.go @@ -677,7 +677,7 @@ var ( Timestamp: MockBlock.Header.Timestamp.UnixNano(), Bits: MockBlock.Header.Bits, } - MockIPLDPayload = btc.IPLDPayload{ + MockConvertedPayload = btc.ConvertedPayload{ BlockPayload: MockBlockPayload, TxMetaData: MockTxsMetaData, } diff --git a/pkg/super_node/btc/payload_fetcher_test.go b/pkg/super_node/btc/payload_fetcher_test.go deleted file mode 100644 index 8dd3c1ae..00000000 --- a/pkg/super_node/btc/payload_fetcher_test.go +++ /dev/null @@ -1,17 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package btc diff --git a/pkg/super_node/btc/publisher.go b/pkg/super_node/btc/publisher.go index 90159bd6..6805dbc6 100644 --- a/pkg/super_node/btc/publisher.go +++ b/pkg/super_node/btc/publisher.go @@ -48,10 +48,10 @@ func NewIPLDPublisher(ipfsPath string) (*IPLDPublisher, error) { } // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload -func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) { - ipldPayload, ok := payload.(IPLDPayload) +func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) { + ipldPayload, ok := payload.(ConvertedPayload) if !ok { - return nil, fmt.Errorf("eth publisher expected payload type %T got %T", &IPLDPayload{}, payload) + return nil, fmt.Errorf("eth publisher expected payload type %T got %T", &ConvertedPayload{}, payload) } // Process and publish headers headerCid, err := pub.publishHeader(ipldPayload.Header) diff --git a/pkg/super_node/btc/publisher_test.go b/pkg/super_node/btc/publisher_test.go index fee15d57..5726c44e 100644 --- a/pkg/super_node/btc/publisher_test.go +++ b/pkg/super_node/btc/publisher_test.go @@ -44,7 +44,7 @@ var _ = Describe("Publisher", func() { HeaderPutter: mockHeaderDagPutter, TransactionPutter: mockTrxDagPutter, } - payload, err := publisher.Publish(mocks.MockIPLDPayload) + payload, err := publisher.Publish(mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) cidPayload, ok := payload.(*btc.CIDPayload) Expect(ok).To(BeTrue()) diff --git a/pkg/super_node/btc/resolver.go b/pkg/super_node/btc/resolver.go deleted file mode 100644 index e7788362..00000000 --- a/pkg/super_node/btc/resolver.go +++ /dev/null @@ -1,54 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package btc - -import ( - "fmt" - - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" - - "github.com/ipfs/go-block-format" -) - -// IPLDResolver satisfies the IPLDResolver interface for bitcoin -type IPLDResolver struct{} - -// NewIPLDResolver returns a pointer to an IPLDResolver which satisfies the IPLDResolver interface -func NewIPLDResolver() *IPLDResolver { - return &IPLDResolver{} -} - -// Resolve is the exported method for resolving all of the BTC IPLDs packaged in an IpfsBlockWrapper -func (eir *IPLDResolver) Resolve(iplds shared.FetchedIPLDs) (shared.ServerResponse, error) { - ipfsBlocks, ok := iplds.(*IPLDWrapper) - if !ok { - return StreamResponse{}, fmt.Errorf("eth resolver expected iplds type %T got %T", &IPLDWrapper{}, iplds) - } - return StreamResponse{ - BlockNumber: ipfsBlocks.BlockNumber, - SerializedHeaders: eir.resolve(ipfsBlocks.Headers), - SerializedTxs: eir.resolve(ipfsBlocks.Transactions), - }, nil -} - -func (eir *IPLDResolver) resolve(iplds []blocks.Block) [][]byte { - rlps := make([][]byte, 0, len(iplds)) - for _, ipld := range iplds { - rlps = append(rlps, ipld.RawData()) - } - return rlps -} diff --git a/pkg/super_node/btc/resolver_test.go b/pkg/super_node/btc/resolver_test.go deleted file mode 100644 index 8dd3c1ae..00000000 --- a/pkg/super_node/btc/resolver_test.go +++ /dev/null @@ -1,17 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package btc diff --git a/pkg/super_node/btc/retriever_test.go b/pkg/super_node/btc/retriever_test.go deleted file mode 100644 index 8dd3c1ae..00000000 --- a/pkg/super_node/btc/retriever_test.go +++ /dev/null @@ -1,17 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package btc diff --git a/pkg/super_node/btc/streamer_test.go b/pkg/super_node/btc/streamer_test.go deleted file mode 100644 index 8dd3c1ae..00000000 --- a/pkg/super_node/btc/streamer_test.go +++ /dev/null @@ -1,17 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package btc diff --git a/pkg/super_node/btc/types.go b/pkg/super_node/btc/types.go index 966a954d..06b54b11 100644 --- a/pkg/super_node/btc/types.go +++ b/pkg/super_node/btc/types.go @@ -17,12 +17,10 @@ package btc import ( - "encoding/json" "math/big" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" - "github.com/ipfs/go-block-format" ) // BlockPayload packages the block and tx data received from block connection notifications @@ -32,16 +30,16 @@ type BlockPayload struct { Txs []*btcutil.Tx } -// IPLDPayload is a custom type which packages raw BTC data for publishing to IPFS and filtering to subscribers +// ConvertedPayload is a custom type which packages raw BTC data for publishing to IPFS and filtering to subscribers // Returned by PayloadConverter // Passed to IPLDPublisher and ResponseFilterer -type IPLDPayload struct { +type ConvertedPayload struct { BlockPayload TxMetaData []TxModelWithInsAndOuts } // Height satisfies the StreamedIPLDs interface -func (i IPLDPayload) Height() int64 { +func (i ConvertedPayload) Height() int64 { return i.BlockPayload.BlockHeight } @@ -62,46 +60,15 @@ type CIDWrapper struct { Transactions []TxModel } -// IPLDWrapper is used to package raw IPLD block data fetched from IPFS -// Returned by IPLDFetcher -// Passed to IPLDResolver -type IPLDWrapper struct { +// IPLDs is used to package raw IPLD block data fetched from IPFS and returned by the server +// Returned by IPLDFetcher and ResponseFilterer +type IPLDs struct { BlockNumber *big.Int - Headers []blocks.Block - Transactions []blocks.Block + Headers [][]byte + Transactions [][]byte } -// StreamResponse holds the data streamed from the super node eth service to the requesting clients -// Returned by IPLDResolver and ResponseFilterer -// Passed to client subscriptions -type StreamResponse struct { - BlockNumber *big.Int `json:"blockNumber"` - SerializedHeaders [][]byte `json:"headerBytes"` - SerializedTxs [][]byte `json:"transactionBytes"` - - encoded []byte - err error -} - -// Height satisfies the ServerResponse interface -func (sr StreamResponse) Height() int64 { - return sr.BlockNumber.Int64() -} - -func (sr *StreamResponse) ensureEncoded() { - if sr.encoded == nil && sr.err == nil { - sr.encoded, sr.err = json.Marshal(sr) - } -} - -// Length to implement Encoder interface for StateDiff -func (sr *StreamResponse) Length() int { - sr.ensureEncoded() - return len(sr.encoded) -} - -// Encode to implement Encoder interface for StateDiff -func (sr *StreamResponse) Encode() ([]byte, error) { - sr.ensureEncoded() - return sr.encoded, sr.err +// Height satisfies the StreamedIPLDs interface +func (i IPLDs) Height() int64 { + return i.BlockNumber.Int64() } diff --git a/pkg/super_node/constructors.go b/pkg/super_node/constructors.go index d737adb0..5a2888f9 100644 --- a/pkg/super_node/constructors.go +++ b/pkg/super_node/constructors.go @@ -148,18 +148,6 @@ func NewIPLDPublisher(chain shared.ChainType, ipfsPath string) (shared.IPLDPubli } } -// NewIPLDResolver constructs an IPLDResolver for the provided chain type -func NewIPLDResolver(chain shared.ChainType) (shared.IPLDResolver, error) { - switch chain { - case shared.Ethereum: - return eth.NewIPLDResolver(), nil - case shared.Bitcoin: - return btc.NewIPLDResolver(), nil - default: - return nil, fmt.Errorf("invalid chain %s for resolver constructor", chain.String()) - } -} - // NewPublicAPI constructs a PublicAPI for the provided chain type func NewPublicAPI(chain shared.ChainType, db *postgres.DB, ipfsPath string) (rpc.API, error) { switch chain { diff --git a/pkg/super_node/eth/api.go b/pkg/super_node/eth/api.go index 7418c4d8..81f5ca4d 100644 --- a/pkg/super_node/eth/api.go +++ b/pkg/super_node/eth/api.go @@ -26,7 +26,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" - "github.com/ipfs/go-block-format" ) // APIName is the namespace for the super node's eth api @@ -48,7 +47,7 @@ func NewPublicEthAPI(b *Backend) *PublicEthAPI { // BlockNumber returns the block number of the chain head. func (pea *PublicEthAPI) BlockNumber() hexutil.Uint64 { - number, _ := pea.b.retriever.RetrieveLastBlockNumber() + number, _ := pea.b.Retriever.RetrieveLastBlockNumber() return hexutil.Uint64(number) } @@ -74,20 +73,20 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery) Contracts: addrStrs, Topics: topicStrSets, } - tx, err := pea.b.db.Beginx() + tx, err := pea.b.DB.Beginx() if err != nil { return nil, err } // If we have a blockhash to filter on, fire off single retrieval query if crit.BlockHash != nil { - rctCIDs, err := pea.b.retriever.RetrieveRctCIDs(tx, filter, 0, crit.BlockHash, nil) + rctCIDs, err := pea.b.Retriever.RetrieveRctCIDs(tx, filter, 0, crit.BlockHash, nil) if err != nil { return nil, err } if err := tx.Commit(); err != nil { return nil, err } - rctIPLDs, err := pea.b.fetcher.FetchRcts(rctCIDs) + rctIPLDs, err := pea.b.Fetcher.FetchRcts(rctCIDs) if err != nil { return nil, err } @@ -98,14 +97,14 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery) startingBlock := crit.FromBlock endingBlock := crit.ToBlock if startingBlock == nil { - startingBlockInt, err := pea.b.retriever.RetrieveFirstBlockNumber() + startingBlockInt, err := pea.b.Retriever.RetrieveFirstBlockNumber() if err != nil { return nil, err } startingBlock = big.NewInt(startingBlockInt) } if endingBlock == nil { - endingBlockInt, err := pea.b.retriever.RetrieveLastBlockNumber() + endingBlockInt, err := pea.b.Retriever.RetrieveLastBlockNumber() if err != nil { return nil, err } @@ -115,7 +114,7 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery) end := endingBlock.Int64() allRctCIDs := make([]ReceiptModel, 0) for i := start; i <= end; i++ { - rctCIDs, err := pea.b.retriever.RetrieveRctCIDs(tx, filter, i, nil, nil) + rctCIDs, err := pea.b.Retriever.RetrieveRctCIDs(tx, filter, i, nil, nil) if err != nil { return nil, err } @@ -124,7 +123,7 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery) if err := tx.Commit(); err != nil { return nil, err } - rctIPLDs, err := pea.b.fetcher.FetchRcts(allRctCIDs) + rctIPLDs, err := pea.b.Fetcher.FetchRcts(allRctCIDs) if err != nil { return nil, err } @@ -181,10 +180,10 @@ func (pea *PublicEthAPI) GetTransactionByHash(ctx context.Context, hash common.H } // extractLogsOfInterest returns logs from the receipt IPLD -func extractLogsOfInterest(rctIPLDs []blocks.Block, wantedTopics [][]string) ([]*types.Log, error) { +func extractLogsOfInterest(rctIPLDs [][]byte, wantedTopics [][]string) ([]*types.Log, error) { var logs []*types.Log for _, rctIPLD := range rctIPLDs { - rctRLP := rctIPLD.RawData() + rctRLP := rctIPLD var rct types.Receipt if err := rlp.DecodeBytes(rctRLP, &rct); err != nil { return nil, err diff --git a/pkg/super_node/eth/backend.go b/pkg/super_node/eth/backend.go index 635d5f1e..d9e37772 100644 --- a/pkg/super_node/eth/backend.go +++ b/pkg/super_node/eth/backend.go @@ -36,9 +36,9 @@ var ( ) type Backend struct { - retriever *CIDRetriever - fetcher *IPLDFetcher - db *postgres.DB + Retriever *CIDRetriever + Fetcher *IPLDFetcher + DB *postgres.DB } func NewEthBackend(db *postgres.DB, ipfsPath string) (*Backend, error) { @@ -48,9 +48,9 @@ func NewEthBackend(db *postgres.DB, ipfsPath string) (*Backend, error) { return nil, err } return &Backend{ - retriever: r, - fetcher: f, - db: db, + Retriever: r, + Fetcher: f, + DB: db, }, nil } @@ -58,7 +58,7 @@ func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumbe number := blockNumber.Int64() var err error if blockNumber == rpc.LatestBlockNumber { - number, err = b.retriever.RetrieveLastBlockNumber() + number, err = b.Retriever.RetrieveLastBlockNumber() if err != nil { return nil, err } @@ -67,11 +67,11 @@ func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumbe return nil, errPendingBlockNumber } // Retrieve the CIDs for headers at this height - tx, err := b.db.Beginx() + tx, err := b.DB.Beginx() if err != nil { return nil, err } - headerCids, err := b.retriever.RetrieveHeaderCIDs(tx, number) + headerCids, err := b.Retriever.RetrieveHeaderCIDs(tx, number) if err != nil { if err := tx.Rollback(); err != nil { logrus.Error(err) @@ -86,7 +86,7 @@ func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumbe return nil, fmt.Errorf("header at block %d is not available", number) } // Fetch the header IPLDs for those CIDs - headerIPLDs, err := b.fetcher.FetchHeaders([]HeaderModel{headerCids[0]}) + headerIPLDs, err := b.Fetcher.FetchHeaders([]HeaderModel{headerCids[0]}) if err != nil { return nil, err } @@ -94,7 +94,7 @@ func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumbe // We throw an error in FetchHeaders() if the number of headers does not match the number of CIDs and we already // confirmed the number of CIDs is greater than 0 so there is no need to bound check the slice before accessing header := new(types.Header) - if err := rlp.DecodeBytes(headerIPLDs[0].RawData(), header); err != nil { + if err := rlp.DecodeBytes(headerIPLDs[0], header); err != nil { return nil, err } return header, nil @@ -105,7 +105,7 @@ func (b *Backend) GetTd(blockHash common.Hash) (*big.Int, error) { pgStr := `SELECT header_cids.td FROM header_cids WHERE header_cids.block_hash = $1` var tdStr string - err := b.db.Select(&tdStr, pgStr, blockHash.String()) + err := b.DB.Select(&tdStr, pgStr, blockHash.String()) if err != nil { return nil, err } @@ -118,11 +118,11 @@ func (b *Backend) GetTd(blockHash common.Hash) (*big.Int, error) { // GetLogs returns all the logs for the given block hash func (b *Backend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) { - tx, err := b.db.Beginx() + tx, err := b.DB.Beginx() if err != nil { return nil, err } - receiptCIDs, err := b.retriever.RetrieveRctCIDs(tx, ReceiptFilter{}, 0, &hash, nil) + receiptCIDs, err := b.Retriever.RetrieveRctCIDs(tx, ReceiptFilter{}, 0, &hash, nil) if err != nil { if err := tx.Rollback(); err != nil { logrus.Error(err) @@ -135,14 +135,14 @@ func (b *Backend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log if len(receiptCIDs) == 0 { return nil, nil } - receiptIPLDs, err := b.fetcher.FetchRcts(receiptCIDs) + receiptIPLDs, err := b.Fetcher.FetchRcts(receiptCIDs) if err != nil { return nil, err } logs := make([][]*types.Log, len(receiptIPLDs)) for i, rctIPLD := range receiptIPLDs { var rct types.Receipt - if err := rlp.DecodeBytes(rctIPLD.RawData(), &rct); err != nil { + if err := rlp.DecodeBytes(rctIPLD, &rct); err != nil { return nil, err } logs[i] = rct.Logs @@ -157,7 +157,7 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber number := blockNumber.Int64() var err error if blockNumber == rpc.LatestBlockNumber { - number, err = b.retriever.RetrieveLastBlockNumber() + number, err = b.Retriever.RetrieveLastBlockNumber() if err != nil { return nil, err } @@ -166,54 +166,54 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber return nil, errPendingBlockNumber } // Retrieve all the CIDs for the block - headerCID, uncleCIDs, txCIDs, rctCIDs, err := b.retriever.RetrieveBlockByNumber(number) + headerCID, uncleCIDs, txCIDs, rctCIDs, err := b.Retriever.RetrieveBlockByNumber(number) if err != nil { return nil, err } // Fetch and decode the header IPLD - headerIPLDs, err := b.fetcher.FetchHeaders([]HeaderModel{headerCID}) + headerIPLDs, err := b.Fetcher.FetchHeaders([]HeaderModel{headerCID}) if err != nil { return nil, err } var header *types.Header - if err := rlp.DecodeBytes(headerIPLDs[0].RawData(), header); err != nil { + if err := rlp.DecodeBytes(headerIPLDs[0], header); err != nil { return nil, err } // Fetch and decode the uncle IPLDs - uncleIPLDs, err := b.fetcher.FetchUncles(uncleCIDs) + uncleIPLDs, err := b.Fetcher.FetchUncles(uncleCIDs) if err != nil { return nil, err } var uncles []*types.Header for _, uncleIPLD := range uncleIPLDs { var uncle *types.Header - if err := rlp.DecodeBytes(uncleIPLD.RawData(), uncle); err != nil { + if err := rlp.DecodeBytes(uncleIPLD, uncle); err != nil { return nil, err } uncles = append(uncles, uncle) } // Fetch and decode the transaction IPLDs - txIPLDs, err := b.fetcher.FetchTrxs(txCIDs) + txIPLDs, err := b.Fetcher.FetchTrxs(txCIDs) if err != nil { return nil, err } var transactions []*types.Transaction for _, txIPLD := range txIPLDs { var tx *types.Transaction - if err := rlp.DecodeBytes(txIPLD.RawData(), tx); err != nil { + if err := rlp.DecodeBytes(txIPLD, tx); err != nil { return nil, err } transactions = append(transactions, tx) } // Fetch and decode the receipt IPLDs - rctIPLDs, err := b.fetcher.FetchRcts(rctCIDs) + rctIPLDs, err := b.Fetcher.FetchRcts(rctCIDs) if err != nil { return nil, err } var receipts []*types.Receipt for _, rctIPLD := range rctIPLDs { var receipt *types.Receipt - if err := rlp.DecodeBytes(rctIPLD.RawData(), receipt); err != nil { + if err := rlp.DecodeBytes(rctIPLD, receipt); err != nil { return nil, err } receipts = append(receipts, receipt) @@ -226,54 +226,54 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber // detail, otherwise only the transaction hash is returned. func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { // Retrieve all the CIDs for the block - headerCID, uncleCIDs, txCIDs, rctCIDs, err := b.retriever.RetrieveBlockByHash(hash) + headerCID, uncleCIDs, txCIDs, rctCIDs, err := b.Retriever.RetrieveBlockByHash(hash) if err != nil { return nil, err } // Fetch and decode the header IPLD - headerIPLDs, err := b.fetcher.FetchHeaders([]HeaderModel{headerCID}) + headerIPLDs, err := b.Fetcher.FetchHeaders([]HeaderModel{headerCID}) if err != nil { return nil, err } var header *types.Header - if err := rlp.DecodeBytes(headerIPLDs[0].RawData(), header); err != nil { + if err := rlp.DecodeBytes(headerIPLDs[0], header); err != nil { return nil, err } // Fetch and decode the uncle IPLDs - uncleIPLDs, err := b.fetcher.FetchUncles(uncleCIDs) + uncleIPLDs, err := b.Fetcher.FetchUncles(uncleCIDs) if err != nil { return nil, err } var uncles []*types.Header for _, uncleIPLD := range uncleIPLDs { var uncle *types.Header - if err := rlp.DecodeBytes(uncleIPLD.RawData(), uncle); err != nil { + if err := rlp.DecodeBytes(uncleIPLD, uncle); err != nil { return nil, err } uncles = append(uncles, uncle) } // Fetch and decode the transaction IPLDs - txIPLDs, err := b.fetcher.FetchTrxs(txCIDs) + txIPLDs, err := b.Fetcher.FetchTrxs(txCIDs) if err != nil { return nil, err } var transactions []*types.Transaction for _, txIPLD := range txIPLDs { var tx *types.Transaction - if err := rlp.DecodeBytes(txIPLD.RawData(), tx); err != nil { + if err := rlp.DecodeBytes(txIPLD, tx); err != nil { return nil, err } transactions = append(transactions, tx) } // Fetch and decode the receipt IPLDs - rctIPLDs, err := b.fetcher.FetchRcts(rctCIDs) + rctIPLDs, err := b.Fetcher.FetchRcts(rctCIDs) if err != nil { return nil, err } var receipts []*types.Receipt for _, rctIPLD := range rctIPLDs { var receipt *types.Receipt - if err := rlp.DecodeBytes(rctIPLD.RawData(), receipt); err != nil { + if err := rlp.DecodeBytes(rctIPLD, receipt); err != nil { return nil, err } receipts = append(receipts, receipt) @@ -295,15 +295,15 @@ func (b *Backend) GetTransaction(ctx context.Context, txHash common.Hash) (*type BlockHash string `db:"block_hash"` BlockNumber int64 `db:"block_number"` } - if err := b.db.Get(&txCIDWithHeaderInfo, pgStr, txHash.String()); err != nil { + if err := b.DB.Get(&txCIDWithHeaderInfo, pgStr, txHash.String()); err != nil { return nil, common.Hash{}, 0, 0, err } - txIPLD, err := b.fetcher.FetchTrxs([]TxModel{{CID: txCIDWithHeaderInfo.CID}}) + txIPLD, err := b.Fetcher.FetchTrxs([]TxModel{{CID: txCIDWithHeaderInfo.CID}}) if err != nil { return nil, common.Hash{}, 0, 0, err } var transaction *types.Transaction - if err := rlp.DecodeBytes(txIPLD[0].RawData(), transaction); err != nil { + if err := rlp.DecodeBytes(txIPLD[0], transaction); err != nil { return nil, common.Hash{}, 0, 0, err } return transaction, common.HexToHash(txCIDWithHeaderInfo.BlockHash), uint64(txCIDWithHeaderInfo.BlockNumber), uint64(txCIDWithHeaderInfo.Index), nil diff --git a/pkg/super_node/eth/converter.go b/pkg/super_node/eth/converter.go index 3dc22ac2..2a7979e8 100644 --- a/pkg/super_node/eth/converter.go +++ b/pkg/super_node/eth/converter.go @@ -42,7 +42,7 @@ func NewPayloadConverter(chainConfig *params.ChainConfig) *PayloadConverter { // Convert method is used to convert a eth statediff.Payload to an IPLDPayload // Satisfies the shared.PayloadConverter interface -func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) { +func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.ConvertedData, error) { stateDiffPayload, ok := payload.(statediff.Payload) if !ok { return nil, fmt.Errorf("eth converter: expected payload type %T got %T", statediff.Payload{}, payload) @@ -53,7 +53,7 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Streame return nil, err } trxLen := len(block.Transactions()) - convertedPayload := IPLDPayload{ + convertedPayload := ConvertedPayload{ TotalDifficulty: stateDiffPayload.TotalDifficulty, Block: block, TxMetaData: make([]TxModel, 0, trxLen), diff --git a/pkg/super_node/eth/converter_test.go b/pkg/super_node/eth/converter_test.go index a75b4586..6c4f94bf 100644 --- a/pkg/super_node/eth/converter_test.go +++ b/pkg/super_node/eth/converter_test.go @@ -32,7 +32,7 @@ var _ = Describe("Converter", func() { converter := eth.NewPayloadConverter(params.MainnetChainConfig) payload, err := converter.Convert(mocks.MockStateDiffPayload) Expect(err).ToNot(HaveOccurred()) - convertedPayload, ok := payload.(eth.IPLDPayload) + convertedPayload, ok := payload.(eth.ConvertedPayload) Expect(ok).To(BeTrue()) Expect(convertedPayload.Block.Number().String()).To(Equal(mocks.BlockNumber.String())) Expect(convertedPayload.Block.Hash().String()).To(Equal(mocks.MockBlock.Hash().String())) diff --git a/pkg/super_node/eth/filterer.go b/pkg/super_node/eth/filterer.go index 639cde50..074d0a40 100644 --- a/pkg/super_node/eth/filterer.go +++ b/pkg/super_node/eth/filterer.go @@ -37,58 +37,58 @@ func NewResponseFilterer() *ResponseFilterer { } // Filter is used to filter through eth data to extract and package requested data into a Payload -func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload shared.StreamedIPLDs) (shared.ServerResponse, error) { +func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload shared.ConvertedData) (shared.IPLDs, error) { ethFilters, ok := filter.(*SubscriptionSettings) if !ok { - return StreamResponse{}, fmt.Errorf("eth filterer expected filter type %T got %T", &SubscriptionSettings{}, filter) + return IPLDs{}, fmt.Errorf("eth filterer expected filter type %T got %T", &SubscriptionSettings{}, filter) } - ethPayload, ok := payload.(IPLDPayload) + ethPayload, ok := payload.(ConvertedPayload) if !ok { - return StreamResponse{}, fmt.Errorf("eth filterer expected payload type %T got %T", IPLDPayload{}, payload) + return IPLDs{}, fmt.Errorf("eth filterer expected payload type %T got %T", ConvertedPayload{}, payload) } if checkRange(ethFilters.Start.Int64(), ethFilters.End.Int64(), ethPayload.Block.Number().Int64()) { - response := new(StreamResponse) + response := new(IPLDs) if err := s.filterHeaders(ethFilters.HeaderFilter, response, ethPayload); err != nil { - return StreamResponse{}, err + return IPLDs{}, err } txHashes, err := s.filterTransactions(ethFilters.TxFilter, response, ethPayload) if err != nil { - return StreamResponse{}, err + return IPLDs{}, err } var filterTxs []common.Hash if ethFilters.ReceiptFilter.MatchTxs { filterTxs = txHashes } if err := s.filerReceipts(ethFilters.ReceiptFilter, response, ethPayload, filterTxs); err != nil { - return StreamResponse{}, err + return IPLDs{}, err } if err := s.filterState(ethFilters.StateFilter, response, ethPayload); err != nil { - return StreamResponse{}, err + return IPLDs{}, err } if err := s.filterStorage(ethFilters.StorageFilter, response, ethPayload); err != nil { - return StreamResponse{}, err + return IPLDs{}, err } response.BlockNumber = ethPayload.Block.Number() return *response, nil } - return StreamResponse{}, nil + return IPLDs{}, nil } -func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *StreamResponse, payload IPLDPayload) error { +func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *IPLDs, payload ConvertedPayload) error { if !headerFilter.Off { headerRLP, err := rlp.EncodeToBytes(payload.Block.Header()) if err != nil { return err } - response.HeadersRlp = append(response.HeadersRlp, headerRLP) + response.Headers = append(response.Headers, headerRLP) if headerFilter.Uncles { - response.UnclesRlp = make([][]byte, 0, len(payload.Block.Body().Uncles)) - for _, uncle := range payload.Block.Body().Uncles { + response.Uncles = make([][]byte, len(payload.Block.Body().Uncles)) + for i, uncle := range payload.Block.Body().Uncles { uncleRlp, err := rlp.EncodeToBytes(uncle) if err != nil { return err } - response.UnclesRlp = append(response.UnclesRlp, uncleRlp) + response.Uncles[i] = uncleRlp } } } @@ -102,17 +102,20 @@ func checkRange(start, end, actual int64) bool { return false } -func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *StreamResponse, payload IPLDPayload) ([]common.Hash, error) { - trxHashes := make([]common.Hash, 0, len(payload.Block.Body().Transactions)) +func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *IPLDs, payload ConvertedPayload) ([]common.Hash, error) { + var trxHashes []common.Hash if !trxFilter.Off { + trxLen := len(payload.Block.Body().Transactions) + trxHashes = make([]common.Hash, 0, trxLen) + response.Transactions = make([][]byte, 0, trxLen) for i, trx := range payload.Block.Body().Transactions { if checkTransactionAddrs(trxFilter.Src, trxFilter.Dst, payload.TxMetaData[i].Src, payload.TxMetaData[i].Dst) { trxBuffer := new(bytes.Buffer) if err := trx.EncodeRLP(trxBuffer); err != nil { return nil, err } + response.Transactions = append(response.Transactions, trxBuffer.Bytes()) trxHashes = append(trxHashes, trx.Hash()) - response.TransactionsRlp = append(response.TransactionsRlp, trxBuffer.Bytes()) } } } @@ -138,8 +141,9 @@ func checkTransactionAddrs(wantedSrc, wantedDst []string, actualSrc, actualDst s return false } -func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *StreamResponse, payload IPLDPayload, trxHashes []common.Hash) error { +func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *IPLDs, payload ConvertedPayload, trxHashes []common.Hash) error { if !receiptFilter.Off { + response.Receipts = make([][]byte, 0, len(payload.Receipts)) for i, receipt := range payload.Receipts { // topics is always length 4 topics := [][]string{payload.ReceiptMetaData[i].Topic0s, payload.ReceiptMetaData[i].Topic1s, payload.ReceiptMetaData[i].Topic2s, payload.ReceiptMetaData[i].Topic3s} @@ -149,7 +153,7 @@ func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response * if err := receiptForStorage.EncodeRLP(receiptBuffer); err != nil { return err } - response.ReceiptsRlp = append(response.ReceiptsRlp, receiptBuffer.Bytes()) + response.Receipts = append(response.Receipts, receiptBuffer.Bytes()) } } } @@ -217,9 +221,9 @@ func slicesShareString(slice1, slice2 []string) int { return 0 } -func (s *ResponseFilterer) filterState(stateFilter StateFilter, response *StreamResponse, payload IPLDPayload) error { +func (s *ResponseFilterer) filterState(stateFilter StateFilter, response *IPLDs, payload ConvertedPayload) error { if !stateFilter.Off { - response.StateNodesRlp = make(map[common.Hash][]byte) + response.StateNodes = make([]StateNode, 0, len(payload.StateNodes)) keyFilters := make([]common.Hash, len(stateFilter.Addresses)) for i, addr := range stateFilter.Addresses { keyFilters[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes()) @@ -227,7 +231,11 @@ func (s *ResponseFilterer) filterState(stateFilter StateFilter, response *Stream for _, stateNode := range payload.StateNodes { if checkNodeKeys(keyFilters, stateNode.Key) { if stateNode.Leaf || stateFilter.IntermediateNodes { - response.StateNodesRlp[stateNode.Key] = stateNode.Value + response.StateNodes = append(response.StateNodes, StateNode{ + StateTrieKey: stateNode.Key, + IPLD: stateNode.Value, + Leaf: stateNode.Leaf, + }) } } } @@ -248,9 +256,9 @@ func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool { return false } -func (s *ResponseFilterer) filterStorage(storageFilter StorageFilter, response *StreamResponse, payload IPLDPayload) error { +func (s *ResponseFilterer) filterStorage(storageFilter StorageFilter, response *IPLDs, payload ConvertedPayload) error { if !storageFilter.Off { - response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte) + response.StorageNodes = make([]StorageNode, 0) stateKeyFilters := make([]common.Hash, len(storageFilter.Addresses)) for i, addr := range storageFilter.Addresses { stateKeyFilters[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes()) @@ -261,10 +269,14 @@ func (s *ResponseFilterer) filterStorage(storageFilter StorageFilter, response * } for stateKey, storageNodes := range payload.StorageNodes { if checkNodeKeys(stateKeyFilters, stateKey) { - response.StorageNodesRlp[stateKey] = make(map[common.Hash][]byte) for _, storageNode := range storageNodes { if checkNodeKeys(storageKeyFilters, storageNode.Key) { - response.StorageNodesRlp[stateKey][storageNode.Key] = storageNode.Value + response.StorageNodes = append(response.StorageNodes, StorageNode{ + StateTrieKey: stateKey, + StorageTrieKey: storageNode.Key, + IPLD: storageNode.Value, + Leaf: storageNode.Leaf, + }) } } } diff --git a/pkg/super_node/eth/filterer_test.go b/pkg/super_node/eth/filterer_test.go index 745f1c0a..e3004a08 100644 --- a/pkg/super_node/eth/filterer_test.go +++ b/pkg/super_node/eth/filterer_test.go @@ -43,133 +43,141 @@ var _ = Describe("Filterer", func() { }) It("Transcribes all the data from the IPLDPayload into the StreamPayload if given an open filter", func() { - payload, err := filterer.Filter(openFilter, mocks.MockIPLDPayload) + payload, err := filterer.Filter(openFilter, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) - superNodePayload, ok := payload.(eth.StreamResponse) + iplds, ok := payload.(eth.IPLDs) Expect(ok).To(BeTrue()) - Expect(superNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) - Expect(superNodePayload.HeadersRlp).To(Equal(mocks.MockSeedNodePayload.HeadersRlp)) + Expect(iplds.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) + Expect(iplds.Headers).To(Equal(mocks.MockIPLDs.Headers)) var unclesRlp [][]byte - Expect(superNodePayload.UnclesRlp).To(Equal(unclesRlp)) - Expect(len(superNodePayload.TransactionsRlp)).To(Equal(2)) - Expect(shared.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) - Expect(shared.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) - Expect(len(superNodePayload.ReceiptsRlp)).To(Equal(2)) - Expect(shared.ListContainsBytes(superNodePayload.ReceiptsRlp, expectedRctForStorageRLP1)).To(BeTrue()) - Expect(shared.ListContainsBytes(superNodePayload.ReceiptsRlp, expectedRctForStorageRLP2)).To(BeTrue()) - Expect(len(superNodePayload.StateNodesRlp)).To(Equal(2)) - Expect(superNodePayload.StateNodesRlp[mocks.ContractLeafKey]).To(Equal(mocks.ValueBytes)) - Expect(superNodePayload.StateNodesRlp[mocks.AnotherContractLeafKey]).To(Equal(mocks.AnotherValueBytes)) - Expect(superNodePayload.StorageNodesRlp).To(Equal(mocks.MockSeedNodePayload.StorageNodesRlp)) + Expect(iplds.Uncles).To(Equal(unclesRlp)) + Expect(len(iplds.Transactions)).To(Equal(2)) + Expect(shared.ListContainsBytes(iplds.Transactions, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) + Expect(shared.ListContainsBytes(iplds.Transactions, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) + Expect(len(iplds.Receipts)).To(Equal(2)) + Expect(shared.ListContainsBytes(iplds.Receipts, expectedRctForStorageRLP1)).To(BeTrue()) + Expect(shared.ListContainsBytes(iplds.Receipts, expectedRctForStorageRLP2)).To(BeTrue()) + Expect(len(iplds.StateNodes)).To(Equal(2)) + for _, stateNode := range iplds.StateNodes { + Expect(stateNode.Leaf).To(BeTrue()) + if stateNode.StateTrieKey == mocks.ContractLeafKey { + Expect(stateNode.IPLD).To(Equal(mocks.ValueBytes)) + } + if stateNode.StateTrieKey == mocks.AnotherContractLeafKey { + Expect(stateNode.IPLD).To(Equal(mocks.AnotherValueBytes)) + } + } + Expect(iplds.StorageNodes).To(Equal(mocks.MockIPLDs.StorageNodes)) }) It("Applies filters from the provided config.Subscription", func() { - payload1, err := filterer.Filter(rctContractFilter, mocks.MockIPLDPayload) + payload1, err := filterer.Filter(rctContractFilter, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) - superNodePayload1, ok := payload1.(eth.StreamResponse) + iplds1, ok := payload1.(eth.IPLDs) Expect(ok).To(BeTrue()) - Expect(superNodePayload1.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) - Expect(len(superNodePayload1.HeadersRlp)).To(Equal(0)) - Expect(len(superNodePayload1.UnclesRlp)).To(Equal(0)) - Expect(len(superNodePayload1.TransactionsRlp)).To(Equal(0)) - Expect(len(superNodePayload1.StorageNodesRlp)).To(Equal(0)) - Expect(len(superNodePayload1.StateNodesRlp)).To(Equal(0)) - Expect(len(superNodePayload1.ReceiptsRlp)).To(Equal(1)) - Expect(superNodePayload1.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2)) + Expect(iplds1.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) + Expect(len(iplds1.Headers)).To(Equal(0)) + Expect(len(iplds1.Uncles)).To(Equal(0)) + Expect(len(iplds1.Transactions)).To(Equal(0)) + Expect(len(iplds1.StorageNodes)).To(Equal(0)) + Expect(len(iplds1.StateNodes)).To(Equal(0)) + Expect(len(iplds1.Receipts)).To(Equal(1)) + Expect(iplds1.Receipts[0]).To(Equal(expectedRctForStorageRLP2)) - payload2, err := filterer.Filter(rctTopicsFilter, mocks.MockIPLDPayload) + payload2, err := filterer.Filter(rctTopicsFilter, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) - superNodePayload2, ok := payload2.(eth.StreamResponse) + iplds2, ok := payload2.(eth.IPLDs) Expect(ok).To(BeTrue()) - Expect(superNodePayload2.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) - Expect(len(superNodePayload2.HeadersRlp)).To(Equal(0)) - Expect(len(superNodePayload2.UnclesRlp)).To(Equal(0)) - Expect(len(superNodePayload2.TransactionsRlp)).To(Equal(0)) - Expect(len(superNodePayload2.StorageNodesRlp)).To(Equal(0)) - Expect(len(superNodePayload2.StateNodesRlp)).To(Equal(0)) - Expect(len(superNodePayload2.ReceiptsRlp)).To(Equal(1)) - Expect(superNodePayload2.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP1)) + Expect(iplds2.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) + Expect(len(iplds2.Headers)).To(Equal(0)) + Expect(len(iplds2.Uncles)).To(Equal(0)) + Expect(len(iplds2.Transactions)).To(Equal(0)) + Expect(len(iplds2.StorageNodes)).To(Equal(0)) + Expect(len(iplds2.StateNodes)).To(Equal(0)) + Expect(len(iplds2.Receipts)).To(Equal(1)) + Expect(iplds2.Receipts[0]).To(Equal(expectedRctForStorageRLP1)) - payload3, err := filterer.Filter(rctTopicsAndContractFilter, mocks.MockIPLDPayload) + payload3, err := filterer.Filter(rctTopicsAndContractFilter, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) - superNodePayload3, ok := payload3.(eth.StreamResponse) + iplds3, ok := payload3.(eth.IPLDs) Expect(ok).To(BeTrue()) - Expect(superNodePayload3.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) - Expect(len(superNodePayload3.HeadersRlp)).To(Equal(0)) - Expect(len(superNodePayload3.UnclesRlp)).To(Equal(0)) - Expect(len(superNodePayload3.TransactionsRlp)).To(Equal(0)) - Expect(len(superNodePayload3.StorageNodesRlp)).To(Equal(0)) - Expect(len(superNodePayload3.StateNodesRlp)).To(Equal(0)) - Expect(len(superNodePayload3.ReceiptsRlp)).To(Equal(1)) - Expect(superNodePayload3.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP1)) + Expect(iplds3.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) + Expect(len(iplds3.Headers)).To(Equal(0)) + Expect(len(iplds3.Uncles)).To(Equal(0)) + Expect(len(iplds3.Transactions)).To(Equal(0)) + Expect(len(iplds3.StorageNodes)).To(Equal(0)) + Expect(len(iplds3.StateNodes)).To(Equal(0)) + Expect(len(iplds3.Receipts)).To(Equal(1)) + Expect(iplds3.Receipts[0]).To(Equal(expectedRctForStorageRLP1)) - payload4, err := filterer.Filter(rctContractsAndTopicFilter, mocks.MockIPLDPayload) + payload4, err := filterer.Filter(rctContractsAndTopicFilter, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) - superNodePayload4, ok := payload4.(eth.StreamResponse) + iplds4, ok := payload4.(eth.IPLDs) Expect(ok).To(BeTrue()) - Expect(superNodePayload4.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) - Expect(len(superNodePayload4.HeadersRlp)).To(Equal(0)) - Expect(len(superNodePayload4.UnclesRlp)).To(Equal(0)) - Expect(len(superNodePayload4.TransactionsRlp)).To(Equal(0)) - Expect(len(superNodePayload4.StorageNodesRlp)).To(Equal(0)) - Expect(len(superNodePayload4.StateNodesRlp)).To(Equal(0)) - Expect(len(superNodePayload4.ReceiptsRlp)).To(Equal(1)) - Expect(superNodePayload4.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2)) + Expect(iplds4.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) + Expect(len(iplds4.Headers)).To(Equal(0)) + Expect(len(iplds4.Uncles)).To(Equal(0)) + Expect(len(iplds4.Transactions)).To(Equal(0)) + Expect(len(iplds4.StorageNodes)).To(Equal(0)) + Expect(len(iplds4.StateNodes)).To(Equal(0)) + Expect(len(iplds4.Receipts)).To(Equal(1)) + Expect(iplds4.Receipts[0]).To(Equal(expectedRctForStorageRLP2)) - payload5, err := filterer.Filter(rctsForAllCollectedTrxs, mocks.MockIPLDPayload) + payload5, err := filterer.Filter(rctsForAllCollectedTrxs, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) - superNodePayload5, ok := payload5.(eth.StreamResponse) + iplds5, ok := payload5.(eth.IPLDs) Expect(ok).To(BeTrue()) - Expect(superNodePayload5.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) - Expect(len(superNodePayload5.HeadersRlp)).To(Equal(0)) - Expect(len(superNodePayload5.UnclesRlp)).To(Equal(0)) - Expect(len(superNodePayload5.TransactionsRlp)).To(Equal(2)) - Expect(shared.ListContainsBytes(superNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) - Expect(shared.ListContainsBytes(superNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) - Expect(len(superNodePayload5.StorageNodesRlp)).To(Equal(0)) - Expect(len(superNodePayload5.StateNodesRlp)).To(Equal(0)) - Expect(len(superNodePayload5.ReceiptsRlp)).To(Equal(2)) - Expect(shared.ListContainsBytes(superNodePayload5.ReceiptsRlp, expectedRctForStorageRLP1)).To(BeTrue()) - Expect(shared.ListContainsBytes(superNodePayload5.ReceiptsRlp, expectedRctForStorageRLP2)).To(BeTrue()) + Expect(iplds5.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) + Expect(len(iplds5.Headers)).To(Equal(0)) + Expect(len(iplds5.Uncles)).To(Equal(0)) + Expect(len(iplds5.Transactions)).To(Equal(2)) + Expect(shared.ListContainsBytes(iplds5.Transactions, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) + Expect(shared.ListContainsBytes(iplds5.Transactions, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) + Expect(len(iplds5.StorageNodes)).To(Equal(0)) + Expect(len(iplds5.StateNodes)).To(Equal(0)) + Expect(len(iplds5.Receipts)).To(Equal(2)) + Expect(shared.ListContainsBytes(iplds5.Receipts, expectedRctForStorageRLP1)).To(BeTrue()) + Expect(shared.ListContainsBytes(iplds5.Receipts, expectedRctForStorageRLP2)).To(BeTrue()) - payload6, err := filterer.Filter(rctsForSelectCollectedTrxs, mocks.MockIPLDPayload) + payload6, err := filterer.Filter(rctsForSelectCollectedTrxs, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) - superNodePayload6, ok := payload6.(eth.StreamResponse) + iplds6, ok := payload6.(eth.IPLDs) Expect(ok).To(BeTrue()) - Expect(superNodePayload6.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) - Expect(len(superNodePayload6.HeadersRlp)).To(Equal(0)) - Expect(len(superNodePayload6.UnclesRlp)).To(Equal(0)) - Expect(len(superNodePayload6.TransactionsRlp)).To(Equal(1)) - Expect(shared.ListContainsBytes(superNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) - Expect(len(superNodePayload6.StorageNodesRlp)).To(Equal(0)) - Expect(len(superNodePayload6.StateNodesRlp)).To(Equal(0)) - Expect(len(superNodePayload6.ReceiptsRlp)).To(Equal(1)) - Expect(superNodePayload4.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2)) + Expect(iplds6.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) + Expect(len(iplds6.Headers)).To(Equal(0)) + Expect(len(iplds6.Uncles)).To(Equal(0)) + Expect(len(iplds6.Transactions)).To(Equal(1)) + Expect(shared.ListContainsBytes(iplds5.Transactions, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) + Expect(len(iplds6.StorageNodes)).To(Equal(0)) + Expect(len(iplds6.StateNodes)).To(Equal(0)) + Expect(len(iplds6.Receipts)).To(Equal(1)) + Expect(iplds4.Receipts[0]).To(Equal(expectedRctForStorageRLP2)) - payload7, err := filterer.Filter(stateFilter, mocks.MockIPLDPayload) + payload7, err := filterer.Filter(stateFilter, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) - superNodePayload7, ok := payload7.(eth.StreamResponse) + iplds7, ok := payload7.(eth.IPLDs) Expect(ok).To(BeTrue()) - Expect(superNodePayload7.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) - Expect(len(superNodePayload7.HeadersRlp)).To(Equal(0)) - Expect(len(superNodePayload7.UnclesRlp)).To(Equal(0)) - Expect(len(superNodePayload7.TransactionsRlp)).To(Equal(0)) - Expect(len(superNodePayload7.StorageNodesRlp)).To(Equal(0)) - Expect(len(superNodePayload7.ReceiptsRlp)).To(Equal(0)) - Expect(len(superNodePayload7.StateNodesRlp)).To(Equal(1)) - Expect(superNodePayload7.StateNodesRlp[mocks.ContractLeafKey]).To(Equal(mocks.ValueBytes)) + Expect(iplds7.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) + Expect(len(iplds7.Headers)).To(Equal(0)) + Expect(len(iplds7.Uncles)).To(Equal(0)) + Expect(len(iplds7.Transactions)).To(Equal(0)) + Expect(len(iplds7.StorageNodes)).To(Equal(0)) + Expect(len(iplds7.Receipts)).To(Equal(0)) + Expect(len(iplds7.StateNodes)).To(Equal(1)) + Expect(iplds7.StateNodes[0].StateTrieKey).To(Equal(mocks.ContractLeafKey)) + Expect(iplds7.StateNodes[0].IPLD).To(Equal(mocks.ValueBytes)) - payload8, err := filterer.Filter(rctTopicsAndContractFilterFail, mocks.MockIPLDPayload) + payload8, err := filterer.Filter(rctTopicsAndContractFilterFail, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) - superNodePayload8, ok := payload8.(eth.StreamResponse) + iplds8, ok := payload8.(eth.IPLDs) Expect(ok).To(BeTrue()) - Expect(superNodePayload8.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) - Expect(len(superNodePayload8.HeadersRlp)).To(Equal(0)) - Expect(len(superNodePayload8.UnclesRlp)).To(Equal(0)) - Expect(len(superNodePayload8.TransactionsRlp)).To(Equal(0)) - Expect(len(superNodePayload8.StorageNodesRlp)).To(Equal(0)) - Expect(len(superNodePayload8.StateNodesRlp)).To(Equal(0)) - Expect(len(superNodePayload8.ReceiptsRlp)).To(Equal(0)) + Expect(iplds8.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) + Expect(len(iplds8.Headers)).To(Equal(0)) + Expect(len(iplds8.Uncles)).To(Equal(0)) + Expect(len(iplds8.Transactions)).To(Equal(0)) + Expect(len(iplds8.StorageNodes)).To(Equal(0)) + Expect(len(iplds8.StateNodes)).To(Equal(0)) + Expect(len(iplds8.Receipts)).To(Equal(0)) }) }) }) diff --git a/pkg/super_node/eth/ipld_fetcher.go b/pkg/super_node/eth/ipld_fetcher.go index c9220f5e..e1935136 100644 --- a/pkg/super_node/eth/ipld_fetcher.go +++ b/pkg/super_node/eth/ipld_fetcher.go @@ -21,8 +21,6 @@ import ( "errors" "fmt" - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" - "github.com/ethereum/go-ethereum/common" "github.com/ipfs/go-block-format" "github.com/ipfs/go-blockservice" @@ -30,6 +28,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/ipfs" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) var ( @@ -53,13 +52,13 @@ func NewIPLDFetcher(ipfsPath string) (*IPLDFetcher, error) { } // Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper -func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.FetchedIPLDs, error) { +func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { cidWrapper, ok := cids.(*CIDWrapper) if !ok { return nil, fmt.Errorf("eth fetcher: expected cids type %T got %T", &CIDWrapper{}, cids) } log.Debug("fetching iplds") - iplds := new(IPLDWrapper) + iplds := IPLDs{} iplds.BlockNumber = cidWrapper.BlockNumber var err error iplds.Headers, err = f.FetchHeaders(cidWrapper.Headers) @@ -91,91 +90,107 @@ func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.FetchedIPLDs, e // FetchHeaders fetches headers // It uses the f.fetchBatch method -func (f *IPLDFetcher) FetchHeaders(cids []HeaderModel) ([]blocks.Block, error) { +func (f *IPLDFetcher) FetchHeaders(cids []HeaderModel) ([][]byte, error) { log.Debug("fetching header iplds") - headerCids := make([]cid.Cid, 0, len(cids)) - for _, c := range cids { + headerCids := make([]cid.Cid, len(cids)) + for i, c := range cids { dc, err := cid.Decode(c.CID) if err != nil { return nil, err } - headerCids = append(headerCids, dc) + headerCids[i] = dc } headers := f.fetchBatch(headerCids) - if len(headers) != len(headerCids) { - log.Errorf("ipfs fetcher: number of header blocks returned (%d) does not match number expected (%d)", len(headers), len(headerCids)) - return headers, errUnexpectedNumberOfIPLDs + headersRLP := make([][]byte, len(headers)) + for i, header := range headers { + headersRLP[i] = header.RawData() } - return headers, nil + if len(headersRLP) != len(headerCids) { + log.Errorf("ipfs fetcher: number of header blocks returned (%d) does not match number expected (%d)", len(headers), len(headerCids)) + return headersRLP, errUnexpectedNumberOfIPLDs + } + return headersRLP, nil } // FetchUncles fetches uncles // It uses the f.fetchBatch method -func (f *IPLDFetcher) FetchUncles(cids []UncleModel) ([]blocks.Block, error) { +func (f *IPLDFetcher) FetchUncles(cids []UncleModel) ([][]byte, error) { log.Debug("fetching uncle iplds") - uncleCids := make([]cid.Cid, 0, len(cids)) - for _, c := range cids { + uncleCids := make([]cid.Cid, len(cids)) + for i, c := range cids { dc, err := cid.Decode(c.CID) if err != nil { return nil, err } - uncleCids = append(uncleCids, dc) + uncleCids[i] = dc } uncles := f.fetchBatch(uncleCids) - if len(uncles) != len(uncleCids) { - log.Errorf("ipfs fetcher: number of uncle blocks returned (%d) does not match number expected (%d)", len(uncles), len(uncleCids)) - return uncles, errUnexpectedNumberOfIPLDs + unclesRLP := make([][]byte, len(uncles)) + for i, uncle := range uncles { + unclesRLP[i] = uncle.RawData() } - return uncles, nil + if len(unclesRLP) != len(uncleCids) { + log.Errorf("ipfs fetcher: number of uncle blocks returned (%d) does not match number expected (%d)", len(uncles), len(uncleCids)) + return unclesRLP, errUnexpectedNumberOfIPLDs + } + return unclesRLP, nil } // FetchTrxs fetches transactions // It uses the f.fetchBatch method -func (f *IPLDFetcher) FetchTrxs(cids []TxModel) ([]blocks.Block, error) { +func (f *IPLDFetcher) FetchTrxs(cids []TxModel) ([][]byte, error) { log.Debug("fetching transaction iplds") - trxCids := make([]cid.Cid, 0, len(cids)) - for _, c := range cids { + trxCids := make([]cid.Cid, len(cids)) + for i, c := range cids { dc, err := cid.Decode(c.CID) if err != nil { return nil, err } - trxCids = append(trxCids, dc) + trxCids[i] = dc } trxs := f.fetchBatch(trxCids) - if len(trxs) != len(trxCids) { - log.Errorf("ipfs fetcher: number of transaction blocks returned (%d) does not match number expected (%d)", len(trxs), len(trxCids)) - return trxs, errUnexpectedNumberOfIPLDs + trxsRLP := make([][]byte, len(trxs)) + for i, trx := range trxs { + trxsRLP[i] = trx.RawData() } - return trxs, nil + if len(trxsRLP) != len(trxCids) { + log.Errorf("ipfs fetcher: number of transaction blocks returned (%d) does not match number expected (%d)", len(trxs), len(trxCids)) + return trxsRLP, errUnexpectedNumberOfIPLDs + } + return trxsRLP, nil } // FetchRcts fetches receipts // It uses the f.fetchBatch method -func (f *IPLDFetcher) FetchRcts(cids []ReceiptModel) ([]blocks.Block, error) { +func (f *IPLDFetcher) FetchRcts(cids []ReceiptModel) ([][]byte, error) { log.Debug("fetching receipt iplds") - rctCids := make([]cid.Cid, 0, len(cids)) - for _, c := range cids { + rctCids := make([]cid.Cid, len(cids)) + for i, c := range cids { dc, err := cid.Decode(c.CID) if err != nil { return nil, err } - rctCids = append(rctCids, dc) + rctCids[i] = dc } rcts := f.fetchBatch(rctCids) - if len(rcts) != len(rctCids) { - log.Errorf("ipfs fetcher: number of receipt blocks returned (%d) does not match number expected (%d)", len(rcts), len(rctCids)) - return rcts, errUnexpectedNumberOfIPLDs + rctsRLP := make([][]byte, len(rcts)) + for i, rct := range rcts { + rctsRLP[i] = rct.RawData() } - return rcts, nil + if len(rctsRLP) != len(rctCids) { + log.Errorf("ipfs fetcher: number of receipt blocks returned (%d) does not match number expected (%d)", len(rcts), len(rctCids)) + return rctsRLP, errUnexpectedNumberOfIPLDs + } + return rctsRLP, nil } // FetchState fetches state nodes // It uses the single f.fetch method instead of the batch fetch, because it // needs to maintain the data's relation to state keys -func (f *IPLDFetcher) FetchState(cids []StateNodeModel) (map[common.Hash]blocks.Block, error) { +func (f *IPLDFetcher) FetchState(cids []StateNodeModel) ([]StateNode, error) { log.Debug("fetching state iplds") - stateNodes := make(map[common.Hash]blocks.Block) - for _, stateNode := range cids { + stateNodes := make([]StateNode, len(cids)) + for i, stateNode := range cids { if stateNode.CID == "" || stateNode.StateKey == "" { continue } @@ -187,7 +202,11 @@ func (f *IPLDFetcher) FetchState(cids []StateNodeModel) (map[common.Hash]blocks. if err != nil { return nil, err } - stateNodes[common.HexToHash(stateNode.StateKey)] = state + stateNodes[i] = StateNode{ + IPLD: state.RawData(), + StateTrieKey: common.HexToHash(stateNode.StateKey), + Leaf: stateNode.Leaf, + } } return stateNodes, nil } @@ -195,10 +214,10 @@ func (f *IPLDFetcher) FetchState(cids []StateNodeModel) (map[common.Hash]blocks. // FetchStorage fetches storage nodes // It uses the single f.fetch method instead of the batch fetch, because it // needs to maintain the data's relation to state and storage keys -func (f *IPLDFetcher) FetchStorage(cids []StorageNodeWithStateKeyModel) (map[common.Hash]map[common.Hash]blocks.Block, error) { +func (f *IPLDFetcher) FetchStorage(cids []StorageNodeWithStateKeyModel) ([]StorageNode, error) { log.Debug("fetching storage iplds") - storageNodes := make(map[common.Hash]map[common.Hash]blocks.Block) - for _, storageNode := range cids { + storageNodes := make([]StorageNode, len(cids)) + for i, storageNode := range cids { if storageNode.CID == "" || storageNode.StorageKey == "" || storageNode.StateKey == "" { continue } @@ -210,10 +229,12 @@ func (f *IPLDFetcher) FetchStorage(cids []StorageNodeWithStateKeyModel) (map[com if err != nil { return nil, err } - if storageNodes[common.HexToHash(storageNode.StateKey)] == nil { - storageNodes[common.HexToHash(storageNode.StateKey)] = make(map[common.Hash]blocks.Block) + storageNodes[i] = StorageNode{ + IPLD: storage.RawData(), + StateTrieKey: common.HexToHash(storageNode.StateKey), + StorageTrieKey: common.HexToHash(storageNode.StorageKey), + Leaf: storageNode.Leaf, } - storageNodes[common.HexToHash(storageNode.StateKey)][common.HexToHash(storageNode.StorageKey)] = storage } return storageNodes, nil } diff --git a/pkg/super_node/eth/ipld_fetcher_test.go b/pkg/super_node/eth/ipld_fetcher_test.go index ee0d954c..074107f2 100644 --- a/pkg/super_node/eth/ipld_fetcher_test.go +++ b/pkg/super_node/eth/ipld_fetcher_test.go @@ -17,6 +17,7 @@ package eth_test import ( + "bytes" "math/big" "github.com/ethereum/go-ethereum/common" @@ -101,30 +102,32 @@ var _ = Describe("Fetcher", func() { fetcher.BlockService = mockBlockService i, err := fetcher.Fetch(mockCIDWrapper) Expect(err).ToNot(HaveOccurred()) - iplds, ok := i.(*eth.IPLDWrapper) + iplds, ok := i.(eth.IPLDs) Expect(ok).To(BeTrue()) Expect(iplds.BlockNumber).To(Equal(mockCIDWrapper.BlockNumber)) Expect(len(iplds.Headers)).To(Equal(1)) - Expect(iplds.Headers[0]).To(Equal(mockHeaderBlock)) + Expect(iplds.Headers[0]).To(Equal(mockHeaderBlock.RawData())) Expect(len(iplds.Uncles)).To(Equal(1)) - Expect(iplds.Uncles[0]).To(Equal(mockUncleBlock)) + Expect(iplds.Uncles[0]).To(Equal(mockUncleBlock.RawData())) Expect(len(iplds.Transactions)).To(Equal(1)) - Expect(iplds.Transactions[0]).To(Equal(mockTrxBlock)) + Expect(iplds.Transactions[0]).To(Equal(mockTrxBlock.RawData())) Expect(len(iplds.Receipts)).To(Equal(1)) - Expect(iplds.Receipts[0]).To(Equal(mockReceiptBlock)) + Expect(iplds.Receipts[0]).To(Equal(mockReceiptBlock.RawData())) Expect(len(iplds.StateNodes)).To(Equal(1)) - stateNode, ok := iplds.StateNodes[common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470")] - Expect(ok).To(BeTrue()) - Expect(stateNode).To(Equal(mockStateBlock)) - Expect(len(iplds.StorageNodes)).To(Equal(1)) - storageNodes := iplds.StorageNodes[common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470")] - Expect(len(storageNodes)).To(Equal(2)) - storageNode1, ok := storageNodes[common.HexToHash("0000000000000000000000000000000000000000000000000000000000000001")] - Expect(ok).To(BeTrue()) - Expect(storageNode1).To(Equal(mockStorageBlock1)) - storageNode2, ok := storageNodes[common.HexToHash("0000000000000000000000000000000000000000000000000000000000000002")] - Expect(storageNode2).To(Equal(mockStorageBlock2)) - Expect(ok).To(BeTrue()) + Expect(iplds.StateNodes[0].StateTrieKey).To(Equal(common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470"))) + Expect(iplds.StateNodes[0].Leaf).To(BeTrue()) + Expect(iplds.StateNodes[0].IPLD).To(Equal(mockStateBlock.RawData())) + Expect(len(iplds.StorageNodes)).To(Equal(2)) + for _, storage := range iplds.StorageNodes { + Expect(storage.Leaf).To(BeTrue()) + Expect(storage.StateTrieKey).To(Equal(common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470"))) + if bytes.Equal(storage.StorageTrieKey.Bytes(), common.HexToHash("0000000000000000000000000000000000000000000000000000000000000001").Bytes()) { + Expect(storage.IPLD).To(Equal(mockStorageBlock1.RawData())) + } + if bytes.Equal(storage.StorageTrieKey.Bytes(), common.HexToHash("0000000000000000000000000000000000000000000000000000000000000002").Bytes()) { + Expect(storage.IPLD).To(Equal(mockStorageBlock2.RawData())) + } + } }) }) }) diff --git a/pkg/super_node/eth/mocks/converter.go b/pkg/super_node/eth/mocks/converter.go index 09326998..eea84595 100644 --- a/pkg/super_node/eth/mocks/converter.go +++ b/pkg/super_node/eth/mocks/converter.go @@ -29,12 +29,12 @@ import ( // PayloadConverter is the underlying struct for the Converter interface type PayloadConverter struct { PassedStatediffPayload statediff.Payload - ReturnIPLDPayload eth.IPLDPayload + ReturnIPLDPayload eth.ConvertedPayload ReturnErr error } // Convert method is used to convert a geth statediff.Payload to a IPLDPayload -func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) { +func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.ConvertedData, error) { stateDiffPayload, ok := payload.(statediff.Payload) if !ok { return nil, fmt.Errorf("convert expected payload type %T got %T", statediff.Payload{}, payload) @@ -46,13 +46,13 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Streame // IterativePayloadConverter is the underlying struct for the Converter interface type IterativePayloadConverter struct { PassedStatediffPayload []statediff.Payload - ReturnIPLDPayload []eth.IPLDPayload + ReturnIPLDPayload []eth.ConvertedPayload ReturnErr error iteration int } // Convert method is used to convert a geth statediff.Payload to a IPLDPayload -func (pc *IterativePayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) { +func (pc *IterativePayloadConverter) Convert(payload shared.RawChainData) (shared.ConvertedData, error) { stateDiffPayload, ok := payload.(statediff.Payload) if !ok { return nil, fmt.Errorf("convert expected payload type %T got %T", statediff.Payload{}, payload) diff --git a/pkg/super_node/eth/mocks/publisher.go b/pkg/super_node/eth/mocks/publisher.go index 14887938..a33e1211 100644 --- a/pkg/super_node/eth/mocks/publisher.go +++ b/pkg/super_node/eth/mocks/publisher.go @@ -26,16 +26,16 @@ import ( // IPLDPublisher is the underlying struct for the Publisher interface type IPLDPublisher struct { - PassedIPLDPayload eth.IPLDPayload + PassedIPLDPayload eth.ConvertedPayload ReturnCIDPayload *eth.CIDPayload ReturnErr error } // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload -func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) { - ipldPayload, ok := payload.(eth.IPLDPayload) +func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) { + ipldPayload, ok := payload.(eth.ConvertedPayload) if !ok { - return nil, fmt.Errorf("publish expected payload type %T got %T", ð.IPLDPayload{}, payload) + return nil, fmt.Errorf("publish expected payload type %T got %T", ð.ConvertedPayload{}, payload) } pub.PassedIPLDPayload = ipldPayload return pub.ReturnCIDPayload, pub.ReturnErr @@ -43,17 +43,17 @@ func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForI // IterativeIPLDPublisher is the underlying struct for the Publisher interface; used in testing type IterativeIPLDPublisher struct { - PassedIPLDPayload []eth.IPLDPayload + PassedIPLDPayload []eth.ConvertedPayload ReturnCIDPayload []*eth.CIDPayload ReturnErr error iteration int } // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload -func (pub *IterativeIPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) { - ipldPayload, ok := payload.(eth.IPLDPayload) +func (pub *IterativeIPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) { + ipldPayload, ok := payload.(eth.ConvertedPayload) if !ok { - return nil, fmt.Errorf("publish expected payload type %T got %T", ð.IPLDPayload{}, payload) + return nil, fmt.Errorf("publish expected payload type %T got %T", ð.ConvertedPayload{}, payload) } pub.PassedIPLDPayload = append(pub.PassedIPLDPayload, ipldPayload) if len(pub.ReturnCIDPayload) < pub.iteration+1 { diff --git a/pkg/super_node/eth/mocks/test_data.go b/pkg/super_node/eth/mocks/test_data.go index f9398cf2..e45a4949 100644 --- a/pkg/super_node/eth/mocks/test_data.go +++ b/pkg/super_node/eth/mocks/test_data.go @@ -23,6 +23,9 @@ import ( "math/big" rand2 "math/rand" + "github.com/multiformats/go-multihash" + "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" @@ -233,7 +236,7 @@ var ( TotalDifficulty: big.NewInt(1337), } - MockIPLDPayload = eth.IPLDPayload{ + MockConvertedPayload = eth.ConvertedPayload{ TotalDifficulty: big.NewInt(1337), Block: MockBlock, Receipts: MockReceipts, @@ -293,44 +296,55 @@ var ( }, }, } + headerCID, _ = ipld.RawdataToCid(ipld.MEthHeader, MockHeaderRlp, multihash.KECCAK_256) + trx1CID, _ = ipld.RawdataToCid(ipld.MEthTx, MockTransactions.GetRlp(0), multihash.KECCAK_256) + trx2CID, _ = ipld.RawdataToCid(ipld.MEthTx, MockTransactions.GetRlp(1), multihash.KECCAK_256) + rct1CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, MockReceipts.GetRlp(0), multihash.KECCAK_256) + rct2CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, MockReceipts.GetRlp(1), multihash.KECCAK_256) + state1CID, _ = ipld.RawdataToCid(ipld.MEthStateTrie, ValueBytes, multihash.KECCAK_256) + state2CID, _ = ipld.RawdataToCid(ipld.MEthStateTrie, AnotherValueBytes, multihash.KECCAK_256) + storageCID, _ = ipld.RawdataToCid(ipld.MEthStorageTrie, StorageValue, multihash.KECCAK_256) - MockIPLDWrapper = ð.IPLDWrapper{ + HeaderIPLD, _ = blocks.NewBlockWithCid(MockHeaderRlp, headerCID) + Trx1IPLD, _ = blocks.NewBlockWithCid(MockTransactions.GetRlp(0), trx1CID) + Trx2IPLD, _ = blocks.NewBlockWithCid(MockTransactions.GetRlp(1), trx2CID) + Rct1IPLD, _ = blocks.NewBlockWithCid(MockReceipts.GetRlp(0), rct1CID) + Rct2IPLD, _ = blocks.NewBlockWithCid(MockReceipts.GetRlp(1), rct2CID) + State1IPLD, _ = blocks.NewBlockWithCid(ValueBytes, state1CID) + State2IPLD, _ = blocks.NewBlockWithCid(AnotherValueBytes, state2CID) + StorageIPLD, _ = blocks.NewBlockWithCid(StorageValue, storageCID) + + MockIPLDs = eth.IPLDs{ BlockNumber: big.NewInt(1), - Headers: []blocks.Block{ - blocks.NewBlock(MockHeaderRlp), + Headers: [][]byte{ + HeaderIPLD.RawData(), }, - Transactions: []blocks.Block{ - blocks.NewBlock(MockTransactions.GetRlp(0)), - blocks.NewBlock(MockTransactions.GetRlp(1)), + Transactions: [][]byte{ + Trx1IPLD.RawData(), + Trx2IPLD.RawData(), }, - Receipts: []blocks.Block{ - blocks.NewBlock(MockReceipts.GetRlp(0)), - blocks.NewBlock(MockReceipts.GetRlp(1)), + Receipts: [][]byte{ + Rct1IPLD.RawData(), + Rct2IPLD.RawData(), }, - StateNodes: map[common.Hash]blocks.Block{ - ContractLeafKey: blocks.NewBlock(ValueBytes), - AnotherContractLeafKey: blocks.NewBlock(AnotherValueBytes), - }, - StorageNodes: map[common.Hash]map[common.Hash]blocks.Block{ - ContractLeafKey: { - common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001"): blocks.NewBlock(StorageValue), + StateNodes: []eth2.StateNode{ + { + StateTrieKey: ContractLeafKey, + Leaf: true, + IPLD: State1IPLD.RawData(), + }, + { + StateTrieKey: AnotherContractLeafKey, + Leaf: true, + IPLD: State2IPLD.RawData(), }, }, - } - - MockSeedNodePayload = eth2.StreamResponse{ - BlockNumber: big.NewInt(1), - HeadersRlp: [][]byte{MockHeaderRlp}, - UnclesRlp: [][]byte{}, - TransactionsRlp: [][]byte{MockTransactions.GetRlp(0), MockTransactions.GetRlp(1)}, - ReceiptsRlp: [][]byte{MockTransactions.GetRlp(0), MockTransactions.GetRlp(1)}, - StateNodesRlp: map[common.Hash][]byte{ - ContractLeafKey: ValueBytes, - AnotherContractLeafKey: AnotherValueBytes, - }, - StorageNodesRlp: map[common.Hash]map[common.Hash][]byte{ - ContractLeafKey: { - common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001"): StorageValue, + StorageNodes: []eth2.StorageNode{ + { + StateTrieKey: ContractLeafKey, + StorageTrieKey: common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001"), + Leaf: true, + IPLD: StorageIPLD.RawData(), }, }, } diff --git a/pkg/super_node/eth/publisher.go b/pkg/super_node/eth/publisher.go index f584893d..5d2b142f 100644 --- a/pkg/super_node/eth/publisher.go +++ b/pkg/super_node/eth/publisher.go @@ -53,10 +53,10 @@ func NewIPLDPublisher(ipfsPath string) (*IPLDPublisher, error) { } // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload -func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) { - ipldPayload, ok := payload.(IPLDPayload) +func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) { + ipldPayload, ok := payload.(ConvertedPayload) if !ok { - return nil, fmt.Errorf("eth publisher expected payload type %T got %T", IPLDPayload{}, payload) + return nil, fmt.Errorf("eth publisher expected payload type %T got %T", ConvertedPayload{}, payload) } // Process and publish headers headerCid, err := pub.publishHeader(ipldPayload.Block.Header()) diff --git a/pkg/super_node/eth/publisher_test.go b/pkg/super_node/eth/publisher_test.go index e7292f49..542f2040 100644 --- a/pkg/super_node/eth/publisher_test.go +++ b/pkg/super_node/eth/publisher_test.go @@ -48,8 +48,8 @@ var _ = Describe("Publisher", func() { mockHeaderDagPutter.CIDsToReturn = []string{"mockHeaderCID"} mockTrxDagPutter.CIDsToReturn = []string{"mockTrxCID1", "mockTrxCID2"} mockRctDagPutter.CIDsToReturn = []string{"mockRctCID1", "mockRctCID2"} - val1 := common.BytesToHash(mocks.MockIPLDPayload.StateNodes[0].Value) - val2 := common.BytesToHash(mocks.MockIPLDPayload.StateNodes[1].Value) + val1 := common.BytesToHash(mocks.MockConvertedPayload.StateNodes[0].Value) + val2 := common.BytesToHash(mocks.MockConvertedPayload.StateNodes[1].Value) mockStateDagPutter.CIDsToReturn = map[common.Hash][]string{ val1: {"mockStateCID1"}, val2: {"mockStateCID2"}, @@ -62,11 +62,11 @@ var _ = Describe("Publisher", func() { StatePutter: mockStateDagPutter, StoragePutter: mockStorageDagPutter, } - payload, err := publisher.Publish(mocks.MockIPLDPayload) + payload, err := publisher.Publish(mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) cidPayload, ok := payload.(*eth.CIDPayload) Expect(ok).To(BeTrue()) - Expect(cidPayload.HeaderCID.TotalDifficulty).To(Equal(mocks.MockIPLDPayload.TotalDifficulty.String())) + Expect(cidPayload.HeaderCID.TotalDifficulty).To(Equal(mocks.MockConvertedPayload.TotalDifficulty.String())) Expect(cidPayload.HeaderCID.BlockNumber).To(Equal(mocks.MockCIDPayload.HeaderCID.BlockNumber)) Expect(cidPayload.HeaderCID.BlockHash).To(Equal(mocks.MockCIDPayload.HeaderCID.BlockHash)) Expect(cidPayload.UncleCIDs).To(Equal(mocks.MockCIDPayload.UncleCIDs)) diff --git a/pkg/super_node/eth/resolver.go b/pkg/super_node/eth/resolver.go deleted file mode 100644 index acc038ff..00000000 --- a/pkg/super_node/eth/resolver.go +++ /dev/null @@ -1,78 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package eth - -import ( - "fmt" - - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" - - "github.com/ethereum/go-ethereum/common" - "github.com/ipfs/go-block-format" -) - -// IPLDResolver satisfies the IPLDResolver interface for ethereum -type IPLDResolver struct{} - -// NewIPLDResolver returns a pointer to an IPLDResolver which satisfies the IPLDResolver interface -func NewIPLDResolver() *IPLDResolver { - return &IPLDResolver{} -} - -// Resolve is the exported method for resolving all of the ETH IPLDs packaged in an IpfsBlockWrapper -func (eir *IPLDResolver) Resolve(iplds shared.FetchedIPLDs) (shared.ServerResponse, error) { - ipfsBlocks, ok := iplds.(*IPLDWrapper) - if !ok { - return StreamResponse{}, fmt.Errorf("eth resolver expected iplds type %T got %T", &IPLDWrapper{}, iplds) - } - return StreamResponse{ - BlockNumber: ipfsBlocks.BlockNumber, - HeadersRlp: eir.resolve(ipfsBlocks.Headers), - UnclesRlp: eir.resolve(ipfsBlocks.Uncles), - TransactionsRlp: eir.resolve(ipfsBlocks.Transactions), - ReceiptsRlp: eir.resolve(ipfsBlocks.Receipts), - StateNodesRlp: eir.resolveState(ipfsBlocks.StateNodes), - StorageNodesRlp: eir.resolveStorage(ipfsBlocks.StorageNodes), - }, nil -} - -func (eir *IPLDResolver) resolve(iplds []blocks.Block) [][]byte { - rlps := make([][]byte, 0, len(iplds)) - for _, ipld := range iplds { - rlps = append(rlps, ipld.RawData()) - } - return rlps -} - -func (eir *IPLDResolver) resolveState(iplds map[common.Hash]blocks.Block) map[common.Hash][]byte { - stateNodes := make(map[common.Hash][]byte, len(iplds)) - for key, ipld := range iplds { - stateNodes[key] = ipld.RawData() - } - return stateNodes -} - -func (eir *IPLDResolver) resolveStorage(iplds map[common.Hash]map[common.Hash]blocks.Block) map[common.Hash]map[common.Hash][]byte { - storageNodes := make(map[common.Hash]map[common.Hash][]byte) - for stateKey, storageIPLDs := range iplds { - storageNodes[stateKey] = make(map[common.Hash][]byte) - for storageKey, storageVal := range storageIPLDs { - storageNodes[stateKey][storageKey] = storageVal.RawData() - } - } - return storageNodes -} diff --git a/pkg/super_node/eth/resolver_test.go b/pkg/super_node/eth/resolver_test.go deleted file mode 100644 index 66115aca..00000000 --- a/pkg/super_node/eth/resolver_test.go +++ /dev/null @@ -1,55 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package eth_test - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" - - "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" - "github.com/vulcanize/vulcanizedb/pkg/super_node/eth/mocks" -) - -var ( - resolver *eth.IPLDResolver -) - -var _ = Describe("Resolver", func() { - Describe("ResolveIPLDs", func() { - BeforeEach(func() { - resolver = eth.NewIPLDResolver() - }) - It("Resolves IPLD data to their correct geth data types and packages them to send to requesting transformers", func() { - payload, err := resolver.Resolve(mocks.MockIPLDWrapper) - Expect(err).ToNot(HaveOccurred()) - superNodePayload, ok := payload.(eth.StreamResponse) - Expect(ok).To(BeTrue()) - Expect(superNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) - Expect(superNodePayload.HeadersRlp).To(Equal(mocks.MockSeedNodePayload.HeadersRlp)) - Expect(superNodePayload.UnclesRlp).To(Equal(mocks.MockSeedNodePayload.UnclesRlp)) - Expect(len(superNodePayload.TransactionsRlp)).To(Equal(2)) - Expect(shared.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) - Expect(shared.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) - Expect(len(superNodePayload.ReceiptsRlp)).To(Equal(2)) - Expect(shared.ListContainsBytes(superNodePayload.ReceiptsRlp, mocks.MockReceipts.GetRlp(0))).To(BeTrue()) - Expect(shared.ListContainsBytes(superNodePayload.ReceiptsRlp, mocks.MockReceipts.GetRlp(1))).To(BeTrue()) - Expect(len(superNodePayload.StateNodesRlp)).To(Equal(2)) - Expect(superNodePayload.StorageNodesRlp).To(Equal(mocks.MockSeedNodePayload.StorageNodesRlp)) - }) - }) -}) diff --git a/pkg/super_node/eth/types.go b/pkg/super_node/eth/types.go index 9b4c8314..551c77dc 100644 --- a/pkg/super_node/eth/types.go +++ b/pkg/super_node/eth/types.go @@ -17,18 +17,16 @@ package eth import ( - "encoding/json" "math/big" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/ipfs/go-block-format" ) -// IPLDPayload is a custom type which packages raw ETH data for publishing to IPFS and filtering to subscribers +// ConvertedPayload is a custom type which packages raw ETH data for publishing to IPFS and filtering to subscribers // Returned by PayloadConverter // Passed to IPLDPublisher and ResponseFilterer -type IPLDPayload struct { +type ConvertedPayload struct { TotalDifficulty *big.Int Block *types.Block TxMetaData []TxModel @@ -39,7 +37,7 @@ type IPLDPayload struct { } // Height satisfies the StreamedIPLDs interface -func (i IPLDPayload) Height() int64 { +func (i ConvertedPayload) Height() int64 { return i.Block.Number().Int64() } @@ -75,54 +73,32 @@ type CIDWrapper struct { StorageNodes []StorageNodeWithStateKeyModel } -// IPLDWrapper is used to package raw IPLD block data fetched from IPFS -// Returned by IPLDFetcher -// Passed to IPLDResolver -type IPLDWrapper struct { +// IPLDs is used to package raw IPLD block data fetched from IPFS and returned by the server +// Returned by IPLDFetcher and ResponseFilterer +type IPLDs struct { BlockNumber *big.Int - Headers []blocks.Block - Uncles []blocks.Block - Transactions []blocks.Block - Receipts []blocks.Block - StateNodes map[common.Hash]blocks.Block - StorageNodes map[common.Hash]map[common.Hash]blocks.Block + Headers [][]byte + Uncles [][]byte + Transactions [][]byte + Receipts [][]byte + StateNodes []StateNode + StorageNodes []StorageNode } -// StreamResponse holds the data streamed from the super node eth service to the requesting clients -// Returned by IPLDResolver and ResponseFilterer -// Passed to client subscriptions -type StreamResponse struct { - BlockNumber *big.Int `json:"blockNumber"` - HeadersRlp [][]byte `json:"headersRlp"` - UnclesRlp [][]byte `json:"unclesRlp"` - TransactionsRlp [][]byte `json:"transactionsRlp"` - ReceiptsRlp [][]byte `json:"receiptsRlp"` - StateNodesRlp map[common.Hash][]byte `json:"stateNodesRlp"` - StorageNodesRlp map[common.Hash]map[common.Hash][]byte `json:"storageNodesRlp"` - - encoded []byte - err error +// Height satisfies the StreamedIPLDs interface +func (i IPLDs) Height() int64 { + return i.BlockNumber.Int64() } -// Height satisfies the ServerResponse interface -func (sr StreamResponse) Height() int64 { - return sr.BlockNumber.Int64() +type StateNode struct { + StateTrieKey common.Hash + IPLD []byte + Leaf bool } -func (sr *StreamResponse) ensureEncoded() { - if sr.encoded == nil && sr.err == nil { - sr.encoded, sr.err = json.Marshal(sr) - } -} - -// Length to implement Encoder interface for StateDiff -func (sr *StreamResponse) Length() int { - sr.ensureEncoded() - return len(sr.encoded) -} - -// Encode to implement Encoder interface for StateDiff -func (sr *StreamResponse) Encode() ([]byte, error) { - sr.ensureEncoded() - return sr.encoded, sr.err +type StorageNode struct { + StateTrieKey common.Hash + StorageTrieKey common.Hash + IPLD []byte + Leaf bool } diff --git a/pkg/super_node/service.go b/pkg/super_node/service.go index 3ef2002c..9d78c2b6 100644 --- a/pkg/super_node/service.go +++ b/pkg/super_node/service.go @@ -44,9 +44,9 @@ type SuperNode interface { // APIs(), Protocols(), Start() and Stop() node.Service // Data processing event loop - ProcessData(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.StreamedIPLDs) error + ProcessData(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.ConvertedData) error // Pub-Sub handling event loop - FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.StreamedIPLDs) + FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) // Method to subscribe to the service Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) // Method to unsubscribe from the service @@ -73,8 +73,6 @@ type Service struct { IPLDFetcher shared.IPLDFetcher // Interface for searching and retrieving CIDs from Postgres index Retriever shared.CIDRetriever - // Interface for resolving IPLDs to their data types - Resolver shared.IPLDResolver // Chan the processor uses to subscribe to payloads from the Streamer PayloadChan chan shared.RawChainData // Used to signal shutdown of the service @@ -132,10 +130,6 @@ func NewSuperNode(settings *shared.SuperNodeConfig) (SuperNode, error) { if err != nil { return nil, err } - sn.Resolver, err = NewIPLDResolver(settings.Chain) - if err != nil { - return nil, err - } } sn.QuitChan = settings.Quit sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription) @@ -175,7 +169,7 @@ func (sap *Service) APIs() []rpc.API { // It forwards the converted data to the publishAndIndex process(es) it spins up // If forwards the converted data to a ScreenAndServe process if it there is one listening on the passed screenAndServePayload channel // This continues on no matter if or how many subscribers there are -func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<- shared.StreamedIPLDs) error { +func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<- shared.ConvertedData) error { sub, err := sap.Streamer.Stream(sap.PayloadChan) if err != nil { return err @@ -183,7 +177,7 @@ func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<- wg.Add(1) // Channels for forwarding data to the publishAndIndex workers - publishAndIndexPayload := make(chan shared.StreamedIPLDs, PayloadChanBufferSize) + publishAndIndexPayload := make(chan shared.ConvertedData, PayloadChanBufferSize) // publishAndIndex worker pool to handle publishing and indexing concurrently, while // limiting the number of Postgres connections we can possibly open so as to prevent error for i := 0; i < sap.WorkerPoolSize; i++ { @@ -220,7 +214,7 @@ func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<- // publishAndIndex is spun up by SyncAndConvert and receives converted chain data from that process // it publishes this data to IPFS and indexes their CIDs with useful metadata in Postgres -func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared.StreamedIPLDs) { +func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared.ConvertedData) { go func() { for { select { @@ -243,7 +237,7 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared // It filters and sends this data to any subscribers to the service // This process can be stood up alone, without an screenAndServePayload attached to a SyncAndConvert process // and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only -func (sap *Service) FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.StreamedIPLDs) { +func (sap *Service) FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) { wg.Add(1) go func() { for { @@ -261,7 +255,7 @@ func (sap *Service) FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-c } // filterAndServe filters the payload according to each subscription type and sends to the subscriptions -func (sap *Service) filterAndServe(payload shared.StreamedIPLDs) { +func (sap *Service) filterAndServe(payload shared.ConvertedData) { log.Debugf("Sending payload to subscriptions") sap.Lock() for ty, subs := range sap.Subscriptions { @@ -284,9 +278,14 @@ func (sap *Service) filterAndServe(payload shared.StreamedIPLDs) { sap.closeType(ty) continue } + responseRLP, err := rlp.EncodeToBytes(response) + if err != nil { + log.Error(err) + continue + } for id, sub := range subs { select { - case sub.PayloadChan <- SubscriptionPayload{Data: response, Err: "", Flag: EmptyFlag}: + case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}: log.Debugf("sending super node payload to subscription %s", id) default: log.Infof("unable to send payload to subscription %s; channel has no receiver", id) @@ -372,18 +371,18 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share if empty { continue } - blocksWrapper, err := sap.IPLDFetcher.Fetch(cidWrapper) + response, err := sap.IPLDFetcher.Fetch(cidWrapper) if err != nil { sendNonBlockingErr(sub, fmt.Errorf("IPLD Fetching error at block %d\r%s", i, err.Error())) continue } - backFillIplds, err := sap.Resolver.Resolve(blocksWrapper) + responseRLP, err := rlp.EncodeToBytes(response) if err != nil { - sendNonBlockingErr(sub, fmt.Errorf("IPLD Resolving error at block %d\r%s", i, err.Error())) + log.Error(err) continue } select { - case sub.PayloadChan <- SubscriptionPayload{Data: backFillIplds, Err: "", Flag: EmptyFlag}: + case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}: log.Debugf("sending super node historical data payload to subscription %s", id) default: log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id) @@ -420,7 +419,7 @@ func (sap *Service) Unsubscribe(id rpc.ID) { func (sap *Service) Start(*p2p.Server) error { log.Info("Starting super node service") wg := new(sync.WaitGroup) - payloadChan := make(chan shared.StreamedIPLDs, PayloadChanBufferSize) + payloadChan := make(chan shared.ConvertedData, PayloadChanBufferSize) if err := sap.ProcessData(wg, payloadChan); err != nil { return err } diff --git a/pkg/super_node/service_test.go b/pkg/super_node/service_test.go index 517a4df0..eb1b5c5e 100644 --- a/pkg/super_node/service_test.go +++ b/pkg/super_node/service_test.go @@ -51,7 +51,7 @@ var _ = Describe("Service", func() { ReturnErr: nil, } mockConverter := &mocks.PayloadConverter{ - ReturnIPLDPayload: mocks.MockIPLDPayload, + ReturnIPLDPayload: mocks.MockConvertedPayload, ReturnErr: nil, } processor := &super_node.Service{ @@ -71,7 +71,7 @@ var _ = Describe("Service", func() { Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStateDiffPayload)) Expect(len(mockCidIndexer.PassedCIDPayload)).To(Equal(1)) Expect(mockCidIndexer.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload)) - Expect(mockPublisher.PassedIPLDPayload).To(Equal(mocks.MockIPLDPayload)) + Expect(mockPublisher.PassedIPLDPayload).To(Equal(mocks.MockConvertedPayload)) Expect(mockStreamer.PassedPayloadChan).To(Equal(payloadChan)) }) }) diff --git a/pkg/super_node/shared/intefaces.go b/pkg/super_node/shared/intefaces.go index 2974b7b1..c6d42134 100644 --- a/pkg/super_node/shared/intefaces.go +++ b/pkg/super_node/shared/intefaces.go @@ -32,12 +32,12 @@ type PayloadFetcher interface { // PayloadConverter converts chain-specific payloads into IPLD payloads for publishing type PayloadConverter interface { - Convert(payload RawChainData) (StreamedIPLDs, error) + Convert(payload RawChainData) (ConvertedData, error) } // IPLDPublisher publishes IPLD payloads and returns a CID payload for indexing type IPLDPublisher interface { - Publish(payload StreamedIPLDs) (CIDsForIndexing, error) + Publish(payload ConvertedData) (CIDsForIndexing, error) } // CIDIndexer indexes a CID payload in Postgres @@ -47,7 +47,7 @@ type CIDIndexer interface { // ResponseFilterer applies a filter to an IPLD payload to return a subscription response packet type ResponseFilterer interface { - Filter(filter SubscriptionSettings, payload StreamedIPLDs) (response ServerResponse, err error) + Filter(filter SubscriptionSettings, payload ConvertedData) (response IPLDs, err error) } // CIDRetriever retrieves cids according to a provided filter and returns a CID wrapper @@ -60,12 +60,7 @@ type CIDRetriever interface { // IPLDFetcher uses a CID wrapper to fetch an IPLD wrapper type IPLDFetcher interface { - Fetch(cids CIDsForFetching) (FetchedIPLDs, error) -} - -// IPLDResolver resolves an IPLD wrapper into chain-specific payloads -type IPLDResolver interface { - Resolve(iplds FetchedIPLDs) (ServerResponse, error) + Fetch(cids CIDsForFetching) (IPLDs, error) } // ClientSubscription is a general interface for chain data subscriptions diff --git a/pkg/super_node/shared/mocks/fetcher.go b/pkg/super_node/shared/mocks/payload_fetcher.go similarity index 90% rename from pkg/super_node/shared/mocks/fetcher.go rename to pkg/super_node/shared/mocks/payload_fetcher.go index e9d6cbfa..589a50ee 100644 --- a/pkg/super_node/shared/mocks/fetcher.go +++ b/pkg/super_node/shared/mocks/payload_fetcher.go @@ -23,8 +23,8 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) -// IPLDFetcher mock for tests -type IPLDFetcher struct { +// PayloadFetcher mock for tests +type PayloadFetcher struct { PayloadsToReturn map[uint64]shared.RawChainData FetchErrs map[uint64]error CalledAtBlockHeights [][]uint64 @@ -32,7 +32,7 @@ type IPLDFetcher struct { } // FetchAt mock method -func (fetcher *IPLDFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) { +func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) { if fetcher.PayloadsToReturn == nil { return nil, errors.New("mock StateDiffFetcher needs to be initialized with payloads to return") } diff --git a/pkg/super_node/shared/mocks/retriever.go b/pkg/super_node/shared/mocks/retriever.go index c98a1c32..02af5049 100644 --- a/pkg/super_node/shared/mocks/retriever.go +++ b/pkg/super_node/shared/mocks/retriever.go @@ -21,8 +21,8 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) -// MockCIDRetriever is a mock CID retriever for use in tests -type MockCIDRetriever struct { +// CIDRetriever is a mock CID retriever for use in tests +type CIDRetriever struct { GapsToRetrieve []shared.Gap GapsToRetrieveErr error CalledTimes int @@ -31,34 +31,34 @@ type MockCIDRetriever struct { } // RetrieveCIDs mock method -func (*MockCIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) (shared.CIDsForFetching, bool, error) { +func (*CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) (shared.CIDsForFetching, bool, error) { panic("implement me") } // RetrieveLastBlockNumber mock method -func (*MockCIDRetriever) RetrieveLastBlockNumber() (int64, error) { +func (*CIDRetriever) RetrieveLastBlockNumber() (int64, error) { panic("implement me") } // RetrieveFirstBlockNumber mock method -func (mcr *MockCIDRetriever) RetrieveFirstBlockNumber() (int64, error) { +func (mcr *CIDRetriever) RetrieveFirstBlockNumber() (int64, error) { return mcr.FirstBlockNumberToReturn, mcr.RetrieveFirstBlockNumberErr } // RetrieveGapsInData mock method -func (mcr *MockCIDRetriever) RetrieveGapsInData() ([]shared.Gap, error) { +func (mcr *CIDRetriever) RetrieveGapsInData() ([]shared.Gap, error) { mcr.CalledTimes++ return mcr.GapsToRetrieve, mcr.GapsToRetrieveErr } // SetGapsToRetrieve mock method -func (mcr *MockCIDRetriever) SetGapsToRetrieve(gaps []shared.Gap) { +func (mcr *CIDRetriever) SetGapsToRetrieve(gaps []shared.Gap) { if mcr.GapsToRetrieve == nil { mcr.GapsToRetrieve = make([]shared.Gap, 0) } mcr.GapsToRetrieve = append(mcr.GapsToRetrieve, gaps...) } -func (mcr *MockCIDRetriever) Database() *postgres.DB { +func (mcr *CIDRetriever) Database() *postgres.DB { panic("implement me") } diff --git a/pkg/super_node/shared/types.go b/pkg/super_node/shared/types.go index f89a7a74..e213f7f4 100644 --- a/pkg/super_node/shared/types.go +++ b/pkg/super_node/shared/types.go @@ -20,7 +20,7 @@ package shared type RawChainData interface{} // The concrete type underneath StreamedIPLDs should not be a pointer -type StreamedIPLDs interface { +type ConvertedData interface { Height() int64 } @@ -28,10 +28,7 @@ type CIDsForIndexing interface{} type CIDsForFetching interface{} -type FetchedIPLDs interface{} - -// The concrete type underneath StreamedIPLDs should not be a pointer -type ServerResponse interface { +type IPLDs interface { Height() int64 } diff --git a/pkg/super_node/subscription.go b/pkg/super_node/subscription.go index fc2cb0ec..a1cdb045 100644 --- a/pkg/super_node/subscription.go +++ b/pkg/super_node/subscription.go @@ -20,7 +20,6 @@ import ( "errors" "github.com/ethereum/go-ethereum/rpc" - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) type Flag int32 @@ -40,9 +39,10 @@ type Subscription struct { // SubscriptionPayload is the struct for a super node stream payload // It carries data of a type specific to the chain being supported/queried and an error message type SubscriptionPayload struct { - Data shared.ServerResponse `json:"data"` // e.g. for Ethereum eth.StreamPayload - Err string `json:"err"` // field for error - Flag Flag `json:"flag"` // field for message + Data []byte `json:"data"` // e.g. for Ethereum rlp serialized eth.StreamPayload + Height int64 `json:"height"` + Err string `json:"err"` // field for error + Flag Flag `json:"flag"` // field for message } func (sp SubscriptionPayload) Error() error { diff --git a/pkg/super_node/watcher/config.go b/pkg/watcher/config.go similarity index 100% rename from pkg/super_node/watcher/config.go rename to pkg/watcher/config.go diff --git a/pkg/super_node/watcher/constructors.go b/pkg/watcher/constructors.go similarity index 96% rename from pkg/super_node/watcher/constructors.go rename to pkg/watcher/constructors.go index d8b14588..3e22bec9 100644 --- a/pkg/super_node/watcher/constructors.go +++ b/pkg/watcher/constructors.go @@ -24,7 +24,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/postgres" shared2 "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" "github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/eth" - "github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/shared" + "github.com/vulcanize/vulcanizedb/pkg/watcher/shared" ) // NewSuperNodeStreamer returns a new shared.SuperNodeStreamer diff --git a/pkg/super_node/watcher/eth/repository.go b/pkg/watcher/eth/repository.go similarity index 73% rename from pkg/super_node/watcher/eth/repository.go rename to pkg/watcher/eth/repository.go index 7ee0199c..0c37f9c0 100644 --- a/pkg/super_node/watcher/eth/repository.go +++ b/pkg/watcher/eth/repository.go @@ -17,9 +17,11 @@ package eth import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/vulcanize/vulcanizedb/pkg/postgres" "github.com/vulcanize/vulcanizedb/pkg/super_node" - "github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/shared" + "github.com/vulcanize/vulcanizedb/pkg/watcher/shared" ) // Repository is the underlying struct for satisfying the shared.Repository interface for eth @@ -56,3 +58,27 @@ func (r *Repository) GetQueueData(height int64, hash string) (super_node.Subscri func (r *Repository) ReadyData(payload super_node.SubscriptionPayload) error { panic("implement me") } + +func (r *Repository) readyHeader(header *types.Header) error { + panic("implement me") +} + +func (r *Repository) readyUncle(uncle *types.Header) error { + panic("implement me") +} + +func (r *Repository) readyTxs(transactions types.Transactions) error { + panic("implement me") +} + +func (r *Repository) readyRcts(receipts types.Receipts) error { + panic("implement me") +} + +func (r *Repository) readyState(stateNodes map[common.Address][]byte) error { + panic("implement me") +} + +func (r *Repository) readyStorage(storageNodes map[common.Address]map[common.Address][]byte) error { + panic("implement me") +} diff --git a/pkg/super_node/watcher/service.go b/pkg/watcher/service.go similarity index 91% rename from pkg/super_node/watcher/service.go rename to pkg/watcher/service.go index 2cbbcbc2..a15b6831 100644 --- a/pkg/super_node/watcher/service.go +++ b/pkg/watcher/service.go @@ -24,12 +24,12 @@ import ( "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/super_node" - "github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/shared" "github.com/vulcanize/vulcanizedb/pkg/wasm" + "github.com/vulcanize/vulcanizedb/pkg/watcher/shared" ) -// SuperNodeWatcher is the top level interface for watching data from super node -type SuperNodeWatcher interface { +// Watcher is the top level interface for watching data from super node +type Watcher interface { Init() error Watch(wg *sync.WaitGroup) error } @@ -56,8 +56,8 @@ type Service struct { backFilling *int32 // 0 => not backfilling; 1 => backfilling } -// NewSuperNodeWatcher returns a new Service which satisfies the SuperNodeWatcher interface -func NewSuperNodeWatcher(c Config, quitChan chan bool) (SuperNodeWatcher, error) { +// NewWatcher returns a new Service which satisfies the Watcher interface +func NewWatcher(c Config, quitChan chan bool) (Watcher, error) { repo, err := NewRepository(c.SubscriptionConfig.ChainType(), c.DB, c.TriggerFunctions) if err != nil { return nil, err @@ -119,7 +119,7 @@ func (s *Service) combinedQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscriptio logrus.Error(payload.Error()) continue } - if payload.Data.Height() == atomic.LoadInt64(s.payloadIndex) { + if payload.Height == atomic.LoadInt64(s.payloadIndex) { // If the data is at our current index it is ready to be processed; add it to the ready data queue if err := s.Repository.ReadyData(payload); err != nil { logrus.Error(err) @@ -133,7 +133,7 @@ func (s *Service) combinedQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscriptio case err := <-sub.Err(): logrus.Error(err) case <-s.QuitChan: - logrus.Info("WatchContract shutting down") + logrus.Info("Watcher shutting down") wg.Done() return } @@ -168,7 +168,7 @@ func (s *Service) backFillOnlyQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscri case err := <-sub.Err(): logrus.Error(err) case <-s.QuitChan: - logrus.Info("WatchContract shutting down") + logrus.Info("Watcher shutting down") wg.Done() return } diff --git a/pkg/super_node/watcher/shared/interfaces.go b/pkg/watcher/shared/interfaces.go similarity index 100% rename from pkg/super_node/watcher/shared/interfaces.go rename to pkg/watcher/shared/interfaces.go