diff --git a/cmd/resync.go b/cmd/resync.go index dd4bb9aa..121000d2 100644 --- a/cmd/resync.go +++ b/cmd/resync.go @@ -18,7 +18,10 @@ package cmd import ( "fmt" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + + "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/super_node/resync" ) @@ -28,15 +31,24 @@ var resyncCmd = &cobra.Command{ Short: "Resync historical data", Long: `Use this command to fill in sections of missing data in the super node`, Run: func(cmd *cobra.Command, args []string) { + subCommand = cmd.CalledAs() + logWithCommand = *log.WithField("SubCommand", subCommand) rsyncCmdCommand() }, } +func init() { + rootCmd.AddCommand(resyncCmd) +} + func rsyncCmdCommand() { rConfig, err := resync.NewReSyncConfig() if err != nil { logWithCommand.Fatal(err) } + if err := ipfs.InitIPFSPlugins(); err != nil { + logWithCommand.Fatal(err) + } rService, err := resync.NewResyncService(rConfig) if err != nil { logWithCommand.Fatal(err) @@ -46,7 +58,3 @@ func rsyncCmdCommand() { } fmt.Printf("%s %s resync finished", rConfig.Chain.String(), rConfig.ResyncType.String()) } - -func init() { - rootCmd.AddCommand(resyncCmd) -} diff --git a/environments/resyncBTC.toml b/environments/resyncBTC.toml new file mode 100644 index 00000000..b35bd61e --- /dev/null +++ b/environments/resyncBTC.toml @@ -0,0 +1,24 @@ +[database] + name = "vulcanize_public" + hostname = "localhost" + port = 5432 + user = "vdbm" + +[resync] + chain = "bitcoin" + type = "full" + clearOldCache = true + ipfsPath = "/root/.ipfs" + batchSize = 1 + batchNumber = 50 + start = 0 + stop = 0 + +[bitcoin] + httpPath = "127.0.0.1:8332" + pass = "password" + user = "username" + nodeID = "ocd0" + clientName = "Omnicore" + genesisBlock = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f" + networkID = "0xD9B4BEF9" \ No newline at end of file diff --git a/environments/resyncETH.toml b/environments/resyncETH.toml new file mode 100644 index 00000000..98253e67 --- /dev/null +++ b/environments/resyncETH.toml @@ -0,0 +1,18 @@ +[database] + name = "vulcanize_public" + hostname = "localhost" + port = 5432 + user = "vdbm" + +[resync] + chain = "ethereum" + type = "state" + clearOldCache = true + ipfsPath = "/root/.ipfs" + batchSize = 5 + batchNumber = 50 + start = 0 + stop = 0 + +[ethereum] + httpPath = "127.0.0.1:8545" \ No newline at end of file diff --git a/environments/superNodeBTC.toml b/environments/superNodeBTC.toml index ce2f6b1f..52b0d633 100644 --- a/environments/superNodeBTC.toml +++ b/environments/superNodeBTC.toml @@ -14,8 +14,9 @@ sync = true workers = 1 backFill = true - frequency = 15 - batchSize = 50 + frequency = 45 + batchSize = 1 + batchNumber = 50 [bitcoin] wsPath = "127.0.0.1:8332" diff --git a/environments/superNodeETH.toml b/environments/superNodeETH.toml index c2abc1b0..cc2b7274 100644 --- a/environments/superNodeETH.toml +++ b/environments/superNodeETH.toml @@ -9,14 +9,15 @@ ipfsPath = "/root/.ipfs" server = true ipcPath = "/root/.vulcanize/eth/vulcanize.ipc" - wsPath = "127.0.0.1:8080" - httpPath = "127.0.0.1:8081" + wsPath = "127.0.0.1:8081" + httpPath = "127.0.0.1:8082" sync = true workers = 1 backFill = true frequency = 15 - batchSize = 50 + batchSize = 5 + batchNumber = 50 [ethereum] - wsPath = "ws://127.0.0.1:8546" - httpPath = "http://127.0.0.1:8545" \ No newline at end of file + wsPath = "127.0.0.1:8546" + httpPath = "127.0.0.1:8545" \ No newline at end of file diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index ace79700..247a30f4 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -30,7 +30,7 @@ import ( const ( DefaultMaxBatchSize uint64 = 100 - DefaultMaxBatchNumber int64 = 10 + DefaultMaxBatchNumber int64 = 50 ) // BackFillInterface for filling in gaps in the super node @@ -57,6 +57,8 @@ type BackFillService struct { GapCheckFrequency time.Duration // Size of batch fetches BatchSize uint64 + // Number of goroutines + BatchNumber int64 // Channel for receiving quit signal QuitChan chan bool // Chain type @@ -89,6 +91,10 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert if batchSize == 0 { batchSize = DefaultMaxBatchSize } + batchNumber := int64(settings.BatchNumber) + if batchNumber == 0 { + batchNumber = DefaultMaxBatchNumber + } return &BackFillService{ Indexer: indexer, Converter: converter, @@ -97,6 +103,7 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert Fetcher: fetcher, GapCheckFrequency: settings.Frequency, BatchSize: batchSize, + BatchNumber: int64(batchNumber), ScreenAndServeChan: screenAndServeChan, QuitChan: settings.Quit, chain: settings.Chain, @@ -112,7 +119,7 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) { for { select { case <-bfs.QuitChan: - log.Infof("quiting %s FillGaps process", bfs.chain.String()) + log.Infof("quiting %s FillGapsInSuperNode process", bfs.chain.String()) wg.Done() return case <-ticker.C: @@ -124,7 +131,9 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) { } if startingBlock != 0 { log.Infof("found gap at the beginning of the %s sync", bfs.chain.String()) - bfs.fillGaps(0, uint64(startingBlock-1)) + if err := bfs.backFill(0, uint64(startingBlock-1)); err != nil { + log.Error(err) + } } gaps, err := bfs.Retriever.RetrieveGapsInData() if err != nil { @@ -132,7 +141,7 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) { continue } for _, gap := range gaps { - if err := bfs.fillGaps(gap.Start, gap.Stop); err != nil { + if err := bfs.backFill(gap.Start, gap.Stop); err != nil { log.Error(err) } } @@ -142,27 +151,10 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) { log.Infof("%s fillGaps goroutine successfully spun up", bfs.chain.String()) } -func (bfs *BackFillService) fillGaps(startingBlock, endingBlock uint64) error { - log.Infof("going to fill in %s gap from %d to %d", bfs.chain.String(), startingBlock, endingBlock) - errChan := make(chan error) - done := make(chan bool) - if err := bfs.backFill(startingBlock, endingBlock, errChan, done); err != nil { - return err - } - for { - select { - case err := <-errChan: - log.Errorf("super node db backfill error for chain %s: %v", bfs.chain.String(), err) - case <-done: - log.Infof("finished filling in %s gap from %d to %d", bfs.chain.String(), startingBlock, endingBlock) - return nil - } - } -} - // backFill fetches, processes, and returns utils.StorageDiffs over a range of blocks // It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently -func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64, errChan chan error, done chan bool) error { +func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error { + log.Infof("filling in %s gap from %d to %d", bfs.chain.String(), startingBlock, endingBlock) if endingBlock < startingBlock { return fmt.Errorf("super node %s db backfill: ending block number needs to be greater than starting block number", bfs.chain.String()) } @@ -175,28 +167,27 @@ func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64, errChan // int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have var activeCount int64 // channel for processing goroutines to signal when they are done - processingDone := make(chan [2]uint64) + processingDone := make(chan bool) forwardDone := make(chan bool) - // for each block range bin spin up a goroutine to batch fetch and process state diffs for that range + // for each block range bin spin up a goroutine to batch fetch and process data for that range go func() { for _, blockHeights := range blockRangeBins { // if we have reached our limit of active goroutines // wait for one to finish before starting the next - if atomic.AddInt64(&activeCount, 1) > DefaultMaxBatchNumber { + if atomic.AddInt64(&activeCount, 1) > bfs.BatchNumber { // this blocks until a process signals it has finished <-forwardDone } go func(blockHeights []uint64) { payloads, err := bfs.Fetcher.FetchAt(blockHeights) if err != nil { - errChan <- err + log.Errorf("%s super node historical data fetcher error: %s", bfs.chain.String(), err.Error()) } for _, payload := range payloads { ipldPayload, err := bfs.Converter.Convert(payload) if err != nil { - errChan <- err - continue + log.Errorf("%s super node historical data converter error: %s", bfs.chain.String(), err.Error()) } // If there is a ScreenAndServe process listening, forward payload to it select { @@ -205,42 +196,36 @@ func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64, errChan } cidPayload, err := bfs.Publisher.Publish(ipldPayload) if err != nil { - errChan <- err - continue + log.Errorf("%s super node historical data publisher error: %s", bfs.chain.String(), err.Error()) } if err := bfs.Indexer.Index(cidPayload); err != nil { - errChan <- err + log.Errorf("%s super node historical data indexer error: %s", bfs.chain.String(), err.Error()) } } // when this goroutine is done, send out a signal - processingDone <- [2]uint64{blockHeights[0], blockHeights[len(blockHeights)-1]} + log.Infof("finished filling in %s gap from %d to %d", bfs.chain.String(), blockHeights[0], blockHeights[len(blockHeights)-1]) + processingDone <- true }(blockHeights) } }() - // goroutine that listens on the processingDone chan + // listen on the processingDone chan // keeps track of the number of processing goroutines that have finished - // when they have all finished, sends the final signal out - go func() { - goroutinesFinished := 0 - for { + // when they have all finished, return + goroutinesFinished := 0 + for { + select { + case <-processingDone: + atomic.AddInt64(&activeCount, -1) select { - case doneWithHeights := <-processingDone: - atomic.AddInt64(&activeCount, -1) - select { - // if we are waiting for a process to finish, signal that one has - case forwardDone <- true: - default: - } - log.Infof("finished filling in %s gap sub-bin from %d to %d", bfs.chain.String(), doneWithHeights[0], doneWithHeights[1]) - goroutinesFinished++ - if goroutinesFinished >= len(blockRangeBins) { - done <- true - return - } + // if we are waiting for a process to finish, signal that one has + case forwardDone <- true: + default: + } + goroutinesFinished++ + if goroutinesFinished >= len(blockRangeBins) { + return nil } } - }() - - return nil + } } diff --git a/pkg/super_node/backfiller_test.go b/pkg/super_node/backfiller_test.go index 7bcc58a2..236357e0 100644 --- a/pkg/super_node/backfiller_test.go +++ b/pkg/super_node/backfiller_test.go @@ -67,6 +67,7 @@ var _ = Describe("BackFiller", func() { Retriever: mockRetriever, GapCheckFrequency: time.Second * 2, BatchSize: super_node.DefaultMaxBatchSize, + BatchNumber: super_node.DefaultMaxBatchNumber, QuitChan: quitChan, } wg := &sync.WaitGroup{} @@ -121,6 +122,7 @@ var _ = Describe("BackFiller", func() { Retriever: mockRetriever, GapCheckFrequency: time.Second * 2, BatchSize: super_node.DefaultMaxBatchSize, + BatchNumber: super_node.DefaultMaxBatchNumber, QuitChan: quitChan, } wg := &sync.WaitGroup{} @@ -169,6 +171,7 @@ var _ = Describe("BackFiller", func() { Retriever: mockRetriever, GapCheckFrequency: time.Second * 2, BatchSize: super_node.DefaultMaxBatchSize, + BatchNumber: super_node.DefaultMaxBatchNumber, QuitChan: quitChan, } wg := &sync.WaitGroup{} diff --git a/pkg/super_node/btc/http_streamer.go b/pkg/super_node/btc/http_streamer.go index a75caf91..3cee388a 100644 --- a/pkg/super_node/btc/http_streamer.go +++ b/pkg/super_node/btc/http_streamer.go @@ -68,12 +68,12 @@ func (ps *HTTPPayloadStreamer) Stream(payloadChan chan shared.RawChainData) (sha if bytes.Equal(blockHashBytes, ps.lastHash) { continue } - ps.lastHash = blockHashBytes block, err := client.GetBlock(blockHash) if err != nil { errChan <- err continue } + ps.lastHash = blockHashBytes payloadChan <- BlockPayload{ Header: &block.Header, BlockHeight: height, diff --git a/pkg/super_node/config.go b/pkg/super_node/config.go index 2e2f76de..dfa29543 100644 --- a/pkg/super_node/config.go +++ b/pkg/super_node/config.go @@ -17,6 +17,7 @@ package super_node import ( + "fmt" "os" "path/filepath" "time" @@ -53,10 +54,11 @@ type Config struct { WSClient interface{} NodeInfo core.Node // Backfiller params - BackFill bool - HTTPClient interface{} - Frequency time.Duration - BatchSize uint64 + BackFill bool + HTTPClient interface{} + Frequency time.Duration + BatchSize uint64 + BatchNumber uint64 } // NewSuperNodeConfig is used to initialize a SuperNode config from a .toml file @@ -90,7 +92,7 @@ func NewSuperNodeConfig() (*Config, error) { c.Workers = workers switch c.Chain { case shared.Ethereum: - c.NodeInfo, c.WSClient, err = getEthNodeAndClient(viper.GetString("ethereum.wsPath")) + c.NodeInfo, c.WSClient, err = getEthNodeAndClient(fmt.Sprintf("ws://%s", viper.GetString("ethereum.wsPath"))) if err != nil { return nil, err } @@ -116,7 +118,7 @@ func NewSuperNodeConfig() (*Config, error) { if c.Serve { wsPath := viper.GetString("superNode.wsPath") if wsPath == "" { - wsPath = "ws://127.0.0.1:8546" + wsPath = "127.0.0.1:8080" } c.WSEndpoint = wsPath ipcPath := viper.GetString("superNode.ipcPath") @@ -130,7 +132,7 @@ func NewSuperNodeConfig() (*Config, error) { c.IPCEndpoint = ipcPath httpPath := viper.GetString("superNode.httpPath") if httpPath == "" { - httpPath = "http://127.0.0.1:8545" + httpPath = "127.0.0.1:8081" } c.HTTPEndpoint = httpPath } @@ -162,7 +164,7 @@ func (sn *Config) BackFillFields(chain string) error { var err error switch sn.Chain { case shared.Ethereum: - _, httpClient, err = getEthNodeAndClient(viper.GetString("ethereum.httpPath")) + _, httpClient, err = getEthNodeAndClient(fmt.Sprintf("http://%s", viper.GetString("ethereum.httpPath"))) if err != nil { return err } @@ -185,6 +187,7 @@ func (sn *Config) BackFillFields(chain string) error { } sn.Frequency = frequency sn.BatchSize = uint64(viper.GetInt64("superNode.batchSize")) + sn.BatchNumber = uint64(viper.GetInt64("superNode.batchNumber")) return nil } diff --git a/pkg/super_node/constructors.go b/pkg/super_node/constructors.go index 8f0ccf6e..34ddb93d 100644 --- a/pkg/super_node/constructors.go +++ b/pkg/super_node/constructors.go @@ -172,8 +172,9 @@ func NewCleaner(chain shared.ChainType, db *postgres.DB) (shared.Cleaner, error) switch chain { case shared.Ethereum: return eth.NewCleaner(db), nil - // TODO: support BTC + case shared.Bitcoin: + return btc.NewCleaner(db), nil default: - return nil, fmt.Errorf("invalid chain %s for publisher constructor", chain.String()) + return nil, fmt.Errorf("invalid chain %s for cleaner constructor", chain.String()) } } diff --git a/pkg/super_node/resync/config.go b/pkg/super_node/resync/config.go index 7dcb49ab..ab8b1cdc 100644 --- a/pkg/super_node/resync/config.go +++ b/pkg/super_node/resync/config.go @@ -45,10 +45,11 @@ type Config struct { DBConfig config.Database IPFSPath string - HTTPClient interface{} // Note this client is expected to support the retrieval of the specified data type(s) - NodeInfo core.Node // Info for the associated node - Ranges [][2]uint64 // The block height ranges to resync - BatchSize uint64 // BatchSize for the resync http calls (client has to support batch sizing) + HTTPClient interface{} // Note this client is expected to support the retrieval of the specified data type(s) + NodeInfo core.Node // Info for the associated node + Ranges [][2]uint64 // The block height ranges to resync + BatchSize uint64 // BatchSize for the resync http calls (client has to support batch sizing) + BatchNumber uint64 Quit chan bool // Channel for shutting down } @@ -57,6 +58,9 @@ type Config struct { func NewReSyncConfig() (*Config, error) { c := new(Config) var err error + start := uint64(viper.GetInt64("resync.start")) + stop := uint64(viper.GetInt64("resync.stop")) + c.Ranges = [][2]uint64{{start, stop}} ipfsPath := viper.GetString("resync.ipfsPath") if ipfsPath == "" { home, err := os.UserHomeDir() @@ -90,11 +94,10 @@ func NewReSyncConfig() (*Config, error) { } return nil, fmt.Errorf("chain type %s does not support data type %s", c.Chain.String(), c.ResyncType.String()) } - c.BatchSize = uint64(viper.GetInt64("resync.batchSize")) switch c.Chain { case shared.Ethereum: - c.NodeInfo, c.HTTPClient, err = getEthNodeAndClient(viper.GetString("ethereum.httpPath")) + c.NodeInfo, c.HTTPClient, err = getEthNodeAndClient(fmt.Sprintf("http://%s", viper.GetString("ethereum.httpPath"))) if err != nil { return nil, err } @@ -117,6 +120,8 @@ func NewReSyncConfig() (*Config, error) { db := utils.LoadPostgres(c.DBConfig, c.NodeInfo) c.DB = &db c.Quit = make(chan bool) + c.BatchSize = uint64(viper.GetInt64("resync.batchSize")) + c.BatchNumber = uint64(viper.GetInt64("resync.batchNumber")) return c, nil } diff --git a/pkg/super_node/resync/service.go b/pkg/super_node/resync/service.go index 13300c6f..74c4cd2d 100644 --- a/pkg/super_node/resync/service.go +++ b/pkg/super_node/resync/service.go @@ -46,6 +46,8 @@ type Service struct { Cleaner shared.Cleaner // Size of batch fetches BatchSize uint64 + // Number of goroutines + BatchNumber int64 // Channel for receiving quit signal QuitChan chan bool // Chain type @@ -88,6 +90,10 @@ func NewResyncService(settings *Config) (Resync, error) { if batchSize == 0 { batchSize = super_node.DefaultMaxBatchSize } + batchNumber := int64(settings.BatchNumber) + if batchNumber == 0 { + batchNumber = super_node.DefaultMaxBatchNumber + } return &Service{ Indexer: indexer, Converter: converter, @@ -96,6 +102,7 @@ func NewResyncService(settings *Config) (Resync, error) { Fetcher: fetcher, Cleaner: cleaner, BatchSize: batchSize, + BatchNumber: int64(batchNumber), QuitChan: settings.Quit, chain: settings.Chain, ranges: settings.Ranges, @@ -107,36 +114,19 @@ func NewResyncService(settings *Config) (Resync, error) { func (rs *Service) Resync() error { if rs.clearOldCache { if err := rs.Cleaner.Clean(rs.ranges, rs.data); err != nil { - return fmt.Errorf("%s resync %s data cleaning error: %v", rs.chain.String(), rs.data.String(), err) + return fmt.Errorf("%s %s data resync cleaning error: %v", rs.chain.String(), rs.data.String(), err) } } for _, rng := range rs.ranges { if err := rs.resync(rng[0], rng[1]); err != nil { - return fmt.Errorf("%s resync %s data sync initialization error: %v", rs.chain.String(), rs.data.String(), err) + return fmt.Errorf("%s %s data resync initialization error: %v", rs.chain.String(), rs.data.String(), err) } } return nil } func (rs *Service) resync(startingBlock, endingBlock uint64) error { - logrus.Infof("resyncing %s %s data from %d to %d", rs.chain.String(), rs.data.String(), startingBlock, endingBlock) - errChan := make(chan error) - done := make(chan bool) - if err := rs.refill(startingBlock, endingBlock, errChan, done); err != nil { - return err - } - for { - select { - case err := <-errChan: - logrus.Errorf("%s resync %s data sync error: %v", rs.chain.String(), rs.data.String(), err) - case <-done: - logrus.Infof("finished %s %s resync from %d to %d", rs.chain.String(), rs.data.String(), startingBlock, endingBlock) - return nil - } - } -} - -func (rs *Service) refill(startingBlock, endingBlock uint64, errChan chan error, done chan bool) error { + logrus.Infof("resyncing %s data from %d to %d", rs.chain.String(), startingBlock, endingBlock) if endingBlock < startingBlock { return fmt.Errorf("%s resync range ending block number needs to be greater than the starting block number", rs.chain.String()) } @@ -148,7 +138,7 @@ func (rs *Service) refill(startingBlock, endingBlock uint64, errChan chan error, // int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have var activeCount int64 // channel for processing goroutines to signal when they are done - processingDone := make(chan [2]uint64) + processingDone := make(chan bool) forwardDone := make(chan bool) // for each block range bin spin up a goroutine to batch fetch and process state diffs for that range @@ -156,59 +146,52 @@ func (rs *Service) refill(startingBlock, endingBlock uint64, errChan chan error, for _, blockHeights := range blockRangeBins { // if we have reached our limit of active goroutines // wait for one to finish before starting the next - if atomic.AddInt64(&activeCount, 1) > super_node.DefaultMaxBatchNumber { + if atomic.AddInt64(&activeCount, 1) > rs.BatchNumber { // this blocks until a process signals it has finished <-forwardDone } go func(blockHeights []uint64) { payloads, err := rs.Fetcher.FetchAt(blockHeights) if err != nil { - errChan <- err + logrus.Errorf("%s resync fetcher error: %s", rs.chain.String(), err.Error()) } for _, payload := range payloads { ipldPayload, err := rs.Converter.Convert(payload) if err != nil { - errChan <- err - continue + logrus.Errorf("%s resync converter error: %s", rs.chain.String(), err.Error()) } cidPayload, err := rs.Publisher.Publish(ipldPayload) if err != nil { - errChan <- err - continue + logrus.Errorf("%s resync publisher error: %s", rs.chain.String(), err.Error()) } if err := rs.Indexer.Index(cidPayload); err != nil { - errChan <- err + logrus.Errorf("%s resync indexer error: %s", rs.chain.String(), err.Error()) } } // when this goroutine is done, send out a signal - processingDone <- [2]uint64{blockHeights[0], blockHeights[len(blockHeights)-1]} + logrus.Infof("finished %s resync section from %d to %d", rs.chain.String(), blockHeights[0], blockHeights[len(blockHeights)-1]) + processingDone <- true }(blockHeights) } }() - // goroutine that listens on the processingDone chan - // keeps track of the number of processing goroutines that have finished + // listen on the processingDone chan and + // keep track of the number of processing goroutines that have finished // when they have all finished, sends the final signal out - go func() { - goroutinesFinished := 0 - for { + goroutinesFinished := 0 + for { + select { + case <-processingDone: + atomic.AddInt64(&activeCount, -1) select { - case doneWithHeights := <-processingDone: - atomic.AddInt64(&activeCount, -1) - select { - // if we are waiting for a process to finish, signal that one has - case forwardDone <- true: - default: - } - logrus.Infof("finished %s resync sub-bin from %d to %d", rs.chain.String(), doneWithHeights[0], doneWithHeights[1]) - goroutinesFinished++ - if goroutinesFinished >= len(blockRangeBins) { - done <- true - return - } + // if we are waiting for a process to finish, signal that one has + case forwardDone <- true: + default: + } + goroutinesFinished++ + if goroutinesFinished >= len(blockRangeBins) { + return nil } } - }() - - return nil + } }