From 60855d282364db07665ac5d5e8824f2124ddde20 Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Wed, 8 Jun 2022 10:26:27 -0400 Subject: [PATCH] Tests + Significant Refactor The code for historical processing has been significantly refactored to use a context to signify a shutdown. There have also been many tests added for historical and knownGaps processing. --- cmd/head.go | 5 +- cmd/historic.go | 10 +- internal/shutdown/shutdown.go | 10 +- internal/shutdown/shutdown_test.go | 9 +- pkg/beaconclient/beaconclient.go | 2 +- pkg/beaconclient/capturehead_test.go | 72 ++++-- pkg/beaconclient/capturehistoric.go | 79 ++++--- pkg/beaconclient/capturehistoric_test.go | 187 ++++++++++++++-- pkg/beaconclient/databasewrite.go | 6 +- pkg/beaconclient/metrics.go | 51 ++--- pkg/beaconclient/processhistoric.go | 267 ++++++++++++----------- pkg/beaconclient/processknowngaps.go | 76 ++++--- pkg/beaconclient/processslot.go | 24 +- 13 files changed, 499 insertions(+), 299 deletions(-) diff --git a/cmd/head.go b/cmd/head.go index fa7aa6a..7165851 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -63,11 +63,12 @@ func startHeadTracking() { log.Info("The Beacon Client has booted successfully!") // Capture head blocks go Bc.CaptureHead() + kgCtx, KgCancel := context.WithCancel(context.Background()) if viper.GetBool("kg.processKnownGaps") { go func() { errG := new(errgroup.Group) errG.Go(func() error { - errs := Bc.ProcessKnownGaps(viper.GetInt("kg.maxKnownGapsWorker")) + errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker")) if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") @@ -81,7 +82,7 @@ func startHeadTracking() { } // Shutdown when the time is right. - err = shutdown.ShutdownHeadTracking(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) + err = shutdown.ShutdownHeadTracking(ctx, KgCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") } else { diff --git a/cmd/historic.go b/cmd/historic.go index 34ad1b5..3e08026 100644 --- a/cmd/historic.go +++ b/cmd/historic.go @@ -60,10 +60,11 @@ func startHistoricProcessing() { serveProm(addr) } - errG, _ := errgroup.WithContext(context.Background()) + hpContext, hpCancel := context.WithCancel(context.Background()) + errG, _ := errgroup.WithContext(context.Background()) errG.Go(func() error { - errs := Bc.CaptureHistoric(viper.GetInt("bc.maxHistoricProcessWorker")) + errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker")) if len(errs) != 0 { if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events") @@ -73,11 +74,12 @@ func startHistoricProcessing() { return nil }) + kgContext, kgCancel := context.WithCancel(context.Background()) if viper.GetBool("kg.processKnownGaps") { go func() { errG := new(errgroup.Group) errG.Go(func() error { - errs := Bc.ProcessKnownGaps(viper.GetInt("kg.maxKnownGapsWorker")) + errs := Bc.ProcessKnownGaps(kgContext, viper.GetInt("kg.maxKnownGapsWorker")) if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") @@ -91,7 +93,7 @@ func startHistoricProcessing() { } // Shutdown when the time is right. - err = shutdown.ShutdownHistoricProcessing(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) + err = shutdown.ShutdownHistoricProcessing(ctx, kgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") } else { diff --git a/internal/shutdown/shutdown.go b/internal/shutdown/shutdown.go index 228b760..742a100 100644 --- a/internal/shutdown/shutdown.go +++ b/internal/shutdown/shutdown.go @@ -40,7 +40,7 @@ func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime t } // Wrapper function for shutting down the head tracking process. -func ShutdownHeadTracking(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { +func ShutdownHeadTracking(ctx context.Context, kgCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. "beaconClient": func(ctx context.Context) error { @@ -50,7 +50,7 @@ func ShutdownHeadTracking(ctx context.Context, notifierCh chan os.Signal, waitTi loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking") } if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { - err = BC.StopKnownGapsProcessing() + err = BC.StopKnownGapsProcessing(kgCancel) if err != nil { loghelper.LogError(err).Error("Unable to stop processing known gaps") } @@ -61,17 +61,17 @@ func ShutdownHeadTracking(ctx context.Context, notifierCh chan os.Signal, waitTi } // Wrapper function for shutting down the head tracking process. -func ShutdownHistoricProcessing(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { +func ShutdownHistoricProcessing(ctx context.Context, kgCancel, hpCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. "beaconClient": func(ctx context.Context) error { defer DB.Close() - err := BC.StopHistoric() + err := BC.StopHistoric(hpCancel) if err != nil { loghelper.LogError(err).Error("Unable to stop processing historic") } if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { - err = BC.StopKnownGapsProcessing() + err = BC.StopKnownGapsProcessing(kgCancel) if err != nil { loghelper.LogError(err).Error("Unable to stop processing known gaps") } diff --git a/internal/shutdown/shutdown_test.go b/internal/shutdown/shutdown_test.go index 9dbb94d..7e0220c 100644 --- a/internal/shutdown/shutdown_test.go +++ b/internal/shutdown/shutdown_test.go @@ -72,8 +72,9 @@ var _ = Describe("Shutdown", func() { Context("When Channels are empty,", func() { It("Should Shutdown Successfully.", func() { go func() { + _, cancel := context.WithCancel(context.Background()) log.Debug("Starting shutdown chan") - err = shutdown.ShutdownHeadTracking(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) + err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") Expect(err).ToNot(HaveOccurred()) }() @@ -84,8 +85,9 @@ var _ = Describe("Shutdown", func() { shutdownCh := make(chan bool) //log.SetLevel(log.DebugLevel) go func() { + _, cancel := context.WithCancel(context.Background()) log.Debug("Starting shutdown chan") - err = shutdown.ShutdownHeadTracking(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) + err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") Expect(err).ToNot(HaveOccurred()) shutdownCh <- true @@ -118,7 +120,8 @@ var _ = Describe("Shutdown", func() { //log.SetLevel(log.DebugLevel) go func() { log.Debug("Starting shutdown chan") - err = shutdown.ShutdownHeadTracking(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) + _, cancel := context.WithCancel(context.Background()) + err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String()))) shutdownCh <- true diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index ee56b39..c612776 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -69,7 +69,7 @@ type BeaconClient struct { // This value is lazily updated. Therefore at times it will be outdated. LatestSlotInBeaconServer int64 PerformHistoricalProcessing bool // Should we perform historical processing? - HistoricalProcess historicProcessing // object keeping track of historical processing + HistoricalProcess HistoricProcessing // object keeping track of historical processing } // A struct to keep track of relevant the head event topic. diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go index 0561913..3002975 100644 --- a/pkg/beaconclient/capturehead_test.go +++ b/pkg/beaconclient/capturehead_test.go @@ -60,6 +60,22 @@ var ( maxRetry int = 120 TestEvents = map[string]Message{ + "0": { + HeadMessage: beaconclient.Head{ + Slot: "0", + Block: "0x4d611d5b93fdab69013a7f0a2f961caca0c853f87cfe9595fe50038163079360", + State: "0x7e76880eb67bbdc86250aa578958e9d0675e64e714337855204fb5abaaf82c2b", + CurrentDutyDependentRoot: "", + PreviousDutyDependentRoot: "", + EpochTransition: false, + ExecutionOptimistic: false, + }, + SignedBeaconBlock: filepath.Join("ssz-data", "0", "signed-beacon-block.ssz"), + BeaconState: filepath.Join("ssz-data", "0", "beacon-state.ssz"), + CorrectMhKey: "/blocks/QHVAEQRQPA2GINRRGFSDKYRZGNTGIYLCGY4TAMJTME3WMMDBGJTDSNRRMNQWGYJQMM4DKM3GHA3WGZTFHE2TSNLGMU2TAMBTHAYTMMZQG44TGNRQ", + CorrectParentRoot: "0x0000000000000000000000000000000000000000000000000000000000000000", + CorrectEth1BlockHash: "0x0000000000000000000000000000000000000000000000000000000000000000", + }, "100-dummy": { HeadMessage: beaconclient.Head{ Slot: "100", @@ -118,9 +134,12 @@ var ( EpochTransition: false, ExecutionOptimistic: false, }, - TestNotes: "An easy to process Phase 0 block", - SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"), - BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"), + TestNotes: "An easy to process Phase 0 block", + SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"), + BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"), + CorrectMhKey: "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE", + CorrectParentRoot: "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", + CorrectEth1BlockHash: "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", }, "101": { HeadMessage: beaconclient.Head{ @@ -132,9 +151,11 @@ var ( EpochTransition: false, ExecutionOptimistic: false, }, - TestNotes: "An easy to process Phase 0 block", - SignedBeaconBlock: filepath.Join("ssz-data", "101", "signed-beacon-block.ssz"), - BeaconState: filepath.Join("ssz-data", "101", "beacon-state.ssz"), + TestNotes: "An easy to process Phase 0 block", + SignedBeaconBlock: filepath.Join("ssz-data", "101", "signed-beacon-block.ssz"), + BeaconState: filepath.Join("ssz-data", "101", "beacon-state.ssz"), + CorrectEth1BlockHash: "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", + CorrectMhKey: "/blocks/QHVAEQRQPBQWEZJRME4TOMTFGUYTEMJYGJSDANDGGBSDIYJVMM4WGMRVMY4WKZJVG5RTEZJZMQYGMZRTMY2GGNDDHAZGMZBUGJSDCM3EGMYTAOBT", }, "2375703-dummy": { HeadMessage: beaconclient.Head{ @@ -176,19 +197,23 @@ var ( Block: "0x4392372c5f6e39499e31bf924388b5815639103149f0f54f8a453773b1802301", State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e", CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false}, - TestNotes: "An easy to process Altair Block", - SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"), - BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"), + TestNotes: "An easy to process Altair Block", + SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"), + BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"), + CorrectEth1BlockHash: "0xd74b1c60423651624de6bb301ac25808951c167ba6ecdd9b2e79b4315aee8202", + CorrectParentRoot: "0x08736ddc20b77f65d1aa6301f7e6e856a820ff3ce6430ed2c3694ae35580e740", + CorrectMhKey: "", }, "3797056": { HeadMessage: beaconclient.Head{ Slot: "3797056", Block: "", - State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e", + State: "", CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false}, - TestNotes: "An easy to process Altair Block", - SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"), - BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"), + TestNotes: "An easy to process Altair Block", + // The file below should not exist, this will trigger an error message and 404 response from the mock. + SignedBeaconBlock: filepath.Join("ssz-data", "3797056", "should-not-exist.txt"), + BeaconState: filepath.Join("ssz-data", "3797056", "beacon-state.ssz"), }, } TestConfig = Config{ @@ -214,11 +239,14 @@ var ( ) type Message struct { - HeadMessage beaconclient.Head // The head messsage that will be streamed to the BeaconClient - TestNotes string // A small explanation of the purpose this structure plays in the testing landscape. - MimicConfig *MimicConfig // A configuration of parameters that you are trying to - SignedBeaconBlock string // The file path output of an SSZ encoded SignedBeaconBlock. - BeaconState string // The file path output of an SSZ encoded BeaconState. + HeadMessage beaconclient.Head // The head messsage that will be streamed to the BeaconClient + TestNotes string // A small explanation of the purpose this structure plays in the testing landscape. + MimicConfig *MimicConfig // A configuration of parameters that you are trying to + SignedBeaconBlock string // The file path output of an SSZ encoded SignedBeaconBlock. + BeaconState string // The file path output of an SSZ encoded BeaconState. + CorrectMhKey string // The correct MhKey + CorrectParentRoot string // The correct parent root + CorrectEth1BlockHash string // The correct eth1blockHash } // A structure that can be utilized to mimic and existing SSZ object but change it ever so slightly. @@ -228,7 +256,7 @@ type MimicConfig struct { ForkVersion string // Specify the fork version. This is needed as a workaround to create dummy SignedBeaconBlocks. } -var _ = Describe("Capturehead", func() { +var _ = Describe("Capturehead", Label("head"), func() { Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() { Context("Correctly formatted Phase0 Block", func() { @@ -653,8 +681,7 @@ func (tbc TestBeaconNode) SetupBeaconNodeMock(TestEvents map[string]Message, pro id := httpmock.MustGetSubmatch(req, 1) dat, err := tbc.provideSsz(id, "state", dummyParentRoot) if err != nil { - Expect(err).NotTo(HaveOccurred()) - return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find file for %s", id)), err + return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find file for %s", id)), nil } return httpmock.NewBytesResponse(200, dat), nil }, @@ -667,8 +694,7 @@ func (tbc TestBeaconNode) SetupBeaconNodeMock(TestEvents map[string]Message, pro id := httpmock.MustGetSubmatch(req, 1) dat, err := tbc.provideSsz(id, "block", dummyParentRoot) if err != nil { - Expect(err).NotTo(HaveOccurred()) - return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find file for %s", id)), err + return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find file for %s", id)), nil } return httpmock.NewBytesResponse(200, dat), nil }, diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index a569568..6b7ab40 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -18,6 +18,7 @@ package beaconclient import ( + "context" "fmt" log "github.com/sirupsen/logrus" @@ -27,18 +28,18 @@ import ( ) // This function will perform all the heavy lifting for tracking the head of the chain. -func (bc *BeaconClient) CaptureHistoric(maxWorkers int) []error { +func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []error { log.Info("We are starting the historical processing service.") - bc.HistoricalProcess = historicProcessing{db: bc.Db, metrics: bc.Metrics} - errs := handleBatchProcess(maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.finishProcessing, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb) + bc.HistoricalProcess = HistoricProcessing{db: bc.Db, metrics: bc.Metrics, uniqueNodeIdentifier: bc.UniqueNodeIdentifier} + errs := handleBatchProcess(ctx, maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb) log.Debug("Exiting Historical") return errs } // This function will perform all the necessary clean up tasks for stopping historical processing. -func (bc *BeaconClient) StopHistoric() error { +func (bc *BeaconClient) StopHistoric(cancel context.CancelFunc) error { log.Info("We are stopping the historical processing service.") - err := bc.HistoricalProcess.releaseDbLocks() + err := bc.HistoricalProcess.releaseDbLocks(cancel) if err != nil { loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the ethcl.historic_processing table. Manual Intervention is needed!") } @@ -51,10 +52,10 @@ func (bc *BeaconClient) StopHistoric() error { // // 2. Known Gaps Processing type BatchProcessing interface { - getSlotRange(chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write. - handleProcessingErrors(<-chan batchHistoricError) // Custom logic to handle errors. - removeTableEntry(<-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database. - releaseDbLocks() error // Update the checked_out column to false for whatever table is being updated. + getSlotRange(context.Context, chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write. + handleProcessingErrors(context.Context, <-chan batchHistoricError) // Custom logic to handle errors. + removeTableEntry(context.Context, <-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database. + releaseDbLocks(context.CancelFunc) error // Update the checked_out column to false for whatever table is being updated. } /// ^^^ @@ -89,7 +90,7 @@ type batchHistoricError struct { // 4. Remove the slot entry from the DB. // // 5. Handle any errors. -func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool) []error { +func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool) []error { slotsCh := make(chan slotsToProcess) workCh := make(chan int) processedCh := make(chan slotsToProcess) @@ -99,30 +100,40 @@ func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, d // Start workers for w := 1; w <= maxWorkers; w++ { log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers") - go processSlotRangeWorker(workCh, errCh, db, serverEndpoint, metrics, checkDb) + go processSlotRangeWorker(ctx, workCh, errCh, db, serverEndpoint, metrics, checkDb) } // Process all ranges and send each individual slot to the worker. go func() { - for slots := range slotsCh { - if slots.startSlot > slots.endSlot { - log.Error("We received a batch process request where the startSlot is greater than the end slot.") - errCh <- batchHistoricError{ - err: fmt.Errorf("We received a startSlot where the start was greater than the end."), - errProcess: "RangeOrder", - slot: slots.startSlot, + for { + select { + case <-ctx.Done(): + return + case slots := <-slotsCh: + if slots.startSlot > slots.endSlot { + log.Error("We received a batch process request where the startSlot is greater than the end slot.") + errCh <- batchHistoricError{ + err: fmt.Errorf("We received a startSlot where the start was greater than the end."), + errProcess: "RangeOrder", + slot: slots.startSlot, + } + errCh <- batchHistoricError{ + err: fmt.Errorf("We received a endSlot where the start was greater than the end."), + errProcess: "RangeOrder", + slot: slots.endSlot, + } + } else if slots.startSlot == slots.endSlot { + log.WithField("slot", slots.startSlot).Debug("Added new slot to workCh") + workCh <- slots.startSlot + } else { + for i := slots.startSlot; i <= slots.endSlot; i++ { + workCh <- i + log.WithField("slot", i).Debug("Added new slot to workCh") + } + processedCh <- slots } - errCh <- batchHistoricError{ - err: fmt.Errorf("We received a endSlot where the start was greater than the end."), - errProcess: "RangeOrder", - slot: slots.endSlot, - } - } else { - for i := slots.startSlot; i <= slots.endSlot; i++ { - workCh <- i - } - processedCh <- slots } + } }() @@ -130,26 +141,30 @@ func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, d go func() { errG := new(errgroup.Group) errG.Go(func() error { - return bp.removeTableEntry(processedCh) + return bp.removeTableEntry(ctx, processedCh) }) if err := errG.Wait(); err != nil { finalErrCh <- []error{err} } }() // Process errors from slot processing. - go bp.handleProcessingErrors(errCh) + go bp.handleProcessingErrors(ctx, errCh) // Get slots from the DB. go func() { - errs := bp.getSlotRange(slotsCh) // Periodically adds new entries.... + errs := bp.getSlotRange(ctx, slotsCh) // Periodically adds new entries.... if errs != nil { finalErrCh <- errs } finalErrCh <- nil + log.Debug("We are stopping the processing of adding new entries") }() log.Debug("Waiting for shutdown signal from channel") select { - case <-finishCh: + case <-ctx.Done(): + close(workCh) + close(processedCh) + close(errCh) log.Debug("Received shutdown signal from channel") return nil case errs := <-finalErrCh: diff --git a/pkg/beaconclient/capturehistoric_test.go b/pkg/beaconclient/capturehistoric_test.go index d69f306..77c3bc0 100644 --- a/pkg/beaconclient/capturehistoric_test.go +++ b/pkg/beaconclient/capturehistoric_test.go @@ -15,56 +15,199 @@ import ( var _ = Describe("Capturehistoric", func() { - Describe("Run the application in historic mode", Label("unit", "behavioral"), func() { - Context("Phase0: When we need to process a single block in the ethcl.historic_process table.", func() { - It("Successfully Process the Block", func() { + Describe("Run the application in historic mode", Label("unit", "behavioral", "historical"), func() { + Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the ethcl.historic_process table.", func() { + It("Successfully Process the Blocks", func() { bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 101, 10) - BeaconNodeTester.runBatchProcess(bc, 2, 100, 101, 0, 0) + BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0) + // Run Two seperate processes + BeaconNodeTester.writeEventToHistoricProcess(bc, 2375703, 2375703, 10) + BeaconNodeTester.runHistoricalProcess(bc, 2, 3, 0, 0, 0) time.Sleep(2 * time.Second) - validateSlot(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, "proposed") - validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE") - validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW") - - //validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE") - //validateBeaconState(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW") + validatePopularBatchBlocks(bc) + }) + }) + Context("When the start block is greater than the endBlock", func() { + It("Should Add two entries to the knownGaps table", func() { + bc := setUpTest(BeaconNodeTester.TestConfig, "99") + BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) + defer httpmock.DeactivateAndReset() + BeaconNodeTester.writeEventToHistoricProcess(bc, 101, 100, 10) + BeaconNodeTester.runHistoricalProcess(bc, 2, 0, 0, 2, 0) + }) + }) + Context("Processing the Genesis block", func() { + It("Should Process properly", func() { + bc := setUpTest(BeaconNodeTester.TestConfig, "100") + BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) + defer httpmock.DeactivateAndReset() + BeaconNodeTester.writeEventToHistoricProcess(bc, 0, 0, 10) + BeaconNodeTester.runHistoricalProcess(bc, 2, 1, 0, 0, 0) + validateSlot(bc, BeaconNodeTester.TestEvents["0"].HeadMessage, 0, "proposed") + validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["0"].HeadMessage, BeaconNodeTester.TestEvents["0"].CorrectParentRoot, BeaconNodeTester.TestEvents["0"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["0"].CorrectMhKey) + validateBeaconState(bc, BeaconNodeTester.TestEvents["0"].HeadMessage, BeaconNodeTester.TestEvents["0"].CorrectMhKey) + }) + }) + Context("When there is a skipped slot", func() { + It("Should process the slot properly.", func() { + bc := setUpTest(BeaconNodeTester.TestConfig, "3797055") + BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) + defer httpmock.DeactivateAndReset() + BeaconNodeTester.writeEventToHistoricProcess(bc, 3797056, 3797056, 10) + BeaconNodeTester.runHistoricalProcess(bc, 2, 1, 0, 0, 0) + validateSlot(bc, BeaconNodeTester.TestEvents["3797056"].HeadMessage, 118658, "skipped") + }) + }) + }) + Describe("Running the Application to process Known Gaps", Label("unit", "behavioral", "knownGaps"), func() { + Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the ethcl.known_gaps table.", func() { + It("Successfully Process the Blocks", func() { + bc := setUpTest(BeaconNodeTester.TestConfig, "99") + BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) + defer httpmock.DeactivateAndReset() + BeaconNodeTester.writeEventToKnownGaps(bc, 100, 101) + BeaconNodeTester.runKnownGapsProcess(bc, 2, 2, 0, 0, 0) + // Run Two seperate processes + BeaconNodeTester.writeEventToKnownGaps(bc, 2375703, 2375703) + BeaconNodeTester.runKnownGapsProcess(bc, 2, 3, 0, 0, 0) + time.Sleep(2 * time.Second) + validatePopularBatchBlocks(bc) + }) + }) + Context("When the start block is greater than the endBlock", Label("now", "now-1"), func() { + It("Should Add two entries to the knownGaps table", func() { + log.SetLevel(log.DebugLevel) + bc := setUpTest(BeaconNodeTester.TestConfig, "104") + BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) + defer httpmock.DeactivateAndReset() + BeaconNodeTester.writeEventToKnownGaps(bc, 101, 100) + BeaconNodeTester.runKnownGapsProcess(bc, 2, 2, 0, 2, 0) + }) + }) + Context("When theres a reprocessing error", Label("now", "now-2"), func() { + It("Should update the reprocessing error.", func() { + log.SetLevel(log.DebugLevel) + bc := setUpTest(BeaconNodeTester.TestConfig, "99") + BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) + defer httpmock.DeactivateAndReset() + // We dont have an entry in the BeaconNodeTester for this slot + BeaconNodeTester.writeEventToHistoricProcess(bc, 105, 105, 10) + BeaconNodeTester.runHistoricalProcess(bc, 2, 0, 0, 1, 0) + BeaconNodeTester.runKnownGapsProcess(bc, 2, 0, 0, 1, 1) }) }) }) }) -// This function will write an even to the ethcl.historic_process table +// This function will write an even to the ethcl.known_gaps table +func (tbc TestBeaconNode) writeEventToKnownGaps(bc *beaconclient.BeaconClient, startSlot, endSlot int) { + log.Debug("We are writing the necessary events to batch process") + insertKnownGapsStmt := `INSERT INTO ethcl.known_gaps (start_slot, end_slot) + VALUES ($1, $2);` + res, err := bc.Db.Exec(context.Background(), insertKnownGapsStmt, startSlot, endSlot) + Expect(err).ToNot(HaveOccurred()) + rows, err := res.RowsAffected() + if rows != 1 { + Fail("We didnt write...") + } + Expect(err).ToNot(HaveOccurred()) +} + +// This function will write an even to the ethcl.known_gaps table func (tbc TestBeaconNode) writeEventToHistoricProcess(bc *beaconclient.BeaconClient, startSlot, endSlot, priority int) { log.Debug("We are writing the necessary events to batch process") insertHistoricProcessingStmt := `INSERT INTO ethcl.historic_process (start_slot, end_slot, priority) VALUES ($1, $2, $3);` res, err := bc.Db.Exec(context.Background(), insertHistoricProcessingStmt, startSlot, endSlot, priority) - Expect(err) + Expect(err).ToNot(HaveOccurred()) rows, err := res.RowsAffected() if rows != 1 { Fail("We didnt write...") } - Expect(err) + Expect(err).ToNot(HaveOccurred()) } -// Start the batch processing function, and check for the correct inserted slots. -func (tbc TestBeaconNode) runBatchProcess(bc *beaconclient.BeaconClient, maxWorkers int, startSlot uint64, endSlot uint64, expectedReorgs uint64, expectedKnownGaps uint64) { - go bc.CaptureHistoric(maxWorkers) - diff := endSlot - startSlot + 1 +// Start the CaptureHistoric function, and check for the correct inserted slots. +func (tbc TestBeaconNode) runHistoricalProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { + ctx, cancel := context.WithCancel(context.Background()) + go bc.CaptureHistoric(ctx, maxWorkers) + validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) + log.Debug("Calling the stop function for historical processing..") + err := bc.StopHistoric(cancel) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(3 * time.Second) +} +// Wrapper function that processes knownGaps +func (tbc TestBeaconNode) runKnownGapsProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { + ctx, cancel := context.WithCancel(context.Background()) + go bc.ProcessKnownGaps(ctx, maxWorkers) + validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) + err := bc.StopKnownGapsProcessing(cancel) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(3 * time.Second) +} + +func validateMetrics(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { curRetry := 0 - for atomic.LoadUint64(&bc.Metrics.SlotInserts) != diff { + value := atomic.LoadUint64(&bc.Metrics.SlotInserts) + for value != expectedInserts { time.Sleep(1 * time.Second) curRetry = curRetry + 1 if curRetry == maxRetry { - Fail(fmt.Sprintf("Too many retries have occurred. The number of inserts expects %d, the number that actually occurred, %d", atomic.LoadUint64(&bc.Metrics.SlotInserts), diff)) + Fail(fmt.Sprintf("Too many retries have occurred. The number of inserts expected %d, the number that actually occurred, %d", expectedInserts, atomic.LoadUint64(&bc.Metrics.SlotInserts))) } + value = atomic.LoadUint64(&bc.Metrics.SlotInserts) + } + curRetry = 0 + value = atomic.LoadUint64(&bc.Metrics.KnownGapsInserts) + for value != expectedKnownGaps { + time.Sleep(1 * time.Second) + curRetry = curRetry + 1 + if curRetry == maxRetry { + Fail(fmt.Sprintf("Too many retries have occurred. The number of knownGaps expected %d, the number that actually occurred, %d", expectedKnownGaps, atomic.LoadUint64(&bc.Metrics.KnownGapsInserts))) + } + value = atomic.LoadUint64(&bc.Metrics.KnownGapsInserts) + } + curRetry = 0 + value = atomic.LoadUint64(&bc.Metrics.KnownGapsReprocessError) + for value != expectedKnownGapsReprocessError { + time.Sleep(1 * time.Second) + curRetry = curRetry + 1 + if curRetry == maxRetry { + Fail(fmt.Sprintf("Too many retries have occurred. The number of knownGapsReprocessingErrors expected %d, the number that actually occurred, %d", expectedKnownGapsReprocessError, value)) + } + log.Debug("&bc.Metrics.KnownGapsReprocessError: ", &bc.Metrics.KnownGapsReprocessError) + value = atomic.LoadUint64(&bc.Metrics.KnownGapsReprocessError) + } + curRetry = 0 + value = atomic.LoadUint64(&bc.Metrics.ReorgInserts) + for value != expectedReorgs { + time.Sleep(1 * time.Second) + curRetry = curRetry + 1 + if curRetry == maxRetry { + Fail(fmt.Sprintf("Too many retries have occurred. The number of Reorgs expected %d, the number that actually occurred, %d", expectedReorgs, atomic.LoadUint64(&bc.Metrics.ReorgInserts))) + } + value = atomic.LoadUint64(&bc.Metrics.ReorgInserts) } - - Expect(atomic.LoadUint64(&bc.Metrics.KnownGapsInserts)).To(Equal(expectedKnownGaps)) - Expect(atomic.LoadUint64(&bc.Metrics.ReorgInserts)).To(Equal(expectedKnownGaps)) +} + +// A wrapper function to validate a few popular blocks +func validatePopularBatchBlocks(bc *beaconclient.BeaconClient) { + validateSlot(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, "proposed") + validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectParentRoot, BeaconNodeTester.TestEvents["100"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["100"].CorrectMhKey) + validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectMhKey) + + validateSlot(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, "proposed") + validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, BeaconNodeTester.TestEvents["100"].HeadMessage.Block, BeaconNodeTester.TestEvents["101"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["101"].CorrectMhKey) + validateBeaconState(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, BeaconNodeTester.TestEvents["101"].CorrectMhKey) + + validateSlot(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, "proposed") + validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectParentRoot, BeaconNodeTester.TestEvents["2375703"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["2375703"].CorrectMhKey) + validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectMhKey) } diff --git a/pkg/beaconclient/databasewrite.go b/pkg/beaconclient/databasewrite.go index cc53646..b30c3cc 100644 --- a/pkg/beaconclient/databasewrite.go +++ b/pkg/beaconclient/databasewrite.go @@ -488,23 +488,21 @@ func updateKnownGapErrors(db sql.Database, startSlot int, endSlot int, reprocess res, err := db.Exec(context.Background(), UpsertKnownGapsErrorStmt, startSlot, endSlot, reprocessingErr.Error()) if err != nil { loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Error("Unable to update reprocessing_error") - metric.IncrementKnownGapsProcessingError(1) return err } row, err := res.RowsAffected() if err != nil { loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Error("Unable to count rows affected when trying to update reprocessing_error.") - metric.IncrementKnownGapsProcessingError(1) return err } if row != 1 { loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).WithFields(log.Fields{ "rowCount": row, }).Error("The rows affected by the upsert for reprocessing_error is not 1.") - metric.IncrementKnownGapsProcessingError(1) + metric.IncrementKnownGapsReprocessError(1) return err } - metric.IncrementKnownGapsProcessed(1) + metric.IncrementKnownGapsReprocessError(1) return nil } diff --git a/pkg/beaconclient/metrics.go b/pkg/beaconclient/metrics.go index 086d7a7..c685af6 100644 --- a/pkg/beaconclient/metrics.go +++ b/pkg/beaconclient/metrics.go @@ -19,20 +19,20 @@ import ( "sync/atomic" "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" ) //Create a metric struct and register each channel with prometheus func CreateBeaconClientMetrics() (*BeaconClientMetrics, error) { metrics := &BeaconClientMetrics{ - SlotInserts: 0, - ReorgInserts: 0, - KnownGapsInserts: 0, - knownGapsProcessed: 0, - KnownGapsProcessingError: 0, - HeadError: 0, - HeadReorgError: 0, + SlotInserts: 0, + ReorgInserts: 0, + KnownGapsInserts: 0, + KnownGapsProcessed: 0, + KnownGapsReprocessError: 0, + HeadError: 0, + HeadReorgError: 0, } err := prometheusRegisterHelper("slot_inserts", "Keeps track of the number of slots we have inserted.", &metrics.SlotInserts) if err != nil { @@ -46,11 +46,11 @@ func CreateBeaconClientMetrics() (*BeaconClientMetrics, error) { if err != nil { return nil, err } - err = prometheusRegisterHelper("known_gaps_processed", "Keeps track of the number of known gaps we processed.", &metrics.knownGapsProcessed) + err = prometheusRegisterHelper("known_gaps_reprocess_error", "Keeps track of the number of known gaps that had errors when reprocessing, but the error was updated successfully.", &metrics.KnownGapsReprocessError) if err != nil { return nil, err } - err = prometheusRegisterHelper("known_gaps_processing_error", "Keeps track of the number of known gaps we had errors processing.", &metrics.KnownGapsProcessingError) + err = prometheusRegisterHelper("known_gaps_processed", "Keeps track of the number of known gaps we successfully processed.", &metrics.KnownGapsProcessed) if err != nil { return nil, err } @@ -86,19 +86,19 @@ func prometheusRegisterHelper(name string, help string, varPointer *uint64) erro // A structure utilized for keeping track of various metrics. Currently, mostly used in testing. type BeaconClientMetrics struct { - SlotInserts uint64 // Number of head events we successfully wrote to the DB. - ReorgInserts uint64 // Number of reorg events we successfully wrote to the DB. - KnownGapsInserts uint64 // Number of known_gaps we successfully wrote to the DB. - knownGapsProcessed uint64 // Number of knownGaps processed. - KnownGapsProcessingError uint64 // Number of errors that occurred while processing a knownGap - HeadError uint64 // Number of errors that occurred when decoding the head message. - HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message. + SlotInserts uint64 // Number of head events we successfully wrote to the DB. + ReorgInserts uint64 // Number of reorg events we successfully wrote to the DB. + KnownGapsInserts uint64 // Number of known_gaps we successfully wrote to the DB. + KnownGapsProcessed uint64 // Number of knownGaps processed. + KnownGapsReprocessError uint64 // Number of knownGaps that were updated with an error. + HeadError uint64 // Number of errors that occurred when decoding the head message. + HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message. } // Wrapper function to increment inserts. If we want to use mutexes later we can easily update all // occurrences here. func (m *BeaconClientMetrics) IncrementSlotInserts(inc uint64) { - logrus.Debug("Incrementing Slot Insert") + log.Debug("Incrementing Slot Insert") atomic.AddUint64(&m.SlotInserts, inc) } @@ -117,13 +117,7 @@ func (m *BeaconClientMetrics) IncrementKnownGapsInserts(inc uint64) { // Wrapper function to increment known gaps processed. If we want to use mutexes later we can easily update all // occurrences here. func (m *BeaconClientMetrics) IncrementKnownGapsProcessed(inc uint64) { - atomic.AddUint64(&m.knownGapsProcessed, inc) -} - -// Wrapper function to increment known gaps processing error. If we want to use mutexes later we can easily update all -// occurrences here. -func (m *BeaconClientMetrics) IncrementKnownGapsProcessingError(inc uint64) { - atomic.AddUint64(&m.KnownGapsProcessingError, inc) + atomic.AddUint64(&m.KnownGapsProcessed, inc) } // Wrapper function to increment head errors. If we want to use mutexes later we can easily update all @@ -137,3 +131,10 @@ func (m *BeaconClientMetrics) IncrementHeadError(inc uint64) { func (m *BeaconClientMetrics) IncrementReorgError(inc uint64) { atomic.AddUint64(&m.HeadReorgError, inc) } + +// Wrapper function to increment the number of knownGaps that were updated with reprocessing errors. +//If we want to use mutexes later we can easily update all occurrences here. +func (m *BeaconClientMetrics) IncrementKnownGapsReprocessError(inc uint64) { + log.Debug("Incrementing Known Gap Reprocessing: ", &m.KnownGapsReprocessError) + atomic.AddUint64(&m.KnownGapsReprocessError, inc) +} diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index af6e735..262126d 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -51,35 +51,41 @@ var ( WHERE checked_out_by=$1` ) -type historicProcessing struct { +type HistoricProcessing struct { db sql.Database //db connection metrics *BeaconClientMetrics // metrics for beaconclient uniqueNodeIdentifier int // node unique identifier. - finishProcessing chan int // A channel which indicates to the process handleBatchProcess function that its time to end. } // Get a single row of historical slots from the table. -func (hp historicProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error { - return getBatchProcessRow(hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh, strconv.Itoa(hp.uniqueNodeIdentifier)) +func (hp HistoricProcessing) getSlotRange(ctx context.Context, slotCh chan<- slotsToProcess) []error { + return getBatchProcessRow(ctx, hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh, strconv.Itoa(hp.uniqueNodeIdentifier)) } // Remove the table entry. -func (hp historicProcessing) removeTableEntry(processCh <-chan slotsToProcess) error { - return removeRowPostProcess(hp.db, processCh, QueryBySlotStmt, deleteHpEntryStmt) +func (hp HistoricProcessing) removeTableEntry(ctx context.Context, processCh <-chan slotsToProcess) error { + return removeRowPostProcess(ctx, hp.db, processCh, QueryBySlotStmt, deleteHpEntryStmt) } // Remove the table entry. -func (hp historicProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) { +func (hp HistoricProcessing) handleProcessingErrors(ctx context.Context, errMessages <-chan batchHistoricError) { for { - errMs := <-errMessages - loghelper.LogSlotError(strconv.Itoa(errMs.slot), errMs.err) - writeKnownGaps(hp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, hp.metrics) + select { + case <-ctx.Done(): + return + case errMs := <-errMessages: + loghelper.LogSlotError(strconv.Itoa(errMs.slot), errMs.err) + writeKnownGaps(hp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, hp.metrics) + } } } -func (hp historicProcessing) releaseDbLocks() error { - go func() { hp.finishProcessing <- 1 }() +// "un"-checkout the rows held by this DB in the ethcl.historical_process table. +func (hp HistoricProcessing) releaseDbLocks(cancel context.CancelFunc) error { + go func() { cancel() }() log.Debug("Updating all the entries to ethcl.historical processing") + log.Debug("Db: ", hp.db) + log.Debug("hp.uniqueNodeIdentifier ", hp.uniqueNodeIdentifier) res, err := hp.db.Exec(context.Background(), releaseHpLockStmt, hp.uniqueNodeIdentifier) if err != nil { return fmt.Errorf("Unable to remove lock from ethcl.historical_processing table for node %d, error is %e", hp.uniqueNodeIdentifier, err) @@ -94,17 +100,22 @@ func (hp historicProcessing) releaseDbLocks() error { } // Process the slot range. -func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics, checkDb bool) { - for slot := range workCh { - log.Debug("Handling slot: ", slot) - err, errProcess := handleHistoricSlot(db, serverAddress, slot, metrics, checkDb) - if err != nil { - errMs := batchHistoricError{ - err: err, - errProcess: errProcess, - slot: slot, +func processSlotRangeWorker(ctx context.Context, workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics, checkDb bool) { + for { + select { + case <-ctx.Done(): + return + case slot := <-workCh: + log.Debug("Handling slot: ", slot) + err, errProcess := handleHistoricSlot(db, serverAddress, slot, metrics, checkDb) + if err != nil { + errMs := batchHistoricError{ + err: err, + errProcess: errProcess, + slot: slot, + } + errCh <- errMs } - errCh <- errMs } } } @@ -113,96 +124,102 @@ func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, // It also locks the row by updating the checked_out column. // The statement for getting the start_slot and end_slot must be provided. // The statement for "locking" the row must also be provided. -func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess, uniqueNodeIdentifier string) []error { +func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess, uniqueNodeIdentifier string) []error { errCount := make([]error, 0) // 5 is an arbitrary number. It allows us to retry a few times before // ending the application. prevErrCount := 0 for len(errCount) < 5 { - if len(errCount) != prevErrCount { - log.WithFields(log.Fields{ - "errCount": errCount, - }).Error("New error entry added") - } - processRow, err := db.Exec(context.Background(), checkNewRowsStmt) - if err != nil { - errCount = append(errCount, err) - } - row, err := processRow.RowsAffected() - if err != nil { - errCount = append(errCount, err) - } - if row < 1 { - time.Sleep(1000 * time.Millisecond) - log.Debug("We are checking rows, be patient") - continue - } - log.Debug("We found a new row") - ctx := context.Background() - - // Setup TX - tx, err := db.Begin(ctx) - if err != nil { - loghelper.LogError(err).Error("We are unable to Begin a SQL transaction") - errCount = append(errCount, err) - continue - } - defer func() { - err := tx.Rollback(ctx) - if err != nil && err != pgx.ErrTxClosed { - loghelper.LogError(err).Error("We were unable to Rollback a transaction") + select { + case <-ctx.Done(): + return errCount + default: + if len(errCount) != prevErrCount { + log.WithFields(log.Fields{ + "errCount": errCount, + }).Error("New error entry added") + } + processRow, err := db.Exec(context.Background(), checkNewRowsStmt) + if err != nil { errCount = append(errCount, err) } - }() - - // Query the DB for slots. - sp := slotsToProcess{} - err = tx.QueryRow(ctx, getStartEndSlotStmt).Scan(&sp.startSlot, &sp.endSlot) - if err != nil { - if err == pgx.ErrNoRows { - time.Sleep(100 * time.Millisecond) - continue + row, err := processRow.RowsAffected() + if err != nil { + errCount = append(errCount, err) } - loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), getStartEndSlotStmt, err).Error("Unable to get a row") - errCount = append(errCount, err) - continue - } + if row < 1 { + time.Sleep(1000 * time.Millisecond) + log.Debug("We are checking rows, be patient") + break + } + log.Debug("We found a new row") + dbCtx := context.Background() - // Checkout the Row - res, err := tx.Exec(ctx, checkOutRowStmt, sp.startSlot, sp.endSlot, uniqueNodeIdentifier) - if err != nil { - loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).Error("Unable to checkout the row") - errCount = append(errCount, err) - continue + // Setup TX + tx, err := db.Begin(dbCtx) + if err != nil { + loghelper.LogError(err).Error("We are unable to Begin a SQL transaction") + errCount = append(errCount, err) + break + } + defer func() { + err := tx.Rollback(dbCtx) + if err != nil && err != pgx.ErrTxClosed { + loghelper.LogError(err).Error("We were unable to Rollback a transaction") + errCount = append(errCount, err) + } + }() + + // Query the DB for slots. + sp := slotsToProcess{} + err = tx.QueryRow(dbCtx, getStartEndSlotStmt).Scan(&sp.startSlot, &sp.endSlot) + if err != nil { + if err == pgx.ErrNoRows { + time.Sleep(100 * time.Millisecond) + break + } + loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), getStartEndSlotStmt, err).Error("Unable to get a row") + errCount = append(errCount, err) + break + } + + // Checkout the Row + res, err := tx.Exec(dbCtx, checkOutRowStmt, sp.startSlot, sp.endSlot, uniqueNodeIdentifier) + if err != nil { + loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).Error("Unable to checkout the row") + errCount = append(errCount, err) + break + } + rows, err := res.RowsAffected() + if err != nil { + loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, fmt.Errorf("Unable to determine the rows affected when trying to checkout a row.")) + errCount = append(errCount, err) + break + } + if rows > 1 { + loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).WithFields(log.Fields{ + "rowsReturn": rows, + }).Error("We locked too many rows.....") + errCount = append(errCount, err) + break + } + if rows == 0 { + loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).WithFields(log.Fields{ + "rowsReturn": rows, + }).Error("We did not lock a single row.") + errCount = append(errCount, err) + break + } + err = tx.Commit(dbCtx) + if err != nil { + loghelper.LogSlotRangeError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), err).Error("Unable commit transactions.") + errCount = append(errCount, err) + break + } + log.WithField("slots", sp).Debug("Added a new slots to be processed") + slotCh <- sp } - rows, err := res.RowsAffected() - if err != nil { - loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, fmt.Errorf("Unable to determine the rows affected when trying to checkout a row.")) - errCount = append(errCount, err) - continue - } - if rows > 1 { - loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).WithFields(log.Fields{ - "rowsReturn": rows, - }).Error("We locked too many rows.....") - errCount = append(errCount, err) - continue - } - if rows == 0 { - loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).WithFields(log.Fields{ - "rowsReturn": rows, - }).Error("We did not lock a single row.") - errCount = append(errCount, err) - continue - } - err = tx.Commit(ctx) - if err != nil { - loghelper.LogSlotRangeError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), err).Error("Unable commit transactions.") - errCount = append(errCount, err) - continue - } - slotCh <- sp } log.WithFields(log.Fields{ "ErrCount": errCount, @@ -211,35 +228,39 @@ func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRow } // After a row has been processed it should be removed from its appropriate table. -func removeRowPostProcess(db sql.Database, processCh <-chan slotsToProcess, checkProcessedStmt, removeStmt string) error { +func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan slotsToProcess, checkProcessedStmt, removeStmt string) error { errCh := make(chan error) for { - slots := <-processCh - // Make sure the start and end slot exist in the slots table. - go func() { - finishedProcess := false - for !finishedProcess { - isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot)) + select { + case <-ctx.Done(): + return nil + case slots := <-processCh: + // Make sure the start and end slot exist in the slots table. + go func() { + finishedProcess := false + for !finishedProcess { + isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot)) + if err != nil { + errCh <- err + } + isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.endSlot)) + if err != nil { + errCh <- err + } + if isStartProcess && isEndProcess { + finishedProcess = true + } + } + + _, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot)) if err != nil { errCh <- err } - isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.endSlot)) - if err != nil { - errCh <- err - } - if isStartProcess && isEndProcess { - finishedProcess = true - } - } - _, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot)) - if err != nil { - errCh <- err + }() + if len(errCh) != 0 { + return <-errCh } - - }() - if len(errCh) != 0 { - return <-errCh } } } diff --git a/pkg/beaconclient/processknowngaps.go b/pkg/beaconclient/processknowngaps.go index 85f2834..0812daf 100644 --- a/pkg/beaconclient/processknowngaps.go +++ b/pkg/beaconclient/processknowngaps.go @@ -54,22 +54,21 @@ type KnownGapsProcessing struct { db sql.Database //db connection metrics *BeaconClientMetrics // metrics for beaconclient uniqueNodeIdentifier int // node unique identifier. - finishProcessing chan int // A channel which indicates to the process handleBatchProcess function that its time to end. } // This function will perform all the heavy lifting for tracking the head of the chain. -func (bc *BeaconClient) ProcessKnownGaps(maxWorkers int) []error { +func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) []error { log.Info("We are starting the known gaps processing service.") - bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics, finishProcessing: make(chan int)} - errs := handleBatchProcess(maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.finishProcessing, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb) + bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics} + errs := handleBatchProcess(ctx, maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb) log.Debug("Exiting known gaps processing service") return errs } // This function will perform all the necessary clean up tasks for stopping historical processing. -func (bc *BeaconClient) StopKnownGapsProcessing() error { - log.Info("We are stopping the historical processing service.") - err := bc.KnownGapsProcess.releaseDbLocks() +func (bc *BeaconClient) StopKnownGapsProcessing(cancel context.CancelFunc) error { + log.Info("We are stopping the known gaps processing service.") + err := bc.KnownGapsProcess.releaseDbLocks(cancel) if err != nil { loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the ethcl.known_gaps table. Manual Intervention is needed!") } @@ -77,48 +76,55 @@ func (bc *BeaconClient) StopKnownGapsProcessing() error { } // Get a single row of historical slots from the table. -func (kgp KnownGapsProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error { - return getBatchProcessRow(kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh, strconv.Itoa(kgp.uniqueNodeIdentifier)) +func (kgp KnownGapsProcessing) getSlotRange(ctx context.Context, slotCh chan<- slotsToProcess) []error { + return getBatchProcessRow(ctx, kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh, strconv.Itoa(kgp.uniqueNodeIdentifier)) } // Remove the table entry. -func (kgp KnownGapsProcessing) removeTableEntry(processCh <-chan slotsToProcess) error { - return removeRowPostProcess(kgp.db, processCh, QueryBySlotStmt, deleteKgEntryStmt) +func (kgp KnownGapsProcessing) removeTableEntry(ctx context.Context, processCh <-chan slotsToProcess) error { + return removeRowPostProcess(ctx, kgp.db, processCh, QueryBySlotStmt, deleteKgEntryStmt) } // Remove the table entry. -func (kgp KnownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) { +func (kgp KnownGapsProcessing) handleProcessingErrors(ctx context.Context, errMessages <-chan batchHistoricError) { for { - errMs := <-errMessages - - // Check to see if this if this entry already exists. - res, err := kgp.db.Exec(context.Background(), checkKgSingleSlotStmt, errMs.slot, errMs.slot) - if err != nil { - loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).Error("Unable to see if this slot is in the ethcl.known_gaps table") - } - - rows, err := res.RowsAffected() - if err != nil { - loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).WithFields(log.Fields{ - "queryStatement": checkKgSingleSlotStmt, - }).Error("Unable to get the number of rows affected by this statement.") - } - - if rows > 0 { - loghelper.LogSlotError(strconv.Itoa(errMs.slot), errMs.err).Error("We received an error when processing a knownGap") - err = updateKnownGapErrors(kgp.db, errMs.slot, errMs.slot, errMs.err, kgp.metrics) + select { + case <-ctx.Done(): + return + case errMs := <-errMessages: + // Check to see if this if this entry already exists. + res, err := kgp.db.Exec(context.Background(), checkKgSingleSlotStmt, errMs.slot, errMs.slot) if err != nil { - loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).Error("Error processing known gap") + loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).Error("Unable to see if this slot is in the ethcl.known_gaps table") + } + + rows, err := res.RowsAffected() + if err != nil { + loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).WithFields(log.Fields{ + "queryStatement": checkKgSingleSlotStmt, + }).Error("Unable to get the number of rows affected by this statement.") + } + + if rows > 0 { + loghelper.LogSlotError(strconv.Itoa(errMs.slot), errMs.err).Error("We received an error when processing a knownGap") + err = updateKnownGapErrors(kgp.db, errMs.slot, errMs.slot, errMs.err, kgp.metrics) + if err != nil { + loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).Error("Error processing known gap") + } + } else { + writeKnownGaps(kgp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, kgp.metrics) } - } else { - writeKnownGaps(kgp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, kgp.metrics) } } + } // Updated checked_out column for the uniqueNodeIdentifier. -func (kgp KnownGapsProcessing) releaseDbLocks() error { - go func() { kgp.finishProcessing <- 1 }() +func (kgp KnownGapsProcessing) releaseDbLocks(cancel context.CancelFunc) error { + go func() { cancel() }() + log.Debug("Updating all the entries to ethcl.known_gaps") + log.Debug("Db: ", kgp.db) + log.Debug("kgp.uniqueNodeIdentifier ", kgp.uniqueNodeIdentifier) res, err := kgp.db.Exec(context.Background(), releaseKgLockStmt, kgp.uniqueNodeIdentifier) if err != nil { return err diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index 80fb2d8..f40d7d1 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -42,9 +42,6 @@ import ( ) var ( - SlotUnmarshalError = func(obj string) string { - return fmt.Sprintf("Unable to properly unmarshal the Slot field in the %s.", obj) - } ParentRootUnmarshalError = "Unable to properly unmarshal the ParentRoot field in the SignedBeaconBlock." MissingEth1Data = "Can't get the Eth1 block_hash" VersionedUnmarshalerError = "Unable to create a versioned unmarshaler" @@ -213,15 +210,8 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *d ps.FullSignedBeaconBlock, err = vm.UnmarshalBeaconBlock(ps.SszSignedBeaconBlock) if err != nil { - loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error("We are getting an error message when unmarshalling the SignedBeaconBlock.") - if ps.FullSignedBeaconBlock.Block().Slot() == 0 { - loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(SlotUnmarshalError("SignedBeaconBlock")) - return fmt.Errorf(SlotUnmarshalError("SignedBeaconBlock")) - } else if ps.FullSignedBeaconBlock.Block().ParentRoot() == nil { - loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(ParentRootUnmarshalError) - return fmt.Errorf(ParentRootUnmarshalError) - } - log.Warn("We received a processing error: ", err) + loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Warn("Unable to process the slots SignedBeaconBlock") + return nil } ps.ParentBlockRoot = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().ParentRoot()) return nil @@ -247,14 +237,8 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.Ver vmCh <- versionedUnmarshaler ps.FullBeaconState, err = versionedUnmarshaler.UnmarshalBeaconState(ps.SszBeaconState) if err != nil { - loghelper.LogError(err).Error("We are getting an error message when unmarshalling the BeaconState") - if ps.FullBeaconState.Slot() == 0 { - loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(SlotUnmarshalError("BeaconState")) - return fmt.Errorf(SlotUnmarshalError("BeaconState")) - } else if hex.EncodeToString(ps.FullBeaconState.Eth1Data().BlockHash) == "" { - loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(MissingEth1Data) - return fmt.Errorf(MissingEth1Data) - } + loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error("Unable to process the slots BeaconState") + return err } return nil }