From 5fb1cc06969b5020319b5dfbb4c5e67703f57705 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Tue, 12 May 2020 14:53:50 -0500 Subject: [PATCH] refactor- focus on simplifying concurrent fetching; graceful shutdown for superNode command --- cmd/superNode.go | 10 +- pkg/eth/trie_validator/service.go | 17 +++ pkg/super_node/backfiller.go | 165 ++++++++++------------- pkg/super_node/backfiller_test.go | 10 +- pkg/super_node/btc/cid_retriever.go | 63 +++++---- pkg/super_node/config.go | 3 - pkg/super_node/eth/cid_retriever.go | 17 ++- pkg/super_node/eth/cid_retriever_test.go | 66 ++++++--- pkg/super_node/resync/config.go | 3 - pkg/super_node/resync/service.go | 115 +++++++--------- pkg/super_node/service.go | 95 +++++++------ pkg/super_node/service_test.go | 2 +- 12 files changed, 304 insertions(+), 262 deletions(-) create mode 100644 pkg/eth/trie_validator/service.go diff --git a/cmd/superNode.go b/cmd/superNode.go index 02f35420..849cfe6f 100644 --- a/cmd/superNode.go +++ b/cmd/superNode.go @@ -16,6 +16,8 @@ package cmd import ( + "os" + "os/signal" "sync" "github.com/ethereum/go-ethereum/rpc" @@ -85,15 +87,21 @@ func superNode() { logWithCommand.Fatal(err) } } + var backFiller super_node.BackFillInterface if superNodeConfig.BackFill { logWithCommand.Debug("initializing new super node backfill service") - backFiller, err := super_node.NewBackFillService(superNodeConfig, forwardPayloadChan) + backFiller, err = super_node.NewBackFillService(superNodeConfig, forwardPayloadChan) if err != nil { logWithCommand.Fatal(err) } logWithCommand.Info("starting up super node backfill process") backFiller.BackFill(wg) } + shutdown := make(chan os.Signal) + signal.Notify(shutdown, os.Interrupt) + <-shutdown + backFiller.Stop() + superNode.Stop() wg.Wait() } diff --git a/pkg/eth/trie_validator/service.go b/pkg/eth/trie_validator/service.go new file mode 100644 index 00000000..25f93839 --- /dev/null +++ b/pkg/eth/trie_validator/service.go @@ -0,0 +1,17 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package trie_validator diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index 6355065e..ec227f9a 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -17,9 +17,7 @@ package super_node import ( - "fmt" "sync" - "sync/atomic" "time" log "github.com/sirupsen/logrus" @@ -37,6 +35,7 @@ const ( type BackFillInterface interface { // Method for the super node to periodically check for and fill in gaps in its data using an archival node BackFill(wg *sync.WaitGroup) + Stop() error } // BackFillService for filling in gaps in the super node @@ -62,7 +61,7 @@ type BackFillService struct { // Channel for receiving quit signal QuitChan chan bool // Chain type - Chain shared.ChainType + chain shared.ChainType // Headers with times_validated lower than this will be resynced validationLevel int } @@ -107,8 +106,8 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert BatchSize: batchSize, BatchNumber: int64(batchNumber), ScreenAndServeChan: screenAndServeChan, - QuitChan: settings.Quit, - Chain: settings.Chain, + QuitChan: make(chan bool), + chain: settings.Chain, validationLevel: settings.ValidationLevel, }, nil } @@ -116,119 +115,97 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert // BackFill periodically checks for and fills in gaps in the super node db func (bfs *BackFillService) BackFill(wg *sync.WaitGroup) { ticker := time.NewTicker(bfs.GapCheckFrequency) - wg.Add(1) - go func() { + wg.Add(1) + defer wg.Done() for { select { case <-bfs.QuitChan: - log.Infof("quiting %s FillGapsInSuperNode process", bfs.Chain.String()) - wg.Done() + log.Infof("quiting %s FillGapsInSuperNode process", bfs.chain.String()) return case <-ticker.C: - log.Infof("searching for gaps in the %s super node database", bfs.Chain.String()) - startingBlock, err := bfs.Retriever.RetrieveFirstBlockNumber() - if err != nil { - log.Errorf("super node db backfill RetrieveFirstBlockNumber error for chain %s: %v", bfs.Chain.String(), err) - continue - } - if startingBlock != 0 { - log.Infof("found gap at the beginning of the %s sync", bfs.Chain.String()) - if err := bfs.backFill(0, uint64(startingBlock-1)); err != nil { - log.Error(err) - } + // spin up worker goroutines for this search pass + // we start and kill a new batch of workers for each pass + // so that we know each of the previous workers is done before we search for new gaps + heightsChan := make(chan []uint64) + for i := 1; i <= int(bfs.BatchNumber); i++ { + go bfs.backFill(wg, i, heightsChan) } gaps, err := bfs.Retriever.RetrieveGapsInData(bfs.validationLevel) if err != nil { - log.Errorf("super node db backfill RetrieveGapsInData error for chain %s: %v", bfs.Chain.String(), err) + log.Errorf("%s super node db backFill RetrieveGapsInData error: %v", bfs.chain.String(), err) continue } for _, gap := range gaps { - if err := bfs.backFill(gap.Start, gap.Stop); err != nil { - log.Error(err) - } - } - } - } - }() - log.Infof("%s BackFill goroutine successfully spun up", bfs.Chain.String()) -} - -// backFill fetches, processes, and returns utils.StorageDiffs over a range of blocks -// It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently -func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error { - log.Infof("filling in %s gap from %d to %d", bfs.Chain.String(), startingBlock, endingBlock) - if endingBlock < startingBlock { - return fmt.Errorf("super node %s db backfill: ending block number needs to be greater than starting block number", bfs.Chain.String()) - } - // break the range up into bins of smaller ranges - blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, bfs.BatchSize) - if err != nil { - return err - } - // int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have - var activeCount int64 - // channel for processing goroutines to signal when they are done - processingDone := make(chan bool) - forwardDone := make(chan bool) - - // for each block range bin spin up a goroutine to batch fetch and process data for that range - go func() { - for _, blockHeights := range blockRangeBins { - // if we have reached our limit of active goroutines - // wait for one to finish before starting the next - if atomic.AddInt64(&activeCount, 1) > bfs.BatchNumber { - // this blocks until a process signals it has finished - <-forwardDone - } - go func(blockHeights []uint64) { - payloads, err := bfs.Fetcher.FetchAt(blockHeights) - if err != nil { - log.Errorf("%s super node historical data fetcher error: %s", bfs.Chain.String(), err.Error()) - } - for _, payload := range payloads { - ipldPayload, err := bfs.Converter.Convert(payload) + log.Infof("backFilling %s data from %d to %d", bfs.chain.String(), gap.Start, gap.Stop) + blockRangeBins, err := utils.GetBlockHeightBins(gap.Start, gap.Stop, bfs.BatchSize) if err != nil { - log.Errorf("%s super node historical data converter error: %s", bfs.Chain.String(), err.Error()) - } - // If there is a ScreenAndServe process listening, forward payload to it - select { - case bfs.ScreenAndServeChan <- ipldPayload: - default: - } - cidPayload, err := bfs.Publisher.Publish(ipldPayload) - if err != nil { - log.Errorf("%s super node historical data publisher error: %s", bfs.Chain.String(), err.Error()) + log.Errorf("%s super node db backFill GetBlockHeightBins error: %v", bfs.chain.String(), err) continue } - if err := bfs.Indexer.Index(cidPayload); err != nil { - log.Errorf("%s super node historical data indexer error: %s", bfs.Chain.String(), err.Error()) + for _, heights := range blockRangeBins { + select { + case <-bfs.QuitChan: + log.Infof("quiting %s BackFill process", bfs.chain.String()) + return + default: + heightsChan <- heights + } } } - // when this goroutine is done, send out a signal - log.Infof("finished filling in %s gap from %d to %d", bfs.Chain.String(), blockHeights[0], blockHeights[len(blockHeights)-1]) - processingDone <- true - }(blockHeights) + // send a quit signal to each worker + // this blocks until each worker has finished its current task and is free to receive from the quit channel + for i := 1; i <= int(bfs.BatchNumber); i++ { + bfs.QuitChan <- true + } + } } }() + log.Infof("%s BackFill goroutine successfully spun up", bfs.chain.String()) +} - // listen on the processingDone chan - // keeps track of the number of processing goroutines that have finished - // when they have all finished, return - goroutinesFinished := 0 +func (bfs *BackFillService) backFill(wg *sync.WaitGroup, id int, heightChan chan []uint64) { + wg.Add(1) + defer wg.Done() for { select { - case <-processingDone: - atomic.AddInt64(&activeCount, -1) - select { - // if we are waiting for a process to finish, signal that one has - case forwardDone <- true: - default: + case heights := <-heightChan: + log.Debugf("%s backFill worker %d processing section from %d to %d", bfs.chain.String(), id, heights[0], heights[len(heights)-1]) + payloads, err := bfs.Fetcher.FetchAt(heights) + if err != nil { + log.Errorf("%s backFill worker %d fetcher error: %s", bfs.chain.String(), id, err.Error()) } - goroutinesFinished++ - if goroutinesFinished >= len(blockRangeBins) { - return nil + for _, payload := range payloads { + ipldPayload, err := bfs.Converter.Convert(payload) + if err != nil { + log.Errorf("%s backFill worker %d converter error: %s", bfs.chain.String(), id, err.Error()) + } + // If there is a ScreenAndServe process listening, forward converted payload to it + select { + case bfs.ScreenAndServeChan <- ipldPayload: + log.Debugf("%s backFill worker %d forwarded converted payload to server", bfs.chain.String(), id) + default: + log.Debugf("%s backFill worker %d unable to forward converted payload to server; no channel ready to receive", bfs.chain.String(), id) + } + cidPayload, err := bfs.Publisher.Publish(ipldPayload) + if err != nil { + log.Errorf("%s backFill worker %d publisher error: %s", bfs.chain.String(), id, err.Error()) + continue + } + if err := bfs.Indexer.Index(cidPayload); err != nil { + log.Errorf("%s backFill worker %d indexer error: %s", bfs.chain.String(), id, err.Error()) + } } + log.Infof("%s backFill worker %d finished section from %d to %d", bfs.chain.String(), id, heights[0], heights[len(heights)-1]) + case <-bfs.QuitChan: + log.Infof("%s backFill worker %d shutting down", bfs.chain.String(), id) + return } } } + +func (bfs *BackFillService) Stop() error { + log.Infof("Stopping %s backFill service", bfs.chain.String()) + close(bfs.QuitChan) + return nil +} diff --git a/pkg/super_node/backfiller_test.go b/pkg/super_node/backfiller_test.go index 1df2db68..e0e804ff 100644 --- a/pkg/super_node/backfiller_test.go +++ b/pkg/super_node/backfiller_test.go @@ -69,7 +69,6 @@ var _ = Describe("BackFiller", func() { BatchSize: super_node.DefaultMaxBatchSize, BatchNumber: super_node.DefaultMaxBatchNumber, QuitChan: quitChan, - Chain: shared.Ethereum, } wg := &sync.WaitGroup{} backfiller.BackFill(wg) @@ -125,7 +124,6 @@ var _ = Describe("BackFiller", func() { BatchSize: super_node.DefaultMaxBatchSize, BatchNumber: super_node.DefaultMaxBatchNumber, QuitChan: quitChan, - Chain: shared.Ethereum, } wg := &sync.WaitGroup{} backfiller.BackFill(wg) @@ -156,7 +154,12 @@ var _ = Describe("BackFiller", func() { } mockRetriever := &mocks2.CIDRetriever{ FirstBlockNumberToReturn: 3, - GapsToRetrieve: []shared.Gap{}, + GapsToRetrieve: []shared.Gap{ + { + Start: 0, + Stop: 2, + }, + }, } mockFetcher := &mocks2.PayloadFetcher{ PayloadsToReturn: map[uint64]shared.RawChainData{ @@ -175,7 +178,6 @@ var _ = Describe("BackFiller", func() { BatchSize: super_node.DefaultMaxBatchSize, BatchNumber: super_node.DefaultMaxBatchNumber, QuitChan: quitChan, - Chain: shared.Ethereum, } wg := &sync.WaitGroup{} backfiller.BackFill(wg) diff --git a/pkg/super_node/btc/cid_retriever.go b/pkg/super_node/btc/cid_retriever.go index ce6f7630..f097d642 100644 --- a/pkg/super_node/btc/cid_retriever.go +++ b/pkg/super_node/btc/cid_retriever.go @@ -44,21 +44,21 @@ func NewCIDRetriever(db *postgres.DB) *CIDRetriever { } // RetrieveFirstBlockNumber is used to retrieve the first block number in the db -func (ecr *CIDRetriever) RetrieveFirstBlockNumber() (int64, error) { +func (bcr *CIDRetriever) RetrieveFirstBlockNumber() (int64, error) { var blockNumber int64 - err := ecr.db.Get(&blockNumber, "SELECT block_number FROM btc.header_cids ORDER BY block_number ASC LIMIT 1") + err := bcr.db.Get(&blockNumber, "SELECT block_number FROM btc.header_cids ORDER BY block_number ASC LIMIT 1") return blockNumber, err } // RetrieveLastBlockNumber is used to retrieve the latest block number in the db -func (ecr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) { +func (bcr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) { var blockNumber int64 - err := ecr.db.Get(&blockNumber, "SELECT block_number FROM btc.header_cids ORDER BY block_number DESC LIMIT 1 ") + err := bcr.db.Get(&blockNumber, "SELECT block_number FROM btc.header_cids ORDER BY block_number DESC LIMIT 1 ") return blockNumber, err } // Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters -func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) ([]shared.CIDsForFetching, bool, error) { +func (bcr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) ([]shared.CIDsForFetching, bool, error) { streamFilter, ok := filter.(*SubscriptionSettings) if !ok { return nil, true, fmt.Errorf("btc retriever expected filter type %T got %T", &SubscriptionSettings{}, filter) @@ -66,7 +66,7 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe log.Debug("retrieving cids") // Begin new db tx - tx, err := ecr.db.Beginx() + tx, err := bcr.db.Beginx() if err != nil { return nil, true, err } @@ -82,7 +82,7 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe }() // Retrieve cached header CIDs - headers, err := ecr.RetrieveHeaderCIDs(tx, blockNumber) + headers, err := bcr.RetrieveHeaderCIDs(tx, blockNumber) if err != nil { log.Error("header cid retrieval error") return nil, true, err @@ -98,7 +98,7 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe } // Retrieve cached trx CIDs if !streamFilter.TxFilter.Off { - cw.Transactions, err = ecr.RetrieveTxCIDs(tx, streamFilter.TxFilter, header.ID) + cw.Transactions, err = bcr.RetrieveTxCIDs(tx, streamFilter.TxFilter, header.ID) if err != nil { log.Error("transaction cid retrieval error") return nil, true, err @@ -114,7 +114,7 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe } // RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight -func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]HeaderModel, error) { +func (bcr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]HeaderModel, error) { log.Debug("retrieving header cids for block ", blockNumber) headers := make([]HeaderModel, 0) pgStr := `SELECT * FROM btc.header_cids @@ -124,7 +124,7 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]H // RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters // also returns the ids for the returned transaction cids -func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID int64) ([]TxModel, error) { +func (bcr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID int64) ([]TxModel, error) { log.Debug("retrieving transaction cids for header id ", headerID) args := make([]interface{}, 0, 3) results := make([]TxModel, 0) @@ -168,7 +168,22 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID } // RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db -func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, error) { +func (bcr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, error) { + log.Info("searching for gaps in the btc super node database") + startingBlock, err := bcr.RetrieveFirstBlockNumber() + if err != nil { + return nil, fmt.Errorf("btc CIDRetriever RetrieveFirstBlockNumber error: %v", err) + } + var initialGap []shared.Gap + if startingBlock != 0 { + stop := uint64(startingBlock - 1) + log.Infof("found gap at the beginning of the btc sync from 0 to %d", stop) + initialGap = []shared.Gap{{ + Start: 0, + Stop: stop, + }} + } + pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM btc.header_cids LEFT JOIN btc.header_cids r on btc.header_cids.block_number = r.block_number - 1 LEFT JOIN btc.header_cids fr on btc.header_cids.block_number < fr.block_number @@ -178,7 +193,7 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, Start uint64 `db:"start"` Stop uint64 `db:"stop"` }, 0) - if err := ecr.db.Select(&results, pgStr); err != nil && err != sql.ErrNoRows { + if err := bcr.db.Select(&results, pgStr); err != nil && err != sql.ErrNoRows { return nil, err } emptyGaps := make([]shared.Gap, len(results)) @@ -195,18 +210,18 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, WHERE times_validated < $1 ORDER BY block_number` var heights []uint64 - if err := ecr.db.Select(&heights, pgStr, validationLevel); err != nil && err != sql.ErrNoRows { + if err := bcr.db.Select(&heights, pgStr, validationLevel); err != nil && err != sql.ErrNoRows { return nil, err } - return append(emptyGaps, utils.MissingHeightsToGaps(heights)...), nil + return append(append(initialGap, emptyGaps...), utils.MissingHeightsToGaps(heights)...), nil } // RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash -func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel, []TxModel, error) { +func (bcr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel, []TxModel, error) { log.Debug("retrieving block cids for block hash ", blockHash.String()) // Begin new db tx - tx, err := ecr.db.Beginx() + tx, err := bcr.db.Beginx() if err != nil { return HeaderModel{}, nil, err } @@ -221,12 +236,12 @@ func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel } }() - headerCID, err := ecr.RetrieveHeaderCIDByHash(tx, blockHash) + headerCID, err := bcr.RetrieveHeaderCIDByHash(tx, blockHash) if err != nil { log.Error("header cid retrieval error") return HeaderModel{}, nil, err } - txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.ID) + txCIDs, err := bcr.RetrieveTxCIDsByHeaderID(tx, headerCID.ID) if err != nil { log.Error("tx cid retrieval error") } @@ -234,11 +249,11 @@ func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel } // RetrieveBlockByNumber returns all of the CIDs needed to compose an entire block, for a given block number -func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, []TxModel, error) { +func (bcr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, []TxModel, error) { log.Debug("retrieving block cids for block number ", blockNumber) // Begin new db tx - tx, err := ecr.db.Beginx() + tx, err := bcr.db.Beginx() if err != nil { return HeaderModel{}, nil, err } @@ -253,7 +268,7 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, } }() - headerCID, err := ecr.RetrieveHeaderCIDs(tx, blockNumber) + headerCID, err := bcr.RetrieveHeaderCIDs(tx, blockNumber) if err != nil { log.Error("header cid retrieval error") return HeaderModel{}, nil, err @@ -261,7 +276,7 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, if len(headerCID) < 1 { return HeaderModel{}, nil, fmt.Errorf("header cid retrieval error, no header CIDs found at block %d", blockNumber) } - txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].ID) + txCIDs, err := bcr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].ID) if err != nil { log.Error("tx cid retrieval error") } @@ -269,7 +284,7 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, } // RetrieveHeaderCIDByHash returns the header for the given block hash -func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (HeaderModel, error) { +func (bcr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (HeaderModel, error) { log.Debug("retrieving header cids for block hash ", blockHash.String()) pgStr := `SELECT * FROM btc.header_cids WHERE block_hash = $1` @@ -278,7 +293,7 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.H } // RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id -func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]TxModel, error) { +func (bcr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]TxModel, error) { log.Debug("retrieving tx cids for block id ", headerID) pgStr := `SELECT * FROM btc.transaction_cids WHERE header_id = $1` diff --git a/pkg/super_node/config.go b/pkg/super_node/config.go index ba1547b7..ba582160 100644 --- a/pkg/super_node/config.go +++ b/pkg/super_node/config.go @@ -66,7 +66,6 @@ type Config struct { IPFSPath string IPFSMode shared.IPFSMode DBConfig config.Database - Quit chan bool // Server fields Serve bool ServeDBConn *postgres.DB @@ -182,8 +181,6 @@ func NewSuperNodeConfig() (*Config, error) { } } - c.Quit = make(chan bool) - return c, nil } diff --git a/pkg/super_node/eth/cid_retriever.go b/pkg/super_node/eth/cid_retriever.go index e7acf571..d87541c7 100644 --- a/pkg/super_node/eth/cid_retriever.go +++ b/pkg/super_node/eth/cid_retriever.go @@ -443,6 +443,21 @@ func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageF // RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db // it finds the union of heights where no data exists and where the times_validated is lower than the validation level func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, error) { + log.Info("searching for gaps in the eth super node database") + startingBlock, err := ecr.RetrieveFirstBlockNumber() + if err != nil { + return nil, fmt.Errorf("eth CIDRetriever RetrieveFirstBlockNumber error: %v", err) + } + var initialGap []shared.Gap + if startingBlock != 0 { + stop := uint64(startingBlock - 1) + log.Infof("found gap at the beginning of the eth sync from 0 to %d", stop) + initialGap = []shared.Gap{{ + Start: 0, + Stop: stop, + }} + } + pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM eth.header_cids LEFT JOIN eth.header_cids r on eth.header_cids.block_number = r.block_number - 1 LEFT JOIN eth.header_cids fr on eth.header_cids.block_number < fr.block_number @@ -472,7 +487,7 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, if err := ecr.db.Select(&heights, pgStr, validationLevel); err != nil && err != sql.ErrNoRows { return nil, err } - return append(emptyGaps, utils.MissingHeightsToGaps(heights)...), nil + return append(append(initialGap, emptyGaps...), utils.MissingHeightsToGaps(heights)...), nil } // RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash diff --git a/pkg/super_node/eth/cid_retriever_test.go b/pkg/super_node/eth/cid_retriever_test.go index 7a2a4a7c..ecbf7311 100644 --- a/pkg/super_node/eth/cid_retriever_test.go +++ b/pkg/super_node/eth/cid_retriever_test.go @@ -474,54 +474,64 @@ var _ = Describe("Retriever", func() { Describe("RetrieveGapsInData", func() { It("Doesn't return gaps if there are none", func() { + payload0 := *mocks.MockCIDPayload + payload0.HeaderCID.BlockNumber = "0" payload1 := *mocks.MockCIDPayload - payload1.HeaderCID.BlockNumber = "2" + payload1.HeaderCID.BlockNumber = "1" payload2 := payload1 - payload2.HeaderCID.BlockNumber = "3" - err := repo.Index(mocks.MockCIDPayload) + payload2.HeaderCID.BlockNumber = "2" + payload3 := payload2 + payload3.HeaderCID.BlockNumber = "3" + err := repo.Index(&payload0) Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload1) Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload2) Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload3) + Expect(err).ToNot(HaveOccurred()) gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) Expect(len(gaps)).To(Equal(0)) }) - It("Doesn't return the gap from 0 to the earliest block", func() { + It("Returns the gap from 0 to the earliest block", func() { payload := *mocks.MockCIDPayload payload.HeaderCID.BlockNumber = "5" err := repo.Index(&payload) Expect(err).ToNot(HaveOccurred()) gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) - Expect(len(gaps)).To(Equal(0)) + Expect(len(gaps)).To(Equal(1)) + Expect(gaps[0].Start).To(Equal(uint64(0))) + Expect(gaps[0].Stop).To(Equal(uint64(4))) }) It("Can handle single block gaps", func() { + payload0 := *mocks.MockCIDPayload + payload0.HeaderCID.BlockNumber = "0" payload1 := *mocks.MockCIDPayload - payload1.HeaderCID.BlockNumber = "2" - payload2 := payload1 - payload2.HeaderCID.BlockNumber = "4" - err := repo.Index(mocks.MockCIDPayload) + payload1.HeaderCID.BlockNumber = "1" + payload3 := payload1 + payload3.HeaderCID.BlockNumber = "3" + err := repo.Index(&payload0) Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload1) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload2) + err = repo.Index(&payload3) Expect(err).ToNot(HaveOccurred()) gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) Expect(len(gaps)).To(Equal(1)) - Expect(gaps[0].Start).To(Equal(uint64(3))) - Expect(gaps[0].Stop).To(Equal(uint64(3))) + Expect(gaps[0].Start).To(Equal(uint64(2))) + Expect(gaps[0].Stop).To(Equal(uint64(2))) }) It("Finds gap between two entries", func() { payload1 := *mocks.MockCIDPayload payload1.HeaderCID.BlockNumber = "1010101" payload2 := payload1 - payload2.HeaderCID.BlockNumber = "5" + payload2.HeaderCID.BlockNumber = "0" err := repo.Index(&payload1) Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload2) @@ -529,13 +539,15 @@ var _ = Describe("Retriever", func() { gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) Expect(len(gaps)).To(Equal(1)) - Expect(gaps[0].Start).To(Equal(uint64(6))) + Expect(gaps[0].Start).To(Equal(uint64(1))) Expect(gaps[0].Stop).To(Equal(uint64(1010100))) }) It("Finds gaps between multiple entries", func() { - payload1 := *mocks.MockCIDPayload - payload1.HeaderCID.BlockNumber = "1010101" + payload := *mocks.MockCIDPayload + payload.HeaderCID.BlockNumber = "1010101" + payload1 := payload + payload1.HeaderCID.BlockNumber = "1" payload2 := payload1 payload2.HeaderCID.BlockNumber = "5" payload3 := payload2 @@ -554,7 +566,9 @@ var _ = Describe("Retriever", func() { payload9.HeaderCID.BlockNumber = "106" payload10 := payload5 payload10.HeaderCID.BlockNumber = "1000" - err := repo.Index(&payload1) + err := repo.Index(&payload) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload1) Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload2) Expect(err).ToNot(HaveOccurred()) @@ -577,15 +591,19 @@ var _ = Describe("Retriever", func() { gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) - Expect(len(gaps)).To(Equal(3)) + Expect(len(gaps)).To(Equal(5)) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 0, Stop: 0})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 2, Stop: 4})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 107, Stop: 999})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue()) }) It("Finds validation level gaps", func() { - payload1 := *mocks.MockCIDPayload - payload1.HeaderCID.BlockNumber = "1010101" + payload := *mocks.MockCIDPayload + payload.HeaderCID.BlockNumber = "1010101" + payload1 := payload + payload1.HeaderCID.BlockNumber = "1" payload2 := payload1 payload2.HeaderCID.BlockNumber = "5" payload3 := payload2 @@ -610,7 +628,9 @@ var _ = Describe("Retriever", func() { payload12.HeaderCID.BlockNumber = "109" payload13 := payload5 payload13.HeaderCID.BlockNumber = "1000" - err := repo.Index(&payload1) + err := repo.Index(&payload) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload1) Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload2) Expect(err).ToNot(HaveOccurred()) @@ -643,7 +663,9 @@ var _ = Describe("Retriever", func() { gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) - Expect(len(gaps)).To(Equal(6)) + Expect(len(gaps)).To(Equal(8)) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 0, Stop: 0})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 2, Stop: 4})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 101, Stop: 102})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 104, Stop: 104})).To(BeTrue()) diff --git a/pkg/super_node/resync/config.go b/pkg/super_node/resync/config.go index e43460a6..22b77b83 100644 --- a/pkg/super_node/resync/config.go +++ b/pkg/super_node/resync/config.go @@ -60,8 +60,6 @@ type Config struct { BatchSize uint64 // BatchSize for the resync http calls (client has to support batch sizing) Timeout time.Duration // HTTP connection timeout in seconds BatchNumber uint64 - - Quit chan bool // Channel for shutting down } // NewReSyncConfig fills and returns a resync config from toml parameters @@ -136,7 +134,6 @@ func NewReSyncConfig() (*Config, error) { db := utils.LoadPostgres(c.DBConfig, c.NodeInfo) c.DB = &db - c.Quit = make(chan bool) c.BatchSize = uint64(viper.GetInt64("resync.batchSize")) c.BatchNumber = uint64(viper.GetInt64("resync.batchNumber")) return c, nil diff --git a/pkg/super_node/resync/service.go b/pkg/super_node/resync/service.go index b5c43f0f..4a613a20 100644 --- a/pkg/super_node/resync/service.go +++ b/pkg/super_node/resync/service.go @@ -18,8 +18,6 @@ package resync import ( "fmt" - "sync/atomic" - "github.com/sirupsen/logrus" utils "github.com/vulcanize/vulcanizedb/libraries/shared/utilities" @@ -49,7 +47,7 @@ type Service struct { // Number of goroutines BatchNumber int64 // Channel for receiving quit signal - QuitChan chan bool + quitChan chan bool // Chain type chain shared.ChainType // Resync data type @@ -105,7 +103,7 @@ func NewResyncService(settings *Config) (Resync, error) { Cleaner: cleaner, BatchSize: batchSize, BatchNumber: int64(batchNumber), - QuitChan: settings.Quit, + quitChan: make(chan bool), chain: settings.Chain, ranges: settings.Ranges, data: settings.ResyncType, @@ -127,81 +125,60 @@ func (rs *Service) Resync() error { return fmt.Errorf("%s %s data resync cleaning error: %v", rs.chain.String(), rs.data.String(), err) } } + // spin up worker goroutines + heightsChan := make(chan []uint64) + for i := 1; i <= int(rs.BatchNumber); i++ { + go rs.resync(i, heightsChan) + } for _, rng := range rs.ranges { - if err := rs.resync(rng[0], rng[1]); err != nil { - return fmt.Errorf("%s %s data resync initialization error: %v", rs.chain.String(), rs.data.String(), err) + if rng[1] < rng[0] { + logrus.Errorf("%s resync range ending block number needs to be greater than the starting block number", rs.chain.String()) + continue } + logrus.Infof("resyncing %s data from %d to %d", rs.chain.String(), rng[0], rng[1]) + // break the range up into bins of smaller ranges + blockRangeBins, err := utils.GetBlockHeightBins(rng[0], rng[1], rs.BatchSize) + if err != nil { + return err + } + for _, heights := range blockRangeBins { + heightsChan <- heights + } + } + // send a quit signal to each worker + // this blocks until each worker has finished its current task and can receive from the quit channel + for i := 1; i <= int(rs.BatchNumber); i++ { + rs.quitChan <- true } return nil } -func (rs *Service) resync(startingBlock, endingBlock uint64) error { - logrus.Infof("resyncing %s data from %d to %d", rs.chain.String(), startingBlock, endingBlock) - if endingBlock < startingBlock { - return fmt.Errorf("%s resync range ending block number needs to be greater than the starting block number", rs.chain.String()) - } - // break the range up into bins of smaller ranges - blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, rs.BatchSize) - if err != nil { - return err - } - // int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have - var activeCount int64 - // channel for processing goroutines to signal when they are done - processingDone := make(chan bool) - forwardDone := make(chan bool) - - // for each block range bin spin up a goroutine to batch fetch and process state diffs for that range - go func() { - for _, blockHeights := range blockRangeBins { - // if we have reached our limit of active goroutines - // wait for one to finish before starting the next - if atomic.AddInt64(&activeCount, 1) > rs.BatchNumber { - // this blocks until a process signals it has finished - <-forwardDone - } - go func(blockHeights []uint64) { - payloads, err := rs.Fetcher.FetchAt(blockHeights) - if err != nil { - logrus.Errorf("%s resync fetcher error: %s", rs.chain.String(), err.Error()) - } - for _, payload := range payloads { - ipldPayload, err := rs.Converter.Convert(payload) - if err != nil { - logrus.Errorf("%s resync converter error: %s", rs.chain.String(), err.Error()) - } - cidPayload, err := rs.Publisher.Publish(ipldPayload) - if err != nil { - logrus.Errorf("%s resync publisher error: %s", rs.chain.String(), err.Error()) - } - if err := rs.Indexer.Index(cidPayload); err != nil { - logrus.Errorf("%s resync indexer error: %s", rs.chain.String(), err.Error()) - } - } - // when this goroutine is done, send out a signal - logrus.Infof("finished %s resync section from %d to %d", rs.chain.String(), blockHeights[0], blockHeights[len(blockHeights)-1]) - processingDone <- true - }(blockHeights) - } - }() - - // listen on the processingDone chan and - // keep track of the number of processing goroutines that have finished - // when they have all finished, sends the final signal out - goroutinesFinished := 0 +func (rs *Service) resync(id int, heightChan chan []uint64) { for { select { - case <-processingDone: - atomic.AddInt64(&activeCount, -1) - select { - // if we are waiting for a process to finish, signal that one has - case forwardDone <- true: - default: + case heights := <-heightChan: + logrus.Debugf("%s resync worker %d processing section from %d to %d", rs.chain.String(), id, heights[0], heights[len(heights)-1]) + payloads, err := rs.Fetcher.FetchAt(heights) + if err != nil { + logrus.Errorf("%s resync worker %d fetcher error: %s", rs.chain.String(), id, err.Error()) } - goroutinesFinished++ - if goroutinesFinished >= len(blockRangeBins) { - return nil + for _, payload := range payloads { + ipldPayload, err := rs.Converter.Convert(payload) + if err != nil { + logrus.Errorf("%s resync worker %d converter error: %s", rs.chain.String(), id, err.Error()) + } + cidPayload, err := rs.Publisher.Publish(ipldPayload) + if err != nil { + logrus.Errorf("%s resync worker %d publisher error: %s", rs.chain.String(), id, err.Error()) + } + if err := rs.Indexer.Index(cidPayload); err != nil { + logrus.Errorf("%s resync worker %d indexer error: %s", rs.chain.String(), id, err.Error()) + } } + logrus.Infof("%s resync worker %d finished section from %d to %d", rs.chain.String(), id, heights[0], heights[len(heights)-1]) + case <-rs.quitChan: + logrus.Infof("%s resync worker %d goroutine shutting down", rs.chain.String(), id) + return } } } diff --git a/pkg/super_node/service.go b/pkg/super_node/service.go index 1f09ea50..611eb85a 100644 --- a/pkg/super_node/service.go +++ b/pkg/super_node/service.go @@ -93,6 +93,8 @@ type Service struct { ipfsPath string // Underlying db db *postgres.DB + // wg for syncing serve processes + serveWg *sync.WaitGroup } // NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct @@ -134,7 +136,7 @@ func NewSuperNode(settings *Config) (SuperNode, error) { } sn.db = settings.ServeDBConn } - sn.QuitChan = settings.Quit + sn.QuitChan = make(chan bool) sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription) sn.SubscriptionTypes = make(map[common.Hash]shared.SubscriptionSettings) sn.WorkerPoolSize = settings.Workers @@ -195,16 +197,15 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared if err != nil { return err } - wg.Add(1) - - // Channels for forwarding data to the publishAndIndex workers + // spin up publishAndIndex worker goroutines publishAndIndexPayload := make(chan shared.ConvertedData, PayloadChanBufferSize) - // publishAndIndex worker pool to handle publishing and indexing concurrently, while - // limiting the number of Postgres connections we can possibly open so as to prevent error - for i := 0; i < sap.WorkerPoolSize; i++ { - sap.publishAndIndex(i, publishAndIndexPayload) + for i := 1; i <= sap.WorkerPoolSize; i++ { + go sap.publishAndIndex(wg, i, publishAndIndexPayload) + log.Debugf("%s publishAndIndex worker %d successfully spun up", sap.chain.String(), i) } go func() { + wg.Add(1) + defer wg.Done() for { select { case payload := <-sap.PayloadChan: @@ -230,8 +231,7 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared case err := <-sub.Err(): log.Errorf("super node subscription error for chain %s: %v", sap.chain.String(), err) case <-sap.QuitChan: - log.Infof("quiting %s SyncAndPublish process", sap.chain.String()) - wg.Done() + log.Infof("quiting %s Sync process", sap.chain.String()) return } } @@ -242,41 +242,44 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared // publishAndIndex is spun up by SyncAndConvert and receives converted chain data from that process // it publishes this data to IPFS and indexes their CIDs with useful metadata in Postgres -func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared.ConvertedData) { - go func() { - for { - select { - case payload := <-publishAndIndexPayload: - log.Debugf("publishing %s data streamed at head height %d", sap.chain.String(), payload.Height()) - cidPayload, err := sap.Publisher.Publish(payload) - if err != nil { - log.Errorf("super node publishAndIndex worker %d publishing error for chain %s: %v", id, sap.chain.String(), err) - continue - } - log.Debugf("indexing %s data streamed at head height %d", sap.chain.String(), payload.Height()) - if err := sap.Indexer.Index(cidPayload); err != nil { - log.Errorf("super node publishAndIndex worker %d indexing error for chain %s: %v", id, sap.chain.String(), err) - } +func (sap *Service) publishAndIndex(wg *sync.WaitGroup, id int, publishAndIndexPayload <-chan shared.ConvertedData) { + wg.Add(1) + defer wg.Done() + for { + select { + case payload := <-publishAndIndexPayload: + log.Debugf("%s super node publishAndIndex worker %d publishing data streamed at head height %d", sap.chain.String(), id, payload.Height()) + cidPayload, err := sap.Publisher.Publish(payload) + if err != nil { + log.Errorf("%s super node publishAndIndex worker %d publishing error: %v", sap.chain.String(), id, err) + continue } + log.Debugf("%s super node publishAndIndex worker %d indexing data streamed at head height %d", sap.chain.String(), id, payload.Height()) + if err := sap.Indexer.Index(cidPayload); err != nil { + log.Errorf("%s super node publishAndIndex worker %d indexing error: %v", sap.chain.String(), id, err) + } + case <-sap.QuitChan: + log.Infof("%s super node publishAndIndex worker %d shutting down", sap.chain.String(), id) + return } - }() - log.Debugf("%s publishAndIndex goroutine successfully spun up", sap.chain.String()) + } } -// Serve listens for incoming converter data off the screenAndServePayload from the SyncAndConvert process +// Serve listens for incoming converter data off the screenAndServePayload from the Sync process // It filters and sends this data to any subscribers to the service -// This process can be stood up alone, without an screenAndServePayload attached to a SyncAndConvert process +// This process can also be stood up alone, without an screenAndServePayload attached to a Sync process // and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) { - wg.Add(1) + sap.serveWg = wg go func() { + wg.Add(1) + defer wg.Done() for { select { case payload := <-screenAndServePayload: sap.filterAndServe(payload) case <-sap.QuitChan: - log.Infof("quiting %s ScreenAndServe process", sap.chain.String()) - wg.Done() + log.Infof("quiting %s Serve process", sap.chain.String()) return } } @@ -286,8 +289,11 @@ func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan share // filterAndServe filters the payload according to each subscription type and sends to the subscriptions func (sap *Service) filterAndServe(payload shared.ConvertedData) { - log.Debugf("Sending %s payload to subscriptions", sap.chain.String()) + log.Debugf("sending %s payload to subscriptions", sap.chain.String()) sap.Lock() + sap.serveWg.Add(1) + defer sap.Unlock() + defer sap.serveWg.Done() for ty, subs := range sap.Subscriptions { // Retrieve the subscription parameters for this subscription type subConfig, ok := sap.SubscriptionTypes[ty] @@ -322,12 +328,13 @@ func (sap *Service) filterAndServe(payload shared.ConvertedData) { } } } - sap.Unlock() } // Subscribe is used by the API to remotely subscribe to the service loop // The params must be rlp serializable and satisfy the SubscriptionSettings() interface func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) { + sap.serveWg.Add(1) + defer sap.serveWg.Done() log.Infof("New %s subscription %s", sap.chain.String(), id) subscription := Subscription{ ID: id, @@ -361,7 +368,7 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha // Otherwise we only filter new data as it is streamed in from the state diffing geth node if params.HistoricalData() || params.HistoricalDataOnly() { if err := sap.sendHistoricalData(subscription, id, params); err != nil { - sendNonBlockingErr(subscription, fmt.Errorf("super node subscriber backfill error for chain %s: %v", sap.chain.String(), err)) + sendNonBlockingErr(subscription, fmt.Errorf("%s super node subscriber backfill error: %v", sap.chain.String(), err)) sendNonBlockingQuit(subscription) return } @@ -392,10 +399,18 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share log.Debugf("%s historical data starting block: %d", sap.chain.String(), params.StartingBlock().Int64()) log.Debugf("%s historical data ending block: %d", sap.chain.String(), endingBlock) go func() { + sap.serveWg.Add(1) + defer sap.serveWg.Done() for i := startingBlock; i <= endingBlock; i++ { + select { + case <-sap.QuitChan: + log.Infof("%s super node historical data feed to subscription %s closed", sap.chain.String(), id) + return + default: + } cidWrappers, empty, err := sap.Retriever.Retrieve(params, i) if err != nil { - sendNonBlockingErr(sub, fmt.Errorf("super node %s CID Retrieval error at block %d\r%s", sap.chain.String(), i, err.Error())) + sendNonBlockingErr(sub, fmt.Errorf(" %s super node CID Retrieval error at block %d\r%s", sap.chain.String(), i, err.Error())) continue } if empty { @@ -404,7 +419,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share for _, cids := range cidWrappers { response, err := sap.IPLDFetcher.Fetch(cids) if err != nil { - sendNonBlockingErr(sub, fmt.Errorf("super node %s IPLD Fetching error at block %d\r%s", sap.chain.String(), i, err.Error())) + sendNonBlockingErr(sub, fmt.Errorf("%s super node IPLD Fetching error at block %d\r%s", sap.chain.String(), i, err.Error())) continue } responseRLP, err := rlp.EncodeToBytes(response) @@ -416,16 +431,16 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}: log.Debugf("sending super node historical data payload to %s subscription %s", sap.chain.String(), id) default: - log.Infof("unable to send back-fill payload to %s subscription %s; channel has no receiver", sap.chain.String(), id) + log.Infof("unable to send backFill payload to %s subscription %s; channel has no receiver", sap.chain.String(), id) } } } // when we are done backfilling send an empty payload signifying so in the msg select { case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: "", Flag: BackFillCompleteFlag}: - log.Debugf("sending backfill completion notice to %s subscription %s", sap.chain.String(), id) + log.Debugf("sending backFill completion notice to %s subscription %s", sap.chain.String(), id) default: - log.Infof("unable to send backfill completion notice to %s subscription %s", sap.chain.String(), id) + log.Infof("unable to send backFill completion notice to %s subscription %s", sap.chain.String(), id) } }() return nil diff --git a/pkg/super_node/service_test.go b/pkg/super_node/service_test.go index face19e7..bc32849a 100644 --- a/pkg/super_node/service_test.go +++ b/pkg/super_node/service_test.go @@ -66,7 +66,7 @@ var _ = Describe("Service", func() { err := processor.Sync(wg, nil) Expect(err).ToNot(HaveOccurred()) time.Sleep(2 * time.Second) - quitChan <- true + close(quitChan) wg.Wait() Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStateDiffPayload)) Expect(len(mockCidIndexer.PassedCIDPayload)).To(Equal(1))