Feature/study application memory #70

Open
abdulrabbani00 wants to merge 18 commits from feature/study-application-memory into develop
15 changed files with 172 additions and 120 deletions
Showing only changes of commit 8253201f95 - Show all commits

View File

@ -39,6 +39,7 @@ var (
bcConnectionProtocol string bcConnectionProtocol string
bcType string bcType string
bcMaxHistoricProcessWorker int bcMaxHistoricProcessWorker int
bcMaxHeadProcessWorker int
bcUniqueNodeIdentifier int bcUniqueNodeIdentifier int
bcCheckDb bool bcCheckDb bool
kgMaxWorker int kgMaxWorker int
@ -96,7 +97,8 @@ func init() {
captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.") captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.")
captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application")
captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application")
captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the eth-beacon.historic_process table. Be careful of system memory.") captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 3, "The number of workers that should be actively processing slots from the eth-beacon.historic_process table. Be careful of system memory.")
captureCmd.PersistentFlags().IntVarP(&bcMaxHeadProcessWorker, "bc.maxHeadProcessWorker", "", 3, "The number of workers that should be actively processing slots head slots. Be careful of system memory.")
captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.") captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.")
captureCmd.PersistentFlags().BoolVarP(&bcCheckDb, "bc.checkDb", "", true, "Should we check to see if the slot exists in the DB before writing it?") captureCmd.PersistentFlags().BoolVarP(&bcCheckDb, "bc.checkDb", "", true, "Should we check to see if the slot exists in the DB before writing it?")
// err = captureCmd.MarkPersistentFlagRequired("bc.address") // err = captureCmd.MarkPersistentFlagRequired("bc.address")
@ -107,7 +109,7 @@ func init() {
//// Known Gaps specific //// Known Gaps specific
captureCmd.PersistentFlags().BoolVarP(&kgProcessGaps, "kg.processKnownGaps", "", true, "Should we process the slots within the eth-beacon.known_gaps table.") captureCmd.PersistentFlags().BoolVarP(&kgProcessGaps, "kg.processKnownGaps", "", true, "Should we process the slots within the eth-beacon.known_gaps table.")
captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.") captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.")
captureCmd.PersistentFlags().IntVarP(&kgMaxWorker, "kg.maxKnownGapsWorker", "", 30, "The number of workers that should be actively processing slots from the eth-beacon.known_gaps table. Be careful of system memory.") captureCmd.PersistentFlags().IntVarP(&kgMaxWorker, "kg.maxKnownGapsWorker", "", 3, "The number of workers that should be actively processing slots from the eth-beacon.known_gaps table. Be careful of system memory.")
// Prometheus Specific // Prometheus Specific
captureCmd.PersistentFlags().BoolVarP(&pmMetrics, "pm.metrics", "", true, "Should we capture prometheus metrics.") captureCmd.PersistentFlags().BoolVarP(&pmMetrics, "pm.metrics", "", true, "Should we capture prometheus metrics.")
@ -157,6 +159,8 @@ func init() {
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker")) err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker"))
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.maxHeadProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHeadProcessWorker"))
exitErr(err)
err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier")) err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier"))
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.checkDb", captureCmd.PersistentFlags().Lookup("bc.checkDb")) err = viper.BindPFlag("bc.checkDb", captureCmd.PersistentFlags().Lookup("bc.checkDb"))

View File

@ -59,7 +59,7 @@ func init() {
func startFullProcessing() { func startFullProcessing() {
// Boot the application // Boot the application
log.Info("Starting the application in head tracking mode.") log.Info("Starting the application in head tracking mode.")
ctx := context.Background() ctx, cancel := context.WithCancel(context.Background())
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
@ -75,14 +75,11 @@ func startFullProcessing() {
log.Info("The Beacon Client has booted successfully!") log.Info("The Beacon Client has booted successfully!")
// Capture head blocks // Capture head blocks
hdCtx, hdCancel := context.WithCancel(context.Background()) go Bc.CaptureHead(ctx, viper.GetInt("bc.maxHeadProcessWorker"), false)
go Bc.CaptureHead(hdCtx, false)
hpContext, hpCancel := context.WithCancel(context.Background())
errG, _ := errgroup.WithContext(context.Background()) errG, _ := errgroup.WithContext(context.Background())
errG.Go(func() error { errG.Go(func() error {
errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker")) errs := Bc.CaptureHistoric(ctx, viper.GetInt("bc.maxHistoricProcessWorker"))
if len(errs) != 0 { if len(errs) != 0 {
if len(errs) != 0 { if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events") log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events")
@ -91,12 +88,11 @@ func startFullProcessing() {
} }
return nil return nil
}) })
kgCtx, kgCancel := context.WithCancel(context.Background())
if viper.GetBool("kg.processKnownGaps") { if viper.GetBool("kg.processKnownGaps") {
go func() { go func() {
errG := new(errgroup.Group) errG := new(errgroup.Group)
errG.Go(func() error { errG.Go(func() error {
errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker")) errs := Bc.ProcessKnownGaps(ctx, viper.GetInt("kg.maxKnownGapsWorker"))
if len(errs) != 0 { if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
@ -116,7 +112,7 @@ func startFullProcessing() {
} }
// Shutdown when the time is right. // Shutdown when the time is right.
err = shutdown.ShutdownFull(ctx, hdCancel, kgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) err = shutdown.ShutdownFull(ctx, cancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!") loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!")
} else { } else {

View File

@ -20,7 +20,6 @@ import (
"context" "context"
"fmt" "fmt"
"net/http" "net/http"
"os"
"strconv" "strconv"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
@ -47,7 +46,7 @@ var headCmd = &cobra.Command{
func startHeadTracking() { func startHeadTracking() {
// Boot the application // Boot the application
log.Info("Starting the application in head tracking mode.") log.Info("Starting the application in head tracking mode.")
ctx := context.Background() ctx, cancel := context.WithCancel(context.Background())
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
@ -63,15 +62,13 @@ func startHeadTracking() {
log.Info("The Beacon Client has booted successfully!") log.Info("The Beacon Client has booted successfully!")
// Capture head blocks // Capture head blocks
hdCtx, hdCancel := context.WithCancel(context.Background()) go Bc.CaptureHead(ctx, viper.GetInt("bc.maxHeadProcessWorker"), false)
go Bc.CaptureHead(hdCtx, false)
kgCtx, kgCancel := context.WithCancel(context.Background())
if viper.GetBool("kg.processKnownGaps") { if viper.GetBool("kg.processKnownGaps") {
go func() { go func() {
errG := new(errgroup.Group) errG := new(errgroup.Group)
errG.Go(func() error { errG.Go(func() error {
errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker")) errs := Bc.ProcessKnownGaps(ctx, viper.GetInt("kg.maxKnownGapsWorker"))
if len(errs) != 0 { if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
@ -91,14 +88,12 @@ func startHeadTracking() {
} }
// Shutdown when the time is right. // Shutdown when the time is right.
err = shutdown.ShutdownHeadTracking(ctx, hdCancel, kgCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!") loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!")
} else { } else {
log.Info("Gracefully shutdown ipld-eth-beacon-indexer") log.Info("Gracefully shutdown ipld-eth-beacon-indexer")
} }
log.Debug("WTF")
os.Exit(0)
} }
func init() { func init() {

View File

@ -49,7 +49,7 @@ var historicCmd = &cobra.Command{
func startHistoricProcessing() { func startHistoricProcessing() {
// Boot the application // Boot the application
log.Info("Starting the application in head tracking mode.") log.Info("Starting the application in head tracking mode.")
ctx := context.Background() ctx, cancel := context.WithCancel(context.Background())
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
@ -63,11 +63,9 @@ func startHistoricProcessing() {
serveProm(addr) serveProm(addr)
} }
hpContext, hpCancel := context.WithCancel(context.Background())
errG, _ := errgroup.WithContext(context.Background()) errG, _ := errgroup.WithContext(context.Background())
errG.Go(func() error { errG.Go(func() error {
errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker")) errs := Bc.CaptureHistoric(ctx, viper.GetInt("bc.maxHistoricProcessWorker"))
if len(errs) != 0 { if len(errs) != 0 {
if len(errs) != 0 { if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events") log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events")
@ -77,12 +75,11 @@ func startHistoricProcessing() {
return nil return nil
}) })
kgContext, kgCancel := context.WithCancel(context.Background())
if viper.GetBool("kg.processKnownGaps") { if viper.GetBool("kg.processKnownGaps") {
go func() { go func() {
errG := new(errgroup.Group) errG := new(errgroup.Group)
errG.Go(func() error { errG.Go(func() error {
errs := Bc.ProcessKnownGaps(kgContext, viper.GetInt("kg.maxKnownGapsWorker")) errs := Bc.ProcessKnownGaps(ctx, viper.GetInt("kg.maxKnownGapsWorker"))
if len(errs) != 0 { if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
@ -102,7 +99,7 @@ func startHistoricProcessing() {
} }
// Shutdown when the time is right. // Shutdown when the time is right.
err = shutdown.ShutdownHistoricProcessing(ctx, kgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) err = shutdown.ShutdownHistoricProcessing(ctx, cancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!") loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!")
} else { } else {

View File

@ -40,68 +40,66 @@ func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime t
} }
// Wrapper function for shutting down the head tracking process. // Wrapper function for shutting down the head tracking process.
func ShutdownHeadTracking(ctx context.Context, hdCancel context.CancelFunc, kgCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { func ShutdownHeadTracking(ctx context.Context, cancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{
// Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown.
"beaconClient": func(ctx context.Context) error { "beaconClient": func(ctx context.Context) error {
defer DB.Close() defer DB.Close()
err := BC.StopHeadTracking(hdCancel) cancel()
if err != nil { BC.StopHeadTracking(ctx, false)
loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking")
}
if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) {
err = BC.StopKnownGapsProcessing(kgCancel) err := BC.StopKnownGapsProcessing(ctx)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to stop processing known gaps") loghelper.LogError(err).Error("Unable to stop processing known gaps")
}
}
return err return err
}
}
return nil
}, },
}) })
} }
// Wrapper function for shutting down the head tracking process. // Wrapper function for shutting down the head tracking process.
func ShutdownHistoricProcessing(ctx context.Context, kgCancel, hpCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { func ShutdownHistoricProcessing(ctx context.Context, cancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{
// Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown.
"beaconClient": func(ctx context.Context) error { "beaconClient": func(ctx context.Context) error {
defer DB.Close() defer DB.Close()
err := BC.StopHistoric(hpCancel) cancel()
err := BC.StopHistoric(ctx)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to stop processing historic") loghelper.LogError(err).Error("Unable to stop processing historic")
} }
if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) {
err = BC.StopKnownGapsProcessing(kgCancel) err = BC.StopKnownGapsProcessing(ctx)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to stop processing known gaps") loghelper.LogError(err).Error("Unable to stop processing known gaps")
}
}
return err return err
}
}
return nil
}, },
}) })
} }
// Shutdown the head and historical processing // Shutdown the head and historical processing
func ShutdownFull(ctx context.Context, hdCancel context.CancelFunc, kgCancel, hpCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { func ShutdownFull(ctx context.Context, cancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{
// Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown.
"beaconClient": func(ctx context.Context) error { "beaconClient": func(ctx context.Context) error {
defer DB.Close() defer DB.Close()
err := BC.StopHistoric(hpCancel) cancel()
err := BC.StopHistoric(ctx)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to stop processing historic") loghelper.LogError(err).Error("Unable to stop processing historic")
} }
if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) {
err = BC.StopKnownGapsProcessing(kgCancel) err = BC.StopKnownGapsProcessing(ctx)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to stop processing known gaps") loghelper.LogError(err).Error("Unable to stop processing known gaps")
} }
} }
err = BC.StopHeadTracking(hdCancel) BC.StopHeadTracking(ctx, false)
if err != nil {
loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking")
}
return err return err
}, },
}) })

View File

@ -56,12 +56,13 @@ var (
BC *beaconclient.BeaconClient BC *beaconclient.BeaconClient
err error err error
ctx context.Context ctx context.Context
cancel context.CancelFunc
notifierCh chan os.Signal notifierCh chan os.Signal
) )
var _ = Describe("Shutdown", func() { var _ = Describe("Shutdown", func() {
BeforeEach(func() { BeforeEach(func() {
ctx = context.Background() ctx, cancel = context.WithCancel(context.Background())
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress,
bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier, bcCheckDb) bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier, bcCheckDb)
notifierCh = make(chan os.Signal, 1) notifierCh = make(chan os.Signal, 1)
@ -72,10 +73,8 @@ var _ = Describe("Shutdown", func() {
Context("When Channels are empty,", func() { Context("When Channels are empty,", func() {
It("Should Shutdown Successfully.", func() { It("Should Shutdown Successfully.", func() {
go func() { go func() {
_, kgCancel := context.WithCancel(context.Background())
_, hdCancel := context.WithCancel(context.Background())
log.Debug("Starting shutdown chan") log.Debug("Starting shutdown chan")
err = shutdown.ShutdownHeadTracking(ctx, hdCancel, kgCancel, notifierCh, maxWaitSecondsShutdown, DB, BC) err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...") log.Debug("We have completed the shutdown...")
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}() }()
@ -86,10 +85,8 @@ var _ = Describe("Shutdown", func() {
shutdownCh := make(chan bool) shutdownCh := make(chan bool)
//log.SetLevel(log.DebugLevel) //log.SetLevel(log.DebugLevel)
go func() { go func() {
_, kgCancel := context.WithCancel(context.Background())
_, hdCancel := context.WithCancel(context.Background())
log.Debug("Starting shutdown chan") log.Debug("Starting shutdown chan")
err = shutdown.ShutdownHeadTracking(ctx, hdCancel, kgCancel, notifierCh, maxWaitSecondsShutdown, DB, BC) err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...") log.Debug("We have completed the shutdown...")
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
shutdownCh <- true shutdownCh <- true
@ -122,9 +119,7 @@ var _ = Describe("Shutdown", func() {
//log.SetLevel(log.DebugLevel) //log.SetLevel(log.DebugLevel)
go func() { go func() {
log.Debug("Starting shutdown chan") log.Debug("Starting shutdown chan")
_, kgCancel := context.WithCancel(context.Background()) err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC)
_, hdCancel := context.WithCancel(context.Background())
err = shutdown.ShutdownHeadTracking(ctx, hdCancel, kgCancel, notifierCh, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...") log.Debug("We have completed the shutdown...")
Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String()))) Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String())))
shutdownCh <- true shutdownCh <- true

View File

@ -24,17 +24,23 @@ import (
) )
// This function will perform all the heavy lifting for tracking the head of the chain. // This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHead(ctx context.Context, skipSee bool) { func (bc *BeaconClient) CaptureHead(ctx context.Context, maxHeadWorkers int, skipSee bool) {
log.Info("We are tracking the head of the chain.") log.Info("We are tracking the head of the chain.")
go bc.handleHead(ctx) go bc.handleHead(ctx, maxHeadWorkers)
go bc.handleReorg(ctx) go bc.handleReorg(ctx)
bc.captureEventTopic(ctx, skipSee) bc.captureEventTopic(ctx, skipSee)
} }
// Stop the head tracking service. // Stop the head tracking service.
func (bc *BeaconClient) StopHeadTracking(cancel context.CancelFunc) error { func (bc *BeaconClient) StopHeadTracking(ctx context.Context, skipSee bool) {
log.Info("We are going to stop tracking the head of chain because of the shutdown signal.") select {
cancel() case <-ctx.Done():
log.Info("Successfully stopped the head tracking service.") if !skipSee {
return nil bc.HeadTracking.SseClient.Unsubscribe(bc.HeadTracking.MessagesCh)
bc.ReOrgTracking.SseClient.Unsubscribe(bc.ReOrgTracking.MessagesCh)
}
log.Info("Successfully stopped the head tracking service.")
default:
log.Error("The context has not completed....")
}
} }

View File

@ -267,8 +267,6 @@ var _ = Describe("Capturehead", Label("head"), func() {
Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() { Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() {
Context("Correctly formatted Phase0 Block", Label("leak-head"), func() { Context("Correctly formatted Phase0 Block", Label("leak-head"), func() {
It("Should turn it into a struct successfully.", func() { It("Should turn it into a struct successfully.", func() {
log.SetLevel(log.DebugLevel)
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
startGoRoutines := runtime.NumGoroutine() startGoRoutines := runtime.NumGoroutine()
@ -280,7 +278,7 @@ var _ = Describe("Capturehead", Label("head"), func() {
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectParentRoot, BeaconNodeTester.TestEvents["100"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["100"].CorrectSignedBeaconBlockMhKey) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectParentRoot, BeaconNodeTester.TestEvents["100"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["100"].CorrectSignedBeaconBlockMhKey)
validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectBeaconStateMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectBeaconStateMhKey)
testStopHeadTracking(cancel, bc, startGoRoutines) testStopHeadTracking(ctx, cancel, bc, startGoRoutines)
}) })
}) })
@ -295,7 +293,7 @@ var _ = Describe("Capturehead", Label("head"), func() {
BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectParentRoot, BeaconNodeTester.TestEvents["2375703"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["2375703"].CorrectSignedBeaconBlockMhKey) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectParentRoot, BeaconNodeTester.TestEvents["2375703"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["2375703"].CorrectSignedBeaconBlockMhKey)
validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectBeaconStateMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectBeaconStateMhKey)
testStopHeadTracking(cancel, bc, startGoRoutines) testStopHeadTracking(ctx, cancel, bc, startGoRoutines)
}) })
}) })
Context("Correctly formatted Altair Test Blocks", func() { Context("Correctly formatted Altair Test Blocks", func() {
@ -313,7 +311,7 @@ var _ = Describe("Capturehead", Label("head"), func() {
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage, 74240, maxRetry, 1, 0, 0) BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage, 74240, maxRetry, 1, 0, 0)
testStopHeadTracking(cancel, bc, startGoRoutines) testStopHeadTracking(ctx, cancel, bc, startGoRoutines)
}) })
}) })
@ -332,7 +330,7 @@ var _ = Describe("Capturehead", Label("head"), func() {
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 1, 0, 0) BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 1, 0, 0)
testStopHeadTracking(cancel, bc, startGoRoutines) testStopHeadTracking(ctx, cancel, bc, startGoRoutines)
}) })
}) })
@ -347,7 +345,7 @@ var _ = Describe("Capturehead", Label("head"), func() {
BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0) BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0)
BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 0, 0) BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 0, 0)
testStopHeadTracking(cancel, bc, startGoRoutines) testStopHeadTracking(ctx, cancel, bc, startGoRoutines)
}) })
}) })
Context("Two consecutive blocks with a bad parent", func() { Context("Two consecutive blocks with a bad parent", func() {
@ -361,7 +359,7 @@ var _ = Describe("Capturehead", Label("head"), func() {
BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0) BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0)
BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 1, 1) BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 1, 1)
testStopHeadTracking(cancel, bc, startGoRoutines) testStopHeadTracking(ctx, cancel, bc, startGoRoutines)
}) })
}) })
Context("Phase 0: We have a correctly formated SSZ SignedBeaconBlock and BeaconState", func() { Context("Phase 0: We have a correctly formated SSZ SignedBeaconBlock and BeaconState", func() {
@ -396,7 +394,7 @@ var _ = Describe("Capturehead", Label("head"), func() {
Expect(start).To(Equal(102)) Expect(start).To(Equal(102))
Expect(end).To(Equal(102)) Expect(end).To(Equal(102))
testStopHeadTracking(cancel, bc, startGoRoutines) testStopHeadTracking(ctx, cancel, bc, startGoRoutines)
}) })
}) })
}) })
@ -414,7 +412,7 @@ var _ = Describe("Capturehead", Label("head"), func() {
start, end := queryKnownGaps(bc.Db, "11", "99") start, end := queryKnownGaps(bc.Db, "11", "99")
Expect(start).To(Equal(11)) Expect(start).To(Equal(11))
Expect(end).To(Equal(99)) Expect(end).To(Equal(99))
testStopHeadTracking(cancel, bc, startGoRoutines) testStopHeadTracking(ctx, cancel, bc, startGoRoutines)
}) })
}) })
Context("There is a gap at start up spanning multiple incrementing range.", func() { Context("There is a gap at start up spanning multiple incrementing range.", func() {
@ -435,7 +433,7 @@ var _ = Describe("Capturehead", Label("head"), func() {
Expect(start).To(Equal(96)) Expect(start).To(Equal(96))
Expect(end).To(Equal(99)) Expect(end).To(Equal(99))
testStopHeadTracking(cancel, bc, startGoRoutines) testStopHeadTracking(ctx, cancel, bc, startGoRoutines)
}) })
}) })
Context("Gaps between two head messages", func() { Context("Gaps between two head messages", func() {
@ -455,7 +453,7 @@ var _ = Describe("Capturehead", Label("head"), func() {
start, end = queryKnownGaps(bc.Db, "2000101", "2375702") start, end = queryKnownGaps(bc.Db, "2000101", "2375702")
Expect(start).To(Equal(2000101)) Expect(start).To(Equal(2000101))
Expect(end).To(Equal(2375702)) Expect(end).To(Equal(2375702))
testStopHeadTracking(cancel, bc, startGoRoutines) testStopHeadTracking(ctx, cancel, bc, startGoRoutines)
}) })
}) })
}) })
@ -470,7 +468,7 @@ var _ = Describe("Capturehead", Label("head"), func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.testMultipleHead(ctx, bc, TestEvents["2375703"].HeadMessage, TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry) BeaconNodeTester.testMultipleHead(ctx, bc, TestEvents["2375703"].HeadMessage, TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry)
testStopHeadTracking(cancel, bc, startGoRoutines) testStopHeadTracking(ctx, cancel, bc, startGoRoutines)
}) })
}) })
Context("Phase0: Multiple head messages for the same slot.", func() { Context("Phase0: Multiple head messages for the same slot.", func() {
@ -482,7 +480,7 @@ var _ = Describe("Capturehead", Label("head"), func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99") bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testMultipleHead(ctx, bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry) BeaconNodeTester.testMultipleHead(ctx, bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry)
testStopHeadTracking(cancel, bc, startGoRoutines) testStopHeadTracking(ctx, cancel, bc, startGoRoutines)
}) })
}) })
Context("Phase 0: Multiple reorgs have occurred on this slot", func() { Context("Phase 0: Multiple reorgs have occurred on this slot", func() {
@ -494,7 +492,7 @@ var _ = Describe("Capturehead", Label("head"), func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99") bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testMultipleReorgs(ctx, bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry) BeaconNodeTester.testMultipleReorgs(ctx, bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry)
testStopHeadTracking(cancel, bc, startGoRoutines) testStopHeadTracking(ctx, cancel, bc, startGoRoutines)
}) })
}) })
Context("Altair: Multiple reorgs have occurred on this slot", func() { Context("Altair: Multiple reorgs have occurred on this slot", func() {
@ -506,7 +504,7 @@ var _ = Describe("Capturehead", Label("head"), func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.testMultipleReorgs(ctx, bc, TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry) BeaconNodeTester.testMultipleReorgs(ctx, bc, TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry)
testStopHeadTracking(cancel, bc, startGoRoutines) testStopHeadTracking(ctx, cancel, bc, startGoRoutines)
}) })
}) })
}) })
@ -899,7 +897,7 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string
// Helper function to test three reorg messages. There are going to be many functions like this, // Helper function to test three reorg messages. There are going to be many functions like this,
// Because we need to test the same logic for multiple phases. // Because we need to test the same logic for multiple phases.
func (tbc TestBeaconNode) testMultipleReorgs(ctx context.Context, bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) { func (tbc TestBeaconNode) testMultipleReorgs(ctx context.Context, bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) {
go bc.CaptureHead(ctx, true) go bc.CaptureHead(ctx, 2, true)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
log.Info("Sending Messages to BeaconClient") log.Info("Sending Messages to BeaconClient")
@ -961,7 +959,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(ctx context.Context, bc *beaconclie
// A test to validate a single block was processed correctly // A test to validate a single block was processed correctly
func (tbc TestBeaconNode) testProcessBlock(ctx context.Context, bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) { func (tbc TestBeaconNode) testProcessBlock(ctx context.Context, bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) {
go bc.CaptureHead(ctx, true) go bc.CaptureHead(ctx, 2, true)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert) sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert)
@ -991,7 +989,7 @@ func (tbc TestBeaconNode) testProcessBlock(ctx context.Context, bc *beaconclient
// A test that ensures that if two HeadMessages occur for a single slot they are marked // A test that ensures that if two HeadMessages occur for a single slot they are marked
// as proposed and forked correctly. // as proposed and forked correctly.
func (tbc TestBeaconNode) testMultipleHead(ctx context.Context, bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) { func (tbc TestBeaconNode) testMultipleHead(ctx context.Context, bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) {
go bc.CaptureHead(ctx, true) go bc.CaptureHead(ctx, 2, true)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
sendHeadMessage(bc, firstHead, maxRetry, 1) sendHeadMessage(bc, firstHead, maxRetry, 1)
@ -1019,7 +1017,7 @@ func (tbc TestBeaconNode) testMultipleHead(ctx context.Context, bc *beaconclient
// as proposed and forked correctly. // as proposed and forked correctly.
func (tbc TestBeaconNode) testKnownGapsMessages(ctx context.Context, bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) { func (tbc TestBeaconNode) testKnownGapsMessages(ctx context.Context, bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) {
bc.KnownGapTableIncrement = tableIncrement bc.KnownGapTableIncrement = tableIncrement
go bc.CaptureHead(ctx, true) go bc.CaptureHead(ctx, 2, true)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
for _, headMsg := range msg { for _, headMsg := range msg {
@ -1060,10 +1058,10 @@ func testSszRoot(msg Message) {
} }
// A make shift function to stop head tracking and insure we dont have any goroutine leaks // A make shift function to stop head tracking and insure we dont have any goroutine leaks
func testStopHeadTracking(cancel context.CancelFunc, bc *beaconclient.BeaconClient, startGoRoutines int) { func testStopHeadTracking(ctx context.Context, cancel context.CancelFunc, bc *beaconclient.BeaconClient, startGoRoutines int) {
bc.Db.Close() bc.Db.Close()
err := bc.StopHeadTracking(cancel) cancel()
Expect(err).ToNot(HaveOccurred()) bc.StopHeadTracking(ctx, true)
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
endNum := runtime.NumGoroutine() endNum := runtime.NumGoroutine()

View File

@ -37,14 +37,18 @@ func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []e
} }
// This function will perform all the necessary clean up tasks for stopping historical processing. // This function will perform all the necessary clean up tasks for stopping historical processing.
func (bc *BeaconClient) StopHistoric(cancel context.CancelFunc) error { func (bc *BeaconClient) StopHistoric(ctx context.Context) error {
select {
case <-ctx.Done():
log.Info("We are stopping the historical processing service.") log.Info("We are stopping the historical processing service.")
cancel()
err := bc.HistoricalProcess.releaseDbLocks() err := bc.HistoricalProcess.releaseDbLocks()
if err != nil { if err != nil {
loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.historic_processing table. Manual Intervention is needed!") loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.historic_processing table. Manual Intervention is needed!")
} }
return nil return nil
default:
return fmt.Errorf("Tried to stop historic before the context ended...")
}
} }
// An interface to enforce any batch processing. Currently there are two use cases for this. // An interface to enforce any batch processing. Currently there are two use cases for this.

View File

@ -143,7 +143,7 @@ var _ = Describe("Capturehistoric", func() {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc) validatePopularBatchBlocks(bc)
testStopHeadTracking(cancel, bc, startGoRoutines) testStopHeadTracking(ctx, cancel, bc, startGoRoutines)
}) })
}) })
@ -168,7 +168,7 @@ var _ = Describe("Capturehistoric", func() {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc) validatePopularBatchBlocks(bc)
testStopHeadTracking(cancel, bc, startGoRoutines) testStopHeadTracking(ctx, cancel, bc, startGoRoutines)
}) })
}) })
Context("When it recieves a known Gaps, historic and head message (in order)", func() { Context("When it recieves a known Gaps, historic and head message (in order)", func() {
@ -192,7 +192,7 @@ var _ = Describe("Capturehistoric", func() {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc) validatePopularBatchBlocks(bc)
testStopHeadTracking(cancel, bc, startGoRoutines) testStopHeadTracking(ctx, cancel, bc, startGoRoutines)
}) })
}) })
}) })
@ -228,25 +228,20 @@ func (tbc TestBeaconNode) writeEventToHistoricProcess(bc *beaconclient.BeaconCli
// Start the CaptureHistoric function, and check for the correct inserted slots. // Start the CaptureHistoric function, and check for the correct inserted slots.
func (tbc TestBeaconNode) runHistoricalProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { func (tbc TestBeaconNode) runHistoricalProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go bc.CaptureHistoric(ctx, maxWorkers) go bc.CaptureHistoric(ctx, maxWorkers)
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)
log.Debug("Calling the stop function for historical processing..") testStopHistoricTracking(ctx, cancel, bc, startGoRoutines)
err := bc.StopHistoric(cancel)
time.Sleep(5 * time.Second)
Expect(err).ToNot(HaveOccurred())
validateAllRowsCheckedOut(bc.Db, hpCheckCheckedOutStmt)
} }
// Wrapper function that processes knownGaps // Wrapper function that processes knownGaps
func (tbc TestBeaconNode) runKnownGapsProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { func (tbc TestBeaconNode) runKnownGapsProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go bc.ProcessKnownGaps(ctx, maxWorkers) go bc.ProcessKnownGaps(ctx, maxWorkers)
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)
err := bc.StopKnownGapsProcessing(cancel) testStopHistoricTracking(ctx, cancel, bc, startGoRoutines)
time.Sleep(5 * time.Second)
Expect(err).ToNot(HaveOccurred())
validateAllRowsCheckedOut(bc.Db, kgCheckCheckedOutStmt)
} }
func validateMetrics(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { func validateMetrics(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
@ -316,3 +311,21 @@ func validateAllRowsCheckedOut(db sql.Database, checkStmt string) {
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(rows).To(Equal(int64(0))) Expect(rows).To(Equal(int64(0)))
} }
// A make shift function to stop head tracking and insure we dont have any goroutine leaks
func testStopHistoricTracking(ctx context.Context, cancel context.CancelFunc, bc *beaconclient.BeaconClient, startGoRoutines int) {
log.Debug("Calling the stop function for historical processing..")
cancel()
err := bc.StopKnownGapsProcessing(ctx)
Expect(err).ToNot(HaveOccurred())
time.Sleep(5 * time.Second)
validateAllRowsCheckedOut(bc.Db, kgCheckCheckedOutStmt)
err = bc.Db.Close()
Expect(err).ToNot(HaveOccurred())
time.Sleep(3 * time.Second)
endNum := runtime.NumGoroutine()
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
Expect(startGoRoutines).To(Equal(endNum))
}

View File

@ -24,6 +24,7 @@ import (
"strconv" "strconv"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/database/sql"
) )
// This function will perform the necessary steps to handle a reorg. // This function will perform the necessary steps to handle a reorg.
@ -42,8 +43,14 @@ func (bc *BeaconClient) handleReorg(ctx context.Context) {
} }
// This function will handle the latest head event. // This function will handle the latest head event.
func (bc *BeaconClient) handleHead(ctx context.Context) { func (bc *BeaconClient) handleHead(ctx context.Context, maxWorkers int) {
log.Info("Starting to process head.") log.Info("Starting to process head.")
workCh := make(chan workParams)
log.WithField("workerNumber", maxWorkers).Info("Creating Workers")
for i := 1; i < maxWorkers; i++ {
go bc.headBlockProcessor(ctx, workCh)
}
errorSlots := 0 errorSlots := 0
for { for {
select { select {
@ -77,9 +84,8 @@ func (bc *BeaconClient) handleHead(ctx context.Context) {
bc.StartingSlot = slot bc.StartingSlot = slot
} }
go processHeadSlot(ctx, bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement, bc.CheckDb) workCh <- workParams{db: bc.Db, serverEndpoint: bc.ServerEndpoint, slot: slot, blockRoot: head.Block, stateRoot: head.State, previousSlot: bc.PreviousSlot, previousBlockRoot: bc.PreviousBlockRoot, metrics: bc.Metrics, knownGapsTableIncrement: bc.KnownGapTableIncrement, checkDb: bc.CheckDb}
log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished sending this slot to the workCh")
log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.")
// Update the previous block // Update the previous block
bc.PreviousSlot = slot bc.PreviousSlot = slot
@ -87,3 +93,29 @@ func (bc *BeaconClient) handleHead(ctx context.Context) {
} }
} }
} }
// A worker that will process head slots.
func (bc *BeaconClient) headBlockProcessor(ctx context.Context, workCh <-chan workParams) {
for {
select {
case <-ctx.Done():
return
case wp := <-workCh:
processHeadSlot(ctx, wp.db, wp.serverEndpoint, wp.slot, wp.blockRoot, wp.stateRoot, wp.previousSlot, wp.previousBlockRoot, wp.metrics, wp.knownGapsTableIncrement, wp.checkDb)
}
}
}
// A struct used to pass parameters to the worker.
type workParams struct {
db sql.Database
serverEndpoint string
slot int
blockRoot string
stateRoot string
previousSlot int
previousBlockRoot string
metrics *BeaconClientMetrics
knownGapsTableIncrement int
checkDb bool
}

View File

@ -20,6 +20,7 @@ package beaconclient
import ( import (
"context" "context"
"fmt"
"strconv" "strconv"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -67,14 +68,18 @@ func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) []
} }
// This function will perform all the necessary clean up tasks for stopping historical processing. // This function will perform all the necessary clean up tasks for stopping historical processing.
func (bc *BeaconClient) StopKnownGapsProcessing(cancel context.CancelFunc) error { func (bc *BeaconClient) StopKnownGapsProcessing(ctx context.Context) error {
select {
case <-ctx.Done():
log.Info("We are stopping the known gaps processing service.") log.Info("We are stopping the known gaps processing service.")
cancel()
err := bc.KnownGapsProcess.releaseDbLocks() err := bc.KnownGapsProcess.releaseDbLocks()
if err != nil { if err != nil {
loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.known_gaps table. Manual Intervention is needed!") loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.known_gaps table. Manual Intervention is needed!")
} }
return nil return nil
default:
return fmt.Errorf("Tried to stop knownGaps Processing without closing the context..")
}
} }
// Get a single row of historical slots from the table. // Get a single row of historical slots from the table.

View File

@ -44,7 +44,12 @@ func querySsz(endpoint string, slot string) (*[]byte, int, error) {
} }
defer response.Body.Close() defer response.Body.Close()
rc := response.StatusCode rc := response.StatusCode
body, err := ioutil.ReadAll(response.Body)
var body []byte
//io.Copy(body, response.Body)
//bytes.buffer...
_, err = response.Body.Read(body)
//body, err := ioutil.ReadAll(response.Body)
if err != nil { if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!") loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!")
return nil, rc, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error()) return nil, rc, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error())

View File

@ -67,9 +67,9 @@ func getEnvInt(envVar string) int {
func processProdHeadBlocks(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { func processProdHeadBlocks(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
startGoRoutines := runtime.NumGoroutine() startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go bc.CaptureHead(ctx, false) go bc.CaptureHead(ctx, 2, false)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)
testStopHeadTracking(cancel, bc, startGoRoutines) testStopHeadTracking(ctx, cancel, bc, startGoRoutines)
} }

View File

@ -45,7 +45,11 @@ func Shutdown(ctx context.Context, notifierCh chan os.Signal, timeout time.Durat
// add any other syscalls that you want to be notified with // add any other syscalls that you want to be notified with
signal.Notify(notifierCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) signal.Notify(notifierCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
<-notifierCh // Wait for one or the other...
select {
case <-notifierCh:
case <-ctx.Done():
}
log.Info("Shutting Down your application") log.Info("Shutting Down your application")