Gracefully shutdown watched address fill service on interrupt

This commit is contained in:
Prathamesh Musale 2022-05-18 11:08:15 +05:30
parent a141d154b7
commit c8bdaefe97
2 changed files with 59 additions and 33 deletions

View File

@ -101,9 +101,11 @@ func serve() {
logWithCommand.Info("state validator disabled") logWithCommand.Info("state validator disabled")
} }
var watchedAddressFillService *fill.Service
if serverConfig.WatchedAddressGapFillerEnabled { if serverConfig.WatchedAddressGapFillerEnabled {
service := fill.New(serverConfig) watchedAddressFillService = fill.New(serverConfig)
go service.Start() wg.Add(1)
go watchedAddressFillService.Start(wg)
logWithCommand.Info("watched address gap filler enabled") logWithCommand.Info("watched address gap filler enabled")
} else { } else {
logWithCommand.Info("watched address gap filler disabled") logWithCommand.Info("watched address gap filler disabled")
@ -115,6 +117,9 @@ func serve() {
if graphQL != nil { if graphQL != nil {
graphQL.Stop() graphQL.Stop()
} }
if watchedAddressFillService != nil {
watchedAddressFillService.Stop()
}
server.Stop() server.Stop()
wg.Wait() wg.Wait()
} }

View File

@ -19,6 +19,7 @@ package fill
import ( import (
"math" "math"
"strings" "strings"
"sync"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -46,6 +47,7 @@ type Service struct {
db *sqlx.DB db *sqlx.DB
client *rpc.Client client *rpc.Client
interval int interval int
quitChan chan bool
} }
// NewServer creates a new Service // NewServer creates a new Service
@ -54,49 +56,68 @@ func New(config *serve.Config) *Service {
db: config.DB, db: config.DB,
client: config.Client, client: config.Client,
interval: config.WatchedAddressGapFillInterval, interval: config.WatchedAddressGapFillInterval,
quitChan: make(chan bool),
} }
} }
// Start is used to begin the service // Start is used to begin the service
func (s *Service) Start() { func (s *Service) Start(wg *sync.WaitGroup) {
defer wg.Done()
for { for {
// Wait for specified interval duration select {
time.Sleep(time.Duration(s.interval) * time.Second) case <-s.quitChan:
log.Info("quiting eth ipld server process")
return
default:
s.fill()
}
}
}
// Get watched addresses from the db // Stop is used to gracefully stop the service
rows := s.fetchWatchedAddresses() func (s *Service) Stop() {
log.Info("stopping watched address gap filler")
close(s.quitChan)
}
// Get the block number to start fill at // fill performs the filling of indexing gap for watched addresses
// Get the block number to end fill at func (s *Service) fill() {
fillWatchedAddresses, minStartBlock, maxEndBlock := s.GetFillAddresses(rows) // Wait for specified interval duration
time.Sleep(time.Duration(s.interval) * time.Second)
if len(fillWatchedAddresses) > 0 { // Get watched addresses from the db
log.Infof("running watched address gap filler for block range: (%d, %d)", minStartBlock, maxEndBlock) rows := s.fetchWatchedAddresses()
// Get the block number to start fill at
// Get the block number to end fill at
fillWatchedAddresses, minStartBlock, maxEndBlock := s.GetFillAddresses(rows)
if len(fillWatchedAddresses) > 0 {
log.Infof("running watched address gap filler for block range: (%d, %d)", minStartBlock, maxEndBlock)
}
// Fill the missing diffs
for blockNumber := minStartBlock; blockNumber <= maxEndBlock; blockNumber++ {
params := statediff.Params{
IntermediateStateNodes: true,
IntermediateStorageNodes: true,
IncludeBlock: true,
IncludeReceipts: true,
IncludeTD: true,
IncludeCode: true,
} }
// Fill the missing diffs fillAddresses := []interface{}{}
for blockNumber := minStartBlock; blockNumber <= maxEndBlock; blockNumber++ { for _, fillWatchedAddress := range fillWatchedAddresses {
params := statediff.Params{ if blockNumber >= fillWatchedAddress.StartBlock && blockNumber <= fillWatchedAddress.EndBlock {
IntermediateStateNodes: true, params.WatchedAddresses = append(params.WatchedAddresses, common.HexToAddress(fillWatchedAddress.Address))
IntermediateStorageNodes: true, fillAddresses = append(fillAddresses, fillWatchedAddress.Address)
IncludeBlock: true,
IncludeReceipts: true,
IncludeTD: true,
IncludeCode: true,
} }
}
fillAddresses := []interface{}{} if len(fillAddresses) > 0 {
for _, fillWatchedAddress := range fillWatchedAddresses { s.writeStateDiffAt(blockNumber, params)
if blockNumber >= fillWatchedAddress.StartBlock && blockNumber <= fillWatchedAddress.EndBlock { s.UpdateLastFilledAt(blockNumber, fillAddresses)
params.WatchedAddresses = append(params.WatchedAddresses, common.HexToAddress(fillWatchedAddress.Address))
fillAddresses = append(fillAddresses, fillWatchedAddress.Address)
}
}
if len(fillAddresses) > 0 {
s.writeStateDiffAt(blockNumber, params)
s.UpdateLastFilledAt(blockNumber, fillAddresses)
}
} }
} }
} }