Graceful Shutdown | Handle SSE | Requires Testing

This Commit contains the following:
* Graceful shutdowns.
* Handling all incoming SSE events for reorgs, finalizations, and head.

The structure of the `BeaconClient` has drastically changed and generics are used.
This commit is contained in:
Abdul Rabbani 2022-04-26 13:57:01 -04:00
parent 5b75f5a257
commit 87313887a4
18 changed files with 388 additions and 53 deletions

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
ipld-ethcl-indexer ipld-ethcl-indexer
ipld-ethcl-indexer.log ipld-ethcl-indexer.log
report.json
cover.profile

View File

@ -6,20 +6,22 @@ package cmd
import ( import (
"os" "os"
"time"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
var ( var (
dbUsername string dbUsername string
dbPassword string dbPassword string
dbName string dbName string
dbAddress string dbAddress string
dbDriver string dbDriver string
dbPort int dbPort int
bcAddress string bcAddress string
bcPort int bcPort int
maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second
) )
// captureCmd represents the capture command // captureCmd represents the capture command

View File

@ -5,8 +5,13 @@ Copyright © 2022 NAME HERE <EMAIL ADDRESS>
package cmd package cmd
import ( import (
"context"
"time"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot" "github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
) )
@ -22,10 +27,22 @@ var headCmd = &cobra.Command{
// Start the application to track at head. // Start the application to track at head.
func startHeadTracking() { func startHeadTracking() {
_, err := boot.BootApplicationWithRetry(dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort) // Boot the application
log.Info("Starting the application in head tracking mode.")
ctx := context.Background()
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to Start application") loghelper.LogError(err).Error("Unable to Start application")
} }
// Capture head blocks
go BC.CaptureHead()
// Shutdown when the time is right.
wait := shutdown.ShutdownServices(ctx, time.Duration(maxWaitSecondsShutdown), DB, BC)
<-wait
} }
func init() { func init() {

2
go.mod
View File

@ -22,6 +22,7 @@ require (
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
) )
@ -35,6 +36,7 @@ require (
github.com/mitchellh/mapstructure v1.4.3 // indirect github.com/mitchellh/mapstructure v1.4.3 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pelletier/go-toml/v2 v2.0.0-beta.8 // indirect github.com/pelletier/go-toml/v2 v2.0.0-beta.8 // indirect
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc
github.com/spf13/afero v1.8.2 // indirect github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.4.1 // indirect github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/cobra v1.4.0 github.com/spf13/cobra v1.4.0

5
go.sum
View File

@ -276,6 +276,8 @@ github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qR
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o=
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
@ -401,6 +403,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@ -660,6 +663,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=

View File

@ -1,6 +1,7 @@
package boot package boot
import ( import (
"context"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -55,39 +56,38 @@ func SetupPostgresDb(dbHostname string, dbPort int, dbName string, dbUsername st
// //
// 2. Connect to the database. // 2. Connect to the database.
// //
func BootApplication(dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int) (sql.Database, error) { func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int) (*beaconclient.BeaconClient, sql.Database, error) {
log.Info("Booting the Application") log.Info("Booting the Application")
log.Debug("Creating the Beacon Client") log.Debug("Creating the Beacon Client")
BC.Address = bcAddress BC = beaconclient.CreateBeaconClient(ctx, bcAddress, bcPort)
BC.Port = bcPort
log.Debug("Checking Beacon Client")
log.Debug("Checking Beacon Client")
err := BC.CheckBeaconClient() err := BC.CheckBeaconClient()
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
log.Debug("Setting up DB connection") log.Debug("Setting up DB connection")
DB, err := SetupPostgresDb(dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName) DB, err := SetupPostgresDb(dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
return DB, nil return BC, DB, nil
} }
// Add retry logic to ensure that we are give the Beacon Client and the DB time to start. // Add retry logic to ensure that we are give the Beacon Client and the DB time to start.
func BootApplicationWithRetry(dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int) (sql.Database, error) { func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int) (*beaconclient.BeaconClient, sql.Database, error) {
var err error var err error
for i := 0; i < maxRetry; i++ { for i := 0; i < maxRetry; i++ {
DB, err = BootApplication(dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort) BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"retryNumber": i, "retryNumber": i,
}).Warn("Unable to boot application. Going to try again") }).Warn("Unable to boot application. Going to try again")
time.Sleep(time.Duration(retryInterval) * time.Second) time.Sleep(time.Duration(retryInterval) * time.Second)
continue
} }
break
} }
return DB, err return BC, DB, err
} }

View File

@ -1,6 +1,8 @@
package boot_test package boot_test
import ( import (
"context"
. "github.com/onsi/ginkgo/v2" . "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot" "github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
@ -20,26 +22,26 @@ var _ = Describe("Boot", func() {
Describe("Booting the application", Label("integration"), func() { Describe("Booting the application", Label("integration"), func() {
Context("When the DB and BC are both up and running", func() { Context("When the DB and BC are both up and running", func() {
It("Should connect successfully", func() { It("Should connect successfully", func() {
db, err := boot.BootApplicationWithRetry(dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort)
defer db.Close() defer db.Close()
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
}) })
Context("When the DB is running but not the BC", func() { Context("When the DB is running but not the BC", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, err := boot.BootApplication(dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100) _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100)
Expect(err).ToNot(BeNil()) Expect(err).ToNot(BeNil())
}) })
}) })
Context("When the BC is running but not the DB", func() { Context("When the BC is running but not the DB", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, err := boot.BootApplication("hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort) _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort)
Expect(err).ToNot(BeNil()) Expect(err).ToNot(BeNil())
}) })
}) })
Context("When neither the BC or DB are running", func() { Context("When neither the BC or DB are running", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, err := boot.BootApplication("hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100) _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100)
Expect(err).ToNot(BeNil()) Expect(err).ToNot(BeNil())
}) })
}) })

View File

@ -0,0 +1,31 @@
package shutdown
import (
"context"
"time"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// Shutdown all the internal services for the application.
func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) <-chan struct{} {
return gracefulshutdown.Shutdown(ctx, waitTime, map[string]gracefulshutdown.Operation{
"database": func(ctx context.Context) error {
err := DB.Close()
if err != nil {
loghelper.LogError(err).Error("Unable to close the DB")
}
return err
},
"beaconClient": func(ctx context.Context) error {
err := BC.StopHeadTracking()
if err != nil {
loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking")
}
return err
},
})
}

View File

@ -0,0 +1,73 @@
package beaconclient
import (
"context"
"fmt"
"github.com/r3labs/sse"
log "github.com/sirupsen/logrus"
)
var (
bcHealthEndpoint = "/eth/v1/node/health" // Endpoint used for the healthcheck
bcHeadTopicEndpoint = "/eth/v1/events?topics=head" // Endpoint used to subscribe to the head of the chain
bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain
bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
connectionProtocol = "http"
)
// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying.
type BeaconClient struct {
Context context.Context // A context generic context with multiple uses.
ServerAddress string // Address of the Beacon Server
ServerPort int // Port of the Beacon Server
PerformHeadTracking bool // Should we track head?
PerformHistoricalProcessing bool // Should we perform historical processing?
HeadTracking *SseEvents[Head] // Track the head block
ReOrgTracking *SseEvents[ChainReorg] // Track all Reorgs
FinalizationTracking *SseEvents[FinalizedCheckpoint] // Track all finalization checkpoints
}
// A struct to keep track of relevant the head event topic.
type SseEvents[P ProcessedEvents] struct {
Url string // The url for the subscription. Primarily used for logging
MessagesCh chan *sse.Event // Contains all the messages from the SSE Channel
ErrorCh chan *SseError // Contains any errors while SSE streaming occurred
ProcessCh chan *P // Used to capture processed data in its proper struct.
SseClient *sse.Client // sse.Client object that is used to interact with the SSE stream
}
// An object to capture any errors when turning an SSE message to JSON.
type SseError struct {
err error
msg []byte
}
// A Function to create the BeaconClient.
func CreateBeaconClient(ctx context.Context, bcAddress string, bcPort int) *BeaconClient {
log.Info("Creating the BeaconClient")
return &BeaconClient{
Context: ctx,
ServerAddress: bcAddress,
ServerPort: bcPort,
HeadTracking: createSseEvent[Head](connectionProtocol, bcAddress, bcPort, bcHeadTopicEndpoint),
ReOrgTracking: createSseEvent[ChainReorg](connectionProtocol, bcAddress, bcPort, bcReorgTopicEndpoint),
FinalizationTracking: createSseEvent[FinalizedCheckpoint](connectionProtocol, bcAddress, bcPort, bcFinalizedTopicEndpoint),
}
}
// Create all the channels to handle a SSE events
func createSseEvent[P ProcessedEvents](connectionProtocol, bcAddress string, bcPort int, endpoint string) *SseEvents[P] {
url := fmt.Sprintf("%s://%s:%d%s", connectionProtocol, bcAddress, bcPort, endpoint)
sseEvents := &SseEvents[P]{
Url: url,
MessagesCh: make(chan *sse.Event),
ErrorCh: make(chan *SseError),
ProcessCh: make(chan *P),
SseClient: func(url string) *sse.Client {
log.WithFields(log.Fields{"url": url}).Info("Creating SSE client")
return sse.NewClient(url)
}(url),
}
return sseEvents
}

View File

@ -1,6 +0,0 @@
package beaconclient
type BeaconClient struct {
Address string
Port int
}

View File

@ -0,0 +1,43 @@
package beaconclient
import (
"time"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHead() {
log.Info("We are tracking the head of the chain.")
//go readProcessedEvents(bc.HeadTracking.ProcessCh)
bc.CaptureHeadTopic()
}
// Stop the head tracking service.
func (bc *BeaconClient) StopHeadTracking() error {
log.Info("We are going to stop tracking the head of chain because of the shutdown signal.")
chHead := make(chan bool)
chReorg := make(chan bool)
chFinal := make(chan bool)
go bc.HeadTracking.finishProcessingChannel(chHead)
go bc.ReOrgTracking.finishProcessingChannel(chReorg)
go bc.FinalizationTracking.finishProcessingChannel(chFinal)
<-chHead
<-chFinal
<-chReorg
log.Info("Successfully stopped the head tracking service.")
return nil
}
func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) {
loghelper.LogUrl(se.Url).Info("Received a close event.")
se.SseClient.Unsubscribe(se.MessagesCh)
for len(se.MessagesCh) != 0 {
time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond)
}
loghelper.LogUrl(se.Url).Info("Done processing all messages, ready for shutdown")
finish <- true
}

View File

@ -1,7 +0,0 @@
// This package will handle all event subscriptions that utilize SSE.
package beaconclient
func ListenToHead() {
}

View File

@ -9,17 +9,13 @@ import (
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
) )
var (
bcHealthEndpoint = "/eth/v1/node/health"
)
// This function will ensure that we can connect to the beacon client. // This function will ensure that we can connect to the beacon client.
// Keep in mind, the beacon client will allow you to connect to it but it might // Keep in mind, the beacon client will allow you to connect to it but it might
// Not allow you to make http requests. This is part of its built in logic, and you will have // Not allow you to make http requests. This is part of its built in logic, and you will have
// to follow their provided guidelines. https://lighthouse-book.sigmaprime.io/api-bn.html#security // to follow their provided guidelines. https://lighthouse-book.sigmaprime.io/api-bn.html#security
func (bc *BeaconClient) CheckBeaconClient() error { func (bc BeaconClient) CheckBeaconClient() error {
log.Debug("Attempting to connect to the beacon client") log.Debug("Attempting to connect to the beacon client")
bcEndpoint := "http://" + bc.Address + ":" + strconv.Itoa(bc.Port) + bcHealthEndpoint bcEndpoint := "http://" + bc.ServerAddress + ":" + strconv.Itoa(bc.ServerPort) + bcHealthEndpoint
resp, err := http.Get(bcEndpoint) resp, err := http.Get(bcEndpoint)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to get bc endpoint: ", bcEndpoint) loghelper.LogError(err).Error("Unable to get bc endpoint: ", bcEndpoint)

View File

@ -1,6 +1,8 @@
package beaconclient_test package beaconclient_test
import ( import (
"context"
. "github.com/onsi/ginkgo/v2" . "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
beaconclient "github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient" beaconclient "github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
@ -8,14 +10,8 @@ import (
var _ = Describe("Healthcheck", func() { var _ = Describe("Healthcheck", func() {
var ( var (
BC = beaconclient.BeaconClient{ BC = beaconclient.CreateBeaconClient(context.Background(), "localhost", 5052)
Address: "localhost", errBc = beaconclient.CreateBeaconClient(context.Background(), "blah-blah", 1010)
Port: 5052,
}
errBc = beaconclient.BeaconClient{
Address: "blah",
Port: 10,
}
) )
Describe("Connecting to the lighthouse client", Label("integration"), func() { Describe("Connecting to the lighthouse client", Label("integration"), func() {
Context("When the client is running", func() { Context("When the client is running", func() {
@ -24,8 +20,8 @@ var _ = Describe("Healthcheck", func() {
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
}) })
Context("When the client is running", func() { Context("When the client is not running", func() {
It("We should connect successfully", func() { It("We not should connect successfully", func() {
err := errBc.CheckBeaconClient() err := errBc.CheckBeaconClient()
Expect(err).ToNot(BeNil()) Expect(err).ToNot(BeNil())
}) })

View File

@ -0,0 +1,37 @@
package beaconclient
// This interface captured what the events can be for processed event streams.
type ProcessedEvents interface {
Head | FinalizedCheckpoint | ChainReorg
}
// This struct captures the JSON representation of the head topic
type Head struct {
Slot string `json:"slot"`
Block string `json:"block"`
State string `json:"state"`
CurrentDutyDependentRoot string `json:"current_duty_dependent_root"`
PreviousDutyDependentRoot string `json:"previous_duty_dependent_root"`
EpochTransition bool `json:"epoch_transition"`
ExecutionOptimistic bool `json:"execution_optimistic"`
}
// This struct captures the JSON representation of the finalized_checkpoint topic.
type FinalizedCheckpoint struct {
Block string `json:"block"`
State string `json:"state"`
Epoch string `json:"epoch"`
ExecutionOptimistic bool `json:"execution_optimistic"`
}
// This struct captures the JSON representation of the chain_reorg topic.
type ChainReorg struct {
Slot string `json:"slot"`
Depth string `json:"depth"`
OldHeadBlock string `json:"old_head_block"`
NewHeadBlock string `json:"new_head_block"`
OldHeadState string `json:"old_head_state"`
NewHeadState string `json:"new_head_state"`
Epoch string `json:"epoch"`
ExecutionOptimistic bool `json:"execution_optimistic"`
}

View File

@ -0,0 +1,66 @@
// This package will handle all event subscriptions that utilize SSE.
package beaconclient
import (
"encoding/json"
"time"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
var (
shutdownWaitInterval = time.Duration(5) * time.Second
)
// This function will capture all the SSE events for a given SseEvents object.
// When new messages come in, it will ensure that they are decoded into JSON.
// If any errors occur, it log the error information.
func handleSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) {
loghelper.LogUrl(eventHandler.Url).Info("Subscribing to Messages")
go eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh)
for {
select {
case message := <-eventHandler.MessagesCh:
// Message can be nil if its a keep-alive message
if len(message.Data) != 0 {
go processMsg(message.Data, eventHandler.ProcessCh, eventHandler.ErrorCh)
}
case headErr := <-eventHandler.ErrorCh:
log.WithFields(log.Fields{
"url": eventHandler.Url,
"err": headErr.err,
"msg": headErr.msg,
},
).Error("Unable to handle event.")
case process := <-eventHandler.ProcessCh:
log.WithFields(log.Fields{"processed": process}).Debug("Processesing a Message")
}
}
}
// Capture all of the head topics.
func (bc *BeaconClient) CaptureHeadTopic() {
log.Info("We are capturing all SSE events")
go handleSseEvent(bc.HeadTracking)
go handleSseEvent(bc.ReOrgTracking)
go handleSseEvent(bc.FinalizationTracking)
}
// Turn the data object into a Struct.
func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan<- *SseError) {
log.WithFields(log.Fields{"msg": msg}).Debug("Processing a Message")
var msgMarshaled P
err := json.Unmarshal(msg, &msgMarshaled)
if err != nil {
errorCh <- &SseError{
err: err,
msg: msg,
}
return
}
processCh <- &msgMarshaled
}

View File

@ -0,0 +1,64 @@
package gracefulshutdown
import (
"context"
"os"
"os/signal"
"sync"
"syscall"
"time"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// operation is a clean up function on shutting down
type Operation func(ctx context.Context) error
// gracefulShutdown waits for termination syscalls and doing clean up operations after received it
func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operation) <-chan struct{} {
wait := make(chan struct{})
go func() {
s := make(chan os.Signal, 1)
// add any other syscalls that you want to be notified with
signal.Notify(s, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
<-s
log.Info("Shutting Down your application")
// set timeout for the ops to be done to prevent system hang
timeoutFunc := time.AfterFunc(timeout, func() {
log.Warnf("timeout %d ms has been elapsed, force exit", timeout.Milliseconds())
os.Exit(0)
})
defer timeoutFunc.Stop()
var wg sync.WaitGroup
// Do the operations asynchronously to save time
for key, op := range ops {
wg.Add(1)
innerOp := op
innerKey := key
go func() {
defer wg.Done()
log.Infof("cleaning up: %s", innerKey)
if err := innerOp(ctx); err != nil {
loghelper.LogError(err).Errorf("%s: clean up failed: %s", innerKey, err.Error())
return
}
log.Infof("%s was shutdown gracefully", innerKey)
}()
}
wg.Wait()
close(wait)
}()
return wait
}

12
pkg/loghelper/log_url.go Normal file
View File

@ -0,0 +1,12 @@
package loghelper
import (
log "github.com/sirupsen/logrus"
)
// A simple helper function that will help wrap the error message.
func LogUrl(url string) *log.Entry {
return log.WithFields(log.Fields{
"url": url,
})
}