From 5be205ffa6684262b3a1494af9e6f745ca7573b1 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Fri, 1 Nov 2019 14:03:28 -0500 Subject: [PATCH] super node backfill breaks batch call into smaller bins; retrieve gap test --- cmd/syncAndPublish.go | 2 +- pkg/super_node/backfiller.go | 49 ++++---- pkg/super_node/backfiller_test.go | 15 ++- pkg/super_node/mocks/retriever.go | 8 +- pkg/super_node/repository_test.go | 11 +- pkg/super_node/retriever.go | 12 +- pkg/super_node/retriever_test.go | 179 ++++++++++++++++++++++++++---- pkg/super_node/test_helpers.go | 10 ++ 8 files changed, 214 insertions(+), 72 deletions(-) diff --git a/cmd/syncAndPublish.go b/cmd/syncAndPublish.go index d2bf6851..e2126f96 100644 --- a/cmd/syncAndPublish.go +++ b/cmd/syncAndPublish.go @@ -120,5 +120,5 @@ func newBackFiller() (super_node.BackFillInterface, error) { } else { frequency = time.Duration(freq) } - return super_node.NewBackFillService(ipfsPath, &db, archivalRPCClient, time.Minute*frequency) + return super_node.NewBackFillService(ipfsPath, &db, archivalRPCClient, time.Minute*frequency, super_node.DefaultMaxBatchSize) } diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index 6a119efe..cd016477 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -23,8 +23,6 @@ import ( "time" "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/statediff" log "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" @@ -34,9 +32,10 @@ import ( ) const ( - DefaultMaxBatchSize uint64 = 5000 - defaultMaxBatchNumber int64 = 100 + DefaultMaxBatchSize uint64 = 1000 + defaultMaxBatchNumber int64 = 10 ) + // BackFillInterface for filling in gaps in the super node type BackFillInterface interface { // Method for the super node to periodically check for and fill in gaps in its data using an archival node @@ -58,11 +57,11 @@ type BackFillService struct { // Check frequency GapCheckFrequency time.Duration // size of batch fetches - batchSize uint64 + BatchSize uint64 } // NewBackFillService returns a new BackFillInterface -func NewBackFillService(ipfsPath string, db *postgres.DB, archivalNodeRPCClient core.RpcClient, freq time.Duration) (BackFillInterface, error) { +func NewBackFillService(ipfsPath string, db *postgres.DB, archivalNodeRPCClient core.RpcClient, freq time.Duration, batchSize uint64) (BackFillInterface, error) { publisher, err := ipfs.NewIPLDPublisher(ipfsPath) if err != nil { return nil, err @@ -72,9 +71,9 @@ func NewBackFillService(ipfsPath string, db *postgres.DB, archivalNodeRPCClient Converter: ipfs.NewPayloadConverter(params.MainnetChainConfig), Publisher: publisher, Retriever: NewCIDRetriever(db), - Fetcher: fetcher.NewStateDiffFetcher(archivalNodeRPCClient), + Fetcher: fetcher.NewStateDiffFetcher(archivalNodeRPCClient), GapCheckFrequency: freq, - batchSize: DefaultMaxBatchSize, + BatchSize: batchSize, }, nil } @@ -118,6 +117,7 @@ func (bfs *BackFillService) FillGaps(wg *sync.WaitGroup, quitChan <-chan bool) { } func (bfs *BackFillService) fillGaps(startingBlock, endingBlock uint64) { + log.Infof("going to fill in gap from %d to %d", startingBlock, endingBlock) errChan := make(chan error) done := make(chan bool) backFillInitErr := bfs.BackFill(startingBlock, endingBlock, errChan, done) @@ -127,15 +127,15 @@ func (bfs *BackFillService) fillGaps(startingBlock, endingBlock uint64) { } for { select { - case err := <- errChan: + case err := <-errChan: log.Error(err) - case <- done: + case <-done: + log.Infof("finished filling in gap from %d to %d", startingBlock, endingBlock) return } } } - // BackFill fetches, processes, and returns utils.StorageDiffs over a range of blocks // It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently func (bfs *BackFillService) BackFill(startingBlock, endingBlock uint64, errChan chan error, done chan bool) error { @@ -144,14 +144,14 @@ func (bfs *BackFillService) BackFill(startingBlock, endingBlock uint64, errChan } // break the range up into bins of smaller ranges length := endingBlock - startingBlock + 1 - numberOfBins := length / bfs.batchSize - remainder := length % bfs.batchSize + numberOfBins := length / bfs.BatchSize + remainder := length % bfs.BatchSize if remainder != 0 { numberOfBins++ } blockRangeBins := make([][]uint64, numberOfBins) for i := range blockRangeBins { - nextBinStart := startingBlock + uint64(bfs.batchSize) + nextBinStart := startingBlock + uint64(bfs.BatchSize) if nextBinStart > endingBlock { nextBinStart = endingBlock + 1 } @@ -166,7 +166,7 @@ func (bfs *BackFillService) BackFill(startingBlock, endingBlock uint64, errChan // int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have var activeCount int64 // channel for processing goroutines to signal when they are done - processingDone := make(chan bool) + processingDone := make(chan [2]uint64) forwardDone := make(chan bool) // for each block range bin spin up a goroutine to batch fetch and process state diffs for that range @@ -184,29 +184,23 @@ func (bfs *BackFillService) BackFill(startingBlock, endingBlock uint64, errChan errChan <- fetchErr } for _, payload := range payloads { - stateDiff := new(statediff.StateDiff) - stateDiffDecodeErr := rlp.DecodeBytes(payload.StateDiffRlp, stateDiff) - if stateDiffDecodeErr != nil { - errChan <- stateDiffDecodeErr - continue - } ipldPayload, convertErr := bfs.Converter.Convert(payload) if convertErr != nil { - log.Error(convertErr) + errChan <- convertErr continue } cidPayload, publishErr := bfs.Publisher.Publish(ipldPayload) if publishErr != nil { - log.Error(publishErr) + errChan <- publishErr continue } indexErr := bfs.Repository.Index(cidPayload) if indexErr != nil { - log.Error(indexErr) + errChan <- indexErr } } // when this goroutine is done, send out a signal - processingDone <- true + processingDone <- [2]uint64{blockHeights[0], blockHeights[len(blockHeights)-1]} }(blockHeights) } }() @@ -218,13 +212,14 @@ func (bfs *BackFillService) BackFill(startingBlock, endingBlock uint64, errChan goroutinesFinished := 0 for { select { - case <-processingDone: + case doneWithHeights := <-processingDone: atomic.AddInt64(&activeCount, -1) select { // if we are waiting for a process to finish, signal that one has case forwardDone <- true: default: } + log.Infof("finished filling in gap sub-bin from %d to %d", doneWithHeights[0], doneWithHeights[1]) goroutinesFinished++ if goroutinesFinished == int(numberOfBins) { done <- true @@ -235,4 +230,4 @@ func (bfs *BackFillService) BackFill(startingBlock, endingBlock uint64, errChan }() return nil -} \ No newline at end of file +} diff --git a/pkg/super_node/backfiller_test.go b/pkg/super_node/backfiller_test.go index 5916cd27..cfa4e571 100644 --- a/pkg/super_node/backfiller_test.go +++ b/pkg/super_node/backfiller_test.go @@ -47,7 +47,7 @@ var _ = Describe("BackFiller", func() { } mockRetriever := &mocks3.MockCIDRetriever{ FirstBlockNumberToReturn: 1, - GapsToRetrieve: [][2]int64{ + GapsToRetrieve: [][2]uint64{ { 100, 101, }, @@ -63,9 +63,10 @@ var _ = Describe("BackFiller", func() { Repository: mockCidRepo, Publisher: mockPublisher, Converter: mockConverter, - StateDiffFetcher: mockFetcher, + Fetcher: mockFetcher, Retriever: mockRetriever, GapCheckFrequency: time.Second * 2, + BatchSize: super_node.DefaultMaxBatchSize, } wg := &sync.WaitGroup{} quitChan := make(chan bool, 1) @@ -100,7 +101,7 @@ var _ = Describe("BackFiller", func() { } mockRetriever := &mocks3.MockCIDRetriever{ FirstBlockNumberToReturn: 1, - GapsToRetrieve: [][2]int64{ + GapsToRetrieve: [][2]uint64{ { 100, 100, }, @@ -115,9 +116,10 @@ var _ = Describe("BackFiller", func() { Repository: mockCidRepo, Publisher: mockPublisher, Converter: mockConverter, - StateDiffFetcher: mockFetcher, + Fetcher: mockFetcher, Retriever: mockRetriever, GapCheckFrequency: time.Second * 2, + BatchSize: super_node.DefaultMaxBatchSize, } wg := &sync.WaitGroup{} quitChan := make(chan bool, 1) @@ -149,7 +151,7 @@ var _ = Describe("BackFiller", func() { } mockRetriever := &mocks3.MockCIDRetriever{ FirstBlockNumberToReturn: 3, - GapsToRetrieve: [][2]int64{}, + GapsToRetrieve: [][2]uint64{}, } mockFetcher := &mocks2.StateDiffFetcher{ PayloadsToReturn: map[uint64]statediff.Payload{ @@ -161,9 +163,10 @@ var _ = Describe("BackFiller", func() { Repository: mockCidRepo, Publisher: mockPublisher, Converter: mockConverter, - StateDiffFetcher: mockFetcher, + Fetcher: mockFetcher, Retriever: mockRetriever, GapCheckFrequency: time.Second * 2, + BatchSize: super_node.DefaultMaxBatchSize, } wg := &sync.WaitGroup{} quitChan := make(chan bool, 1) diff --git a/pkg/super_node/mocks/retriever.go b/pkg/super_node/mocks/retriever.go index 2fafe933..f2ca3de9 100644 --- a/pkg/super_node/mocks/retriever.go +++ b/pkg/super_node/mocks/retriever.go @@ -7,7 +7,7 @@ import ( // MockCIDRetriever is a mock CID retriever for use in tests type MockCIDRetriever struct { - GapsToRetrieve [][2]int64 + GapsToRetrieve [][2]uint64 GapsToRetrieveErr error CalledTimes int FirstBlockNumberToReturn int64 @@ -30,15 +30,15 @@ func (mcr *MockCIDRetriever) RetrieveFirstBlockNumber() (int64, error) { } // RetrieveGapsInData mock method -func (mcr *MockCIDRetriever) RetrieveGapsInData() ([][2]int64, error) { +func (mcr *MockCIDRetriever) RetrieveGapsInData() ([][2]uint64, error) { mcr.CalledTimes++ return mcr.GapsToRetrieve, mcr.GapsToRetrieveErr } // SetGapsToRetrieve mock method -func (mcr *MockCIDRetriever) SetGapsToRetrieve(gaps [][2]int64) { +func (mcr *MockCIDRetriever) SetGapsToRetrieve(gaps [][2]uint64) { if mcr.GapsToRetrieve == nil { - mcr.GapsToRetrieve = make([][2]int64, 0) + mcr.GapsToRetrieve = make([][2]uint64, 0) } mcr.GapsToRetrieve = append(mcr.GapsToRetrieve, gaps...) } diff --git a/pkg/super_node/repository_test.go b/pkg/super_node/repository_test.go index 61a8d34c..77cc5ed1 100644 --- a/pkg/super_node/repository_test.go +++ b/pkg/super_node/repository_test.go @@ -26,13 +26,12 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/super_node" ) -var ( - db *postgres.DB - err error - repo super_node.CIDRepository -) - var _ = Describe("Repository", func() { + var ( + db *postgres.DB + err error + repo super_node.CIDRepository + ) BeforeEach(func() { db, err = super_node.SetupDB() Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/super_node/retriever.go b/pkg/super_node/retriever.go index 4d3be107..3a9e495c 100644 --- a/pkg/super_node/retriever.go +++ b/pkg/super_node/retriever.go @@ -33,7 +33,7 @@ type CIDRetriever interface { RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*ipfs.CIDWrapper, error) RetrieveLastBlockNumber() (int64, error) RetrieveFirstBlockNumber() (int64, error) - RetrieveGapsInData() ([][2]int64, error) + RetrieveGapsInData() ([][2]uint64, error) } // EthCIDRetriever is the underlying struct supporting the CIDRetriever interface @@ -312,12 +312,12 @@ func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters confi } type gap struct { - Start int64 `db:"start"` - Stop int64 `db:"stop"` + Start uint64 `db:"start"` + Stop uint64 `db:"stop"` } // RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db -func (ecr *EthCIDRetriever) RetrieveGapsInData() ([][2]int64, error) { +func (ecr *EthCIDRetriever) RetrieveGapsInData() ([][2]uint64, error) { pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM header_cids LEFT JOIN header_cids r on header_cids.block_number = r.block_number - 1 LEFT JOIN header_cids fr on header_cids.block_number < fr.block_number @@ -328,9 +328,9 @@ func (ecr *EthCIDRetriever) RetrieveGapsInData() ([][2]int64, error) { if err != nil { return nil, err } - gapRanges := make([][2]int64, 0) + gapRanges := make([][2]uint64, 0) for _, gap := range gaps { - gapRanges = append(gapRanges, [2]int64{gap.Start, gap.Stop}) + gapRanges = append(gapRanges, [2]uint64{gap.Start, gap.Stop}) } return gapRanges, nil } diff --git a/pkg/super_node/retriever_test.go b/pkg/super_node/retriever_test.go index de38ed65..3baef35d 100644 --- a/pkg/super_node/retriever_test.go +++ b/pkg/super_node/retriever_test.go @@ -23,6 +23,7 @@ import ( . "github.com/onsi/gomega" "github.com/vulcanize/vulcanizedb/pkg/config" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" "github.com/vulcanize/vulcanizedb/pkg/super_node" @@ -177,12 +178,15 @@ var ( ) var _ = Describe("Retriever", func() { + var ( + db *postgres.DB + repo super_node.CIDRepository + ) BeforeEach(func() { + var err error db, err = super_node.SetupDB() Expect(err).ToNot(HaveOccurred()) repo = super_node.NewCIDRepository(db) - err = repo.Index(mocks.MockCIDPayload) - Expect(err).ToNot(HaveOccurred()) retriever = super_node.NewCIDRetriever(db) }) AfterEach(func() { @@ -190,6 +194,10 @@ var _ = Describe("Retriever", func() { }) Describe("RetrieveCIDs", func() { + BeforeEach(func() { + indexErr := repo.Index(mocks.MockCIDPayload) + Expect(indexErr).ToNot(HaveOccurred()) + }) It("Retrieves all CIDs for the given blocknumber when provided an open filter", func() { cidWrapper, err := retriever.RetrieveCIDs(openFilter, 1) Expect(err).ToNot(HaveOccurred()) @@ -216,12 +224,10 @@ var _ = Describe("Retriever", func() { Expect(len(cidWrapper.StorageNodes)).To(Equal(1)) Expect(cidWrapper.StorageNodes).To(Equal(mocks.MockCIDWrapper.StorageNodes)) }) - }) - Describe("RetrieveCIDs", func() { It("Applies filters from the provided config.Subscription", func() { - cidWrapper1, err := retriever.RetrieveCIDs(rctContractFilter, 1) - Expect(err).ToNot(HaveOccurred()) + cidWrapper1, err1 := retriever.RetrieveCIDs(rctContractFilter, 1) + Expect(err1).ToNot(HaveOccurred()) Expect(cidWrapper1.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) Expect(len(cidWrapper1.Headers)).To(Equal(0)) Expect(len(cidWrapper1.Transactions)).To(Equal(0)) @@ -230,8 +236,8 @@ var _ = Describe("Retriever", func() { Expect(len(cidWrapper1.Receipts)).To(Equal(1)) Expect(cidWrapper1.Receipts[0]).To(Equal("mockRctCID2")) - cidWrapper2, err := retriever.RetrieveCIDs(rctTopicsFilter, 1) - Expect(err).ToNot(HaveOccurred()) + cidWrapper2, err2 := retriever.RetrieveCIDs(rctTopicsFilter, 1) + Expect(err2).ToNot(HaveOccurred()) Expect(cidWrapper2.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) Expect(len(cidWrapper2.Headers)).To(Equal(0)) Expect(len(cidWrapper2.Transactions)).To(Equal(0)) @@ -240,8 +246,8 @@ var _ = Describe("Retriever", func() { Expect(len(cidWrapper2.Receipts)).To(Equal(1)) Expect(cidWrapper2.Receipts[0]).To(Equal("mockRctCID1")) - cidWrapper3, err := retriever.RetrieveCIDs(rctTopicsAndContractFilter, 1) - Expect(err).ToNot(HaveOccurred()) + cidWrapper3, err3 := retriever.RetrieveCIDs(rctTopicsAndContractFilter, 1) + Expect(err3).ToNot(HaveOccurred()) Expect(cidWrapper3.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) Expect(len(cidWrapper3.Headers)).To(Equal(0)) Expect(len(cidWrapper3.Transactions)).To(Equal(0)) @@ -250,8 +256,8 @@ var _ = Describe("Retriever", func() { Expect(len(cidWrapper3.Receipts)).To(Equal(1)) Expect(cidWrapper3.Receipts[0]).To(Equal("mockRctCID1")) - cidWrapper4, err := retriever.RetrieveCIDs(rctContractsAndTopicFilter, 1) - Expect(err).ToNot(HaveOccurred()) + cidWrapper4, err4 := retriever.RetrieveCIDs(rctContractsAndTopicFilter, 1) + Expect(err4).ToNot(HaveOccurred()) Expect(cidWrapper4.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) Expect(len(cidWrapper4.Headers)).To(Equal(0)) Expect(len(cidWrapper4.Transactions)).To(Equal(0)) @@ -260,8 +266,8 @@ var _ = Describe("Retriever", func() { Expect(len(cidWrapper4.Receipts)).To(Equal(1)) Expect(cidWrapper4.Receipts[0]).To(Equal("mockRctCID2")) - cidWrapper5, err := retriever.RetrieveCIDs(rctsForAllCollectedTrxs, 1) - Expect(err).ToNot(HaveOccurred()) + cidWrapper5, err5 := retriever.RetrieveCIDs(rctsForAllCollectedTrxs, 1) + Expect(err5).ToNot(HaveOccurred()) Expect(cidWrapper5.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) Expect(len(cidWrapper5.Headers)).To(Equal(0)) Expect(len(cidWrapper5.Transactions)).To(Equal(2)) @@ -273,8 +279,8 @@ var _ = Describe("Retriever", func() { Expect(super_node.ListContainsString(cidWrapper5.Receipts, "mockRctCID1")).To(BeTrue()) Expect(super_node.ListContainsString(cidWrapper5.Receipts, "mockRctCID2")).To(BeTrue()) - cidWrapper6, err := retriever.RetrieveCIDs(rctsForSelectCollectedTrxs, 1) - Expect(err).ToNot(HaveOccurred()) + cidWrapper6, err6 := retriever.RetrieveCIDs(rctsForSelectCollectedTrxs, 1) + Expect(err6).ToNot(HaveOccurred()) Expect(cidWrapper6.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) Expect(len(cidWrapper6.Headers)).To(Equal(0)) Expect(len(cidWrapper6.Transactions)).To(Equal(1)) @@ -284,8 +290,8 @@ var _ = Describe("Retriever", func() { Expect(len(cidWrapper6.Receipts)).To(Equal(1)) Expect(cidWrapper6.Receipts[0]).To(Equal("mockRctCID2")) - cidWrapper7, err := retriever.RetrieveCIDs(stateFilter, 1) - Expect(err).ToNot(HaveOccurred()) + cidWrapper7, err7 := retriever.RetrieveCIDs(stateFilter, 1) + Expect(err7).ToNot(HaveOccurred()) Expect(cidWrapper7.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) Expect(len(cidWrapper7.Headers)).To(Equal(0)) Expect(len(cidWrapper7.Transactions)).To(Equal(0)) @@ -302,17 +308,146 @@ var _ = Describe("Retriever", func() { Describe("RetrieveFirstBlockNumber", func() { It("Gets the number of the first block that has data in the database", func() { - num, err := retriever.RetrieveFirstBlockNumber() - Expect(err).ToNot(HaveOccurred()) + indexErr := repo.Index(mocks.MockCIDPayload) + Expect(indexErr).ToNot(HaveOccurred()) + num, retrieveErr := retriever.RetrieveFirstBlockNumber() + Expect(retrieveErr).ToNot(HaveOccurred()) Expect(num).To(Equal(int64(1))) }) + + It("Gets the number of the first block that has data in the database", func() { + payload := *mocks.MockCIDPayload + payload.BlockNumber = "1010101" + indexErr := repo.Index(&payload) + Expect(indexErr).ToNot(HaveOccurred()) + num, retrieveErr := retriever.RetrieveFirstBlockNumber() + Expect(retrieveErr).ToNot(HaveOccurred()) + Expect(num).To(Equal(int64(1010101))) + }) + + It("Gets the number of the first block that has data in the database", func() { + payload1 := *mocks.MockCIDPayload + payload1.BlockNumber = "1010101" + payload2 := payload1 + payload2.BlockNumber = "5" + indexErr := repo.Index(&payload1) + Expect(indexErr).ToNot(HaveOccurred()) + indexErr2 := repo.Index(&payload2) + Expect(indexErr2).ToNot(HaveOccurred()) + num, retrieveErr := retriever.RetrieveFirstBlockNumber() + Expect(retrieveErr).ToNot(HaveOccurred()) + Expect(num).To(Equal(int64(5))) + }) }) Describe("RetrieveLastBlockNumber", func() { It("Gets the number of the latest block that has data in the database", func() { - num, err := retriever.RetrieveLastBlockNumber() - Expect(err).ToNot(HaveOccurred()) + indexErr := repo.Index(mocks.MockCIDPayload) + Expect(indexErr).ToNot(HaveOccurred()) + num, retrieveErr := retriever.RetrieveLastBlockNumber() + Expect(retrieveErr).ToNot(HaveOccurred()) Expect(num).To(Equal(int64(1))) }) + + It("Gets the number of the latest block that has data in the database", func() { + payload := *mocks.MockCIDPayload + payload.BlockNumber = "1010101" + indexErr := repo.Index(&payload) + Expect(indexErr).ToNot(HaveOccurred()) + num, retrieveErr := retriever.RetrieveLastBlockNumber() + Expect(retrieveErr).ToNot(HaveOccurred()) + Expect(num).To(Equal(int64(1010101))) + }) + + It("Gets the number of the latest block that has data in the database", func() { + payload1 := *mocks.MockCIDPayload + payload1.BlockNumber = "1010101" + payload2 := payload1 + payload2.BlockNumber = "5" + indexErr := repo.Index(&payload1) + Expect(indexErr).ToNot(HaveOccurred()) + indexErr2 := repo.Index(&payload2) + Expect(indexErr2).ToNot(HaveOccurred()) + num, retrieveErr := retriever.RetrieveLastBlockNumber() + Expect(retrieveErr).ToNot(HaveOccurred()) + Expect(num).To(Equal(int64(1010101))) + }) + }) + + Describe("RetrieveGapsInData", func() { + It("Doesn't return gaps if there are none", func() { + payload1 := *mocks.MockCIDPayload + payload1.BlockNumber = "2" + payload2 := payload1 + payload2.BlockNumber = "3" + indexErr1 := repo.Index(mocks.MockCIDPayload) + Expect(indexErr1).ToNot(HaveOccurred()) + indexErr2 := repo.Index(&payload1) + Expect(indexErr2).ToNot(HaveOccurred()) + indexErr3 := repo.Index(&payload2) + Expect(indexErr3).ToNot(HaveOccurred()) + gaps, retrieveErr := retriever.RetrieveGapsInData() + Expect(retrieveErr).ToNot(HaveOccurred()) + Expect(len(gaps)).To(Equal(0)) + }) + + It("Doesn't return the gap from 0 to the earliest block", func() { + payload := *mocks.MockCIDPayload + payload.BlockNumber = "5" + indexErr := repo.Index(&payload) + Expect(indexErr).ToNot(HaveOccurred()) + gaps, retrieveErr := retriever.RetrieveGapsInData() + Expect(retrieveErr).ToNot(HaveOccurred()) + Expect(len(gaps)).To(Equal(0)) + }) + + It("Finds gap between two entries", func() { + payload1 := *mocks.MockCIDPayload + payload1.BlockNumber = "1010101" + payload2 := payload1 + payload2.BlockNumber = "5" + indexErr := repo.Index(&payload1) + Expect(indexErr).ToNot(HaveOccurred()) + indexErr2 := repo.Index(&payload2) + Expect(indexErr2).ToNot(HaveOccurred()) + gaps, retrieveErr := retriever.RetrieveGapsInData() + Expect(retrieveErr).ToNot(HaveOccurred()) + Expect(len(gaps)).To(Equal(1)) + Expect(gaps[0][0]).To(Equal(uint64(6))) + Expect(gaps[0][1]).To(Equal(uint64(1010100))) + }) + + It("Finds gaps between multiple entries", func() { + payload1 := *mocks.MockCIDPayload + payload1.BlockNumber = "1010101" + payload2 := payload1 + payload2.BlockNumber = "5" + payload3 := payload2 + payload3.BlockNumber = "100" + payload4 := payload3 + payload4.BlockNumber = "101" + payload5 := payload4 + payload5.BlockNumber = "102" + payload6 := payload5 + payload6.BlockNumber = "1000" + indexErr := repo.Index(&payload1) + Expect(indexErr).ToNot(HaveOccurred()) + indexErr2 := repo.Index(&payload2) + Expect(indexErr2).ToNot(HaveOccurred()) + indexErr3 := repo.Index(&payload3) + Expect(indexErr3).ToNot(HaveOccurred()) + indexErr4 := repo.Index(&payload4) + Expect(indexErr4).ToNot(HaveOccurred()) + indexErr5 := repo.Index(&payload5) + Expect(indexErr5).ToNot(HaveOccurred()) + indexErr6 := repo.Index(&payload6) + Expect(indexErr6).ToNot(HaveOccurred()) + gaps, retrieveErr := retriever.RetrieveGapsInData() + Expect(retrieveErr).ToNot(HaveOccurred()) + Expect(len(gaps)).To(Equal(3)) + Expect(super_node.ListContainsRange(gaps, [2]uint64{6, 99})).To(BeTrue()) + Expect(super_node.ListContainsRange(gaps, [2]uint64{103, 999})).To(BeTrue()) + Expect(super_node.ListContainsRange(gaps, [2]uint64{1001, 1010100})).To(BeTrue()) + }) }) }) diff --git a/pkg/super_node/test_helpers.go b/pkg/super_node/test_helpers.go index 86b4f8ca..762f5f5e 100644 --- a/pkg/super_node/test_helpers.go +++ b/pkg/super_node/test_helpers.go @@ -76,3 +76,13 @@ func ListContainsBytes(bbb [][]byte, b []byte) bool { } return false } + +// ListContainsRange used to check if a list of [2]uint64 contains a particula [2]uint64 +func ListContainsRange(rangeList [][2]uint64, rng [2]uint64) bool { + for _, rangeInList := range rangeList { + if rangeInList == rng { + return true + } + } + return false +}