diff --git a/.github/workflows/on-pr.yml b/.github/workflows/on-pr.yml index faaf4a5..4e1f8ad 100644 --- a/.github/workflows/on-pr.yml +++ b/.github/workflows/on-pr.yml @@ -34,8 +34,8 @@ jobs: runs-on: ubuntu-latest ## IF you want to update the default branch for `pull_request runs, do it after the ||` env: - foundry-test-ref: ${{ github.event.inputs.foundry-test-ref || 'feature/build-stack'}} - ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || 'main' }} + foundry-test-ref: ${{ github.event.inputs.foundry-test-ref || 'c17752de64f208f286f02379b80d2a935237c860'}} + ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || '05600e51d2163e1c5e2a872cb54606bc0a380d12' }} GOPATH: /tmp/go steps: - name: Create GOPATH diff --git a/Makefile b/Makefile index 7b97424..bf2e654 100644 --- a/Makefile +++ b/Makefile @@ -30,6 +30,26 @@ integration-test-ci: --cover --coverprofile=cover.profile \ --race --trace --json-report=report.json +.PHONY: integration-test-local +integration-test-local: + go vet ./... + go fmt ./... + $(GINKGO) -r --label-filter integration \ + --procs=4 --compilers=4 \ + --randomize-all --randomize-suites \ + --fail-on-pending --keep-going \ + --race --trace + +.PHONY: unit-test-local +unit-test-local: + go vet ./... + go fmt ./... + $(GINKGO) -r --label-filter unit \ + --procs=4 --compilers=4 \ + --randomize-all --randomize-suites \ + --fail-on-pending --keep-going \ + --race --trace + .PHONY: unit-test-ci unit-test-ci: go vet ./... diff --git a/cmd/head.go b/cmd/head.go index 10ee3b8..db98242 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -40,9 +40,7 @@ func startHeadTracking() { go BC.CaptureHead() // Shutdown when the time is right. - wait := shutdown.ShutdownServices(ctx, time.Duration(maxWaitSecondsShutdown), DB, BC) - - <-wait + shutdown.ShutdownServices(ctx, time.Duration(maxWaitSecondsShutdown), DB, BC) } func init() { diff --git a/internal/shutdown/shutdown.go b/internal/shutdown/shutdown.go index 9b1742f..087adb4 100644 --- a/internal/shutdown/shutdown.go +++ b/internal/shutdown/shutdown.go @@ -4,6 +4,7 @@ import ( "context" "time" + log "github.com/sirupsen/logrus" "github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" "github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown" @@ -11,8 +12,8 @@ import ( ) // Shutdown all the internal services for the application. -func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) <-chan struct{} { - return gracefulshutdown.Shutdown(ctx, waitTime, map[string]gracefulshutdown.Operation{ +func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) { + successCh, errCh := gracefulshutdown.Shutdown(ctx, waitTime, map[string]gracefulshutdown.Operation{ "database": func(ctx context.Context) error { err := DB.Close() if err != nil { @@ -28,4 +29,11 @@ func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Databa return err }, }) + + select { + case _ = <-successCh: + log.Info("Gracefully Shutdown ipld-ethcl-indexer!") + case err := <-errCh: + loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") + } } diff --git a/pkg/beaconclient/beaconclient_suite_test.go b/pkg/beaconclient/beaconclient_suite_test.go index 4c9c1d3..67e98de 100644 --- a/pkg/beaconclient/beaconclient_suite_test.go +++ b/pkg/beaconclient/beaconclient_suite_test.go @@ -7,7 +7,7 @@ import ( . "github.com/onsi/gomega" ) -func TestHealthcheck(t *testing.T) { +func TestBeaconClient(t *testing.T) { RegisterFailHandler(Fail) - RunSpecs(t, "Healthcheck Suite") + RunSpecs(t, "BeaconClient Suite", Label("beacon-client")) } diff --git a/pkg/beaconclient/capturehead.go b/pkg/beaconclient/capturehead.go index 6a20c8f..d1ca6a1 100644 --- a/pkg/beaconclient/capturehead.go +++ b/pkg/beaconclient/capturehead.go @@ -10,8 +10,10 @@ import ( // This function will perform all the heavy lifting for tracking the head of the chain. func (bc *BeaconClient) CaptureHead() { log.Info("We are tracking the head of the chain.") - //go readProcessedEvents(bc.HeadTracking.ProcessCh) - bc.CaptureHeadTopic() + go bc.handleHead() + go bc.handleFinalizedCheckpoint() + go bc.handleReorgs() + bc.captureEventTopic() } // Stop the head tracking service. @@ -32,10 +34,11 @@ func (bc *BeaconClient) StopHeadTracking() error { return nil } +// This function closes the SSE subscription, but waits until the MessagesCh is empty func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) { loghelper.LogUrl(se.Url).Info("Received a close event.") se.SseClient.Unsubscribe(se.MessagesCh) - for len(se.MessagesCh) != 0 { + for len(se.MessagesCh) != 0 && len(se.ProcessCh) != 0 { time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond) } loghelper.LogUrl(se.Url).Info("Done processing all messages, ready for shutdown") diff --git a/pkg/beaconclient/handleevents.go b/pkg/beaconclient/handleevents.go new file mode 100644 index 0000000..08f715e --- /dev/null +++ b/pkg/beaconclient/handleevents.go @@ -0,0 +1,35 @@ +package beaconclient + +import log "github.com/sirupsen/logrus" + +// This function will perform the necessary steps to handle a reorg. +func (bc *BeaconClient) handleReorgs() { + log.Info("Starting to process reorgs.") + for { + // We will add real functionality later + reorg := <-bc.ReOrgTracking.ProcessCh + log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.") + } +} + +// This function will perform the necessary steps to handle a reorg. +func (bc *BeaconClient) handleFinalizedCheckpoint() { + log.Info("Starting to process finalized checkpoints.") + for { + // We will add real functionality later + finalized := <-bc.ReOrgTracking.ProcessCh + log.WithFields(log.Fields{"finalized": finalized}).Debug("Received a new finalized checkpoint.") + } + +} + +// This function will handle the latest head event. +func (bc *BeaconClient) handleHead() { + log.Info("Starting to process head.") + for { + // We will add real functionality later + head := <-bc.ReOrgTracking.ProcessCh + log.WithFields(log.Fields{"head": head}).Debug("Received a new head event.") + } + +} diff --git a/pkg/beaconclient/ssehandler.go b/pkg/beaconclient/ssehandler.go index 6e3c003..ac9d01f 100644 --- a/pkg/beaconclient/ssehandler.go +++ b/pkg/beaconclient/ssehandler.go @@ -17,7 +17,7 @@ var ( // This function will capture all the SSE events for a given SseEvents object. // When new messages come in, it will ensure that they are decoded into JSON. // If any errors occur, it log the error information. -func handleSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) { +func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) { loghelper.LogUrl(eventHandler.Url).Info("Subscribing to Messages") go eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh) for { @@ -42,25 +42,27 @@ func handleSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) { } } -// Capture all of the head topics. -func (bc *BeaconClient) CaptureHeadTopic() { - log.Info("We are capturing all SSE events") - go handleSseEvent(bc.HeadTracking) - go handleSseEvent(bc.ReOrgTracking) - go handleSseEvent(bc.FinalizationTracking) -} - // Turn the data object into a Struct. func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan<- *SseError) { - log.WithFields(log.Fields{"msg": msg}).Debug("Processing a Message") + log.WithFields(log.Fields{"msg": msg}).Info("Processing a Message") var msgMarshaled P err := json.Unmarshal(msg, &msgMarshaled) if err != nil { + loghelper.LogError(err).Error("Unable to parse message") errorCh <- &SseError{ err: err, msg: msg, } return } + log.WithFields(log.Fields{"process": processCh}).Info("Processed") processCh <- &msgMarshaled } + +// Capture all of the event topics. +func (bc *BeaconClient) captureEventTopic() { + log.Info("We are capturing all SSE events") + go handleIncomingSseEvent(bc.HeadTracking) + go handleIncomingSseEvent(bc.ReOrgTracking) + go handleIncomingSseEvent(bc.FinalizationTracking) +} diff --git a/pkg/gracefulshutdown/gracefulshutdown.go b/pkg/gracefulshutdown/gracefulshutdown.go index 36510db..9f01d85 100644 --- a/pkg/gracefulshutdown/gracefulshutdown.go +++ b/pkg/gracefulshutdown/gracefulshutdown.go @@ -2,6 +2,7 @@ package gracefulshutdown import ( "context" + "fmt" "os" "os/signal" "sync" @@ -16,8 +17,9 @@ import ( type Operation func(ctx context.Context) error // gracefulShutdown waits for termination syscalls and doing clean up operations after received it -func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operation) <-chan struct{} { - wait := make(chan struct{}) +func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operation) (<-chan struct{}, <-chan error) { + waitCh := make(chan struct{}) + errCh := make(chan error) go func() { s := make(chan os.Signal, 1) @@ -30,7 +32,8 @@ func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operati // set timeout for the ops to be done to prevent system hang timeoutFunc := time.AfterFunc(timeout, func() { log.Warnf("timeout %d ms has been elapsed, force exit", timeout.Milliseconds()) - os.Exit(0) + errCh <- fmt.Errorf("Application shutdown took too long.") + return }) defer timeoutFunc.Stop() @@ -57,8 +60,8 @@ func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operati wg.Wait() - close(wait) + close(waitCh) }() - return wait + return waitCh, errCh }