diff --git a/pkg/ipfs/converter_test.go b/pkg/ipfs/converter_test.go index 016244ec..5fd35009 100644 --- a/pkg/ipfs/converter_test.go +++ b/pkg/ipfs/converter_test.go @@ -45,6 +45,7 @@ var _ = Describe("Converter", func() { Expect(converterPayload.TrxMetaData).To(Equal(mocks.MockTrxMeta)) Expect(converterPayload.ReceiptMetaData).To(Equal(mocks.MockRctMeta)) }) + It(" Throws an error if the wrong chain config is used", func() { converter := ipfs.NewPayloadConverter(params.TestnetChainConfig) _, err := converter.Convert(mocks.MockStateDiffPayload) diff --git a/pkg/ipfs/publisher_test.go b/pkg/ipfs/publisher_test.go index a9f03183..caad0e4d 100644 --- a/pkg/ipfs/publisher_test.go +++ b/pkg/ipfs/publisher_test.go @@ -40,6 +40,7 @@ var _ = Describe("Publisher", func() { mockStateDagPutter = new(mocks.IncrementingDagPutter) mockStorageDagPutter = new(mocks.IncrementingDagPutter) }) + Describe("Publish", func() { It("Publishes the passed IPLDPayload objects to IPFS and returns a CIDPayload for indexing", func() { mockHeaderDagPutter.CIDsToReturn = []string{"mockHeaderCID"} diff --git a/pkg/ipfs/resolver_test.go b/pkg/ipfs/resolver_test.go index 17e1a4fb..3f8a4163 100644 --- a/pkg/ipfs/resolver_test.go +++ b/pkg/ipfs/resolver_test.go @@ -19,10 +19,10 @@ package ipfs_test import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/seed_node" "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" + "github.com/vulcanize/vulcanizedb/pkg/seed_node" ) var ( @@ -31,11 +31,13 @@ var ( var _ = Describe("Resolver", func() { Describe("ResolveIPLDs", func() { - It("Resolves IPLD data to their correct geth data types and packages them to send to requesting transformers", func() { + BeforeEach(func() { resolver = ipfs.NewIPLDResolver() + }) + It("Resolves IPLD data to their correct geth data types and packages them to send to requesting transformers", func() { seedNodePayload, err := resolver.ResolveIPLDs(mocks.MockIPLDWrapper) Expect(err).ToNot(HaveOccurred()) - Expect(seedNodePayload.BlockNumber.Int64()).To(Equal(int64(1))) + Expect(seedNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) Expect(seedNodePayload.HeadersRlp).To(Equal(mocks.MockSeeNodePayload.HeadersRlp)) Expect(seedNodePayload.UnclesRlp).To(Equal(mocks.MockSeeNodePayload.UnclesRlp)) Expect(len(seedNodePayload.TransactionsRlp)).To(Equal(2)) diff --git a/pkg/seed_node/filterer.go b/pkg/seed_node/filterer.go index d31d6fce..0f93e97c 100644 --- a/pkg/seed_node/filterer.go +++ b/pkg/seed_node/filterer.go @@ -165,14 +165,17 @@ func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics, wantedContrac } } } - } else { - // We only keep receipts with logs of interest if we are interested in that contract - for _, wantedTopic := range wantedTopics { - for _, actualTopic := range actualTopics { - if wantedTopic == actualTopic { - for _, wantedContract := range wantedContracts { - if wantedContract == actualContract { - return true + } else { // We keep receipts that belong to one of the specified contracts and have logs with topics if we aren't filtering on topics + for _, wantedContract := range wantedContracts { + if wantedContract == actualContract { + if len(wantedTopics) == 0 { + return true + } else { // Or if we have contracts and topics to filter on we only keep receipts that satisfy both conditions + for _, wantedTopic := range wantedTopics { + for _, actualTopic := range actualTopics { + if wantedTopic == actualTopic { + return true + } } } } diff --git a/pkg/seed_node/filterer_test.go b/pkg/seed_node/filterer_test.go new file mode 100644 index 00000000..cb99ebeb --- /dev/null +++ b/pkg/seed_node/filterer_test.go @@ -0,0 +1,153 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package seed_node_test + +import ( + "bytes" + + "github.com/ethereum/go-ethereum/core/types" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" + "github.com/vulcanize/vulcanizedb/pkg/seed_node" +) + +var ( + filterer seed_node.ResponseFilterer + expectedRctForStorageRLP1 []byte + expectedRctForStorageRLP2 []byte +) + +var _ = Describe("Filterer", func() { + Describe("FilterResponse", func() { + BeforeEach(func() { + filterer = seed_node.NewResponseFilterer() + expectedRctForStorageRLP1 = getReceiptForStorageRLP(mocks.MockReceipts, 0) + expectedRctForStorageRLP2 = getReceiptForStorageRLP(mocks.MockReceipts, 1) + }) + + It("Transcribes all the data from the IPLDPayload into the SeedNodePayload if given an open filter", func() { + seedNodePayload, err := filterer.FilterResponse(openFilter, *mocks.MockIPLDPayload) + Expect(err).ToNot(HaveOccurred()) + Expect(seedNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) + Expect(seedNodePayload.HeadersRlp).To(Equal(mocks.MockSeeNodePayload.HeadersRlp)) + Expect(seedNodePayload.UnclesRlp).To(Equal(mocks.MockSeeNodePayload.UnclesRlp)) + Expect(len(seedNodePayload.TransactionsRlp)).To(Equal(2)) + Expect(seed_node.ListContainsBytes(seedNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) + Expect(seed_node.ListContainsBytes(seedNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) + Expect(len(seedNodePayload.ReceiptsRlp)).To(Equal(2)) + Expect(seed_node.ListContainsBytes(seedNodePayload.ReceiptsRlp, expectedRctForStorageRLP1)).To(BeTrue()) + Expect(seed_node.ListContainsBytes(seedNodePayload.ReceiptsRlp, expectedRctForStorageRLP2)).To(BeTrue()) + Expect(len(seedNodePayload.StateNodesRlp)).To(Equal(2)) + Expect(seedNodePayload.StateNodesRlp[mocks.ContractLeafKey]).To(Equal(mocks.ValueBytes)) + Expect(seedNodePayload.StateNodesRlp[mocks.AnotherContractLeafKey]).To(Equal(mocks.AnotherValueBytes)) + Expect(seedNodePayload.StorageNodesRlp).To(Equal(mocks.MockSeeNodePayload.StorageNodesRlp)) + }) + + It("Applies filters from the provided config.Subscription", func() { + seedNodePayload1, err := filterer.FilterResponse(rctContractFilter, *mocks.MockIPLDPayload) + Expect(err).ToNot(HaveOccurred()) + Expect(seedNodePayload1.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) + Expect(len(seedNodePayload1.HeadersRlp)).To(Equal(0)) + Expect(len(seedNodePayload1.UnclesRlp)).To(Equal(0)) + Expect(len(seedNodePayload1.TransactionsRlp)).To(Equal(0)) + Expect(len(seedNodePayload1.StorageNodesRlp)).To(Equal(0)) + Expect(len(seedNodePayload1.StateNodesRlp)).To(Equal(0)) + Expect(len(seedNodePayload1.ReceiptsRlp)).To(Equal(1)) + Expect(seedNodePayload1.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2)) + + seedNodePayload2, err := filterer.FilterResponse(rctTopicsFilter, *mocks.MockIPLDPayload) + Expect(err).ToNot(HaveOccurred()) + Expect(seedNodePayload2.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) + Expect(len(seedNodePayload2.HeadersRlp)).To(Equal(0)) + Expect(len(seedNodePayload2.UnclesRlp)).To(Equal(0)) + Expect(len(seedNodePayload2.TransactionsRlp)).To(Equal(0)) + Expect(len(seedNodePayload2.StorageNodesRlp)).To(Equal(0)) + Expect(len(seedNodePayload2.StateNodesRlp)).To(Equal(0)) + Expect(len(seedNodePayload2.ReceiptsRlp)).To(Equal(1)) + Expect(seedNodePayload2.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP1)) + + seedNodePayload3, err := filterer.FilterResponse(rctTopicsAndContractFilter, *mocks.MockIPLDPayload) + Expect(err).ToNot(HaveOccurred()) + Expect(seedNodePayload3.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) + Expect(len(seedNodePayload3.HeadersRlp)).To(Equal(0)) + Expect(len(seedNodePayload3.UnclesRlp)).To(Equal(0)) + Expect(len(seedNodePayload3.TransactionsRlp)).To(Equal(0)) + Expect(len(seedNodePayload3.StorageNodesRlp)).To(Equal(0)) + Expect(len(seedNodePayload3.StateNodesRlp)).To(Equal(0)) + Expect(len(seedNodePayload3.ReceiptsRlp)).To(Equal(1)) + Expect(seedNodePayload3.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP1)) + + seedNodePayload4, err := filterer.FilterResponse(rctContractsAndTopicFilter, *mocks.MockIPLDPayload) + Expect(err).ToNot(HaveOccurred()) + Expect(seedNodePayload4.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) + Expect(len(seedNodePayload4.HeadersRlp)).To(Equal(0)) + Expect(len(seedNodePayload4.UnclesRlp)).To(Equal(0)) + Expect(len(seedNodePayload4.TransactionsRlp)).To(Equal(0)) + Expect(len(seedNodePayload4.StorageNodesRlp)).To(Equal(0)) + Expect(len(seedNodePayload4.StateNodesRlp)).To(Equal(0)) + Expect(len(seedNodePayload4.ReceiptsRlp)).To(Equal(1)) + Expect(seedNodePayload4.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2)) + + seedNodePayload5, err := filterer.FilterResponse(rctsForAllCollectedTrxs, *mocks.MockIPLDPayload) + Expect(err).ToNot(HaveOccurred()) + Expect(seedNodePayload5.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) + Expect(len(seedNodePayload5.HeadersRlp)).To(Equal(0)) + Expect(len(seedNodePayload5.UnclesRlp)).To(Equal(0)) + Expect(len(seedNodePayload5.TransactionsRlp)).To(Equal(2)) + Expect(seed_node.ListContainsBytes(seedNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) + Expect(seed_node.ListContainsBytes(seedNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) + Expect(len(seedNodePayload5.StorageNodesRlp)).To(Equal(0)) + Expect(len(seedNodePayload5.StateNodesRlp)).To(Equal(0)) + Expect(len(seedNodePayload5.ReceiptsRlp)).To(Equal(2)) + Expect(seed_node.ListContainsBytes(seedNodePayload5.ReceiptsRlp, expectedRctForStorageRLP1)).To(BeTrue()) + Expect(seed_node.ListContainsBytes(seedNodePayload5.ReceiptsRlp, expectedRctForStorageRLP2)).To(BeTrue()) + + seedNodePayload6, err := filterer.FilterResponse(rctsForSelectCollectedTrxs, *mocks.MockIPLDPayload) + Expect(err).ToNot(HaveOccurred()) + Expect(seedNodePayload6.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) + Expect(len(seedNodePayload6.HeadersRlp)).To(Equal(0)) + Expect(len(seedNodePayload6.UnclesRlp)).To(Equal(0)) + Expect(len(seedNodePayload6.TransactionsRlp)).To(Equal(1)) + Expect(seed_node.ListContainsBytes(seedNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) + Expect(len(seedNodePayload6.StorageNodesRlp)).To(Equal(0)) + Expect(len(seedNodePayload6.StateNodesRlp)).To(Equal(0)) + Expect(len(seedNodePayload6.ReceiptsRlp)).To(Equal(1)) + Expect(seedNodePayload4.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2)) + + seedNodePayload7, err := filterer.FilterResponse(stateFilter, *mocks.MockIPLDPayload) + Expect(err).ToNot(HaveOccurred()) + Expect(seedNodePayload7.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) + Expect(len(seedNodePayload7.HeadersRlp)).To(Equal(0)) + Expect(len(seedNodePayload7.UnclesRlp)).To(Equal(0)) + Expect(len(seedNodePayload7.TransactionsRlp)).To(Equal(0)) + Expect(len(seedNodePayload7.StorageNodesRlp)).To(Equal(0)) + Expect(len(seedNodePayload7.ReceiptsRlp)).To(Equal(0)) + Expect(len(seedNodePayload7.StateNodesRlp)).To(Equal(1)) + Expect(seedNodePayload7.StateNodesRlp[mocks.ContractLeafKey]).To(Equal(mocks.ValueBytes)) + }) + }) +}) + +func getReceiptForStorageRLP(receipts types.Receipts, i int) []byte { + receiptForStorage := (*types.ReceiptForStorage)(receipts[i]) + receiptBuffer := new(bytes.Buffer) + err := receiptForStorage.EncodeRLP(receiptBuffer) + Expect(err).ToNot(HaveOccurred()) + return receiptBuffer.Bytes() +} diff --git a/pkg/seed_node/repository_test.go b/pkg/seed_node/repository_test.go index 0e85404e..9eb6bdbb 100644 --- a/pkg/seed_node/repository_test.go +++ b/pkg/seed_node/repository_test.go @@ -41,6 +41,7 @@ var _ = Describe("Repository", func() { AfterEach(func() { seed_node.TearDownDB(db) }) + Describe("Index", func() { It("Indexes CIDs and related metadata into vulcanizedb", func() { err = repo.Index(mocks.MockCIDPayload) diff --git a/pkg/seed_node/retriever_test.go b/pkg/seed_node/retriever_test.go index 7075d08b..5789fc5c 100644 --- a/pkg/seed_node/retriever_test.go +++ b/pkg/seed_node/retriever_test.go @@ -29,7 +29,151 @@ import ( ) var ( - retriever seed_node.CIDRetriever + retriever seed_node.CIDRetriever + openFilter = config.Subscription{ + StartingBlock: big.NewInt(0), + EndingBlock: big.NewInt(1), + HeaderFilter: config.HeaderFilter{}, + TrxFilter: config.TrxFilter{}, + ReceiptFilter: config.ReceiptFilter{}, + StateFilter: config.StateFilter{}, + StorageFilter: config.StorageFilter{}, + } + rctContractFilter = config.Subscription{ + StartingBlock: big.NewInt(0), + EndingBlock: big.NewInt(1), + HeaderFilter: config.HeaderFilter{ + Off: true, + }, + TrxFilter: config.TrxFilter{ + Off: true, + }, + ReceiptFilter: config.ReceiptFilter{ + Contracts: []string{"0x0000000000000000000000000000000000000001"}, + }, + StateFilter: config.StateFilter{ + Off: true, + }, + StorageFilter: config.StorageFilter{ + Off: true, + }, + } + rctTopicsFilter = config.Subscription{ + StartingBlock: big.NewInt(0), + EndingBlock: big.NewInt(1), + HeaderFilter: config.HeaderFilter{ + Off: true, + }, + TrxFilter: config.TrxFilter{ + Off: true, + }, + ReceiptFilter: config.ReceiptFilter{ + Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000004"}, + }, + StateFilter: config.StateFilter{ + Off: true, + }, + StorageFilter: config.StorageFilter{ + Off: true, + }, + } + rctTopicsAndContractFilter = config.Subscription{ + StartingBlock: big.NewInt(0), + EndingBlock: big.NewInt(1), + HeaderFilter: config.HeaderFilter{ + Off: true, + }, + TrxFilter: config.TrxFilter{ + Off: true, + }, + ReceiptFilter: config.ReceiptFilter{ + Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000004", "0x0000000000000000000000000000000000000000000000000000000000000005"}, + Contracts: []string{"0x0000000000000000000000000000000000000000"}, + }, + StateFilter: config.StateFilter{ + Off: true, + }, + StorageFilter: config.StorageFilter{ + Off: true, + }, + } + rctContractsAndTopicFilter = config.Subscription{ + StartingBlock: big.NewInt(0), + EndingBlock: big.NewInt(1), + HeaderFilter: config.HeaderFilter{ + Off: true, + }, + TrxFilter: config.TrxFilter{ + Off: true, + }, + ReceiptFilter: config.ReceiptFilter{ + Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000005"}, + Contracts: []string{"0x0000000000000000000000000000000000000000", "0x0000000000000000000000000000000000000001"}, + }, + StateFilter: config.StateFilter{ + Off: true, + }, + StorageFilter: config.StorageFilter{ + Off: true, + }, + } + rctsForAllCollectedTrxs = config.Subscription{ + StartingBlock: big.NewInt(0), + EndingBlock: big.NewInt(1), + HeaderFilter: config.HeaderFilter{ + Off: true, + }, + TrxFilter: config.TrxFilter{}, // Trx filter open so we will collect all trxs, therefore we will also collect all corresponding rcts despite rct filter + ReceiptFilter: config.ReceiptFilter{ + Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000006"}, // Topic isn't one of the topics we have + Contracts: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have + }, + StateFilter: config.StateFilter{ + Off: true, + }, + StorageFilter: config.StorageFilter{ + Off: true, + }, + } + rctsForSelectCollectedTrxs = config.Subscription{ + StartingBlock: big.NewInt(0), + EndingBlock: big.NewInt(1), + HeaderFilter: config.HeaderFilter{ + Off: true, + }, + TrxFilter: config.TrxFilter{ + Dst: []string{"0x0000000000000000000000000000000000000001"}, // We only filter for one of the trxs so we will only get the one corresponding receipt + }, + ReceiptFilter: config.ReceiptFilter{ + Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000006"}, // Topic isn't one of the topics we have + Contracts: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have + }, + StateFilter: config.StateFilter{ + Off: true, + }, + StorageFilter: config.StorageFilter{ + Off: true, + }, + } + stateFilter = config.Subscription{ + StartingBlock: big.NewInt(0), + EndingBlock: big.NewInt(1), + HeaderFilter: config.HeaderFilter{ + Off: true, + }, + TrxFilter: config.TrxFilter{ + Off: true, + }, + ReceiptFilter: config.ReceiptFilter{ + Off: true, + }, + StateFilter: config.StateFilter{ + Addresses: []string{mocks.Address.Hex()}, + }, + StorageFilter: config.StorageFilter{ + Off: true, + }, + } ) var _ = Describe("Retriever", func() { @@ -44,16 +188,9 @@ var _ = Describe("Retriever", func() { AfterEach(func() { seed_node.TearDownDB(db) }) + Describe("RetrieveCIDs", func() { - It("Retrieves the CIDs specified by the provided filtering metadata", func() { - openFilter := config.Subscription{ - StartingBlock: big.NewInt(0), - HeaderFilter: config.HeaderFilter{}, - TrxFilter: config.TrxFilter{}, - ReceiptFilter: config.ReceiptFilter{}, - StateFilter: config.StateFilter{}, - StorageFilter: config.StorageFilter{}, - } + It("Retrieves all CIDs for the given blocknumber when provided an open filter", func() { cidWrapper, err := retriever.RetrieveCIDs(openFilter, 1) Expect(err).ToNot(HaveOccurred()) Expect(cidWrapper.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) @@ -83,24 +220,6 @@ var _ = Describe("Retriever", func() { Describe("RetrieveCIDs", func() { It("Applies filters from the provided config.Subscription", func() { - rctContractFilter := config.Subscription{ - StartingBlock: big.NewInt(0), - HeaderFilter: config.HeaderFilter{ - Off: true, - }, - TrxFilter: config.TrxFilter{ - Off: true, - }, - ReceiptFilter: config.ReceiptFilter{ - Contracts: []string{"0x0000000000000000000000000000000000000001"}, - }, - StateFilter: config.StateFilter{ - Off: true, - }, - StorageFilter: config.StorageFilter{ - Off: true, - }, - } cidWrapper1, err := retriever.RetrieveCIDs(rctContractFilter, 1) Expect(err).ToNot(HaveOccurred()) Expect(cidWrapper1.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) @@ -111,24 +230,6 @@ var _ = Describe("Retriever", func() { Expect(len(cidWrapper1.Receipts)).To(Equal(1)) Expect(cidWrapper1.Receipts[0]).To(Equal("mockRctCID2")) - rctTopicsFilter := config.Subscription{ - StartingBlock: big.NewInt(0), - HeaderFilter: config.HeaderFilter{ - Off: true, - }, - TrxFilter: config.TrxFilter{ - Off: true, - }, - ReceiptFilter: config.ReceiptFilter{ - Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000004"}, - }, - StateFilter: config.StateFilter{ - Off: true, - }, - StorageFilter: config.StorageFilter{ - Off: true, - }, - } cidWrapper2, err := retriever.RetrieveCIDs(rctTopicsFilter, 1) Expect(err).ToNot(HaveOccurred()) Expect(cidWrapper2.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) @@ -139,25 +240,6 @@ var _ = Describe("Retriever", func() { Expect(len(cidWrapper2.Receipts)).To(Equal(1)) Expect(cidWrapper2.Receipts[0]).To(Equal("mockRctCID1")) - rctTopicsAndContractFilter := config.Subscription{ - StartingBlock: big.NewInt(0), - HeaderFilter: config.HeaderFilter{ - Off: true, - }, - TrxFilter: config.TrxFilter{ - Off: true, - }, - ReceiptFilter: config.ReceiptFilter{ - Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000004", "0x0000000000000000000000000000000000000000000000000000000000000005"}, - Contracts: []string{"0x0000000000000000000000000000000000000000"}, - }, - StateFilter: config.StateFilter{ - Off: true, - }, - StorageFilter: config.StorageFilter{ - Off: true, - }, - } cidWrapper3, err := retriever.RetrieveCIDs(rctTopicsAndContractFilter, 1) Expect(err).ToNot(HaveOccurred()) Expect(cidWrapper3.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) @@ -168,25 +250,6 @@ var _ = Describe("Retriever", func() { Expect(len(cidWrapper3.Receipts)).To(Equal(1)) Expect(cidWrapper3.Receipts[0]).To(Equal("mockRctCID1")) - rctContractsAndTopicFilter := config.Subscription{ - StartingBlock: big.NewInt(0), - HeaderFilter: config.HeaderFilter{ - Off: true, - }, - TrxFilter: config.TrxFilter{ - Off: true, - }, - ReceiptFilter: config.ReceiptFilter{ - Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000005"}, - Contracts: []string{"0x0000000000000000000000000000000000000000", "0x0000000000000000000000000000000000000001"}, - }, - StateFilter: config.StateFilter{ - Off: true, - }, - StorageFilter: config.StorageFilter{ - Off: true, - }, - } cidWrapper4, err := retriever.RetrieveCIDs(rctContractsAndTopicFilter, 1) Expect(err).ToNot(HaveOccurred()) Expect(cidWrapper4.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) @@ -197,81 +260,30 @@ var _ = Describe("Retriever", func() { Expect(len(cidWrapper4.Receipts)).To(Equal(1)) Expect(cidWrapper4.Receipts[0]).To(Equal("mockRctCID2")) - rctsForAllCollectedTrxs := config.Subscription{ - StartingBlock: big.NewInt(0), - HeaderFilter: config.HeaderFilter{ - Off: true, - }, - TrxFilter: config.TrxFilter{}, // Trx filter open so we will collect all trxs, therefore we will also collect all corresponding rcts despite rct filter - ReceiptFilter: config.ReceiptFilter{ - Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000006"}, // Topic isn't one of the topics we have - Contracts: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have - }, - StateFilter: config.StateFilter{ - Off: true, - }, - StorageFilter: config.StorageFilter{ - Off: true, - }, - } cidWrapper5, err := retriever.RetrieveCIDs(rctsForAllCollectedTrxs, 1) Expect(err).ToNot(HaveOccurred()) Expect(cidWrapper5.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) Expect(len(cidWrapper5.Headers)).To(Equal(0)) Expect(len(cidWrapper5.Transactions)).To(Equal(2)) + Expect(seed_node.ListContainsString(cidWrapper5.Transactions, "mockTrxCID1")).To(BeTrue()) + Expect(seed_node.ListContainsString(cidWrapper5.Transactions, "mockTrxCID2")).To(BeTrue()) Expect(len(cidWrapper5.StateNodes)).To(Equal(0)) Expect(len(cidWrapper5.StorageNodes)).To(Equal(0)) Expect(len(cidWrapper5.Receipts)).To(Equal(2)) Expect(seed_node.ListContainsString(cidWrapper5.Receipts, "mockRctCID1")).To(BeTrue()) Expect(seed_node.ListContainsString(cidWrapper5.Receipts, "mockRctCID2")).To(BeTrue()) - rctsForSelectCollectedTrxs := config.Subscription{ - StartingBlock: big.NewInt(0), - HeaderFilter: config.HeaderFilter{ - Off: true, - }, - TrxFilter: config.TrxFilter{ - Dst: []string{"0x0000000000000000000000000000000000000001"}, // We only filter for one of the trxs so we will only get the one corresponding receipt - }, - ReceiptFilter: config.ReceiptFilter{ - Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000006"}, // Topic isn't one of the topics we have - Contracts: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have - }, - StateFilter: config.StateFilter{ - Off: true, - }, - StorageFilter: config.StorageFilter{ - Off: true, - }, - } cidWrapper6, err := retriever.RetrieveCIDs(rctsForSelectCollectedTrxs, 1) Expect(err).ToNot(HaveOccurred()) Expect(cidWrapper6.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) Expect(len(cidWrapper6.Headers)).To(Equal(0)) Expect(len(cidWrapper6.Transactions)).To(Equal(1)) + Expect(cidWrapper6.Transactions[0]).To(Equal("mockTrxCID2")) Expect(len(cidWrapper6.StateNodes)).To(Equal(0)) Expect(len(cidWrapper6.StorageNodes)).To(Equal(0)) Expect(len(cidWrapper6.Receipts)).To(Equal(1)) Expect(cidWrapper6.Receipts[0]).To(Equal("mockRctCID2")) - stateFilter := config.Subscription{ - StartingBlock: big.NewInt(0), - HeaderFilter: config.HeaderFilter{ - Off: true, - }, - TrxFilter: config.TrxFilter{ - Off: true, - }, - ReceiptFilter: config.ReceiptFilter{ - Off: true, - }, - StateFilter: config.StateFilter{ - Addresses: []string{mocks.Address.Hex()}, - }, - StorageFilter: config.StorageFilter{ - Off: true, - }, - } cidWrapper7, err := retriever.RetrieveCIDs(stateFilter, 1) Expect(err).ToNot(HaveOccurred()) Expect(cidWrapper7.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) @@ -295,6 +307,7 @@ var _ = Describe("Retriever", func() { Expect(num).To(Equal(int64(1))) }) }) + Describe("RetrieveLastBlockNumber", func() { It("Gets the number of the latest block that has data in the database", func() { num, err := retriever.RetrieveLastBlockNumber() diff --git a/pkg/seed_node/service.go b/pkg/seed_node/service.go index 18d3b2e7..e8162ae3 100644 --- a/pkg/seed_node/service.go +++ b/pkg/seed_node/service.go @@ -301,20 +301,21 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio startingBlock, err = sap.Retriever.RetrieveFirstBlockNumber() if err != nil { sub.PayloadChan <- streamer.SeedNodePayload{ - ErrMsg: "unable to set block range; error: " + err.Error(), + ErrMsg: "unable to set block range start; error: " + err.Error(), } } if startingBlock < con.StartingBlock.Int64() { startingBlock = con.StartingBlock.Int64() } - if con.EndingBlock.Int64() <= 0 || con.EndingBlock.Int64() <= startingBlock { - endingBlock, err = sap.Retriever.RetrieveLastBlockNumber() - if err != nil { - sub.PayloadChan <- streamer.SeedNodePayload{ - ErrMsg: "unable to set block range; error: " + err.Error(), - } + endingBlock, err = sap.Retriever.RetrieveLastBlockNumber() + if err != nil { + sub.PayloadChan <- streamer.SeedNodePayload{ + ErrMsg: "unable to set block range end; error: " + err.Error(), } } + if endingBlock > con.EndingBlock.Int64() && con.EndingBlock.Int64() > 0 && con.EndingBlock.Int64() > startingBlock { + endingBlock = con.EndingBlock.Int64() + } log.Debug("backfill starting block:", con.StartingBlock) log.Debug("backfill ending block:", endingBlock) // Backfilled payloads are sent concurrently to the streamed payloads, so the receiver needs to pay attention to diff --git a/pkg/seed_node/service_test.go b/pkg/seed_node/service_test.go index 2008df6c..f3a86345 100644 --- a/pkg/seed_node/service_test.go +++ b/pkg/seed_node/service_test.go @@ -32,7 +32,6 @@ import ( ) var _ = Describe("Service", func() { - Describe("SyncAndPublish", func() { It("Streams statediff.Payloads, converts them to IPLDPayloads, publishes IPLDPayloads, and indexes CIDPayloads", func() { wg := new(sync.WaitGroup)