Track validation progress on a channel

This commit is contained in:
Prathamesh Musale 2022-06-01 16:36:46 +05:30
parent fc4ca89b41
commit 301bc8859d
3 changed files with 34 additions and 15 deletions

View File

@ -47,7 +47,7 @@ func stateValidator() {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
service := validator.NewService(cfg.DB, height, trail, sleepInterval, chainCfg) service := validator.NewService(cfg.DB, height, trail, sleepInterval, chainCfg, nil)
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
wg.Add(1) wg.Add(1)

View File

@ -37,9 +37,10 @@ type service struct {
logger *log.Logger logger *log.Logger
chainCfg *params.ChainConfig chainCfg *params.ChainConfig
quitChan chan bool quitChan chan bool
progressChan chan uint64
} }
func NewService(db *sqlx.DB, blockNum, trailNum uint64, sleepInterval uint, chainCfg *params.ChainConfig) *service { func NewService(db *sqlx.DB, blockNum, trailNum uint64, sleepInterval uint, chainCfg *params.ChainConfig, progressChan chan uint64) *service {
return &service{ return &service{
db: db, db: db,
blockNum: blockNum, blockNum: blockNum,
@ -48,6 +49,7 @@ func NewService(db *sqlx.DB, blockNum, trailNum uint64, sleepInterval uint, chai
logger: log.New(), logger: log.New(),
chainCfg: chainCfg, chainCfg: chainCfg,
quitChan: make(chan bool), quitChan: make(chan bool),
progressChan: progressChan,
} }
} }
@ -96,7 +98,6 @@ func (s *service) Start(ctx context.Context, wg *sync.WaitGroup) {
select { select {
case <-s.quitChan: case <-s.quitChan:
s.logger.Infof("last validated block %v", idxBlockNum-1) s.logger.Infof("last validated block %v", idxBlockNum-1)
s.logger.Info("stopping ipld-eth-db-validator process")
return return
default: default:
idxBlockNum, err = s.Validate(ctx, api, idxBlockNum) idxBlockNum, err = s.Validate(ctx, api, idxBlockNum)
@ -111,6 +112,10 @@ func (s *service) Start(ctx context.Context, wg *sync.WaitGroup) {
// Stop is used to gracefully stop the service // Stop is used to gracefully stop the service
func (s *service) Stop() { func (s *service) Stop() {
s.logger.Info("stopping ipld-eth-db-validator process")
if s.progressChan != nil {
close(s.progressChan)
}
close(s.quitChan) close(s.quitChan)
} }
@ -129,6 +134,10 @@ func (s *service) Validate(ctx context.Context, api *ipldEth.PublicEthAPI, idxBl
} }
s.logger.Infof("state root verified for block %d", idxBlockNum) s.logger.Infof("state root verified for block %d", idxBlockNum)
if s.progressChan != nil {
s.progressChan <- idxBlockNum
}
idxBlockNum++ idxBlockNum++
} else { } else {
// Sleep / wait for head to move ahead // Sleep / wait for head to move ahead

View File

@ -2,6 +2,7 @@ package integration_test
import ( import (
"context" "context"
"sync"
"time" "time"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
@ -25,6 +26,24 @@ var _ = Describe("Integration test", func() {
var contractErr error var contractErr error
sleepInterval := 5 * time.Second sleepInterval := 5 * time.Second
db := shared.SetupDB()
validationProgressChan := make(chan uint64)
service := validator.NewService(db, 1, trail, validatorSleepInterval, validator.IntegrationTestChainConfig, validationProgressChan)
wg := new(sync.WaitGroup)
It("test init", func() {
wg.Add(1)
go service.Start(ctx, wg)
})
defer It("test teardown", func() {
service.Stop()
wg.Wait()
Expect(validationProgressChan).To(BeClosed())
})
Describe("Validate state", func() { Describe("Validate state", func() {
BeforeEach(func() { BeforeEach(func() {
// Deploy a dummy contract as the first contract might get deployed at block number 0 // Deploy a dummy contract as the first contract might get deployed at block number 0
@ -35,20 +54,11 @@ var _ = Describe("Integration test", func() {
time.Sleep(sleepInterval) time.Sleep(sleepInterval)
}) })
It("Validate state root", func() { It("performs state root validation", func() {
Expect(contractErr).ToNot(HaveOccurred()) Expect(contractErr).ToNot(HaveOccurred())
db := shared.SetupDB() Expect(validationProgressChan).ToNot(BeClosed())
srvc := validator.NewService(db, uint64(contract.BlockNumber), trail, validatorSleepInterval, validator.IntegrationTestChainConfig) Eventually(validationProgressChan).Should(Receive(Equal(uint64(contract.BlockNumber))))
stopCh := make(chan int, 1)
go func() {
srvc.Start(ctx, nil)
stopCh <- 1
}()
go func() {
<-stopCh
srvc.Stop()
}()
}) })
}) })
}) })