Process SSE messages, and include a graceful shutdown
This commit is contained in:
parent
3e244e8281
commit
d496886f95
4
.github/workflows/on-pr.yml
vendored
4
.github/workflows/on-pr.yml
vendored
@ -34,8 +34,8 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
## IF you want to update the default branch for `pull_request runs, do it after the ||`
|
## IF you want to update the default branch for `pull_request runs, do it after the ||`
|
||||||
env:
|
env:
|
||||||
foundry-test-ref: ${{ github.event.inputs.foundry-test-ref || 'feature/build-stack'}}
|
foundry-test-ref: ${{ github.event.inputs.foundry-test-ref || 'c17752de64f208f286f02379b80d2a935237c860'}}
|
||||||
ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || 'main' }}
|
ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || '05600e51d2163e1c5e2a872cb54606bc0a380d12' }}
|
||||||
GOPATH: /tmp/go
|
GOPATH: /tmp/go
|
||||||
steps:
|
steps:
|
||||||
- name: Create GOPATH
|
- name: Create GOPATH
|
||||||
|
20
Makefile
20
Makefile
@ -30,6 +30,26 @@ integration-test-ci:
|
|||||||
--cover --coverprofile=cover.profile \
|
--cover --coverprofile=cover.profile \
|
||||||
--race --trace --json-report=report.json
|
--race --trace --json-report=report.json
|
||||||
|
|
||||||
|
.PHONY: integration-test-local
|
||||||
|
integration-test-local:
|
||||||
|
go vet ./...
|
||||||
|
go fmt ./...
|
||||||
|
$(GINKGO) -r --label-filter integration \
|
||||||
|
--procs=4 --compilers=4 \
|
||||||
|
--randomize-all --randomize-suites \
|
||||||
|
--fail-on-pending --keep-going \
|
||||||
|
--race --trace
|
||||||
|
|
||||||
|
.PHONY: unit-test-local
|
||||||
|
unit-test-local:
|
||||||
|
go vet ./...
|
||||||
|
go fmt ./...
|
||||||
|
$(GINKGO) -r --label-filter unit \
|
||||||
|
--procs=4 --compilers=4 \
|
||||||
|
--randomize-all --randomize-suites \
|
||||||
|
--fail-on-pending --keep-going \
|
||||||
|
--race --trace
|
||||||
|
|
||||||
.PHONY: unit-test-ci
|
.PHONY: unit-test-ci
|
||||||
unit-test-ci:
|
unit-test-ci:
|
||||||
go vet ./...
|
go vet ./...
|
||||||
|
@ -40,9 +40,7 @@ func startHeadTracking() {
|
|||||||
go BC.CaptureHead()
|
go BC.CaptureHead()
|
||||||
|
|
||||||
// Shutdown when the time is right.
|
// Shutdown when the time is right.
|
||||||
wait := shutdown.ShutdownServices(ctx, time.Duration(maxWaitSecondsShutdown), DB, BC)
|
shutdown.ShutdownServices(ctx, time.Duration(maxWaitSecondsShutdown), DB, BC)
|
||||||
|
|
||||||
<-wait
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
|
||||||
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
|
||||||
"github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown"
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown"
|
||||||
@ -11,8 +12,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Shutdown all the internal services for the application.
|
// Shutdown all the internal services for the application.
|
||||||
func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) <-chan struct{} {
|
func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) {
|
||||||
return gracefulshutdown.Shutdown(ctx, waitTime, map[string]gracefulshutdown.Operation{
|
successCh, errCh := gracefulshutdown.Shutdown(ctx, waitTime, map[string]gracefulshutdown.Operation{
|
||||||
"database": func(ctx context.Context) error {
|
"database": func(ctx context.Context) error {
|
||||||
err := DB.Close()
|
err := DB.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -28,4 +29,11 @@ func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Databa
|
|||||||
return err
|
return err
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
select {
|
||||||
|
case _ = <-successCh:
|
||||||
|
log.Info("Gracefully Shutdown ipld-ethcl-indexer!")
|
||||||
|
case err := <-errCh:
|
||||||
|
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,7 @@ import (
|
|||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestHealthcheck(t *testing.T) {
|
func TestBeaconClient(t *testing.T) {
|
||||||
RegisterFailHandler(Fail)
|
RegisterFailHandler(Fail)
|
||||||
RunSpecs(t, "Healthcheck Suite")
|
RunSpecs(t, "BeaconClient Suite", Label("beacon-client"))
|
||||||
}
|
}
|
||||||
|
@ -10,8 +10,10 @@ import (
|
|||||||
// This function will perform all the heavy lifting for tracking the head of the chain.
|
// This function will perform all the heavy lifting for tracking the head of the chain.
|
||||||
func (bc *BeaconClient) CaptureHead() {
|
func (bc *BeaconClient) CaptureHead() {
|
||||||
log.Info("We are tracking the head of the chain.")
|
log.Info("We are tracking the head of the chain.")
|
||||||
//go readProcessedEvents(bc.HeadTracking.ProcessCh)
|
go bc.handleHead()
|
||||||
bc.CaptureHeadTopic()
|
go bc.handleFinalizedCheckpoint()
|
||||||
|
go bc.handleReorgs()
|
||||||
|
bc.captureEventTopic()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop the head tracking service.
|
// Stop the head tracking service.
|
||||||
@ -32,10 +34,11 @@ func (bc *BeaconClient) StopHeadTracking() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This function closes the SSE subscription, but waits until the MessagesCh is empty
|
||||||
func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) {
|
func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) {
|
||||||
loghelper.LogUrl(se.Url).Info("Received a close event.")
|
loghelper.LogUrl(se.Url).Info("Received a close event.")
|
||||||
se.SseClient.Unsubscribe(se.MessagesCh)
|
se.SseClient.Unsubscribe(se.MessagesCh)
|
||||||
for len(se.MessagesCh) != 0 {
|
for len(se.MessagesCh) != 0 && len(se.ProcessCh) != 0 {
|
||||||
time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond)
|
time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond)
|
||||||
}
|
}
|
||||||
loghelper.LogUrl(se.Url).Info("Done processing all messages, ready for shutdown")
|
loghelper.LogUrl(se.Url).Info("Done processing all messages, ready for shutdown")
|
||||||
|
35
pkg/beaconclient/handleevents.go
Normal file
35
pkg/beaconclient/handleevents.go
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
package beaconclient
|
||||||
|
|
||||||
|
import log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
// This function will perform the necessary steps to handle a reorg.
|
||||||
|
func (bc *BeaconClient) handleReorgs() {
|
||||||
|
log.Info("Starting to process reorgs.")
|
||||||
|
for {
|
||||||
|
// We will add real functionality later
|
||||||
|
reorg := <-bc.ReOrgTracking.ProcessCh
|
||||||
|
log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This function will perform the necessary steps to handle a reorg.
|
||||||
|
func (bc *BeaconClient) handleFinalizedCheckpoint() {
|
||||||
|
log.Info("Starting to process finalized checkpoints.")
|
||||||
|
for {
|
||||||
|
// We will add real functionality later
|
||||||
|
finalized := <-bc.ReOrgTracking.ProcessCh
|
||||||
|
log.WithFields(log.Fields{"finalized": finalized}).Debug("Received a new finalized checkpoint.")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// This function will handle the latest head event.
|
||||||
|
func (bc *BeaconClient) handleHead() {
|
||||||
|
log.Info("Starting to process head.")
|
||||||
|
for {
|
||||||
|
// We will add real functionality later
|
||||||
|
head := <-bc.ReOrgTracking.ProcessCh
|
||||||
|
log.WithFields(log.Fields{"head": head}).Debug("Received a new head event.")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -17,7 +17,7 @@ var (
|
|||||||
// This function will capture all the SSE events for a given SseEvents object.
|
// This function will capture all the SSE events for a given SseEvents object.
|
||||||
// When new messages come in, it will ensure that they are decoded into JSON.
|
// When new messages come in, it will ensure that they are decoded into JSON.
|
||||||
// If any errors occur, it log the error information.
|
// If any errors occur, it log the error information.
|
||||||
func handleSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) {
|
func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) {
|
||||||
loghelper.LogUrl(eventHandler.Url).Info("Subscribing to Messages")
|
loghelper.LogUrl(eventHandler.Url).Info("Subscribing to Messages")
|
||||||
go eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh)
|
go eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh)
|
||||||
for {
|
for {
|
||||||
@ -42,25 +42,27 @@ func handleSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Capture all of the head topics.
|
|
||||||
func (bc *BeaconClient) CaptureHeadTopic() {
|
|
||||||
log.Info("We are capturing all SSE events")
|
|
||||||
go handleSseEvent(bc.HeadTracking)
|
|
||||||
go handleSseEvent(bc.ReOrgTracking)
|
|
||||||
go handleSseEvent(bc.FinalizationTracking)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Turn the data object into a Struct.
|
// Turn the data object into a Struct.
|
||||||
func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan<- *SseError) {
|
func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan<- *SseError) {
|
||||||
log.WithFields(log.Fields{"msg": msg}).Debug("Processing a Message")
|
log.WithFields(log.Fields{"msg": msg}).Info("Processing a Message")
|
||||||
var msgMarshaled P
|
var msgMarshaled P
|
||||||
err := json.Unmarshal(msg, &msgMarshaled)
|
err := json.Unmarshal(msg, &msgMarshaled)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
loghelper.LogError(err).Error("Unable to parse message")
|
||||||
errorCh <- &SseError{
|
errorCh <- &SseError{
|
||||||
err: err,
|
err: err,
|
||||||
msg: msg,
|
msg: msg,
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
log.WithFields(log.Fields{"process": processCh}).Info("Processed")
|
||||||
processCh <- &msgMarshaled
|
processCh <- &msgMarshaled
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Capture all of the event topics.
|
||||||
|
func (bc *BeaconClient) captureEventTopic() {
|
||||||
|
log.Info("We are capturing all SSE events")
|
||||||
|
go handleIncomingSseEvent(bc.HeadTracking)
|
||||||
|
go handleIncomingSseEvent(bc.ReOrgTracking)
|
||||||
|
go handleIncomingSseEvent(bc.FinalizationTracking)
|
||||||
|
}
|
||||||
|
@ -2,6 +2,7 @@ package gracefulshutdown
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"sync"
|
"sync"
|
||||||
@ -16,8 +17,9 @@ import (
|
|||||||
type Operation func(ctx context.Context) error
|
type Operation func(ctx context.Context) error
|
||||||
|
|
||||||
// gracefulShutdown waits for termination syscalls and doing clean up operations after received it
|
// gracefulShutdown waits for termination syscalls and doing clean up operations after received it
|
||||||
func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operation) <-chan struct{} {
|
func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operation) (<-chan struct{}, <-chan error) {
|
||||||
wait := make(chan struct{})
|
waitCh := make(chan struct{})
|
||||||
|
errCh := make(chan error)
|
||||||
go func() {
|
go func() {
|
||||||
s := make(chan os.Signal, 1)
|
s := make(chan os.Signal, 1)
|
||||||
|
|
||||||
@ -30,7 +32,8 @@ func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operati
|
|||||||
// set timeout for the ops to be done to prevent system hang
|
// set timeout for the ops to be done to prevent system hang
|
||||||
timeoutFunc := time.AfterFunc(timeout, func() {
|
timeoutFunc := time.AfterFunc(timeout, func() {
|
||||||
log.Warnf("timeout %d ms has been elapsed, force exit", timeout.Milliseconds())
|
log.Warnf("timeout %d ms has been elapsed, force exit", timeout.Milliseconds())
|
||||||
os.Exit(0)
|
errCh <- fmt.Errorf("Application shutdown took too long.")
|
||||||
|
return
|
||||||
})
|
})
|
||||||
|
|
||||||
defer timeoutFunc.Stop()
|
defer timeoutFunc.Stop()
|
||||||
@ -57,8 +60,8 @@ func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operati
|
|||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
close(wait)
|
close(waitCh)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return wait
|
return waitCh, errCh
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user