diff --git a/cmd/state_validator.go b/cmd/state_validator.go index 5c734c7..ded52df 100644 --- a/cmd/state_validator.go +++ b/cmd/state_validator.go @@ -47,7 +47,7 @@ func stateValidator() { logWithCommand.Fatal(err) } - service := validator.NewService(cfg.DB, height, trail, sleepInterval, chainCfg) + service := validator.NewService(cfg.DB, height, trail, sleepInterval, chainCfg, nil) wg := new(sync.WaitGroup) wg.Add(1) diff --git a/pkg/validator/validator.go b/pkg/validator/validator.go index 6c9d159..b31d88f 100644 --- a/pkg/validator/validator.go +++ b/pkg/validator/validator.go @@ -37,9 +37,10 @@ type service struct { logger *log.Logger chainCfg *params.ChainConfig quitChan chan bool + progressChan chan uint64 } -func NewService(db *sqlx.DB, blockNum, trailNum uint64, sleepInterval uint, chainCfg *params.ChainConfig) *service { +func NewService(db *sqlx.DB, blockNum, trailNum uint64, sleepInterval uint, chainCfg *params.ChainConfig, progressChan chan uint64) *service { return &service{ db: db, blockNum: blockNum, @@ -48,6 +49,7 @@ func NewService(db *sqlx.DB, blockNum, trailNum uint64, sleepInterval uint, chai logger: log.New(), chainCfg: chainCfg, quitChan: make(chan bool), + progressChan: progressChan, } } @@ -96,7 +98,6 @@ func (s *service) Start(ctx context.Context, wg *sync.WaitGroup) { select { case <-s.quitChan: s.logger.Infof("last validated block %v", idxBlockNum-1) - s.logger.Info("stopping ipld-eth-db-validator process") return default: idxBlockNum, err = s.Validate(ctx, api, idxBlockNum) @@ -111,6 +112,10 @@ func (s *service) Start(ctx context.Context, wg *sync.WaitGroup) { // Stop is used to gracefully stop the service func (s *service) Stop() { + s.logger.Info("stopping ipld-eth-db-validator process") + if s.progressChan != nil { + close(s.progressChan) + } close(s.quitChan) } @@ -129,6 +134,10 @@ func (s *service) Validate(ctx context.Context, api *ipldEth.PublicEthAPI, idxBl } s.logger.Infof("state root verified for block %d", idxBlockNum) + if s.progressChan != nil { + s.progressChan <- idxBlockNum + } + idxBlockNum++ } else { // Sleep / wait for head to move ahead diff --git a/test/integration_test.go b/test/integration_test.go index ed76a77..ec0c1ba 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -2,6 +2,7 @@ package integration_test import ( "context" + "sync" "time" . "github.com/onsi/ginkgo" @@ -25,6 +26,24 @@ var _ = Describe("Integration test", func() { var contractErr error sleepInterval := 5 * time.Second + db := shared.SetupDB() + validationProgressChan := make(chan uint64) + service := validator.NewService(db, 1, trail, validatorSleepInterval, validator.IntegrationTestChainConfig, validationProgressChan) + + wg := new(sync.WaitGroup) + + It("test init", func() { + wg.Add(1) + go service.Start(ctx, wg) + }) + + defer It("test teardown", func() { + service.Stop() + wg.Wait() + + Expect(validationProgressChan).To(BeClosed()) + }) + Describe("Validate state", func() { BeforeEach(func() { // Deploy a dummy contract as the first contract might get deployed at block number 0 @@ -35,20 +54,11 @@ var _ = Describe("Integration test", func() { time.Sleep(sleepInterval) }) - It("Validate state root", func() { + It("performs state root validation", func() { Expect(contractErr).ToNot(HaveOccurred()) - db := shared.SetupDB() - srvc := validator.NewService(db, uint64(contract.BlockNumber), trail, validatorSleepInterval, validator.IntegrationTestChainConfig) - stopCh := make(chan int, 1) - go func() { - srvc.Start(ctx, nil) - stopCh <- 1 - }() - go func() { - <-stopCh - srvc.Stop() - }() + Expect(validationProgressChan).ToNot(BeClosed()) + Eventually(validationProgressChan).Should(Receive(Equal(uint64(contract.BlockNumber)))) }) }) })