refactor- focus on simplifying concurrent fetching; graceful shutdown for superNode command

This commit is contained in:
Ian Norden 2020-05-12 14:53:50 -05:00
parent b7d6152238
commit 5fb1cc0696
12 changed files with 304 additions and 262 deletions

View File

@ -16,6 +16,8 @@
package cmd package cmd
import ( import (
"os"
"os/signal"
"sync" "sync"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
@ -85,15 +87,21 @@ func superNode() {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
} }
var backFiller super_node.BackFillInterface
if superNodeConfig.BackFill { if superNodeConfig.BackFill {
logWithCommand.Debug("initializing new super node backfill service") logWithCommand.Debug("initializing new super node backfill service")
backFiller, err := super_node.NewBackFillService(superNodeConfig, forwardPayloadChan) backFiller, err = super_node.NewBackFillService(superNodeConfig, forwardPayloadChan)
if err != nil { if err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
logWithCommand.Info("starting up super node backfill process") logWithCommand.Info("starting up super node backfill process")
backFiller.BackFill(wg) backFiller.BackFill(wg)
} }
shutdown := make(chan os.Signal)
signal.Notify(shutdown, os.Interrupt)
<-shutdown
backFiller.Stop()
superNode.Stop()
wg.Wait() wg.Wait()
} }

View File

@ -0,0 +1,17 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package trie_validator

View File

@ -17,9 +17,7 @@
package super_node package super_node
import ( import (
"fmt"
"sync" "sync"
"sync/atomic"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -37,6 +35,7 @@ const (
type BackFillInterface interface { type BackFillInterface interface {
// Method for the super node to periodically check for and fill in gaps in its data using an archival node // Method for the super node to periodically check for and fill in gaps in its data using an archival node
BackFill(wg *sync.WaitGroup) BackFill(wg *sync.WaitGroup)
Stop() error
} }
// BackFillService for filling in gaps in the super node // BackFillService for filling in gaps in the super node
@ -62,7 +61,7 @@ type BackFillService struct {
// Channel for receiving quit signal // Channel for receiving quit signal
QuitChan chan bool QuitChan chan bool
// Chain type // Chain type
Chain shared.ChainType chain shared.ChainType
// Headers with times_validated lower than this will be resynced // Headers with times_validated lower than this will be resynced
validationLevel int validationLevel int
} }
@ -107,8 +106,8 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert
BatchSize: batchSize, BatchSize: batchSize,
BatchNumber: int64(batchNumber), BatchNumber: int64(batchNumber),
ScreenAndServeChan: screenAndServeChan, ScreenAndServeChan: screenAndServeChan,
QuitChan: settings.Quit, QuitChan: make(chan bool),
Chain: settings.Chain, chain: settings.Chain,
validationLevel: settings.ValidationLevel, validationLevel: settings.ValidationLevel,
}, nil }, nil
} }
@ -116,119 +115,97 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert
// BackFill periodically checks for and fills in gaps in the super node db // BackFill periodically checks for and fills in gaps in the super node db
func (bfs *BackFillService) BackFill(wg *sync.WaitGroup) { func (bfs *BackFillService) BackFill(wg *sync.WaitGroup) {
ticker := time.NewTicker(bfs.GapCheckFrequency) ticker := time.NewTicker(bfs.GapCheckFrequency)
wg.Add(1)
go func() { go func() {
wg.Add(1)
defer wg.Done()
for { for {
select { select {
case <-bfs.QuitChan: case <-bfs.QuitChan:
log.Infof("quiting %s FillGapsInSuperNode process", bfs.Chain.String()) log.Infof("quiting %s FillGapsInSuperNode process", bfs.chain.String())
wg.Done()
return return
case <-ticker.C: case <-ticker.C:
log.Infof("searching for gaps in the %s super node database", bfs.Chain.String()) // spin up worker goroutines for this search pass
startingBlock, err := bfs.Retriever.RetrieveFirstBlockNumber() // we start and kill a new batch of workers for each pass
if err != nil { // so that we know each of the previous workers is done before we search for new gaps
log.Errorf("super node db backfill RetrieveFirstBlockNumber error for chain %s: %v", bfs.Chain.String(), err) heightsChan := make(chan []uint64)
continue for i := 1; i <= int(bfs.BatchNumber); i++ {
} go bfs.backFill(wg, i, heightsChan)
if startingBlock != 0 {
log.Infof("found gap at the beginning of the %s sync", bfs.Chain.String())
if err := bfs.backFill(0, uint64(startingBlock-1)); err != nil {
log.Error(err)
}
} }
gaps, err := bfs.Retriever.RetrieveGapsInData(bfs.validationLevel) gaps, err := bfs.Retriever.RetrieveGapsInData(bfs.validationLevel)
if err != nil { if err != nil {
log.Errorf("super node db backfill RetrieveGapsInData error for chain %s: %v", bfs.Chain.String(), err) log.Errorf("%s super node db backFill RetrieveGapsInData error: %v", bfs.chain.String(), err)
continue continue
} }
for _, gap := range gaps { for _, gap := range gaps {
if err := bfs.backFill(gap.Start, gap.Stop); err != nil { log.Infof("backFilling %s data from %d to %d", bfs.chain.String(), gap.Start, gap.Stop)
log.Error(err) blockRangeBins, err := utils.GetBlockHeightBins(gap.Start, gap.Stop, bfs.BatchSize)
if err != nil {
log.Errorf("%s super node db backFill GetBlockHeightBins error: %v", bfs.chain.String(), err)
continue
} }
for _, heights := range blockRangeBins {
select {
case <-bfs.QuitChan:
log.Infof("quiting %s BackFill process", bfs.chain.String())
return
default:
heightsChan <- heights
}
}
}
// send a quit signal to each worker
// this blocks until each worker has finished its current task and is free to receive from the quit channel
for i := 1; i <= int(bfs.BatchNumber); i++ {
bfs.QuitChan <- true
} }
} }
} }
}() }()
log.Infof("%s BackFill goroutine successfully spun up", bfs.Chain.String()) log.Infof("%s BackFill goroutine successfully spun up", bfs.chain.String())
} }
// backFill fetches, processes, and returns utils.StorageDiffs over a range of blocks func (bfs *BackFillService) backFill(wg *sync.WaitGroup, id int, heightChan chan []uint64) {
// It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently wg.Add(1)
func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error { defer wg.Done()
log.Infof("filling in %s gap from %d to %d", bfs.Chain.String(), startingBlock, endingBlock) for {
if endingBlock < startingBlock { select {
return fmt.Errorf("super node %s db backfill: ending block number needs to be greater than starting block number", bfs.Chain.String()) case heights := <-heightChan:
} log.Debugf("%s backFill worker %d processing section from %d to %d", bfs.chain.String(), id, heights[0], heights[len(heights)-1])
// break the range up into bins of smaller ranges payloads, err := bfs.Fetcher.FetchAt(heights)
blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, bfs.BatchSize)
if err != nil { if err != nil {
return err log.Errorf("%s backFill worker %d fetcher error: %s", bfs.chain.String(), id, err.Error())
}
// int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have
var activeCount int64
// channel for processing goroutines to signal when they are done
processingDone := make(chan bool)
forwardDone := make(chan bool)
// for each block range bin spin up a goroutine to batch fetch and process data for that range
go func() {
for _, blockHeights := range blockRangeBins {
// if we have reached our limit of active goroutines
// wait for one to finish before starting the next
if atomic.AddInt64(&activeCount, 1) > bfs.BatchNumber {
// this blocks until a process signals it has finished
<-forwardDone
}
go func(blockHeights []uint64) {
payloads, err := bfs.Fetcher.FetchAt(blockHeights)
if err != nil {
log.Errorf("%s super node historical data fetcher error: %s", bfs.Chain.String(), err.Error())
} }
for _, payload := range payloads { for _, payload := range payloads {
ipldPayload, err := bfs.Converter.Convert(payload) ipldPayload, err := bfs.Converter.Convert(payload)
if err != nil { if err != nil {
log.Errorf("%s super node historical data converter error: %s", bfs.Chain.String(), err.Error()) log.Errorf("%s backFill worker %d converter error: %s", bfs.chain.String(), id, err.Error())
} }
// If there is a ScreenAndServe process listening, forward payload to it // If there is a ScreenAndServe process listening, forward converted payload to it
select { select {
case bfs.ScreenAndServeChan <- ipldPayload: case bfs.ScreenAndServeChan <- ipldPayload:
log.Debugf("%s backFill worker %d forwarded converted payload to server", bfs.chain.String(), id)
default: default:
log.Debugf("%s backFill worker %d unable to forward converted payload to server; no channel ready to receive", bfs.chain.String(), id)
} }
cidPayload, err := bfs.Publisher.Publish(ipldPayload) cidPayload, err := bfs.Publisher.Publish(ipldPayload)
if err != nil { if err != nil {
log.Errorf("%s super node historical data publisher error: %s", bfs.Chain.String(), err.Error()) log.Errorf("%s backFill worker %d publisher error: %s", bfs.chain.String(), id, err.Error())
continue continue
} }
if err := bfs.Indexer.Index(cidPayload); err != nil { if err := bfs.Indexer.Index(cidPayload); err != nil {
log.Errorf("%s super node historical data indexer error: %s", bfs.Chain.String(), err.Error()) log.Errorf("%s backFill worker %d indexer error: %s", bfs.chain.String(), id, err.Error())
}
}
log.Infof("%s backFill worker %d finished section from %d to %d", bfs.chain.String(), id, heights[0], heights[len(heights)-1])
case <-bfs.QuitChan:
log.Infof("%s backFill worker %d shutting down", bfs.chain.String(), id)
return
} }
} }
// when this goroutine is done, send out a signal
log.Infof("finished filling in %s gap from %d to %d", bfs.Chain.String(), blockHeights[0], blockHeights[len(blockHeights)-1])
processingDone <- true
}(blockHeights)
} }
}()
// listen on the processingDone chan func (bfs *BackFillService) Stop() error {
// keeps track of the number of processing goroutines that have finished log.Infof("Stopping %s backFill service", bfs.chain.String())
// when they have all finished, return close(bfs.QuitChan)
goroutinesFinished := 0
for {
select {
case <-processingDone:
atomic.AddInt64(&activeCount, -1)
select {
// if we are waiting for a process to finish, signal that one has
case forwardDone <- true:
default:
}
goroutinesFinished++
if goroutinesFinished >= len(blockRangeBins) {
return nil return nil
} }
}
}
}

View File

@ -69,7 +69,6 @@ var _ = Describe("BackFiller", func() {
BatchSize: super_node.DefaultMaxBatchSize, BatchSize: super_node.DefaultMaxBatchSize,
BatchNumber: super_node.DefaultMaxBatchNumber, BatchNumber: super_node.DefaultMaxBatchNumber,
QuitChan: quitChan, QuitChan: quitChan,
Chain: shared.Ethereum,
} }
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
backfiller.BackFill(wg) backfiller.BackFill(wg)
@ -125,7 +124,6 @@ var _ = Describe("BackFiller", func() {
BatchSize: super_node.DefaultMaxBatchSize, BatchSize: super_node.DefaultMaxBatchSize,
BatchNumber: super_node.DefaultMaxBatchNumber, BatchNumber: super_node.DefaultMaxBatchNumber,
QuitChan: quitChan, QuitChan: quitChan,
Chain: shared.Ethereum,
} }
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
backfiller.BackFill(wg) backfiller.BackFill(wg)
@ -156,7 +154,12 @@ var _ = Describe("BackFiller", func() {
} }
mockRetriever := &mocks2.CIDRetriever{ mockRetriever := &mocks2.CIDRetriever{
FirstBlockNumberToReturn: 3, FirstBlockNumberToReturn: 3,
GapsToRetrieve: []shared.Gap{}, GapsToRetrieve: []shared.Gap{
{
Start: 0,
Stop: 2,
},
},
} }
mockFetcher := &mocks2.PayloadFetcher{ mockFetcher := &mocks2.PayloadFetcher{
PayloadsToReturn: map[uint64]shared.RawChainData{ PayloadsToReturn: map[uint64]shared.RawChainData{
@ -175,7 +178,6 @@ var _ = Describe("BackFiller", func() {
BatchSize: super_node.DefaultMaxBatchSize, BatchSize: super_node.DefaultMaxBatchSize,
BatchNumber: super_node.DefaultMaxBatchNumber, BatchNumber: super_node.DefaultMaxBatchNumber,
QuitChan: quitChan, QuitChan: quitChan,
Chain: shared.Ethereum,
} }
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
backfiller.BackFill(wg) backfiller.BackFill(wg)

View File

@ -44,21 +44,21 @@ func NewCIDRetriever(db *postgres.DB) *CIDRetriever {
} }
// RetrieveFirstBlockNumber is used to retrieve the first block number in the db // RetrieveFirstBlockNumber is used to retrieve the first block number in the db
func (ecr *CIDRetriever) RetrieveFirstBlockNumber() (int64, error) { func (bcr *CIDRetriever) RetrieveFirstBlockNumber() (int64, error) {
var blockNumber int64 var blockNumber int64
err := ecr.db.Get(&blockNumber, "SELECT block_number FROM btc.header_cids ORDER BY block_number ASC LIMIT 1") err := bcr.db.Get(&blockNumber, "SELECT block_number FROM btc.header_cids ORDER BY block_number ASC LIMIT 1")
return blockNumber, err return blockNumber, err
} }
// RetrieveLastBlockNumber is used to retrieve the latest block number in the db // RetrieveLastBlockNumber is used to retrieve the latest block number in the db
func (ecr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) { func (bcr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) {
var blockNumber int64 var blockNumber int64
err := ecr.db.Get(&blockNumber, "SELECT block_number FROM btc.header_cids ORDER BY block_number DESC LIMIT 1 ") err := bcr.db.Get(&blockNumber, "SELECT block_number FROM btc.header_cids ORDER BY block_number DESC LIMIT 1 ")
return blockNumber, err return blockNumber, err
} }
// Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters // Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters
func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) ([]shared.CIDsForFetching, bool, error) { func (bcr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) ([]shared.CIDsForFetching, bool, error) {
streamFilter, ok := filter.(*SubscriptionSettings) streamFilter, ok := filter.(*SubscriptionSettings)
if !ok { if !ok {
return nil, true, fmt.Errorf("btc retriever expected filter type %T got %T", &SubscriptionSettings{}, filter) return nil, true, fmt.Errorf("btc retriever expected filter type %T got %T", &SubscriptionSettings{}, filter)
@ -66,7 +66,7 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
log.Debug("retrieving cids") log.Debug("retrieving cids")
// Begin new db tx // Begin new db tx
tx, err := ecr.db.Beginx() tx, err := bcr.db.Beginx()
if err != nil { if err != nil {
return nil, true, err return nil, true, err
} }
@ -82,7 +82,7 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
}() }()
// Retrieve cached header CIDs // Retrieve cached header CIDs
headers, err := ecr.RetrieveHeaderCIDs(tx, blockNumber) headers, err := bcr.RetrieveHeaderCIDs(tx, blockNumber)
if err != nil { if err != nil {
log.Error("header cid retrieval error") log.Error("header cid retrieval error")
return nil, true, err return nil, true, err
@ -98,7 +98,7 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
} }
// Retrieve cached trx CIDs // Retrieve cached trx CIDs
if !streamFilter.TxFilter.Off { if !streamFilter.TxFilter.Off {
cw.Transactions, err = ecr.RetrieveTxCIDs(tx, streamFilter.TxFilter, header.ID) cw.Transactions, err = bcr.RetrieveTxCIDs(tx, streamFilter.TxFilter, header.ID)
if err != nil { if err != nil {
log.Error("transaction cid retrieval error") log.Error("transaction cid retrieval error")
return nil, true, err return nil, true, err
@ -114,7 +114,7 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
} }
// RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight // RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight
func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]HeaderModel, error) { func (bcr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]HeaderModel, error) {
log.Debug("retrieving header cids for block ", blockNumber) log.Debug("retrieving header cids for block ", blockNumber)
headers := make([]HeaderModel, 0) headers := make([]HeaderModel, 0)
pgStr := `SELECT * FROM btc.header_cids pgStr := `SELECT * FROM btc.header_cids
@ -124,7 +124,7 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]H
// RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters // RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters
// also returns the ids for the returned transaction cids // also returns the ids for the returned transaction cids
func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID int64) ([]TxModel, error) { func (bcr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID int64) ([]TxModel, error) {
log.Debug("retrieving transaction cids for header id ", headerID) log.Debug("retrieving transaction cids for header id ", headerID)
args := make([]interface{}, 0, 3) args := make([]interface{}, 0, 3)
results := make([]TxModel, 0) results := make([]TxModel, 0)
@ -168,7 +168,22 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID
} }
// RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db // RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db
func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, error) { func (bcr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, error) {
log.Info("searching for gaps in the btc super node database")
startingBlock, err := bcr.RetrieveFirstBlockNumber()
if err != nil {
return nil, fmt.Errorf("btc CIDRetriever RetrieveFirstBlockNumber error: %v", err)
}
var initialGap []shared.Gap
if startingBlock != 0 {
stop := uint64(startingBlock - 1)
log.Infof("found gap at the beginning of the btc sync from 0 to %d", stop)
initialGap = []shared.Gap{{
Start: 0,
Stop: stop,
}}
}
pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM btc.header_cids pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM btc.header_cids
LEFT JOIN btc.header_cids r on btc.header_cids.block_number = r.block_number - 1 LEFT JOIN btc.header_cids r on btc.header_cids.block_number = r.block_number - 1
LEFT JOIN btc.header_cids fr on btc.header_cids.block_number < fr.block_number LEFT JOIN btc.header_cids fr on btc.header_cids.block_number < fr.block_number
@ -178,7 +193,7 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap,
Start uint64 `db:"start"` Start uint64 `db:"start"`
Stop uint64 `db:"stop"` Stop uint64 `db:"stop"`
}, 0) }, 0)
if err := ecr.db.Select(&results, pgStr); err != nil && err != sql.ErrNoRows { if err := bcr.db.Select(&results, pgStr); err != nil && err != sql.ErrNoRows {
return nil, err return nil, err
} }
emptyGaps := make([]shared.Gap, len(results)) emptyGaps := make([]shared.Gap, len(results))
@ -195,18 +210,18 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap,
WHERE times_validated < $1 WHERE times_validated < $1
ORDER BY block_number` ORDER BY block_number`
var heights []uint64 var heights []uint64
if err := ecr.db.Select(&heights, pgStr, validationLevel); err != nil && err != sql.ErrNoRows { if err := bcr.db.Select(&heights, pgStr, validationLevel); err != nil && err != sql.ErrNoRows {
return nil, err return nil, err
} }
return append(emptyGaps, utils.MissingHeightsToGaps(heights)...), nil return append(append(initialGap, emptyGaps...), utils.MissingHeightsToGaps(heights)...), nil
} }
// RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash // RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash
func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel, []TxModel, error) { func (bcr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel, []TxModel, error) {
log.Debug("retrieving block cids for block hash ", blockHash.String()) log.Debug("retrieving block cids for block hash ", blockHash.String())
// Begin new db tx // Begin new db tx
tx, err := ecr.db.Beginx() tx, err := bcr.db.Beginx()
if err != nil { if err != nil {
return HeaderModel{}, nil, err return HeaderModel{}, nil, err
} }
@ -221,12 +236,12 @@ func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel
} }
}() }()
headerCID, err := ecr.RetrieveHeaderCIDByHash(tx, blockHash) headerCID, err := bcr.RetrieveHeaderCIDByHash(tx, blockHash)
if err != nil { if err != nil {
log.Error("header cid retrieval error") log.Error("header cid retrieval error")
return HeaderModel{}, nil, err return HeaderModel{}, nil, err
} }
txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.ID) txCIDs, err := bcr.RetrieveTxCIDsByHeaderID(tx, headerCID.ID)
if err != nil { if err != nil {
log.Error("tx cid retrieval error") log.Error("tx cid retrieval error")
} }
@ -234,11 +249,11 @@ func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel
} }
// RetrieveBlockByNumber returns all of the CIDs needed to compose an entire block, for a given block number // RetrieveBlockByNumber returns all of the CIDs needed to compose an entire block, for a given block number
func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, []TxModel, error) { func (bcr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, []TxModel, error) {
log.Debug("retrieving block cids for block number ", blockNumber) log.Debug("retrieving block cids for block number ", blockNumber)
// Begin new db tx // Begin new db tx
tx, err := ecr.db.Beginx() tx, err := bcr.db.Beginx()
if err != nil { if err != nil {
return HeaderModel{}, nil, err return HeaderModel{}, nil, err
} }
@ -253,7 +268,7 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel,
} }
}() }()
headerCID, err := ecr.RetrieveHeaderCIDs(tx, blockNumber) headerCID, err := bcr.RetrieveHeaderCIDs(tx, blockNumber)
if err != nil { if err != nil {
log.Error("header cid retrieval error") log.Error("header cid retrieval error")
return HeaderModel{}, nil, err return HeaderModel{}, nil, err
@ -261,7 +276,7 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel,
if len(headerCID) < 1 { if len(headerCID) < 1 {
return HeaderModel{}, nil, fmt.Errorf("header cid retrieval error, no header CIDs found at block %d", blockNumber) return HeaderModel{}, nil, fmt.Errorf("header cid retrieval error, no header CIDs found at block %d", blockNumber)
} }
txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].ID) txCIDs, err := bcr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].ID)
if err != nil { if err != nil {
log.Error("tx cid retrieval error") log.Error("tx cid retrieval error")
} }
@ -269,7 +284,7 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel,
} }
// RetrieveHeaderCIDByHash returns the header for the given block hash // RetrieveHeaderCIDByHash returns the header for the given block hash
func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (HeaderModel, error) { func (bcr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (HeaderModel, error) {
log.Debug("retrieving header cids for block hash ", blockHash.String()) log.Debug("retrieving header cids for block hash ", blockHash.String())
pgStr := `SELECT * FROM btc.header_cids pgStr := `SELECT * FROM btc.header_cids
WHERE block_hash = $1` WHERE block_hash = $1`
@ -278,7 +293,7 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.H
} }
// RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id // RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id
func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]TxModel, error) { func (bcr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]TxModel, error) {
log.Debug("retrieving tx cids for block id ", headerID) log.Debug("retrieving tx cids for block id ", headerID)
pgStr := `SELECT * FROM btc.transaction_cids pgStr := `SELECT * FROM btc.transaction_cids
WHERE header_id = $1` WHERE header_id = $1`

View File

@ -66,7 +66,6 @@ type Config struct {
IPFSPath string IPFSPath string
IPFSMode shared.IPFSMode IPFSMode shared.IPFSMode
DBConfig config.Database DBConfig config.Database
Quit chan bool
// Server fields // Server fields
Serve bool Serve bool
ServeDBConn *postgres.DB ServeDBConn *postgres.DB
@ -182,8 +181,6 @@ func NewSuperNodeConfig() (*Config, error) {
} }
} }
c.Quit = make(chan bool)
return c, nil return c, nil
} }

View File

@ -443,6 +443,21 @@ func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageF
// RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db // RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db
// it finds the union of heights where no data exists and where the times_validated is lower than the validation level // it finds the union of heights where no data exists and where the times_validated is lower than the validation level
func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, error) { func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, error) {
log.Info("searching for gaps in the eth super node database")
startingBlock, err := ecr.RetrieveFirstBlockNumber()
if err != nil {
return nil, fmt.Errorf("eth CIDRetriever RetrieveFirstBlockNumber error: %v", err)
}
var initialGap []shared.Gap
if startingBlock != 0 {
stop := uint64(startingBlock - 1)
log.Infof("found gap at the beginning of the eth sync from 0 to %d", stop)
initialGap = []shared.Gap{{
Start: 0,
Stop: stop,
}}
}
pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM eth.header_cids pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM eth.header_cids
LEFT JOIN eth.header_cids r on eth.header_cids.block_number = r.block_number - 1 LEFT JOIN eth.header_cids r on eth.header_cids.block_number = r.block_number - 1
LEFT JOIN eth.header_cids fr on eth.header_cids.block_number < fr.block_number LEFT JOIN eth.header_cids fr on eth.header_cids.block_number < fr.block_number
@ -472,7 +487,7 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap,
if err := ecr.db.Select(&heights, pgStr, validationLevel); err != nil && err != sql.ErrNoRows { if err := ecr.db.Select(&heights, pgStr, validationLevel); err != nil && err != sql.ErrNoRows {
return nil, err return nil, err
} }
return append(emptyGaps, utils.MissingHeightsToGaps(heights)...), nil return append(append(initialGap, emptyGaps...), utils.MissingHeightsToGaps(heights)...), nil
} }
// RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash // RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash

View File

@ -474,54 +474,64 @@ var _ = Describe("Retriever", func() {
Describe("RetrieveGapsInData", func() { Describe("RetrieveGapsInData", func() {
It("Doesn't return gaps if there are none", func() { It("Doesn't return gaps if there are none", func() {
payload0 := *mocks.MockCIDPayload
payload0.HeaderCID.BlockNumber = "0"
payload1 := *mocks.MockCIDPayload payload1 := *mocks.MockCIDPayload
payload1.HeaderCID.BlockNumber = "2" payload1.HeaderCID.BlockNumber = "1"
payload2 := payload1 payload2 := payload1
payload2.HeaderCID.BlockNumber = "3" payload2.HeaderCID.BlockNumber = "2"
err := repo.Index(mocks.MockCIDPayload) payload3 := payload2
payload3.HeaderCID.BlockNumber = "3"
err := repo.Index(&payload0)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload1) err = repo.Index(&payload1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload2) err = repo.Index(&payload2)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload3)
Expect(err).ToNot(HaveOccurred())
gaps, err := retriever.RetrieveGapsInData(1) gaps, err := retriever.RetrieveGapsInData(1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(gaps)).To(Equal(0)) Expect(len(gaps)).To(Equal(0))
}) })
It("Doesn't return the gap from 0 to the earliest block", func() { It("Returns the gap from 0 to the earliest block", func() {
payload := *mocks.MockCIDPayload payload := *mocks.MockCIDPayload
payload.HeaderCID.BlockNumber = "5" payload.HeaderCID.BlockNumber = "5"
err := repo.Index(&payload) err := repo.Index(&payload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
gaps, err := retriever.RetrieveGapsInData(1) gaps, err := retriever.RetrieveGapsInData(1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(gaps)).To(Equal(0)) Expect(len(gaps)).To(Equal(1))
Expect(gaps[0].Start).To(Equal(uint64(0)))
Expect(gaps[0].Stop).To(Equal(uint64(4)))
}) })
It("Can handle single block gaps", func() { It("Can handle single block gaps", func() {
payload0 := *mocks.MockCIDPayload
payload0.HeaderCID.BlockNumber = "0"
payload1 := *mocks.MockCIDPayload payload1 := *mocks.MockCIDPayload
payload1.HeaderCID.BlockNumber = "2" payload1.HeaderCID.BlockNumber = "1"
payload2 := payload1 payload3 := payload1
payload2.HeaderCID.BlockNumber = "4" payload3.HeaderCID.BlockNumber = "3"
err := repo.Index(mocks.MockCIDPayload) err := repo.Index(&payload0)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload1) err = repo.Index(&payload1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload2) err = repo.Index(&payload3)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
gaps, err := retriever.RetrieveGapsInData(1) gaps, err := retriever.RetrieveGapsInData(1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(gaps)).To(Equal(1)) Expect(len(gaps)).To(Equal(1))
Expect(gaps[0].Start).To(Equal(uint64(3))) Expect(gaps[0].Start).To(Equal(uint64(2)))
Expect(gaps[0].Stop).To(Equal(uint64(3))) Expect(gaps[0].Stop).To(Equal(uint64(2)))
}) })
It("Finds gap between two entries", func() { It("Finds gap between two entries", func() {
payload1 := *mocks.MockCIDPayload payload1 := *mocks.MockCIDPayload
payload1.HeaderCID.BlockNumber = "1010101" payload1.HeaderCID.BlockNumber = "1010101"
payload2 := payload1 payload2 := payload1
payload2.HeaderCID.BlockNumber = "5" payload2.HeaderCID.BlockNumber = "0"
err := repo.Index(&payload1) err := repo.Index(&payload1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload2) err = repo.Index(&payload2)
@ -529,13 +539,15 @@ var _ = Describe("Retriever", func() {
gaps, err := retriever.RetrieveGapsInData(1) gaps, err := retriever.RetrieveGapsInData(1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(gaps)).To(Equal(1)) Expect(len(gaps)).To(Equal(1))
Expect(gaps[0].Start).To(Equal(uint64(6))) Expect(gaps[0].Start).To(Equal(uint64(1)))
Expect(gaps[0].Stop).To(Equal(uint64(1010100))) Expect(gaps[0].Stop).To(Equal(uint64(1010100)))
}) })
It("Finds gaps between multiple entries", func() { It("Finds gaps between multiple entries", func() {
payload1 := *mocks.MockCIDPayload payload := *mocks.MockCIDPayload
payload1.HeaderCID.BlockNumber = "1010101" payload.HeaderCID.BlockNumber = "1010101"
payload1 := payload
payload1.HeaderCID.BlockNumber = "1"
payload2 := payload1 payload2 := payload1
payload2.HeaderCID.BlockNumber = "5" payload2.HeaderCID.BlockNumber = "5"
payload3 := payload2 payload3 := payload2
@ -554,7 +566,9 @@ var _ = Describe("Retriever", func() {
payload9.HeaderCID.BlockNumber = "106" payload9.HeaderCID.BlockNumber = "106"
payload10 := payload5 payload10 := payload5
payload10.HeaderCID.BlockNumber = "1000" payload10.HeaderCID.BlockNumber = "1000"
err := repo.Index(&payload1) err := repo.Index(&payload)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload2) err = repo.Index(&payload2)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
@ -577,15 +591,19 @@ var _ = Describe("Retriever", func() {
gaps, err := retriever.RetrieveGapsInData(1) gaps, err := retriever.RetrieveGapsInData(1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(gaps)).To(Equal(3)) Expect(len(gaps)).To(Equal(5))
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 0, Stop: 0})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 2, Stop: 4})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 107, Stop: 999})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 107, Stop: 999})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue())
}) })
It("Finds validation level gaps", func() { It("Finds validation level gaps", func() {
payload1 := *mocks.MockCIDPayload payload := *mocks.MockCIDPayload
payload1.HeaderCID.BlockNumber = "1010101" payload.HeaderCID.BlockNumber = "1010101"
payload1 := payload
payload1.HeaderCID.BlockNumber = "1"
payload2 := payload1 payload2 := payload1
payload2.HeaderCID.BlockNumber = "5" payload2.HeaderCID.BlockNumber = "5"
payload3 := payload2 payload3 := payload2
@ -610,7 +628,9 @@ var _ = Describe("Retriever", func() {
payload12.HeaderCID.BlockNumber = "109" payload12.HeaderCID.BlockNumber = "109"
payload13 := payload5 payload13 := payload5
payload13.HeaderCID.BlockNumber = "1000" payload13.HeaderCID.BlockNumber = "1000"
err := repo.Index(&payload1) err := repo.Index(&payload)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload2) err = repo.Index(&payload2)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
@ -643,7 +663,9 @@ var _ = Describe("Retriever", func() {
gaps, err := retriever.RetrieveGapsInData(1) gaps, err := retriever.RetrieveGapsInData(1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(gaps)).To(Equal(6)) Expect(len(gaps)).To(Equal(8))
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 0, Stop: 0})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 2, Stop: 4})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 101, Stop: 102})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 101, Stop: 102})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 104, Stop: 104})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 104, Stop: 104})).To(BeTrue())

View File

@ -60,8 +60,6 @@ type Config struct {
BatchSize uint64 // BatchSize for the resync http calls (client has to support batch sizing) BatchSize uint64 // BatchSize for the resync http calls (client has to support batch sizing)
Timeout time.Duration // HTTP connection timeout in seconds Timeout time.Duration // HTTP connection timeout in seconds
BatchNumber uint64 BatchNumber uint64
Quit chan bool // Channel for shutting down
} }
// NewReSyncConfig fills and returns a resync config from toml parameters // NewReSyncConfig fills and returns a resync config from toml parameters
@ -136,7 +134,6 @@ func NewReSyncConfig() (*Config, error) {
db := utils.LoadPostgres(c.DBConfig, c.NodeInfo) db := utils.LoadPostgres(c.DBConfig, c.NodeInfo)
c.DB = &db c.DB = &db
c.Quit = make(chan bool)
c.BatchSize = uint64(viper.GetInt64("resync.batchSize")) c.BatchSize = uint64(viper.GetInt64("resync.batchSize"))
c.BatchNumber = uint64(viper.GetInt64("resync.batchNumber")) c.BatchNumber = uint64(viper.GetInt64("resync.batchNumber"))
return c, nil return c, nil

View File

@ -18,8 +18,6 @@ package resync
import ( import (
"fmt" "fmt"
"sync/atomic"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
utils "github.com/vulcanize/vulcanizedb/libraries/shared/utilities" utils "github.com/vulcanize/vulcanizedb/libraries/shared/utilities"
@ -49,7 +47,7 @@ type Service struct {
// Number of goroutines // Number of goroutines
BatchNumber int64 BatchNumber int64
// Channel for receiving quit signal // Channel for receiving quit signal
QuitChan chan bool quitChan chan bool
// Chain type // Chain type
chain shared.ChainType chain shared.ChainType
// Resync data type // Resync data type
@ -105,7 +103,7 @@ func NewResyncService(settings *Config) (Resync, error) {
Cleaner: cleaner, Cleaner: cleaner,
BatchSize: batchSize, BatchSize: batchSize,
BatchNumber: int64(batchNumber), BatchNumber: int64(batchNumber),
QuitChan: settings.Quit, quitChan: make(chan bool),
chain: settings.Chain, chain: settings.Chain,
ranges: settings.Ranges, ranges: settings.Ranges,
data: settings.ResyncType, data: settings.ResyncType,
@ -127,81 +125,60 @@ func (rs *Service) Resync() error {
return fmt.Errorf("%s %s data resync cleaning error: %v", rs.chain.String(), rs.data.String(), err) return fmt.Errorf("%s %s data resync cleaning error: %v", rs.chain.String(), rs.data.String(), err)
} }
} }
for _, rng := range rs.ranges { // spin up worker goroutines
if err := rs.resync(rng[0], rng[1]); err != nil { heightsChan := make(chan []uint64)
return fmt.Errorf("%s %s data resync initialization error: %v", rs.chain.String(), rs.data.String(), err) for i := 1; i <= int(rs.BatchNumber); i++ {
go rs.resync(i, heightsChan)
} }
for _, rng := range rs.ranges {
if rng[1] < rng[0] {
logrus.Errorf("%s resync range ending block number needs to be greater than the starting block number", rs.chain.String())
continue
}
logrus.Infof("resyncing %s data from %d to %d", rs.chain.String(), rng[0], rng[1])
// break the range up into bins of smaller ranges
blockRangeBins, err := utils.GetBlockHeightBins(rng[0], rng[1], rs.BatchSize)
if err != nil {
return err
}
for _, heights := range blockRangeBins {
heightsChan <- heights
}
}
// send a quit signal to each worker
// this blocks until each worker has finished its current task and can receive from the quit channel
for i := 1; i <= int(rs.BatchNumber); i++ {
rs.quitChan <- true
} }
return nil return nil
} }
func (rs *Service) resync(startingBlock, endingBlock uint64) error { func (rs *Service) resync(id int, heightChan chan []uint64) {
logrus.Infof("resyncing %s data from %d to %d", rs.chain.String(), startingBlock, endingBlock) for {
if endingBlock < startingBlock { select {
return fmt.Errorf("%s resync range ending block number needs to be greater than the starting block number", rs.chain.String()) case heights := <-heightChan:
} logrus.Debugf("%s resync worker %d processing section from %d to %d", rs.chain.String(), id, heights[0], heights[len(heights)-1])
// break the range up into bins of smaller ranges payloads, err := rs.Fetcher.FetchAt(heights)
blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, rs.BatchSize)
if err != nil { if err != nil {
return err logrus.Errorf("%s resync worker %d fetcher error: %s", rs.chain.String(), id, err.Error())
}
// int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have
var activeCount int64
// channel for processing goroutines to signal when they are done
processingDone := make(chan bool)
forwardDone := make(chan bool)
// for each block range bin spin up a goroutine to batch fetch and process state diffs for that range
go func() {
for _, blockHeights := range blockRangeBins {
// if we have reached our limit of active goroutines
// wait for one to finish before starting the next
if atomic.AddInt64(&activeCount, 1) > rs.BatchNumber {
// this blocks until a process signals it has finished
<-forwardDone
}
go func(blockHeights []uint64) {
payloads, err := rs.Fetcher.FetchAt(blockHeights)
if err != nil {
logrus.Errorf("%s resync fetcher error: %s", rs.chain.String(), err.Error())
} }
for _, payload := range payloads { for _, payload := range payloads {
ipldPayload, err := rs.Converter.Convert(payload) ipldPayload, err := rs.Converter.Convert(payload)
if err != nil { if err != nil {
logrus.Errorf("%s resync converter error: %s", rs.chain.String(), err.Error()) logrus.Errorf("%s resync worker %d converter error: %s", rs.chain.String(), id, err.Error())
} }
cidPayload, err := rs.Publisher.Publish(ipldPayload) cidPayload, err := rs.Publisher.Publish(ipldPayload)
if err != nil { if err != nil {
logrus.Errorf("%s resync publisher error: %s", rs.chain.String(), err.Error()) logrus.Errorf("%s resync worker %d publisher error: %s", rs.chain.String(), id, err.Error())
} }
if err := rs.Indexer.Index(cidPayload); err != nil { if err := rs.Indexer.Index(cidPayload); err != nil {
logrus.Errorf("%s resync indexer error: %s", rs.chain.String(), err.Error()) logrus.Errorf("%s resync worker %d indexer error: %s", rs.chain.String(), id, err.Error())
} }
} }
// when this goroutine is done, send out a signal logrus.Infof("%s resync worker %d finished section from %d to %d", rs.chain.String(), id, heights[0], heights[len(heights)-1])
logrus.Infof("finished %s resync section from %d to %d", rs.chain.String(), blockHeights[0], blockHeights[len(blockHeights)-1]) case <-rs.quitChan:
processingDone <- true logrus.Infof("%s resync worker %d goroutine shutting down", rs.chain.String(), id)
}(blockHeights) return
}
}()
// listen on the processingDone chan and
// keep track of the number of processing goroutines that have finished
// when they have all finished, sends the final signal out
goroutinesFinished := 0
for {
select {
case <-processingDone:
atomic.AddInt64(&activeCount, -1)
select {
// if we are waiting for a process to finish, signal that one has
case forwardDone <- true:
default:
}
goroutinesFinished++
if goroutinesFinished >= len(blockRangeBins) {
return nil
}
} }
} }
} }

View File

@ -93,6 +93,8 @@ type Service struct {
ipfsPath string ipfsPath string
// Underlying db // Underlying db
db *postgres.DB db *postgres.DB
// wg for syncing serve processes
serveWg *sync.WaitGroup
} }
// NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct // NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct
@ -134,7 +136,7 @@ func NewSuperNode(settings *Config) (SuperNode, error) {
} }
sn.db = settings.ServeDBConn sn.db = settings.ServeDBConn
} }
sn.QuitChan = settings.Quit sn.QuitChan = make(chan bool)
sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription) sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription)
sn.SubscriptionTypes = make(map[common.Hash]shared.SubscriptionSettings) sn.SubscriptionTypes = make(map[common.Hash]shared.SubscriptionSettings)
sn.WorkerPoolSize = settings.Workers sn.WorkerPoolSize = settings.Workers
@ -195,16 +197,15 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared
if err != nil { if err != nil {
return err return err
} }
wg.Add(1) // spin up publishAndIndex worker goroutines
// Channels for forwarding data to the publishAndIndex workers
publishAndIndexPayload := make(chan shared.ConvertedData, PayloadChanBufferSize) publishAndIndexPayload := make(chan shared.ConvertedData, PayloadChanBufferSize)
// publishAndIndex worker pool to handle publishing and indexing concurrently, while for i := 1; i <= sap.WorkerPoolSize; i++ {
// limiting the number of Postgres connections we can possibly open so as to prevent error go sap.publishAndIndex(wg, i, publishAndIndexPayload)
for i := 0; i < sap.WorkerPoolSize; i++ { log.Debugf("%s publishAndIndex worker %d successfully spun up", sap.chain.String(), i)
sap.publishAndIndex(i, publishAndIndexPayload)
} }
go func() { go func() {
wg.Add(1)
defer wg.Done()
for { for {
select { select {
case payload := <-sap.PayloadChan: case payload := <-sap.PayloadChan:
@ -230,8 +231,7 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared
case err := <-sub.Err(): case err := <-sub.Err():
log.Errorf("super node subscription error for chain %s: %v", sap.chain.String(), err) log.Errorf("super node subscription error for chain %s: %v", sap.chain.String(), err)
case <-sap.QuitChan: case <-sap.QuitChan:
log.Infof("quiting %s SyncAndPublish process", sap.chain.String()) log.Infof("quiting %s Sync process", sap.chain.String())
wg.Done()
return return
} }
} }
@ -242,41 +242,44 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared
// publishAndIndex is spun up by SyncAndConvert and receives converted chain data from that process // publishAndIndex is spun up by SyncAndConvert and receives converted chain data from that process
// it publishes this data to IPFS and indexes their CIDs with useful metadata in Postgres // it publishes this data to IPFS and indexes their CIDs with useful metadata in Postgres
func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared.ConvertedData) { func (sap *Service) publishAndIndex(wg *sync.WaitGroup, id int, publishAndIndexPayload <-chan shared.ConvertedData) {
go func() { wg.Add(1)
defer wg.Done()
for { for {
select { select {
case payload := <-publishAndIndexPayload: case payload := <-publishAndIndexPayload:
log.Debugf("publishing %s data streamed at head height %d", sap.chain.String(), payload.Height()) log.Debugf("%s super node publishAndIndex worker %d publishing data streamed at head height %d", sap.chain.String(), id, payload.Height())
cidPayload, err := sap.Publisher.Publish(payload) cidPayload, err := sap.Publisher.Publish(payload)
if err != nil { if err != nil {
log.Errorf("super node publishAndIndex worker %d publishing error for chain %s: %v", id, sap.chain.String(), err) log.Errorf("%s super node publishAndIndex worker %d publishing error: %v", sap.chain.String(), id, err)
continue continue
} }
log.Debugf("indexing %s data streamed at head height %d", sap.chain.String(), payload.Height()) log.Debugf("%s super node publishAndIndex worker %d indexing data streamed at head height %d", sap.chain.String(), id, payload.Height())
if err := sap.Indexer.Index(cidPayload); err != nil { if err := sap.Indexer.Index(cidPayload); err != nil {
log.Errorf("super node publishAndIndex worker %d indexing error for chain %s: %v", id, sap.chain.String(), err) log.Errorf("%s super node publishAndIndex worker %d indexing error: %v", sap.chain.String(), id, err)
}
case <-sap.QuitChan:
log.Infof("%s super node publishAndIndex worker %d shutting down", sap.chain.String(), id)
return
} }
} }
} }
}()
log.Debugf("%s publishAndIndex goroutine successfully spun up", sap.chain.String())
}
// Serve listens for incoming converter data off the screenAndServePayload from the SyncAndConvert process // Serve listens for incoming converter data off the screenAndServePayload from the Sync process
// It filters and sends this data to any subscribers to the service // It filters and sends this data to any subscribers to the service
// This process can be stood up alone, without an screenAndServePayload attached to a SyncAndConvert process // This process can also be stood up alone, without an screenAndServePayload attached to a Sync process
// and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only // and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only
func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) { func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) {
wg.Add(1) sap.serveWg = wg
go func() { go func() {
wg.Add(1)
defer wg.Done()
for { for {
select { select {
case payload := <-screenAndServePayload: case payload := <-screenAndServePayload:
sap.filterAndServe(payload) sap.filterAndServe(payload)
case <-sap.QuitChan: case <-sap.QuitChan:
log.Infof("quiting %s ScreenAndServe process", sap.chain.String()) log.Infof("quiting %s Serve process", sap.chain.String())
wg.Done()
return return
} }
} }
@ -286,8 +289,11 @@ func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan share
// filterAndServe filters the payload according to each subscription type and sends to the subscriptions // filterAndServe filters the payload according to each subscription type and sends to the subscriptions
func (sap *Service) filterAndServe(payload shared.ConvertedData) { func (sap *Service) filterAndServe(payload shared.ConvertedData) {
log.Debugf("Sending %s payload to subscriptions", sap.chain.String()) log.Debugf("sending %s payload to subscriptions", sap.chain.String())
sap.Lock() sap.Lock()
sap.serveWg.Add(1)
defer sap.Unlock()
defer sap.serveWg.Done()
for ty, subs := range sap.Subscriptions { for ty, subs := range sap.Subscriptions {
// Retrieve the subscription parameters for this subscription type // Retrieve the subscription parameters for this subscription type
subConfig, ok := sap.SubscriptionTypes[ty] subConfig, ok := sap.SubscriptionTypes[ty]
@ -322,12 +328,13 @@ func (sap *Service) filterAndServe(payload shared.ConvertedData) {
} }
} }
} }
sap.Unlock()
} }
// Subscribe is used by the API to remotely subscribe to the service loop // Subscribe is used by the API to remotely subscribe to the service loop
// The params must be rlp serializable and satisfy the SubscriptionSettings() interface // The params must be rlp serializable and satisfy the SubscriptionSettings() interface
func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) { func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) {
sap.serveWg.Add(1)
defer sap.serveWg.Done()
log.Infof("New %s subscription %s", sap.chain.String(), id) log.Infof("New %s subscription %s", sap.chain.String(), id)
subscription := Subscription{ subscription := Subscription{
ID: id, ID: id,
@ -361,7 +368,7 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha
// Otherwise we only filter new data as it is streamed in from the state diffing geth node // Otherwise we only filter new data as it is streamed in from the state diffing geth node
if params.HistoricalData() || params.HistoricalDataOnly() { if params.HistoricalData() || params.HistoricalDataOnly() {
if err := sap.sendHistoricalData(subscription, id, params); err != nil { if err := sap.sendHistoricalData(subscription, id, params); err != nil {
sendNonBlockingErr(subscription, fmt.Errorf("super node subscriber backfill error for chain %s: %v", sap.chain.String(), err)) sendNonBlockingErr(subscription, fmt.Errorf("%s super node subscriber backfill error: %v", sap.chain.String(), err))
sendNonBlockingQuit(subscription) sendNonBlockingQuit(subscription)
return return
} }
@ -392,10 +399,18 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
log.Debugf("%s historical data starting block: %d", sap.chain.String(), params.StartingBlock().Int64()) log.Debugf("%s historical data starting block: %d", sap.chain.String(), params.StartingBlock().Int64())
log.Debugf("%s historical data ending block: %d", sap.chain.String(), endingBlock) log.Debugf("%s historical data ending block: %d", sap.chain.String(), endingBlock)
go func() { go func() {
sap.serveWg.Add(1)
defer sap.serveWg.Done()
for i := startingBlock; i <= endingBlock; i++ { for i := startingBlock; i <= endingBlock; i++ {
select {
case <-sap.QuitChan:
log.Infof("%s super node historical data feed to subscription %s closed", sap.chain.String(), id)
return
default:
}
cidWrappers, empty, err := sap.Retriever.Retrieve(params, i) cidWrappers, empty, err := sap.Retriever.Retrieve(params, i)
if err != nil { if err != nil {
sendNonBlockingErr(sub, fmt.Errorf("super node %s CID Retrieval error at block %d\r%s", sap.chain.String(), i, err.Error())) sendNonBlockingErr(sub, fmt.Errorf(" %s super node CID Retrieval error at block %d\r%s", sap.chain.String(), i, err.Error()))
continue continue
} }
if empty { if empty {
@ -404,7 +419,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
for _, cids := range cidWrappers { for _, cids := range cidWrappers {
response, err := sap.IPLDFetcher.Fetch(cids) response, err := sap.IPLDFetcher.Fetch(cids)
if err != nil { if err != nil {
sendNonBlockingErr(sub, fmt.Errorf("super node %s IPLD Fetching error at block %d\r%s", sap.chain.String(), i, err.Error())) sendNonBlockingErr(sub, fmt.Errorf("%s super node IPLD Fetching error at block %d\r%s", sap.chain.String(), i, err.Error()))
continue continue
} }
responseRLP, err := rlp.EncodeToBytes(response) responseRLP, err := rlp.EncodeToBytes(response)
@ -416,16 +431,16 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}: case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}:
log.Debugf("sending super node historical data payload to %s subscription %s", sap.chain.String(), id) log.Debugf("sending super node historical data payload to %s subscription %s", sap.chain.String(), id)
default: default:
log.Infof("unable to send back-fill payload to %s subscription %s; channel has no receiver", sap.chain.String(), id) log.Infof("unable to send backFill payload to %s subscription %s; channel has no receiver", sap.chain.String(), id)
} }
} }
} }
// when we are done backfilling send an empty payload signifying so in the msg // when we are done backfilling send an empty payload signifying so in the msg
select { select {
case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: "", Flag: BackFillCompleteFlag}: case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: "", Flag: BackFillCompleteFlag}:
log.Debugf("sending backfill completion notice to %s subscription %s", sap.chain.String(), id) log.Debugf("sending backFill completion notice to %s subscription %s", sap.chain.String(), id)
default: default:
log.Infof("unable to send backfill completion notice to %s subscription %s", sap.chain.String(), id) log.Infof("unable to send backFill completion notice to %s subscription %s", sap.chain.String(), id)
} }
}() }()
return nil return nil

View File

@ -66,7 +66,7 @@ var _ = Describe("Service", func() {
err := processor.Sync(wg, nil) err := processor.Sync(wg, nil)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
quitChan <- true close(quitChan)
wg.Wait() wg.Wait()
Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStateDiffPayload)) Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStateDiffPayload))
Expect(len(mockCidIndexer.PassedCIDPayload)).To(Equal(1)) Expect(len(mockCidIndexer.PassedCIDPayload)).To(Equal(1))