Added test to ensure the application shuts down gracefully or within a timeframe.

This commit is contained in:
Abdul Rabbani 2022-04-28 14:50:29 -04:00
parent 1c919c5051
commit ec24895dca
7 changed files with 150 additions and 15 deletions

View File

@ -6,7 +6,6 @@ package cmd
import ( import (
"context" "context"
"time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -40,7 +39,13 @@ func startHeadTracking() {
go BC.CaptureHead() go BC.CaptureHead()
// Shutdown when the time is right. // Shutdown when the time is right.
shutdown.ShutdownServices(ctx, time.Duration(maxWaitSecondsShutdown), DB, BC) err = shutdown.ShutdownServices(ctx, maxWaitSecondsShutdown, DB, BC)
if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
} else {
log.Info("Gracefully shutdown ipld-ethcl-indexer")
}
} }
func init() { func init() {

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"time" "time"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient" "github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown" "github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown"
@ -12,7 +11,7 @@ import (
) )
// Shutdown all the internal services for the application. // Shutdown all the internal services for the application.
func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) { func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
successCh, errCh := gracefulshutdown.Shutdown(ctx, waitTime, map[string]gracefulshutdown.Operation{ successCh, errCh := gracefulshutdown.Shutdown(ctx, waitTime, map[string]gracefulshutdown.Operation{
"database": func(ctx context.Context) error { "database": func(ctx context.Context) error {
err := DB.Close() err := DB.Close()
@ -32,8 +31,8 @@ func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Databa
select { select {
case _ = <-successCh: case _ = <-successCh:
log.Info("Gracefully Shutdown ipld-ethcl-indexer!") return nil
case err := <-errCh: case err := <-errCh:
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") return err
} }
} }

View File

@ -0,0 +1,13 @@
package shutdown_test
import (
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestShutdown(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Shutdown Suite")
}

View File

@ -0,0 +1,112 @@
package shutdown_test
import (
"context"
"syscall"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/r3labs/sse"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown"
)
var _ = Describe("Shutdown", func() {
var (
dbAddress string = "localhost"
dbPort int = 8077
dbName string = "vulcanize_testing"
dbUsername string = "vdbm"
dbPassword string = "password"
dbDriver string = "PGX"
bcAddress string = "localhost"
bcPort int = 5052
bcConnectionProtocol string = "http"
maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second
DB sql.Database
BC *beaconclient.BeaconClient
err error
ctx context.Context
)
BeforeEach(func() {
ctx = context.Background()
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
Expect(err).To(BeNil())
})
Describe("Run Shutdown Function,", Label("integration"), func() {
Context("When Channels are empty,", func() {
It("Should Shutdown Successfully.", func() {
go func() {
log.Debug("Starting shutdown chan")
err = shutdown.ShutdownServices(ctx, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...")
Expect(err).ToNot(HaveOccurred())
}()
})
})
Context("When the Channels are not empty,", func() {
It("Should try to clear them and shutdown gracefully.", func() {
shutdownCh := make(chan bool)
//log.SetLevel(log.DebugLevel)
go func() {
log.Debug("Starting shutdown chan")
err = shutdown.ShutdownServices(ctx, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...")
Expect(err).ToNot(HaveOccurred())
shutdownCh <- true
}()
messageAddCh := make(chan bool)
go func() {
log.Debug("Adding messages to Channels")
BC.HeadTracking.MessagesCh <- &sse.Event{}
BC.FinalizationTracking.MessagesCh <- &sse.Event{}
BC.ReOrgTracking.MessagesCh <- &sse.Event{}
log.Debug("Message adding complete")
messageAddCh <- true
}()
go func() {
<-messageAddCh
log.Debug("Calling SIGHUP")
syscall.Kill(syscall.Getpid(), syscall.SIGHUP)
log.Debug("Reading messages from channel")
<-BC.HeadTracking.MessagesCh
<-BC.FinalizationTracking.MessagesCh
<-BC.ReOrgTracking.MessagesCh
}()
<-shutdownCh
})
It("Should try to clear them, if it can't, shutdown within a given time frame.", func() {
shutdownCh := make(chan bool)
//log.SetLevel(log.DebugLevel)
go func() {
log.Debug("Starting shutdown chan")
err = shutdown.ShutdownServices(ctx, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...")
Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String())))
shutdownCh <- true
}()
go func() {
log.Debug("Adding messages to Channels")
BC.HeadTracking.MessagesCh <- &sse.Event{}
BC.FinalizationTracking.MessagesCh <- &sse.Event{}
BC.ReOrgTracking.MessagesCh <- &sse.Event{}
log.Debug("Message adding complete")
log.Debug("Calling SIGHUP")
syscall.Kill(syscall.Getpid(), syscall.SIGHUP)
}()
<-shutdownCh
})
})
})
})

View File

@ -61,7 +61,7 @@ func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEve
endpoint := baseEndpoint + path endpoint := baseEndpoint + path
sseEvents := &SseEvents[P]{ sseEvents := &SseEvents[P]{
Endpoint: endpoint, Endpoint: endpoint,
MessagesCh: make(chan *sse.Event), MessagesCh: make(chan *sse.Event, 1),
ErrorCh: make(chan *SseError), ErrorCh: make(chan *SseError),
ProcessCh: make(chan *P), ProcessCh: make(chan *P),
SseClient: func(endpoint string) *sse.Client { SseClient: func(endpoint string) *sse.Client {

View File

@ -13,11 +13,11 @@ import (
// This function will perform all the heavy lifting for tracking the head of the chain. // This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHead() { func (bc *BeaconClient) CaptureHead() {
log.Info("We are tracking the head of the chain.") log.Info("We are tracking the head of the chain.")
bc.tempHelper() //bc.tempHelper()
// go bc.handleHead() go bc.handleHead()
// go bc.handleFinalizedCheckpoint() go bc.handleFinalizedCheckpoint()
// go bc.handleReorgs() go bc.handleReorgs()
// bc.captureEventTopic() bc.captureEventTopic()
} }
// A temporary helper function to see the output of beacon block and states. // A temporary helper function to see the output of beacon block and states.
@ -71,7 +71,7 @@ func (bc *BeaconClient) StopHeadTracking() error {
func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) { func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) {
loghelper.LogEndpoint(se.Endpoint).Info("Received a close event.") loghelper.LogEndpoint(se.Endpoint).Info("Received a close event.")
se.SseClient.Unsubscribe(se.MessagesCh) se.SseClient.Unsubscribe(se.MessagesCh)
for len(se.MessagesCh) != 0 && len(se.ProcessCh) != 0 { for len(se.MessagesCh) != 0 || len(se.ProcessCh) != 0 {
time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond) time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond)
} }
loghelper.LogEndpoint(se.Endpoint).Info("Done processing all messages, ready for shutdown") loghelper.LogEndpoint(se.Endpoint).Info("Done processing all messages, ready for shutdown")

View File

@ -16,6 +16,12 @@ import (
// operation is a clean up function on shutting down // operation is a clean up function on shutting down
type Operation func(ctx context.Context) error type Operation func(ctx context.Context) error
var (
TimeoutErr = func(timeout string) error {
return fmt.Errorf("The Timeout %s, has been elapsed, the application will forcefully exit", timeout)
}
)
// gracefulShutdown waits for termination syscalls and doing clean up operations after received it // gracefulShutdown waits for termination syscalls and doing clean up operations after received it
func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operation) (<-chan struct{}, <-chan error) { func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operation) (<-chan struct{}, <-chan error) {
waitCh := make(chan struct{}) waitCh := make(chan struct{})
@ -31,8 +37,8 @@ func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operati
// set timeout for the ops to be done to prevent system hang // set timeout for the ops to be done to prevent system hang
timeoutFunc := time.AfterFunc(timeout, func() { timeoutFunc := time.AfterFunc(timeout, func() {
log.Warnf("timeout %d ms has been elapsed, force exit", timeout.Milliseconds()) log.Warnf(TimeoutErr(timeout.String()).Error())
errCh <- fmt.Errorf("Application shutdown took too long.") errCh <- TimeoutErr(timeout.String())
return return
}) })