Shutdown Testing #25
@ -6,7 +6,6 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"time"
|
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
@ -40,7 +39,13 @@ func startHeadTracking() {
|
|||||||
go BC.CaptureHead()
|
go BC.CaptureHead()
|
||||||
|
|
||||||
// Shutdown when the time is right.
|
// Shutdown when the time is right.
|
||||||
shutdown.ShutdownServices(ctx, time.Duration(maxWaitSecondsShutdown), DB, BC)
|
err = shutdown.ShutdownServices(ctx, maxWaitSecondsShutdown, DB, BC)
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
|
||||||
|
} else {
|
||||||
|
log.Info("Gracefully shutdown ipld-ethcl-indexer")
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -4,7 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
|
||||||
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
|
||||||
"github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown"
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown"
|
||||||
@ -12,7 +11,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Shutdown all the internal services for the application.
|
// Shutdown all the internal services for the application.
|
||||||
func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) {
|
func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
|
||||||
successCh, errCh := gracefulshutdown.Shutdown(ctx, waitTime, map[string]gracefulshutdown.Operation{
|
successCh, errCh := gracefulshutdown.Shutdown(ctx, waitTime, map[string]gracefulshutdown.Operation{
|
||||||
"database": func(ctx context.Context) error {
|
"database": func(ctx context.Context) error {
|
||||||
err := DB.Close()
|
err := DB.Close()
|
||||||
@ -32,8 +31,8 @@ func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Databa
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case _ = <-successCh:
|
case _ = <-successCh:
|
||||||
log.Info("Gracefully Shutdown ipld-ethcl-indexer!")
|
return nil
|
||||||
case err := <-errCh:
|
case err := <-errCh:
|
||||||
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
13
internal/shutdown/shutdown_suite_test.go
Normal file
13
internal/shutdown/shutdown_suite_test.go
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
package shutdown_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
. "github.com/onsi/ginkgo/v2"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestShutdown(t *testing.T) {
|
||||||
|
RegisterFailHandler(Fail)
|
||||||
|
RunSpecs(t, "Shutdown Suite")
|
||||||
|
}
|
112
internal/shutdown/shutdown_test.go
Normal file
112
internal/shutdown/shutdown_test.go
Normal file
@ -0,0 +1,112 @@
|
|||||||
|
package shutdown_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
. "github.com/onsi/ginkgo/v2"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
"github.com/r3labs/sse"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
|
||||||
|
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
|
||||||
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
|
||||||
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
|
||||||
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = Describe("Shutdown", func() {
|
||||||
|
var (
|
||||||
|
dbAddress string = "localhost"
|
||||||
|
dbPort int = 8077
|
||||||
|
dbName string = "vulcanize_testing"
|
||||||
|
dbUsername string = "vdbm"
|
||||||
|
dbPassword string = "password"
|
||||||
|
dbDriver string = "PGX"
|
||||||
|
bcAddress string = "localhost"
|
||||||
|
bcPort int = 5052
|
||||||
|
bcConnectionProtocol string = "http"
|
||||||
|
maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second
|
||||||
|
DB sql.Database
|
||||||
|
BC *beaconclient.BeaconClient
|
||||||
|
err error
|
||||||
|
ctx context.Context
|
||||||
|
)
|
||||||
|
BeforeEach(func() {
|
||||||
|
ctx = context.Background()
|
||||||
|
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
|
||||||
|
Expect(err).To(BeNil())
|
||||||
|
})
|
||||||
|
|
||||||
|
Describe("Run Shutdown Function,", Label("integration"), func() {
|
||||||
|
Context("When Channels are empty,", func() {
|
||||||
|
It("Should Shutdown Successfully.", func() {
|
||||||
|
go func() {
|
||||||
|
log.Debug("Starting shutdown chan")
|
||||||
|
err = shutdown.ShutdownServices(ctx, maxWaitSecondsShutdown, DB, BC)
|
||||||
|
log.Debug("We have completed the shutdown...")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
}()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
Context("When the Channels are not empty,", func() {
|
||||||
|
It("Should try to clear them and shutdown gracefully.", func() {
|
||||||
|
shutdownCh := make(chan bool)
|
||||||
|
//log.SetLevel(log.DebugLevel)
|
||||||
|
go func() {
|
||||||
|
log.Debug("Starting shutdown chan")
|
||||||
|
err = shutdown.ShutdownServices(ctx, maxWaitSecondsShutdown, DB, BC)
|
||||||
|
log.Debug("We have completed the shutdown...")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
shutdownCh <- true
|
||||||
|
}()
|
||||||
|
|
||||||
|
messageAddCh := make(chan bool)
|
||||||
|
go func() {
|
||||||
|
log.Debug("Adding messages to Channels")
|
||||||
|
BC.HeadTracking.MessagesCh <- &sse.Event{}
|
||||||
|
BC.FinalizationTracking.MessagesCh <- &sse.Event{}
|
||||||
|
BC.ReOrgTracking.MessagesCh <- &sse.Event{}
|
||||||
|
log.Debug("Message adding complete")
|
||||||
|
messageAddCh <- true
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
<-messageAddCh
|
||||||
|
log.Debug("Calling SIGHUP")
|
||||||
|
syscall.Kill(syscall.Getpid(), syscall.SIGHUP)
|
||||||
|
log.Debug("Reading messages from channel")
|
||||||
|
<-BC.HeadTracking.MessagesCh
|
||||||
|
<-BC.FinalizationTracking.MessagesCh
|
||||||
|
<-BC.ReOrgTracking.MessagesCh
|
||||||
|
}()
|
||||||
|
<-shutdownCh
|
||||||
|
|
||||||
|
})
|
||||||
|
It("Should try to clear them, if it can't, shutdown within a given time frame.", func() {
|
||||||
|
shutdownCh := make(chan bool)
|
||||||
|
//log.SetLevel(log.DebugLevel)
|
||||||
|
go func() {
|
||||||
|
log.Debug("Starting shutdown chan")
|
||||||
|
err = shutdown.ShutdownServices(ctx, maxWaitSecondsShutdown, DB, BC)
|
||||||
|
log.Debug("We have completed the shutdown...")
|
||||||
|
Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String())))
|
||||||
|
shutdownCh <- true
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
log.Debug("Adding messages to Channels")
|
||||||
|
BC.HeadTracking.MessagesCh <- &sse.Event{}
|
||||||
|
BC.FinalizationTracking.MessagesCh <- &sse.Event{}
|
||||||
|
BC.ReOrgTracking.MessagesCh <- &sse.Event{}
|
||||||
|
log.Debug("Message adding complete")
|
||||||
|
log.Debug("Calling SIGHUP")
|
||||||
|
syscall.Kill(syscall.Getpid(), syscall.SIGHUP)
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-shutdownCh
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
@ -61,7 +61,7 @@ func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEve
|
|||||||
endpoint := baseEndpoint + path
|
endpoint := baseEndpoint + path
|
||||||
sseEvents := &SseEvents[P]{
|
sseEvents := &SseEvents[P]{
|
||||||
Endpoint: endpoint,
|
Endpoint: endpoint,
|
||||||
MessagesCh: make(chan *sse.Event),
|
MessagesCh: make(chan *sse.Event, 1),
|
||||||
ErrorCh: make(chan *SseError),
|
ErrorCh: make(chan *SseError),
|
||||||
ProcessCh: make(chan *P),
|
ProcessCh: make(chan *P),
|
||||||
SseClient: func(endpoint string) *sse.Client {
|
SseClient: func(endpoint string) *sse.Client {
|
||||||
|
@ -13,11 +13,11 @@ import (
|
|||||||
// This function will perform all the heavy lifting for tracking the head of the chain.
|
// This function will perform all the heavy lifting for tracking the head of the chain.
|
||||||
func (bc *BeaconClient) CaptureHead() {
|
func (bc *BeaconClient) CaptureHead() {
|
||||||
log.Info("We are tracking the head of the chain.")
|
log.Info("We are tracking the head of the chain.")
|
||||||
bc.tempHelper()
|
//bc.tempHelper()
|
||||||
// go bc.handleHead()
|
go bc.handleHead()
|
||||||
// go bc.handleFinalizedCheckpoint()
|
go bc.handleFinalizedCheckpoint()
|
||||||
// go bc.handleReorgs()
|
go bc.handleReorgs()
|
||||||
// bc.captureEventTopic()
|
bc.captureEventTopic()
|
||||||
}
|
}
|
||||||
|
|
||||||
// A temporary helper function to see the output of beacon block and states.
|
// A temporary helper function to see the output of beacon block and states.
|
||||||
@ -71,7 +71,7 @@ func (bc *BeaconClient) StopHeadTracking() error {
|
|||||||
func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) {
|
func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) {
|
||||||
loghelper.LogEndpoint(se.Endpoint).Info("Received a close event.")
|
loghelper.LogEndpoint(se.Endpoint).Info("Received a close event.")
|
||||||
se.SseClient.Unsubscribe(se.MessagesCh)
|
se.SseClient.Unsubscribe(se.MessagesCh)
|
||||||
for len(se.MessagesCh) != 0 && len(se.ProcessCh) != 0 {
|
for len(se.MessagesCh) != 0 || len(se.ProcessCh) != 0 {
|
||||||
time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond)
|
time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond)
|
||||||
}
|
}
|
||||||
loghelper.LogEndpoint(se.Endpoint).Info("Done processing all messages, ready for shutdown")
|
loghelper.LogEndpoint(se.Endpoint).Info("Done processing all messages, ready for shutdown")
|
||||||
|
@ -16,6 +16,12 @@ import (
|
|||||||
// operation is a clean up function on shutting down
|
// operation is a clean up function on shutting down
|
||||||
type Operation func(ctx context.Context) error
|
type Operation func(ctx context.Context) error
|
||||||
|
|
||||||
|
var (
|
||||||
|
TimeoutErr = func(timeout string) error {
|
||||||
|
return fmt.Errorf("The Timeout %s, has been elapsed, the application will forcefully exit", timeout)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
// gracefulShutdown waits for termination syscalls and doing clean up operations after received it
|
// gracefulShutdown waits for termination syscalls and doing clean up operations after received it
|
||||||
func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operation) (<-chan struct{}, <-chan error) {
|
func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operation) (<-chan struct{}, <-chan error) {
|
||||||
waitCh := make(chan struct{})
|
waitCh := make(chan struct{})
|
||||||
@ -31,8 +37,8 @@ func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operati
|
|||||||
|
|
||||||
// set timeout for the ops to be done to prevent system hang
|
// set timeout for the ops to be done to prevent system hang
|
||||||
timeoutFunc := time.AfterFunc(timeout, func() {
|
timeoutFunc := time.AfterFunc(timeout, func() {
|
||||||
log.Warnf("timeout %d ms has been elapsed, force exit", timeout.Milliseconds())
|
log.Warnf(TimeoutErr(timeout.String()).Error())
|
||||||
errCh <- fmt.Errorf("Application shutdown took too long.")
|
errCh <- TimeoutErr(timeout.String())
|
||||||
return
|
return
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user