From 43c254b5f6667b37a775e2538e2486fa45329d05 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 20 Apr 2020 08:14:02 -0500 Subject: [PATCH] fixes after tests --- environments/superNodeETH.toml | 4 +- libraries/shared/mocks/batch_client.go | 4 +- libraries/shared/mocks/stream_client.go | 44 ++++++++++++++ libraries/shared/storage/utils/bins.go | 29 ++++++++- pkg/super_node/backfiller.go | 32 +++++----- pkg/super_node/backfiller_test.go | 7 ++- pkg/super_node/btc/cleaner.go | 3 + pkg/super_node/btc/retriever.go | 19 +----- pkg/super_node/config.go | 14 ++--- pkg/super_node/constructors.go | 12 ++-- pkg/super_node/eth/cleaner.go | 3 + pkg/super_node/eth/payload_fetcher.go | 8 +-- pkg/super_node/eth/retriever.go | 19 +----- pkg/super_node/eth/retriever_test.go | 79 ++++++++++++++++++++++++- pkg/super_node/eth/streamer.go | 15 +++-- pkg/super_node/eth/streamer_test.go | 4 +- 16 files changed, 214 insertions(+), 82 deletions(-) create mode 100644 libraries/shared/mocks/stream_client.go diff --git a/environments/superNodeETH.toml b/environments/superNodeETH.toml index 406dfa54..837c6afa 100644 --- a/environments/superNodeETH.toml +++ b/environments/superNodeETH.toml @@ -13,8 +13,8 @@ type = "state" # $RESYNC_TYPE start = 0 # $RESYNC_START stop = 0 # $RESYNC_STOP - batchSize = 10 # $RESYNC_BATCH_SIZE - batchNumber = 100 # $RESYNC_BATCH_NUMBER + batchSize = 5 # $RESYNC_BATCH_SIZE + batchNumber = 50 # $RESYNC_BATCH_NUMBER timeout = 300 # $HTTP_TIMEOUT clearOldCache = false # $RESYNC_CLEAR_OLD_CACHE resetValidation = true # $RESYNC_RESET_VALIDATION diff --git a/libraries/shared/mocks/batch_client.go b/libraries/shared/mocks/batch_client.go index e6a1b6b6..e81555af 100644 --- a/libraries/shared/mocks/batch_client.go +++ b/libraries/shared/mocks/batch_client.go @@ -21,6 +21,8 @@ import ( "encoding/json" "errors" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/statediff" "github.com/vulcanize/vulcanizedb/pkg/eth/client" ) @@ -65,7 +67,7 @@ func (mc *BackFillerClient) BatchCall(batch []client.BatchElem) error { } // BatchCallContext mockClient method to simulate batch call to geth -func (mc *BackFillerClient) BatchCallContext(ctx context.Context, batch []client.BatchElem) error { +func (mc *BackFillerClient) BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error { if mc.MappedStateDiffAt == nil { return errors.New("mockclient needs to be initialized with statediff payloads and errors") } diff --git a/libraries/shared/mocks/stream_client.go b/libraries/shared/mocks/stream_client.go new file mode 100644 index 00000000..6ae821b4 --- /dev/null +++ b/libraries/shared/mocks/stream_client.go @@ -0,0 +1,44 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package mocks + +import ( + "context" + + "github.com/ethereum/go-ethereum/rpc" +) + +type StreamClient struct { + passedContext context.Context + passedResult interface{} + passedNamespace string + passedPayloadChan interface{} + passedSubscribeArgs []interface{} +} + +func (client *StreamClient) Subscribe(ctx context.Context, namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error) { + client.passedNamespace = namespace + client.passedPayloadChan = payloadChan + client.passedContext = ctx + + for _, arg := range args { + client.passedSubscribeArgs = append(client.passedSubscribeArgs, arg) + } + + subscription := rpc.ClientSubscription{} + return &subscription, nil +} diff --git a/libraries/shared/storage/utils/bins.go b/libraries/shared/storage/utils/bins.go index 18f28c32..42ea0a84 100644 --- a/libraries/shared/storage/utils/bins.go +++ b/libraries/shared/storage/utils/bins.go @@ -16,7 +16,11 @@ package utils -import "errors" +import ( + "errors" + + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" +) func GetBlockHeightBins(startingBlock, endingBlock, batchSize uint64) ([][]uint64, error) { if endingBlock < startingBlock { @@ -43,3 +47,26 @@ func GetBlockHeightBins(startingBlock, endingBlock, batchSize uint64) ([][]uint6 } return blockRangeBins, nil } + +func MissingHeightsToGaps(heights []uint64) []shared.Gap { + validationGaps := make([]shared.Gap, 0) + start := heights[0] + lastHeight := start + for i, height := range heights[1:] { + if height != lastHeight+1 { + validationGaps = append(validationGaps, shared.Gap{ + Start: start, + Stop: lastHeight, + }) + start = height + } + if i+2 == len(heights) { + validationGaps = append(validationGaps, shared.Gap{ + Start: start, + Stop: height, + }) + } + lastHeight = height + } + return validationGaps +} diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index 61d9efba..ff7514bb 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -62,7 +62,7 @@ type BackFillService struct { // Channel for receiving quit signal QuitChan chan bool // Chain type - chain shared.ChainType + Chain shared.ChainType // Headers with times_validated lower than this will be resynced validationLevel int } @@ -108,7 +108,7 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert BatchNumber: int64(batchNumber), ScreenAndServeChan: screenAndServeChan, QuitChan: settings.Quit, - chain: settings.Chain, + Chain: settings.Chain, validationLevel: settings.ValidationLevel, }, nil } @@ -122,25 +122,25 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) { for { select { case <-bfs.QuitChan: - log.Infof("quiting %s FillGapsInSuperNode process", bfs.chain.String()) + log.Infof("quiting %s FillGapsInSuperNode process", bfs.Chain.String()) wg.Done() return case <-ticker.C: - log.Infof("searching for gaps in the %s super node database", bfs.chain.String()) + log.Infof("searching for gaps in the %s super node database", bfs.Chain.String()) startingBlock, err := bfs.Retriever.RetrieveFirstBlockNumber() if err != nil { - log.Errorf("super node db backfill RetrieveFirstBlockNumber error for chain %s: %v", bfs.chain.String(), err) + log.Errorf("super node db backfill RetrieveFirstBlockNumber error for chain %s: %v", bfs.Chain.String(), err) continue } - if startingBlock != 0 { - log.Infof("found gap at the beginning of the %s sync", bfs.chain.String()) + if startingBlock != 0 && bfs.Chain == shared.Bitcoin || startingBlock != 1 && bfs.Chain == shared.Ethereum { + log.Infof("found gap at the beginning of the %s sync", bfs.Chain.String()) if err := bfs.backFill(0, uint64(startingBlock-1)); err != nil { log.Error(err) } } gaps, err := bfs.Retriever.RetrieveGapsInData(bfs.validationLevel) if err != nil { - log.Errorf("super node db backfill RetrieveGapsInData error for chain %s: %v", bfs.chain.String(), err) + log.Errorf("super node db backfill RetrieveGapsInData error for chain %s: %v", bfs.Chain.String(), err) continue } for _, gap := range gaps { @@ -151,15 +151,15 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) { } } }() - log.Infof("%s fillGaps goroutine successfully spun up", bfs.chain.String()) + log.Infof("%s fillGaps goroutine successfully spun up", bfs.Chain.String()) } // backFill fetches, processes, and returns utils.StorageDiffs over a range of blocks // It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error { - log.Infof("filling in %s gap from %d to %d", bfs.chain.String(), startingBlock, endingBlock) + log.Infof("filling in %s gap from %d to %d", bfs.Chain.String(), startingBlock, endingBlock) if endingBlock < startingBlock { - return fmt.Errorf("super node %s db backfill: ending block number needs to be greater than starting block number", bfs.chain.String()) + return fmt.Errorf("super node %s db backfill: ending block number needs to be greater than starting block number", bfs.Chain.String()) } // break the range up into bins of smaller ranges blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, bfs.BatchSize) @@ -184,12 +184,12 @@ func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error { go func(blockHeights []uint64) { payloads, err := bfs.Fetcher.FetchAt(blockHeights) if err != nil { - log.Errorf("%s super node historical data fetcher error: %s", bfs.chain.String(), err.Error()) + log.Errorf("%s super node historical data fetcher error: %s", bfs.Chain.String(), err.Error()) } for _, payload := range payloads { ipldPayload, err := bfs.Converter.Convert(payload) if err != nil { - log.Errorf("%s super node historical data converter error: %s", bfs.chain.String(), err.Error()) + log.Errorf("%s super node historical data converter error: %s", bfs.Chain.String(), err.Error()) } // If there is a ScreenAndServe process listening, forward payload to it select { @@ -198,14 +198,14 @@ func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error { } cidPayload, err := bfs.Publisher.Publish(ipldPayload) if err != nil { - log.Errorf("%s super node historical data publisher error: %s", bfs.chain.String(), err.Error()) + log.Errorf("%s super node historical data publisher error: %s", bfs.Chain.String(), err.Error()) } if err := bfs.Indexer.Index(cidPayload); err != nil { - log.Errorf("%s super node historical data indexer error: %s", bfs.chain.String(), err.Error()) + log.Errorf("%s super node historical data indexer error: %s", bfs.Chain.String(), err.Error()) } } // when this goroutine is done, send out a signal - log.Infof("finished filling in %s gap from %d to %d", bfs.chain.String(), blockHeights[0], blockHeights[len(blockHeights)-1]) + log.Infof("finished filling in %s gap from %d to %d", bfs.Chain.String(), blockHeights[0], blockHeights[len(blockHeights)-1]) processingDone <- true }(blockHeights) } diff --git a/pkg/super_node/backfiller_test.go b/pkg/super_node/backfiller_test.go index 236357e0..56adea4e 100644 --- a/pkg/super_node/backfiller_test.go +++ b/pkg/super_node/backfiller_test.go @@ -45,7 +45,7 @@ var _ = Describe("BackFiller", func() { ReturnErr: nil, } mockRetriever := &mocks2.CIDRetriever{ - FirstBlockNumberToReturn: 0, + FirstBlockNumberToReturn: 1, GapsToRetrieve: []shared.Gap{ { Start: 100, Stop: 101, @@ -69,6 +69,7 @@ var _ = Describe("BackFiller", func() { BatchSize: super_node.DefaultMaxBatchSize, BatchNumber: super_node.DefaultMaxBatchNumber, QuitChan: quitChan, + Chain: shared.Ethereum, } wg := &sync.WaitGroup{} backfiller.FillGapsInSuperNode(wg) @@ -101,7 +102,7 @@ var _ = Describe("BackFiller", func() { ReturnErr: nil, } mockRetriever := &mocks2.CIDRetriever{ - FirstBlockNumberToReturn: 0, + FirstBlockNumberToReturn: 1, GapsToRetrieve: []shared.Gap{ { Start: 100, Stop: 100, @@ -124,6 +125,7 @@ var _ = Describe("BackFiller", func() { BatchSize: super_node.DefaultMaxBatchSize, BatchNumber: super_node.DefaultMaxBatchNumber, QuitChan: quitChan, + Chain: shared.Ethereum, } wg := &sync.WaitGroup{} backfiller.FillGapsInSuperNode(wg) @@ -173,6 +175,7 @@ var _ = Describe("BackFiller", func() { BatchSize: super_node.DefaultMaxBatchSize, BatchNumber: super_node.DefaultMaxBatchNumber, QuitChan: quitChan, + Chain: shared.Ethereum, } wg := &sync.WaitGroup{} backfiller.FillGapsInSuperNode(wg) diff --git a/pkg/super_node/btc/cleaner.go b/pkg/super_node/btc/cleaner.go index 95a00b27..a057dddf 100644 --- a/pkg/super_node/btc/cleaner.go +++ b/pkg/super_node/btc/cleaner.go @@ -50,6 +50,9 @@ func (c *Cleaner) ResetValidation(rngs [][2]uint64) error { SET times_validated = 0 WHERE block_number BETWEEN $1 AND $2` if _, err := tx.Exec(pgStr, rng[0], rng[1]); err != nil { + if err := tx.Rollback(); err != nil { + logrus.Error(err) + } return err } } diff --git a/pkg/super_node/btc/retriever.go b/pkg/super_node/btc/retriever.go index 4a08cbfd..350917bb 100644 --- a/pkg/super_node/btc/retriever.go +++ b/pkg/super_node/btc/retriever.go @@ -20,6 +20,8 @@ import ( "fmt" "math/big" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/lib/pq" "github.com/ethereum/go-ethereum/common" @@ -194,22 +196,7 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, if len(heights) == 0 { return emptyGaps, nil } - validationGaps := make([]shared.Gap, 0) - start := heights[0] - lastHeight := start - for _, height := range heights[1:] { - if height == lastHeight+1 { - lastHeight = height - continue - } - validationGaps = append(validationGaps, shared.Gap{ - Start: start, - Stop: lastHeight, - }) - start = height - lastHeight = start - } - return append(emptyGaps, validationGaps...), nil + return append(emptyGaps, utils.MissingHeightsToGaps(heights)...), nil } // RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash diff --git a/pkg/super_node/config.go b/pkg/super_node/config.go index f0bda9ad..be3d09c3 100644 --- a/pkg/super_node/config.go +++ b/pkg/super_node/config.go @@ -91,13 +91,6 @@ func NewSuperNodeConfig() (*Config, error) { viper.BindEnv("superNode.ipcPath", SUPERNODE_IPC_PATH) viper.BindEnv("superNode.httpPath", SUPERNODE_HTTP_PATH) viper.BindEnv("superNode.backFill", SUPERNODE_BACKFILL) - viper.BindEnv("superNode.timeout", shared.HTTP_TIMEOUT) - - timeout := viper.GetInt("superNode.timeout") - if timeout < 15 { - timeout = 15 - } - c.Timeout = time.Second * time.Duration(timeout) chain := viper.GetString("superNode.chain") c.Chain, err = shared.NewChainType(chain) @@ -178,6 +171,13 @@ func (c *Config) BackFillFields() error { viper.BindEnv("superNode.batchSize", SUPERNODE_BATCH_SIZE) viper.BindEnv("superNode.batchNumber", SUPERNODE_BATCH_NUMBER) viper.BindEnv("superNode.validationLevel", SUPERNODE_VALIDATION_LEVEL) + viper.BindEnv("superNode.timeout", shared.HTTP_TIMEOUT) + + timeout := viper.GetInt("superNode.timeout") + if timeout < 15 { + timeout = 15 + } + c.Timeout = time.Second * time.Duration(timeout) switch c.Chain { case shared.Ethereum: diff --git a/pkg/super_node/constructors.go b/pkg/super_node/constructors.go index 3c4a2532..fa62fb7d 100644 --- a/pkg/super_node/constructors.go +++ b/pkg/super_node/constructors.go @@ -21,12 +21,10 @@ import ( "time" "github.com/btcsuite/btcd/chaincfg" - "github.com/btcsuite/btcd/rpcclient" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" - "github.com/vulcanize/vulcanizedb/pkg/eth/core" "github.com/vulcanize/vulcanizedb/pkg/postgres" "github.com/vulcanize/vulcanizedb/pkg/super_node/btc" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" @@ -73,10 +71,9 @@ func NewCIDRetriever(chain shared.ChainType, db *postgres.DB) (shared.CIDRetriev func NewPayloadStreamer(chain shared.ChainType, clientOrConfig interface{}) (shared.PayloadStreamer, chan shared.RawChainData, error) { switch chain { case shared.Ethereum: - ethClient, ok := clientOrConfig.(core.RPCClient) + ethClient, ok := clientOrConfig.(*rpc.Client) if !ok { - var expectedClientType core.RPCClient - return nil, nil, fmt.Errorf("ethereum payload streamer constructor expected client type %T got %T", expectedClientType, clientOrConfig) + return nil, nil, fmt.Errorf("ethereum payload streamer constructor expected client type %T got %T", &rpc.Client{}, clientOrConfig) } streamChan := make(chan shared.RawChainData, eth.PayloadChanBufferSize) return eth.NewPayloadStreamer(ethClient), streamChan, nil @@ -96,10 +93,9 @@ func NewPayloadStreamer(chain shared.ChainType, clientOrConfig interface{}) (sha func NewPaylaodFetcher(chain shared.ChainType, client interface{}, timeout time.Duration) (shared.PayloadFetcher, error) { switch chain { case shared.Ethereum: - batchClient, ok := client.(eth.BatchClient) + batchClient, ok := client.(*rpc.Client) if !ok { - var expectedClient eth.BatchClient - return nil, fmt.Errorf("ethereum payload fetcher constructor expected client type %T got %T", expectedClient, client) + return nil, fmt.Errorf("ethereum payload fetcher constructor expected client type %T got %T", &rpc.Client{}, client) } return eth.NewPayloadFetcher(batchClient, timeout), nil case shared.Bitcoin: diff --git a/pkg/super_node/eth/cleaner.go b/pkg/super_node/eth/cleaner.go index c07dd063..37ebcba6 100644 --- a/pkg/super_node/eth/cleaner.go +++ b/pkg/super_node/eth/cleaner.go @@ -50,6 +50,9 @@ func (c *Cleaner) ResetValidation(rngs [][2]uint64) error { SET times_validated = 0 WHERE block_number BETWEEN $1 AND $2` if _, err := tx.Exec(pgStr, rng[0], rng[1]); err != nil { + if err := tx.Rollback(); err != nil { + logrus.Error(err) + } return err } } diff --git a/pkg/super_node/eth/payload_fetcher.go b/pkg/super_node/eth/payload_fetcher.go index fece7c10..b1f33649 100644 --- a/pkg/super_node/eth/payload_fetcher.go +++ b/pkg/super_node/eth/payload_fetcher.go @@ -21,15 +21,15 @@ import ( "fmt" "time" + "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff" - "github.com/vulcanize/vulcanizedb/pkg/eth/client" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) // BatchClient is an interface to a batch-fetching geth rpc client; created to allow mock insertion type BatchClient interface { - BatchCallContext(ctx context.Context, batch []client.BatchElem) error + BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error } // PayloadFetcher satisfies the PayloadFetcher interface for ethereum @@ -53,9 +53,9 @@ func NewPayloadFetcher(bc BatchClient, timeout time.Duration) *PayloadFetcher { // FetchAt fetches the statediff payloads at the given block heights // Calls StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error) func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) { - batch := make([]client.BatchElem, 0) + batch := make([]rpc.BatchElem, 0) for _, height := range blockHeights { - batch = append(batch, client.BatchElem{ + batch = append(batch, rpc.BatchElem{ Method: method, Args: []interface{}{height}, Result: new(statediff.Payload), diff --git a/pkg/super_node/eth/retriever.go b/pkg/super_node/eth/retriever.go index 5baa2c72..539052db 100644 --- a/pkg/super_node/eth/retriever.go +++ b/pkg/super_node/eth/retriever.go @@ -20,6 +20,8 @@ import ( "fmt" "math/big" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/jmoiron/sqlx" @@ -479,22 +481,7 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, if len(heights) == 0 { return emptyGaps, nil } - validationGaps := make([]shared.Gap, 0) - start := heights[0] - lastHeight := start - for _, height := range heights[1:] { - if height == lastHeight+1 { - lastHeight = height - continue - } - validationGaps = append(validationGaps, shared.Gap{ - Start: start, - Stop: lastHeight, - }) - start = height - lastHeight = start - } - return append(emptyGaps, validationGaps...), nil + return append(emptyGaps, utils.MissingHeightsToGaps(heights)...), nil } // RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash diff --git a/pkg/super_node/eth/retriever_test.go b/pkg/super_node/eth/retriever_test.go index 71f4b1d8..444a5d24 100644 --- a/pkg/super_node/eth/retriever_test.go +++ b/pkg/super_node/eth/retriever_test.go @@ -544,8 +544,16 @@ var _ = Describe("Retriever", func() { payload4.HeaderCID.BlockNumber = "101" payload5 := payload4 payload5.HeaderCID.BlockNumber = "102" - payload6 := payload5 - payload6.HeaderCID.BlockNumber = "1000" + payload6 := payload4 + payload6.HeaderCID.BlockNumber = "103" + payload7 := payload4 + payload7.HeaderCID.BlockNumber = "104" + payload8 := payload4 + payload8.HeaderCID.BlockNumber = "105" + payload9 := payload4 + payload9.HeaderCID.BlockNumber = "106" + payload10 := payload5 + payload10.HeaderCID.BlockNumber = "1000" err := repo.Index(&payload1) Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload2) @@ -558,11 +566,76 @@ var _ = Describe("Retriever", func() { Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload6) Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload7) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload8) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload9) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload10) + Expect(err).ToNot(HaveOccurred()) + gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) Expect(len(gaps)).To(Equal(3)) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue()) - Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 103, Stop: 999})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 107, Stop: 999})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue()) + }) + + It("Finds validation level gaps", func() { + payload1 := *mocks.MockCIDPayload + payload1.HeaderCID.BlockNumber = "1010101" + payload2 := payload1 + payload2.HeaderCID.BlockNumber = "5" + payload3 := payload2 + payload3.HeaderCID.BlockNumber = "100" + payload4 := payload3 + payload4.HeaderCID.BlockNumber = "101" + payload5 := payload4 + payload5.HeaderCID.BlockNumber = "102" + payload6 := payload4 + payload6.HeaderCID.BlockNumber = "103" + payload7 := payload4 + payload7.HeaderCID.BlockNumber = "104" + payload8 := payload4 + payload8.HeaderCID.BlockNumber = "105" + payload9 := payload4 + payload9.HeaderCID.BlockNumber = "106" + payload10 := payload5 + payload10.HeaderCID.BlockNumber = "1000" + err := repo.Index(&payload1) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload2) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload3) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload4) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload5) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload6) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload7) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload8) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload9) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload10) + Expect(err).ToNot(HaveOccurred()) + + cleaner := eth.NewCleaner(db) + err = cleaner.ResetValidation([][2]uint64{{101, 102}, {104, 104}}) + Expect(err).ToNot(HaveOccurred()) + + gaps, err := retriever.RetrieveGapsInData(1) + Expect(err).ToNot(HaveOccurred()) + Expect(len(gaps)).To(Equal(5)) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 101, Stop: 102})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 104, Stop: 104})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 107, Stop: 999})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue()) }) }) diff --git a/pkg/super_node/eth/streamer.go b/pkg/super_node/eth/streamer.go index 123bc911..c16ad6fe 100644 --- a/pkg/super_node/eth/streamer.go +++ b/pkg/super_node/eth/streamer.go @@ -17,10 +17,12 @@ package eth import ( + "context" + + "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff" "github.com/sirupsen/logrus" - "github.com/vulcanize/vulcanizedb/pkg/eth/core" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) @@ -28,13 +30,18 @@ const ( PayloadChanBufferSize = 20000 // the max eth sub buffer size ) +// StreamClient is an interface for subscribing and streaming from geth +type StreamClient interface { + Subscribe(ctx context.Context, namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error) +} + // PayloadStreamer satisfies the PayloadStreamer interface for ethereum type PayloadStreamer struct { - Client core.RPCClient + Client StreamClient } // NewPayloadStreamer creates a pointer to a new PayloadStreamer which satisfies the PayloadStreamer interface for ethereum -func NewPayloadStreamer(client core.RPCClient) *PayloadStreamer { +func NewPayloadStreamer(client StreamClient) *PayloadStreamer { return &PayloadStreamer{ Client: client, } @@ -53,5 +60,5 @@ func (ps *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared. } } }() - return ps.Client.Subscribe("statediff", stateDiffChan, "stream") + return ps.Client.Subscribe(context.Background(), "statediff", stateDiffChan, "stream") } diff --git a/pkg/super_node/eth/streamer_test.go b/pkg/super_node/eth/streamer_test.go index 0e47041c..d6c014f6 100644 --- a/pkg/super_node/eth/streamer_test.go +++ b/pkg/super_node/eth/streamer_test.go @@ -18,14 +18,14 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/eth/fakes" + "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) var _ = Describe("StateDiff Streamer", func() { It("subscribes to the geth statediff service", func() { - client := &fakes.MockRPCClient{} + client := &mocks.StreamClient{} streamer := eth.NewPayloadStreamer(client) payloadChan := make(chan shared.RawChainData) _, err := streamer.Stream(payloadChan)