diff --git a/cmd/boot.go b/cmd/boot.go index bb543e6..e5e7b3b 100644 --- a/cmd/boot.go +++ b/cmd/boot.go @@ -60,7 +60,7 @@ func bootApp() { notifierCh <- syscall.SIGTERM }() - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) + err = shutdown.ShutdownBoot(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") } else { diff --git a/cmd/head.go b/cmd/head.go index 3af009f..9ef4d63 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -81,7 +81,7 @@ func startHeadTracking() { } // Shutdown when the time is right. - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) + err = shutdown.ShutdownHeadTracking(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") } else { diff --git a/cmd/historic.go b/cmd/historic.go index 2d89deb..e9d8d95 100644 --- a/cmd/historic.go +++ b/cmd/historic.go @@ -91,7 +91,7 @@ func startHistoricProcessing() { } // Shutdown when the time is right. - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) + err = shutdown.ShutdownHistoricProcessing(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") } else { diff --git a/internal/shutdown/shutdown.go b/internal/shutdown/shutdown.go index 22c7310..9c8d71e 100644 --- a/internal/shutdown/shutdown.go +++ b/internal/shutdown/shutdown.go @@ -27,8 +27,21 @@ import ( ) // Shutdown all the internal services for the application. -func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { - successCh, errCh := gracefulshutdown.Shutdown(ctx, notifierCh, waitTime, map[string]gracefulshutdown.Operation{ +func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient, shutdownOperations map[string]gracefulshutdown.Operation) error { + //successCh, errCh := gracefulshutdown.Shutdown(ctx, notifierCh, waitTime, ) + successCh, errCh := gracefulshutdown.Shutdown(ctx, notifierCh, waitTime, shutdownOperations) + + select { + case <-successCh: + return nil + case err := <-errCh: + return err + } +} + +// Wrapper function for shutting down the head tracking process. +func ShutdownHeadTracking(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { + return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. "beaconClient": func(ctx context.Context) error { defer DB.Close() @@ -39,11 +52,33 @@ func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime t return err }, }) - - select { - case <-successCh: - return nil - case err := <-errCh: - return err - } +} + +// Wrapper function for shutting down the head tracking process. +func ShutdownHistoricProcessing(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { + return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ + // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. + "beaconClient": func(ctx context.Context) error { + defer DB.Close() + err := BC.StopHistoric() + if err != nil { + loghelper.LogError(err).Error("Unable to stop processing historic") + } + err = BC.StopKnownGapsProcessing() + if err != nil { + loghelper.LogError(err).Error("Unable to stop processing known gaps") + } + return err + }, + }) +} + +// Wrapper function for shutting down the application in boot mode. +func ShutdownBoot(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { + return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ + // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. + "Database": func(ctx context.Context) error { + return DB.Close() + }, + }) } diff --git a/internal/shutdown/shutdown_test.go b/internal/shutdown/shutdown_test.go index 8c75506..c6fa00e 100644 --- a/internal/shutdown/shutdown_test.go +++ b/internal/shutdown/shutdown_test.go @@ -67,12 +67,12 @@ var _ = Describe("Shutdown", func() { Expect(err).To(BeNil()) }) - Describe("Run Shutdown Function,", Label("integration"), func() { + Describe("Run Shutdown Function for head tracking,", Label("integration"), func() { Context("When Channels are empty,", func() { It("Should Shutdown Successfully.", func() { go func() { log.Debug("Starting shutdown chan") - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) + err = shutdown.ShutdownHeadTracking(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") Expect(err).ToNot(HaveOccurred()) }() @@ -84,7 +84,7 @@ var _ = Describe("Shutdown", func() { //log.SetLevel(log.DebugLevel) go func() { log.Debug("Starting shutdown chan") - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) + err = shutdown.ShutdownHeadTracking(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") Expect(err).ToNot(HaveOccurred()) shutdownCh <- true @@ -117,7 +117,7 @@ var _ = Describe("Shutdown", func() { //log.SetLevel(log.DebugLevel) go func() { log.Debug("Starting shutdown chan") - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) + err = shutdown.ShutdownHeadTracking(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String()))) shutdownCh <- true diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index 33d3cac..caa3a20 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -18,6 +18,7 @@ package beaconclient import ( "context" "fmt" + "math/rand" "github.com/r3labs/sse" log "github.com/sirupsen/logrus" @@ -67,6 +68,8 @@ type BeaconClient struct { // This value is lazily updated. Therefore at times it will be outdated. LatestSlotInBeaconServer int64 PerformHistoricalProcessing bool // Should we perform historical processing? + HistoricalProcess historicProcessing + KnownGapsProcess knownGapsProcessing } // A struct to keep track of relevant the head event topic. @@ -87,7 +90,8 @@ type SseError struct { // A Function to create the BeaconClient. func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int, uniqueNodeIdentifier int) (*BeaconClient, error) { if uniqueNodeIdentifier == 0 { - return nil, fmt.Errorf("The unique node identifier provided is 0, it must be a non-zero value!!!!") + uniqueNodeIdentifier := rand.Int() + log.WithField("randomUniqueNodeIdentifier", uniqueNodeIdentifier).Warn("No uniqueNodeIdentifier provided, we are going to use a randomly generated one.") } metrics, err := CreateBeaconClientMetrics() @@ -104,6 +108,7 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint), ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), Metrics: metrics, + UniqueNodeIdentifier: uniqueNodeIdentifier, //FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), }, nil } diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index 3c4d29b..1b0631a 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -22,27 +22,39 @@ import ( log "github.com/sirupsen/logrus" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" "golang.org/x/sync/errgroup" ) // This function will perform all the heavy lifting for tracking the head of the chain. func (bc *BeaconClient) CaptureHistoric(maxWorkers int) []error { log.Info("We are starting the historical processing service.") - hp := historicProcessing{db: bc.Db, metrics: bc.Metrics} - errs := handleBatchProcess(maxWorkers, hp, hp.db, bc.ServerEndpoint, bc.Metrics) + bc.HistoricalProcess = historicProcessing{db: bc.Db, metrics: bc.Metrics} + errs := handleBatchProcess(maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.finishProcessing, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics) log.Debug("Exiting Historical") return errs } +// This function will perform all the necessary clean up tasks for stopping historical processing. +func (bc *BeaconClient) StopHistoric() error { + log.Info("We are stopping the historical processing service.") + err := bc.HistoricalProcess.releaseDbLocks() + if err != nil { + loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the ethcl.historic_processing table. Manual Intervention is needed!") + } + return nil +} + // An interface to enforce any batch processing. Currently there are two use cases for this. // // 1. Historic Processing // // 2. Known Gaps Processing type BatchProcessing interface { - getSlotRange(chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write. - handleProcessingErrors(<-chan batchHistoricError) - removeTableEntry(<-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database. + getSlotRange(chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write. + handleProcessingErrors(<-chan batchHistoricError) // Custom logic to handle errors. + removeTableEntry(<-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database. + releaseDbLocks() error // Update the checked_out column to false for whatever table is being updated. } // A struct to pass around indicating a table entry for slots to process. @@ -71,12 +83,12 @@ type batchHistoricError struct { // 4. Remove the slot entry from the DB. // // 5. Handle any errors. -func handleBatchProcess(maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics) []error { +func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics) []error { slotsCh := make(chan slotsToProcess) workCh := make(chan int) processedCh := make(chan slotsToProcess) errCh := make(chan batchHistoricError) - finishCh := make(chan []error, 1) + finalErrCh := make(chan []error, 1) // Start workers for w := 1; w <= maxWorkers; w++ { @@ -115,7 +127,7 @@ func handleBatchProcess(maxWorkers int, bp BatchProcessing, db sql.Database, ser return bp.removeTableEntry(processedCh) }) if err := errG.Wait(); err != nil { - finishCh <- []error{err} + finalErrCh <- []error{err} } }() // Process errors from slot processing. @@ -125,12 +137,17 @@ func handleBatchProcess(maxWorkers int, bp BatchProcessing, db sql.Database, ser go func() { errs := bp.getSlotRange(slotsCh) // Periodically adds new entries.... if errs != nil { - finishCh <- errs + finalErrCh <- errs } - finishCh <- nil + finalErrCh <- nil }() - - errs := <-finishCh - log.Debug("Finishing the batchProcess") - return errs + log.Debug("Waiting for shutdown signal from channel") + select { + case <-finishCh: + log.Debug("Received shutdown signal from channel") + return nil + case errs := <-finalErrCh: + log.Debug("Finishing the batchProcess") + return errs + } } diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index 2ba3bd4..006544c 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -42,19 +42,25 @@ var ( lockHpEntryStmt string = `UPDATE ethcl.historic_process SET checked_out=true WHERE start_slot=$1 AND end_slot=$2;` - // Used to delete an entry from the knownGaps table + // Used to delete an entry from the ethcl.historic_process table deleteHpEntryStmt string = `DELETE FROM ethcl.historic_process WHERE start_slot=$1 AND end_slot=$2;` + // Used to update every single row that this node has checked out. + releaseHpLockStmt string = `UPDATE ethcl.historic_process + SET checked_out=false + WHERE checked_out_by=$1` ) type historicProcessing struct { - db sql.Database - metrics *BeaconClientMetrics + db sql.Database //db connection + metrics *BeaconClientMetrics // metrics for beaconclient + uniqueNodeIdentifier int // node unique identifier. + finishProcessing chan int // A channel which indicates to the process handleBatchProcess function that its time to end. } // Get a single row of historical slots from the table. func (hp historicProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error { - return getBatchProcessRow(hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh) + return getBatchProcessRow(hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh, strconv.Itoa(hp.uniqueNodeIdentifier)) } // Remove the table entry. @@ -71,6 +77,22 @@ func (hp historicProcessing) handleProcessingErrors(errMessages <-chan batchHist } } +func (hp historicProcessing) releaseDbLocks() error { + go func() { hp.finishProcessing <- 1 }() + log.Debug("Updating all the entries to ethcl.historical processing") + res, err := hp.db.Exec(context.Background(), releaseHpLockStmt, hp.uniqueNodeIdentifier) + if err != nil { + return fmt.Errorf("Unable to remove lock from ethcl.historical_processing table for node %d, error is %e", hp.uniqueNodeIdentifier, err) + } + log.Debug("Update all the entries to ethcl.historical processing") + rows, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("Unable to calculated number of rows affected by releasing locks from ethcl.historical_processing table for node %d, error is %e", hp.uniqueNodeIdentifier, err) + } + log.WithField("rowCount", rows).Info("Released historicalProcess locks for specified rows.") + return nil +} + // Process the slot range. func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics) { for slot := range workCh { @@ -91,7 +113,7 @@ func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, // It also locks the row by updating the checked_out column. // The statement for getting the start_slot and end_slot must be provided. // The statement for "locking" the row must also be provided. -func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess) []error { +func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess, uniqueNodeIdentifier string) []error { errCount := make([]error, 0) // 5 is an arbitrary number. It allows us to retry a few times before @@ -148,7 +170,7 @@ func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRow } // Checkout the Row - res, err := tx.Exec(ctx, checkOutRowStmt, sp.startSlot, sp.endSlot) + res, err := tx.Exec(ctx, checkOutRowStmt, sp.startSlot, sp.endSlot, uniqueNodeIdentifier) if err != nil { loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).Error("Unable to checkout the row") errCount = append(errCount, err) diff --git a/pkg/beaconclient/processknowngaps.go b/pkg/beaconclient/processknowngaps.go index 6c07c02..5f3603e 100644 --- a/pkg/beaconclient/processknowngaps.go +++ b/pkg/beaconclient/processknowngaps.go @@ -36,7 +36,7 @@ var ( checkKgEntryStmt string = `SELECT * FROM ethcl.known_gaps WHERE checked_out=false;` // Used to checkout a row from the ethcl.known_gaps table lockKgEntryStmt string = `UPDATE ethcl.known_gaps - SET checked_out=true + SET checked_out=true, checked_out_by=$3 WHERE start_slot=$1 AND end_slot=$2;` // Used to delete an entry from the knownGaps table deleteKgEntryStmt string = `DELETE FROM ethcl.known_gaps @@ -44,25 +44,41 @@ var ( // Used to check to see if a single slot exists in the known_gaps table. checkKgSingleSlotStmt string = `SELECT start_slot, end_slot FROM ethcl.known_gaps WHERE start_slot=$1 AND end_slot=$2;` + // Used to update every single row that this node has checked out. + releaseKgLockStmt string = `UPDATE ethcl.known_gaps + SET checked_out=false + WHERE checked_out_by=$1` ) type knownGapsProcessing struct { - db sql.Database - metrics *BeaconClientMetrics + db sql.Database //db connection + metrics *BeaconClientMetrics // metrics for beaconclient + uniqueNodeIdentifier int // node unique identifier. + finishProcessing chan int // A channel which indicates to the process handleBatchProcess function that its time to end. } // This function will perform all the heavy lifting for tracking the head of the chain. func (bc *BeaconClient) ProcessKnownGaps(maxWorkers int) []error { log.Info("We are starting the known gaps processing service.") - hp := knownGapsProcessing{db: bc.Db, metrics: bc.Metrics} - errs := handleBatchProcess(maxWorkers, hp, hp.db, bc.ServerEndpoint, bc.Metrics) + bc.KnownGapsProcess = knownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics, finishProcessing: make(chan int)} + errs := handleBatchProcess(maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.finishProcessing, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics) log.Debug("Exiting known gaps processing service") return errs } +// This function will perform all the necessary clean up tasks for stopping historical processing. +func (bc *BeaconClient) StopKnownGapsProcessing() error { + log.Info("We are stopping the historical processing service.") + err := bc.KnownGapsProcess.releaseDbLocks() + if err != nil { + loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the ethcl.known_gaps table. Manual Intervention is needed!") + } + return nil +} + // Get a single row of historical slots from the table. func (kgp knownGapsProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error { - return getBatchProcessRow(kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh) + return getBatchProcessRow(kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh, strconv.Itoa(kgp.uniqueNodeIdentifier)) } // Remove the table entry. @@ -99,3 +115,18 @@ func (kgp knownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHi } } } + +// Updated checked_out column for the uniqueNodeIdentifier. +func (kgp knownGapsProcessing) releaseDbLocks() error { + go func() { kgp.finishProcessing <- 1 }() + res, err := kgp.db.Exec(context.Background(), releaseKgLockStmt, kgp.uniqueNodeIdentifier) + if err != nil { + return err + } + rows, err := res.RowsAffected() + if err != nil { + return err + } + log.WithField("rowCount", rows).Info("Released knownGaps locks for specified rows.") + return nil +}