From c8bdaefe9728e24ec749e404c7ba9ad4f7750c93 Mon Sep 17 00:00:00 2001 From: prathamesh0 Date: Wed, 18 May 2022 11:08:15 +0530 Subject: [PATCH] Gracefully shutdown watched address fill service on interrupt --- cmd/serve.go | 9 +++-- pkg/fill/service.go | 83 ++++++++++++++++++++++++++++----------------- 2 files changed, 59 insertions(+), 33 deletions(-) diff --git a/cmd/serve.go b/cmd/serve.go index 913f7703..04f5ec35 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -101,9 +101,11 @@ func serve() { logWithCommand.Info("state validator disabled") } + var watchedAddressFillService *fill.Service if serverConfig.WatchedAddressGapFillerEnabled { - service := fill.New(serverConfig) - go service.Start() + watchedAddressFillService = fill.New(serverConfig) + wg.Add(1) + go watchedAddressFillService.Start(wg) logWithCommand.Info("watched address gap filler enabled") } else { logWithCommand.Info("watched address gap filler disabled") @@ -115,6 +117,9 @@ func serve() { if graphQL != nil { graphQL.Stop() } + if watchedAddressFillService != nil { + watchedAddressFillService.Stop() + } server.Stop() wg.Wait() } diff --git a/pkg/fill/service.go b/pkg/fill/service.go index 807b2acb..def311e9 100644 --- a/pkg/fill/service.go +++ b/pkg/fill/service.go @@ -19,6 +19,7 @@ package fill import ( "math" "strings" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -46,6 +47,7 @@ type Service struct { db *sqlx.DB client *rpc.Client interval int + quitChan chan bool } // NewServer creates a new Service @@ -54,49 +56,68 @@ func New(config *serve.Config) *Service { db: config.DB, client: config.Client, interval: config.WatchedAddressGapFillInterval, + quitChan: make(chan bool), } } // Start is used to begin the service -func (s *Service) Start() { +func (s *Service) Start(wg *sync.WaitGroup) { + defer wg.Done() for { - // Wait for specified interval duration - time.Sleep(time.Duration(s.interval) * time.Second) + select { + case <-s.quitChan: + log.Info("quiting eth ipld server process") + return + default: + s.fill() + } + } +} - // Get watched addresses from the db - rows := s.fetchWatchedAddresses() +// Stop is used to gracefully stop the service +func (s *Service) Stop() { + log.Info("stopping watched address gap filler") + close(s.quitChan) +} - // Get the block number to start fill at - // Get the block number to end fill at - fillWatchedAddresses, minStartBlock, maxEndBlock := s.GetFillAddresses(rows) +// fill performs the filling of indexing gap for watched addresses +func (s *Service) fill() { + // Wait for specified interval duration + time.Sleep(time.Duration(s.interval) * time.Second) - if len(fillWatchedAddresses) > 0 { - log.Infof("running watched address gap filler for block range: (%d, %d)", minStartBlock, maxEndBlock) + // Get watched addresses from the db + rows := s.fetchWatchedAddresses() + + // Get the block number to start fill at + // Get the block number to end fill at + fillWatchedAddresses, minStartBlock, maxEndBlock := s.GetFillAddresses(rows) + + if len(fillWatchedAddresses) > 0 { + log.Infof("running watched address gap filler for block range: (%d, %d)", minStartBlock, maxEndBlock) + } + + // Fill the missing diffs + for blockNumber := minStartBlock; blockNumber <= maxEndBlock; blockNumber++ { + params := statediff.Params{ + IntermediateStateNodes: true, + IntermediateStorageNodes: true, + IncludeBlock: true, + IncludeReceipts: true, + IncludeTD: true, + IncludeCode: true, } - // Fill the missing diffs - for blockNumber := minStartBlock; blockNumber <= maxEndBlock; blockNumber++ { - params := statediff.Params{ - IntermediateStateNodes: true, - IntermediateStorageNodes: true, - IncludeBlock: true, - IncludeReceipts: true, - IncludeTD: true, - IncludeCode: true, + fillAddresses := []interface{}{} + for _, fillWatchedAddress := range fillWatchedAddresses { + if blockNumber >= fillWatchedAddress.StartBlock && blockNumber <= fillWatchedAddress.EndBlock { + params.WatchedAddresses = append(params.WatchedAddresses, common.HexToAddress(fillWatchedAddress.Address)) + fillAddresses = append(fillAddresses, fillWatchedAddress.Address) } + } - fillAddresses := []interface{}{} - for _, fillWatchedAddress := range fillWatchedAddresses { - if blockNumber >= fillWatchedAddress.StartBlock && blockNumber <= fillWatchedAddress.EndBlock { - params.WatchedAddresses = append(params.WatchedAddresses, common.HexToAddress(fillWatchedAddress.Address)) - fillAddresses = append(fillAddresses, fillWatchedAddress.Address) - } - } - - if len(fillAddresses) > 0 { - s.writeStateDiffAt(blockNumber, params) - s.UpdateLastFilledAt(blockNumber, fillAddresses) - } + if len(fillAddresses) > 0 { + s.writeStateDiffAt(blockNumber, params) + s.UpdateLastFilledAt(blockNumber, fillAddresses) } } }