diff --git a/cmd/head.go b/cmd/head.go index ca7e7e3..800131b 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -6,7 +6,6 @@ package cmd import ( "context" - "time" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -40,7 +39,13 @@ func startHeadTracking() { go BC.CaptureHead() // Shutdown when the time is right. - shutdown.ShutdownServices(ctx, time.Duration(maxWaitSecondsShutdown), DB, BC) + err = shutdown.ShutdownServices(ctx, maxWaitSecondsShutdown, DB, BC) + if err != nil { + loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") + } else { + log.Info("Gracefully shutdown ipld-ethcl-indexer") + } + } func init() { diff --git a/internal/shutdown/shutdown.go b/internal/shutdown/shutdown.go index 087adb4..49b7f56 100644 --- a/internal/shutdown/shutdown.go +++ b/internal/shutdown/shutdown.go @@ -4,7 +4,6 @@ import ( "context" "time" - log "github.com/sirupsen/logrus" "github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" "github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown" @@ -12,7 +11,7 @@ import ( ) // Shutdown all the internal services for the application. -func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) { +func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { successCh, errCh := gracefulshutdown.Shutdown(ctx, waitTime, map[string]gracefulshutdown.Operation{ "database": func(ctx context.Context) error { err := DB.Close() @@ -32,8 +31,8 @@ func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Databa select { case _ = <-successCh: - log.Info("Gracefully Shutdown ipld-ethcl-indexer!") + return nil case err := <-errCh: - loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") + return err } } diff --git a/internal/shutdown/shutdown_suite_test.go b/internal/shutdown/shutdown_suite_test.go new file mode 100644 index 0000000..4b8d411 --- /dev/null +++ b/internal/shutdown/shutdown_suite_test.go @@ -0,0 +1,13 @@ +package shutdown_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestShutdown(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Shutdown Suite") +} diff --git a/internal/shutdown/shutdown_test.go b/internal/shutdown/shutdown_test.go new file mode 100644 index 0000000..268f2ae --- /dev/null +++ b/internal/shutdown/shutdown_test.go @@ -0,0 +1,115 @@ +//go:build !race +// +build !race + +package shutdown_test + +import ( + "context" + "syscall" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/r3labs/sse" + log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-ethcl-indexer/internal/boot" + "github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown" +) + +var _ = Describe("Shutdown", func() { + var ( + dbAddress string = "localhost" + dbPort int = 8077 + dbName string = "vulcanize_testing" + dbUsername string = "vdbm" + dbPassword string = "password" + dbDriver string = "PGX" + bcAddress string = "localhost" + bcPort int = 5052 + bcConnectionProtocol string = "http" + maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second + DB sql.Database + BC *beaconclient.BeaconClient + err error + ctx context.Context + ) + BeforeEach(func() { + ctx = context.Background() + BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol) + Expect(err).To(BeNil()) + }) + + Describe("Run Shutdown Function,", Label("integration"), func() { + Context("When Channels are empty,", func() { + It("Should Shutdown Successfully.", func() { + go func() { + log.Debug("Starting shutdown chan") + err = shutdown.ShutdownServices(ctx, maxWaitSecondsShutdown, DB, BC) + log.Debug("We have completed the shutdown...") + Expect(err).ToNot(HaveOccurred()) + }() + }) + }) + Context("When the Channels are not empty,", func() { + It("Should try to clear them and shutdown gracefully.", func() { + shutdownCh := make(chan bool) + //log.SetLevel(log.DebugLevel) + go func() { + log.Debug("Starting shutdown chan") + err = shutdown.ShutdownServices(ctx, maxWaitSecondsShutdown, DB, BC) + log.Debug("We have completed the shutdown...") + Expect(err).ToNot(HaveOccurred()) + shutdownCh <- true + }() + + messageAddCh := make(chan bool) + go func() { + log.Debug("Adding messages to Channels") + BC.HeadTracking.MessagesCh <- &sse.Event{} + BC.FinalizationTracking.MessagesCh <- &sse.Event{} + BC.ReOrgTracking.MessagesCh <- &sse.Event{} + log.Debug("Message adding complete") + messageAddCh <- true + }() + + go func() { + <-messageAddCh + log.Debug("Calling SIGHUP") + syscall.Kill(syscall.Getpid(), syscall.SIGHUP) + log.Debug("Reading messages from channel") + <-BC.HeadTracking.MessagesCh + <-BC.FinalizationTracking.MessagesCh + <-BC.ReOrgTracking.MessagesCh + }() + <-shutdownCh + + }) + It("Should try to clear them, if it can't, shutdown within a given time frame.", func() { + shutdownCh := make(chan bool) + //log.SetLevel(log.DebugLevel) + go func() { + log.Debug("Starting shutdown chan") + err = shutdown.ShutdownServices(ctx, maxWaitSecondsShutdown, DB, BC) + log.Debug("We have completed the shutdown...") + Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String()))) + shutdownCh <- true + }() + + go func() { + log.Debug("Adding messages to Channels") + BC.HeadTracking.MessagesCh <- &sse.Event{} + BC.FinalizationTracking.MessagesCh <- &sse.Event{} + BC.ReOrgTracking.MessagesCh <- &sse.Event{} + log.Debug("Message adding complete") + log.Debug("Calling SIGHUP") + syscall.Kill(syscall.Getpid(), syscall.SIGHUP) + }() + + <-shutdownCh + }) + }) + }) +}) diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index 44fce6c..494c532 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -61,7 +61,7 @@ func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEve endpoint := baseEndpoint + path sseEvents := &SseEvents[P]{ Endpoint: endpoint, - MessagesCh: make(chan *sse.Event), + MessagesCh: make(chan *sse.Event, 1), ErrorCh: make(chan *SseError), ProcessCh: make(chan *P), SseClient: func(endpoint string) *sse.Client { diff --git a/pkg/beaconclient/capturehead.go b/pkg/beaconclient/capturehead.go index a300175..7a4a6bd 100644 --- a/pkg/beaconclient/capturehead.go +++ b/pkg/beaconclient/capturehead.go @@ -13,11 +13,11 @@ import ( // This function will perform all the heavy lifting for tracking the head of the chain. func (bc *BeaconClient) CaptureHead() { log.Info("We are tracking the head of the chain.") - bc.tempHelper() - // go bc.handleHead() - // go bc.handleFinalizedCheckpoint() - // go bc.handleReorgs() - // bc.captureEventTopic() + //bc.tempHelper() + go bc.handleHead() + go bc.handleFinalizedCheckpoint() + go bc.handleReorgs() + bc.captureEventTopic() } // A temporary helper function to see the output of beacon block and states. @@ -71,7 +71,7 @@ func (bc *BeaconClient) StopHeadTracking() error { func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) { loghelper.LogEndpoint(se.Endpoint).Info("Received a close event.") se.SseClient.Unsubscribe(se.MessagesCh) - for len(se.MessagesCh) != 0 && len(se.ProcessCh) != 0 { + for len(se.MessagesCh) != 0 || len(se.ProcessCh) != 0 { time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond) } loghelper.LogEndpoint(se.Endpoint).Info("Done processing all messages, ready for shutdown") diff --git a/pkg/gracefulshutdown/gracefulshutdown.go b/pkg/gracefulshutdown/gracefulshutdown.go index 9f01d85..1b8b52a 100644 --- a/pkg/gracefulshutdown/gracefulshutdown.go +++ b/pkg/gracefulshutdown/gracefulshutdown.go @@ -16,6 +16,12 @@ import ( // operation is a clean up function on shutting down type Operation func(ctx context.Context) error +var ( + TimeoutErr = func(timeout string) error { + return fmt.Errorf("The Timeout %s, has been elapsed, the application will forcefully exit", timeout) + } +) + // gracefulShutdown waits for termination syscalls and doing clean up operations after received it func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operation) (<-chan struct{}, <-chan error) { waitCh := make(chan struct{}) @@ -31,8 +37,8 @@ func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operati // set timeout for the ops to be done to prevent system hang timeoutFunc := time.AfterFunc(timeout, func() { - log.Warnf("timeout %d ms has been elapsed, force exit", timeout.Milliseconds()) - errCh <- fmt.Errorf("Application shutdown took too long.") + log.Warnf(TimeoutErr(timeout.String()).Error()) + errCh <- TimeoutErr(timeout.String()) return })