Compare commits

...

15 Commits

Author SHA1 Message Date
Abdul Rabbani
1ea88d05ba Allow custom epoch and status for multiple messages 2022-06-23 14:44:32 -04:00
Abdul Rabbani
58d2104392 Correct condition 2022-06-23 14:10:14 -04:00
Abdul Rabbani
4825a54e4e Fix test conditions 2022-06-23 13:47:26 -04:00
Abdul Rabbani
0954dc6159 Update head testing 2022-06-23 11:35:18 -04:00
Abdul Rabbani
16d034c844 Close idle connections and chan's properly. 2022-06-23 09:17:46 -04:00
Abdul Rabbani
32ec784e1e Close channels in the correct location 2022-06-23 08:50:09 -04:00
Abdul Rabbani
9664783eef Remove pprof 2022-06-23 08:26:23 -04:00
Abdul Rabbani
ac1e3075e2 Update system testing 2022-06-23 08:07:19 -04:00
Abdul Rabbani
041f862c11 Make channels buffered 2022-06-23 07:50:36 -04:00
Abdul Rabbani
2299e54197 Use context to end goroutine 2022-06-22 14:44:26 -04:00
Abdul Rabbani
6cecc69520 Don't close DB 2022-06-22 14:09:39 -04:00
Abdul Rabbani
43e0c71639 Improve test stop 2022-06-22 12:36:49 -04:00
Abdul Rabbani
8253201f95 Limit head workers + use single context + skipSse 2022-06-22 12:06:52 -04:00
Abdul Rabbani
67305e07c3 Add condition for SSE subscription 2022-06-21 16:42:51 -04:00
Abdul Rabbani
a2f2603d38 Use context to stop head tracking 2022-06-21 14:34:10 -04:00
23 changed files with 511 additions and 276 deletions

View File

@ -66,7 +66,7 @@ jobs:
run: |
until $(docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-eth-beacon-indexer.yml" --env-file ./config.sh cp ipld-eth-beacon-indexer:/root/HEALTH ./HEALTH) ; do sleep 10; done
cat ./HEALTH
if [[ "$(cat ./HEALTH)" -eq "0" ]]; then echo "Application boot successful" && (exit 0); else docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-eth-beacon-indexer.yml" cp ipld-eth-beacon-indexer:/root/ipld-eth-beacon-indexer.log . && cat ipld-eth-beacon-indexer.log && (exit 1); fi
if [[ "$(cat ./HEALTH)" -eq "0" ]]; then echo "Application boot successful" && (exit 0); else docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-eth-beacon-indexer.yml" --env-file ./config.sh cp ipld-eth-beacon-indexer:/root/ipld-eth-beacon-indexer.log . && cat ipld-eth-beacon-indexer.log && (exit 1); fi
unit-test:
name: Run Unit Tests

View File

@ -25,6 +25,7 @@ integration-test-ci:
go fmt ./...
$(GINKGO) -r --label-filter integration \
--procs=4 --compilers=4 \
--flake-attempts=3 \
--randomize-all --randomize-suites \
--fail-on-pending --keep-going \
--cover --coverprofile=cover.profile \
@ -76,7 +77,7 @@ unit-test-ci:
go vet ./...
go fmt ./...
$(GINKGO) -r --label-filter unit \
--randomize-all --randomize-suites
--randomize-all --randomize-suites \
--flake-attempts=3 \
--fail-on-pending --keep-going \
--cover --coverprofile=cover.profile \
@ -88,6 +89,7 @@ system-test-ci:
go fmt ./...
$(GINKGO) -r --label-filter system \
--randomize-all --randomize-suites \
--flake-attempts=3 \
--fail-on-pending --keep-going \
--cover --coverprofile=cover.profile \
--trace --json-report=report.json

View File

@ -39,6 +39,7 @@ var (
bcConnectionProtocol string
bcType string
bcMaxHistoricProcessWorker int
bcMaxHeadProcessWorker int
bcUniqueNodeIdentifier int
bcCheckDb bool
kgMaxWorker int
@ -96,7 +97,8 @@ func init() {
captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.")
captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application")
captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application")
captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the eth-beacon.historic_process table. Be careful of system memory.")
captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 3, "The number of workers that should be actively processing slots from the eth-beacon.historic_process table. Be careful of system memory.")
captureCmd.PersistentFlags().IntVarP(&bcMaxHeadProcessWorker, "bc.maxHeadProcessWorker", "", 3, "The number of workers that should be actively processing slots head slots. Be careful of system memory.")
captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.")
captureCmd.PersistentFlags().BoolVarP(&bcCheckDb, "bc.checkDb", "", true, "Should we check to see if the slot exists in the DB before writing it?")
// err = captureCmd.MarkPersistentFlagRequired("bc.address")
@ -107,7 +109,7 @@ func init() {
//// Known Gaps specific
captureCmd.PersistentFlags().BoolVarP(&kgProcessGaps, "kg.processKnownGaps", "", true, "Should we process the slots within the eth-beacon.known_gaps table.")
captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.")
captureCmd.PersistentFlags().IntVarP(&kgMaxWorker, "kg.maxKnownGapsWorker", "", 30, "The number of workers that should be actively processing slots from the eth-beacon.known_gaps table. Be careful of system memory.")
captureCmd.PersistentFlags().IntVarP(&kgMaxWorker, "kg.maxKnownGapsWorker", "", 3, "The number of workers that should be actively processing slots from the eth-beacon.known_gaps table. Be careful of system memory.")
// Prometheus Specific
captureCmd.PersistentFlags().BoolVarP(&pmMetrics, "pm.metrics", "", true, "Should we capture prometheus metrics.")
@ -157,6 +159,8 @@ func init() {
exitErr(err)
err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker"))
exitErr(err)
err = viper.BindPFlag("bc.maxHeadProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHeadProcessWorker"))
exitErr(err)
err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier"))
exitErr(err)
err = viper.BindPFlag("bc.checkDb", captureCmd.PersistentFlags().Lookup("bc.checkDb"))

View File

@ -59,7 +59,7 @@ func init() {
func startFullProcessing() {
// Boot the application
log.Info("Starting the application in head tracking mode.")
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
@ -75,13 +75,11 @@ func startFullProcessing() {
log.Info("The Beacon Client has booted successfully!")
// Capture head blocks
go Bc.CaptureHead()
hpContext, hpCancel := context.WithCancel(context.Background())
go Bc.CaptureHead(ctx, viper.GetInt("bc.maxHeadProcessWorker"), false)
errG, _ := errgroup.WithContext(context.Background())
errG.Go(func() error {
errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"))
errs := Bc.CaptureHistoric(ctx, viper.GetInt("bc.maxHistoricProcessWorker"))
if len(errs) != 0 {
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events")
@ -90,12 +88,11 @@ func startFullProcessing() {
}
return nil
})
kgCtx, KgCancel := context.WithCancel(context.Background())
if viper.GetBool("kg.processKnownGaps") {
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"))
errs := Bc.ProcessKnownGaps(ctx, viper.GetInt("kg.maxKnownGapsWorker"))
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
@ -115,7 +112,7 @@ func startFullProcessing() {
}
// Shutdown when the time is right.
err = shutdown.ShutdownFull(ctx, KgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
err = shutdown.ShutdownFull(ctx, cancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!")
} else {

View File

@ -46,7 +46,7 @@ var headCmd = &cobra.Command{
func startHeadTracking() {
// Boot the application
log.Info("Starting the application in head tracking mode.")
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
@ -62,13 +62,13 @@ func startHeadTracking() {
log.Info("The Beacon Client has booted successfully!")
// Capture head blocks
go Bc.CaptureHead()
kgCtx, KgCancel := context.WithCancel(context.Background())
go Bc.CaptureHead(ctx, viper.GetInt("bc.maxHeadProcessWorker"), false)
if viper.GetBool("kg.processKnownGaps") {
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"))
errs := Bc.ProcessKnownGaps(ctx, viper.GetInt("kg.maxKnownGapsWorker"))
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
@ -88,13 +88,12 @@ func startHeadTracking() {
}
// Shutdown when the time is right.
err = shutdown.ShutdownHeadTracking(ctx, KgCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!")
} else {
log.Info("Gracefully shutdown ipld-eth-beacon-indexer")
}
}
func init() {

View File

@ -49,7 +49,7 @@ var historicCmd = &cobra.Command{
func startHistoricProcessing() {
// Boot the application
log.Info("Starting the application in head tracking mode.")
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
@ -63,11 +63,9 @@ func startHistoricProcessing() {
serveProm(addr)
}
hpContext, hpCancel := context.WithCancel(context.Background())
errG, _ := errgroup.WithContext(context.Background())
errG.Go(func() error {
errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"))
errs := Bc.CaptureHistoric(ctx, viper.GetInt("bc.maxHistoricProcessWorker"))
if len(errs) != 0 {
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events")
@ -77,12 +75,11 @@ func startHistoricProcessing() {
return nil
})
kgContext, kgCancel := context.WithCancel(context.Background())
if viper.GetBool("kg.processKnownGaps") {
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
errs := Bc.ProcessKnownGaps(kgContext, viper.GetInt("kg.maxKnownGapsWorker"))
errs := Bc.ProcessKnownGaps(ctx, viper.GetInt("kg.maxKnownGapsWorker"))
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
@ -102,7 +99,7 @@ func startHistoricProcessing() {
}
// Shutdown when the time is right.
err = shutdown.ShutdownHistoricProcessing(ctx, kgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
err = shutdown.ShutdownHistoricProcessing(ctx, cancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!")
} else {

1
go.mod
View File

@ -70,6 +70,7 @@ require (
github.com/urfave/cli/v2 v2.3.0 // indirect
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/goleak v1.1.12 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f // indirect

3
go.sum
View File

@ -833,8 +833,9 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=

View File

@ -40,68 +40,66 @@ func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime t
}
// Wrapper function for shutting down the head tracking process.
func ShutdownHeadTracking(ctx context.Context, kgCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
func ShutdownHeadTracking(ctx context.Context, cancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{
// Combining DB shutdown with BC because BC needs DB open to cleanly shutdown.
"beaconClient": func(ctx context.Context) error {
defer DB.Close()
err := BC.StopHeadTracking()
if err != nil {
loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking")
}
cancel()
BC.StopHeadTracking(ctx, false)
if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) {
err = BC.StopKnownGapsProcessing(kgCancel)
err := BC.StopKnownGapsProcessing(ctx)
if err != nil {
loghelper.LogError(err).Error("Unable to stop processing known gaps")
}
}
return err
}
}
return nil
},
})
}
// Wrapper function for shutting down the head tracking process.
func ShutdownHistoricProcessing(ctx context.Context, kgCancel, hpCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
func ShutdownHistoricProcessing(ctx context.Context, cancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{
// Combining DB shutdown with BC because BC needs DB open to cleanly shutdown.
"beaconClient": func(ctx context.Context) error {
defer DB.Close()
err := BC.StopHistoric(hpCancel)
cancel()
err := BC.StopHistoricProcess(ctx)
if err != nil {
loghelper.LogError(err).Error("Unable to stop processing historic")
}
if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) {
err = BC.StopKnownGapsProcessing(kgCancel)
err = BC.StopKnownGapsProcessing(ctx)
if err != nil {
loghelper.LogError(err).Error("Unable to stop processing known gaps")
}
}
return err
}
}
return nil
},
})
}
// Shutdown the head and historical processing
func ShutdownFull(ctx context.Context, kgCancel, hpCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
func ShutdownFull(ctx context.Context, cancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{
// Combining DB shutdown with BC because BC needs DB open to cleanly shutdown.
"beaconClient": func(ctx context.Context) error {
defer DB.Close()
err := BC.StopHistoric(hpCancel)
cancel()
err := BC.StopHistoricProcess(ctx)
if err != nil {
loghelper.LogError(err).Error("Unable to stop processing historic")
}
if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) {
err = BC.StopKnownGapsProcessing(kgCancel)
err = BC.StopKnownGapsProcessing(ctx)
if err != nil {
loghelper.LogError(err).Error("Unable to stop processing known gaps")
}
}
err = BC.StopHeadTracking()
if err != nil {
loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking")
}
BC.StopHeadTracking(ctx, false)
return err
},
})

View File

@ -56,23 +56,23 @@ var (
BC *beaconclient.BeaconClient
err error
ctx context.Context
cancel context.CancelFunc
notifierCh chan os.Signal
)
var _ = Describe("Shutdown", func() {
BeforeEach(func() {
ctx = context.Background()
ctx, cancel = context.WithCancel(context.Background())
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress,
bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier, bcCheckDb)
notifierCh = make(chan os.Signal, 1)
Expect(err).To(BeNil())
})
Describe("Run Shutdown Function for head tracking,", Label("integration"), func() {
Describe("Run Shutdown Function for head tracking,", Label("integration", "shutdown"), func() {
Context("When Channels are empty,", func() {
It("Should Shutdown Successfully.", func() {
go func() {
_, cancel := context.WithCancel(context.Background())
log.Debug("Starting shutdown chan")
err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...")
@ -85,7 +85,6 @@ var _ = Describe("Shutdown", func() {
shutdownCh := make(chan bool)
//log.SetLevel(log.DebugLevel)
go func() {
_, cancel := context.WithCancel(context.Background())
log.Debug("Starting shutdown chan")
err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...")
@ -120,7 +119,6 @@ var _ = Describe("Shutdown", func() {
//log.SetLevel(log.DebugLevel)
go func() {
log.Debug("Starting shutdown chan")
_, cancel := context.WithCancel(context.Background())
err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...")
Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String())))

View File

@ -76,8 +76,8 @@ type BeaconClient struct {
type SseEvents[P ProcessedEvents] struct {
Endpoint string // The endpoint for the subscription. Primarily used for logging
MessagesCh chan *sse.Event // Contains all the messages from the SSE Channel
ErrorCh chan *SseError // Contains any errors while SSE streaming occurred
ProcessCh chan *P // Used to capture processed data in its proper struct.
ErrorCh chan SseError // Contains any errors while SSE streaming occurred
ProcessCh chan P // Used to capture processed data in its proper struct.
SseClient *sse.Client // sse.Client object that is used to interact with the SSE stream
}
@ -119,9 +119,9 @@ func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEve
endpoint := baseEndpoint + path
sseEvents := &SseEvents[P]{
Endpoint: endpoint,
MessagesCh: make(chan *sse.Event, 1),
ErrorCh: make(chan *SseError),
ProcessCh: make(chan *P),
MessagesCh: make(chan *sse.Event),
ErrorCh: make(chan SseError),
ProcessCh: make(chan P, 10),
SseClient: func(endpoint string) *sse.Client {
log.WithFields(log.Fields{"endpoint": endpoint}).Info("Creating SSE client")
return sse.NewClient(endpoint)

View File

@ -18,42 +18,32 @@
package beaconclient
import (
"time"
"context"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper"
)
// This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHead() {
func (bc *BeaconClient) CaptureHead(ctx context.Context, maxHeadWorkers int, skipSee bool) {
log.Info("We are tracking the head of the chain.")
go bc.handleHead()
go bc.handleReorg()
bc.captureEventTopic()
go bc.handleHead(ctx, maxHeadWorkers)
go bc.handleReorg(ctx)
bc.captureEventTopic(ctx, skipSee)
}
// Stop the head tracking service.
func (bc *BeaconClient) StopHeadTracking() error {
log.Info("We are going to stop tracking the head of chain because of the shutdown signal.")
chHead := make(chan bool)
chReorg := make(chan bool)
go bc.HeadTracking.finishProcessingChannel(chHead)
go bc.ReOrgTracking.finishProcessingChannel(chReorg)
<-chHead
<-chReorg
log.Info("Successfully stopped the head tracking service.")
return nil
}
// This function closes the SSE subscription, but waits until the MessagesCh is empty
func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) {
loghelper.LogEndpoint(se.Endpoint).Info("Received a close event.")
se.SseClient.Unsubscribe(se.MessagesCh)
for len(se.MessagesCh) != 0 || len(se.ProcessCh) != 0 {
time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond)
func (bc *BeaconClient) StopHeadTracking(ctx context.Context, skipSee bool) {
select {
case <-ctx.Done():
if !skipSee {
bc.HeadTracking.SseClient.Unsubscribe(bc.HeadTracking.MessagesCh)
bc.ReOrgTracking.SseClient.Unsubscribe(bc.ReOrgTracking.MessagesCh)
log.Info("Successfully unsubscribed to SSE client")
close(bc.ReOrgTracking.MessagesCh)
close(bc.HeadTracking.MessagesCh)
}
log.Info("Successfully stopped the head tracking service.")
default:
log.Error("The context has not completed....")
}
loghelper.LogEndpoint(se.Endpoint).Info("Done processing all messages, ready for shutdown")
finish <- true
}

View File

@ -23,6 +23,7 @@ import (
"net/http"
"os"
"path/filepath"
"runtime"
"strconv"
"sync/atomic"
"time"
@ -265,70 +266,121 @@ var _ = Describe("Capturehead", Label("head"), func() {
Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() {
Context("Correctly formatted Phase0 Block", func() {
It("Should turn it into a struct successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
It("Should process it successfully.", func() {
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0)
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
head: BeaconNodeTester.TestEvents["100"].HeadMessage,
expectedEpoch: 3,
expectStatus: "proposed",
})
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectParentRoot, BeaconNodeTester.TestEvents["100"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["100"].CorrectSignedBeaconBlockMhKey)
validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectBeaconStateMhKey)
})
})
Context("Correctly formatted Altair Block", func() {
It("Should turn it into a struct successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
Context("Correctly formatted Altair Block", Label("leak-head"), func() {
It("Should process it successfully.", func() {
log.SetLevel(log.DebugLevel)
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
head: BeaconNodeTester.TestEvents["2375703"].HeadMessage,
expectedEpoch: 74240,
expectStatus: "proposed",
})
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectParentRoot, BeaconNodeTester.TestEvents["2375703"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["2375703"].CorrectSignedBeaconBlockMhKey)
validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectBeaconStateMhKey)
})
})
Context("Correctly formatted Altair Test Blocks", func() {
Context("Correctly formatted Altair Test Blocks", Label("correct-test-altairs"), func() {
It("Should turn it into a struct successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry, 1, 0, 0)
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
head: BeaconNodeTester.TestEvents["2375703-dummy"].HeadMessage,
expectedEpoch: 74240,
expectStatus: "proposed",
})
bc = setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage, 74240, maxRetry, 1, 0, 0)
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
head: BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage,
expectedEpoch: 74240,
expectStatus: "proposed",
})
})
})
Context("Correctly formatted Phase0 Test Blocks", func() {
Context("Correctly formatted Phase0 Test Blocks", Label("correct-test-phase0"), func() {
It("Should turn it into a struct successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0)
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
head: BeaconNodeTester.TestEvents["100-dummy"].HeadMessage,
expectedEpoch: 3,
expectStatus: "proposed",
})
bc = setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 1, 0, 0)
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
head: BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage,
expectedEpoch: 3,
expectStatus: "proposed",
})
//bc = setUpTest(BeaconNodeTester.TestConfig, "99")
//BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
//defer httpmock.DeactivateAndReset()
//BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 1, 0, 0)
})
})
Context("Two consecutive correct blocks", func() {
Context("Two consecutive correct blocks", Label("bug"), func() {
It("Should handle both blocks correctly, without any reorgs or known_gaps", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0)
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 0, 0)
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testProcessBlock(bc, maxRetry, 2, 0, 0, headBlocksSent{
head: BeaconNodeTester.TestEvents["100"].HeadMessage,
expectedEpoch: 3,
expectStatus: "proposed",
}, headBlocksSent{
head: BeaconNodeTester.TestEvents["101"].HeadMessage,
expectedEpoch: 3,
expectStatus: "proposed",
})
})
})
Context("Two consecutive blocks with a bad parent", func() {
Context("Two consecutive blocks with a bad parent", Label("bad-parent"), func() {
It("Should add the previous block to the knownGaps table.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0)
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 1, 1)
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testProcessBlock(bc, maxRetry, 2, 1, 1,
headBlocksSent{
head: BeaconNodeTester.TestEvents["100-dummy"].HeadMessage,
expectedEpoch: 3,
expectStatus: "forked",
},
headBlocksSent{
head: BeaconNodeTester.TestEvents["101"].HeadMessage,
expectedEpoch: 3,
expectStatus: "proposed",
})
})
})
Context("Phase 0: We have a correctly formated SSZ SignedBeaconBlock and BeaconState", func() {
@ -348,10 +400,15 @@ var _ = Describe("Capturehead", Label("head"), func() {
//})
Context("When the proper SSZ objects are not served", func() {
It("Should return an error, and add the slot to the knownGaps table.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "101")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["102-wrong-ssz-1"].HeadMessage, 3, maxRetry, 0, 1, 0)
bc := setUpTest(BeaconNodeTester.TestConfig, "101")
BeaconNodeTester.testProcessBlock(bc, maxRetry, 0, 1, 0, headBlocksSent{
head: BeaconNodeTester.TestEvents["102-wrong-ssz-1"].HeadMessage,
expectedEpoch: 3,
expectStatus: "proposed",
})
knownGapCount := countKnownGapsTable(bc.Db)
Expect(knownGapCount).To(Equal(1))
@ -359,6 +416,7 @@ var _ = Describe("Capturehead", Label("head"), func() {
start, end := queryKnownGaps(bc.Db, "102", "102")
Expect(start).To(Equal(102))
Expect(end).To(Equal(102))
})
})
})
@ -366,9 +424,10 @@ var _ = Describe("Capturehead", Label("head"), func() {
Describe("Known Gaps Scenario", Label("unit", "behavioral"), func() {
Context("There is a gap at start up within one incrementing range.", func() {
It("Should add only a single entry to the knownGaps table.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "10")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "10")
BeaconNodeTester.testKnownGapsMessages(bc, 100, 1, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage)
start, end := queryKnownGaps(bc.Db, "11", "99")
Expect(start).To(Equal(11))
@ -377,9 +436,10 @@ var _ = Describe("Capturehead", Label("head"), func() {
})
Context("There is a gap at start up spanning multiple incrementing range.", func() {
It("Should add multiple entries to the knownGaps table.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "5")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "5")
BeaconNodeTester.testKnownGapsMessages(bc, 10, 10, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage)
start, end := queryKnownGaps(bc.Db, "6", "16")
@ -391,11 +451,12 @@ var _ = Describe("Capturehead", Label("head"), func() {
Expect(end).To(Equal(99))
})
})
Context("Gaps between two head messages", func() {
Context("Gaps between two head messages", Label("gap-head"), func() {
It("Should add the slots in-between", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testKnownGapsMessages(bc, 1000000, 3, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].HeadMessage)
start, end := queryKnownGaps(bc.Db, "101", "1000101")
@ -412,33 +473,37 @@ var _ = Describe("Capturehead", Label("head"), func() {
Describe("ReOrg Scenario", Label("unit", "behavioral"), func() {
Context("Altair: Multiple head messages for the same slot.", func() {
It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.testMultipleHead(bc, TestEvents["2375703"].HeadMessage, TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry)
})
})
Context("Phase0: Multiple head messages for the same slot.", func() {
It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testMultipleHead(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry)
})
})
Context("Phase 0: Multiple reorgs have occurred on this slot", func() {
It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testMultipleReorgs(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry)
})
})
Context("Altair: Multiple reorgs have occurred on this slot", func() {
It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.testMultipleReorgs(bc, TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry)
})
})
@ -523,18 +588,25 @@ func validateBeaconState(bc *beaconclient.BeaconClient, headMessage beaconclient
}
// Wrapper function to send a head message to the beaconclient
func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxRetry int, expectedSuccessfulInserts uint64) {
data, err := json.Marshal(head)
Expect(err).ToNot(HaveOccurred())
func sendHeadMessage(bc *beaconclient.BeaconClient, maxRetry int, expectedSuccessfulInserts uint64, head ...beaconclient.Head) {
var (
data []byte
err error
)
startInserts := atomic.LoadUint64(&bc.Metrics.SlotInserts)
for _, ms := range head {
data, err = json.Marshal(ms)
Expect(err).ToNot(HaveOccurred())
time.Sleep(1 * time.Second)
bc.HeadTracking.MessagesCh <- &sse.Event{
ID: []byte{},
Data: data,
Event: []byte{},
Retry: []byte{},
}
}
curRetry := 0
for atomic.LoadUint64(&bc.Metrics.SlotInserts) != startInserts+expectedSuccessfulInserts {
time.Sleep(1 * time.Second)
@ -832,13 +904,13 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string
// Helper function to test three reorg messages. There are going to be many functions like this,
// Because we need to test the same logic for multiple phases.
func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) {
go bc.CaptureHead()
startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
go bc.CaptureHead(ctx, 2, true)
time.Sleep(1 * time.Second)
log.Info("Sending Messages to BeaconClient")
sendHeadMessage(bc, firstHead, maxRetry, 1)
sendHeadMessage(bc, secondHead, maxRetry, 1)
sendHeadMessage(bc, thirdHead, maxRetry, 1)
sendHeadMessage(bc, maxRetry, 3, firstHead, secondHead, thirdHead)
curRetry := 0
for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 2 {
@ -890,13 +962,22 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
validateSlot(bc, secondHead, epoch, "proposed")
validateSlot(bc, thirdHead, epoch, "forked")
cancel()
testStopHeadTracking(ctx, bc, startGoRoutines, true)
}
// A test to validate a single block was processed correctly
func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) {
go bc.CaptureHead()
func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64, head ...headBlocksSent) {
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
go bc.CaptureHead(ctx, 2, true)
time.Sleep(1 * time.Second)
sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert)
heads := make([]beaconclient.Head, 0)
for _, msgs := range head {
heads = append(heads, msgs.head)
}
sendHeadMessage(bc, maxRetry, expectedSuccessInsert, heads...)
curRetry := 0
for atomic.LoadUint64(&bc.Metrics.KnownGapsInserts) != expectedKnownGaps {
@ -912,23 +993,28 @@ func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head b
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
Fail(fmt.Sprintf("Wrong reorg metrics, got: %d, wanted %d", bc.Metrics.KnownGapsInserts, expectedKnownGaps))
Fail(fmt.Sprintf("Wrong reorg metrics, got: %d, wanted %d", bc.Metrics.ReorgInserts, expectedReorgs))
}
}
if expectedSuccessInsert > 0 {
validateSlot(bc, head, epoch, "proposed")
for _, msg := range head {
validateSlot(bc, msg.head, msg.expectedEpoch, msg.expectStatus)
}
}
cancel()
testStopHeadTracking(ctx, bc, startGoRoutines, true)
}
// A test that ensures that if two HeadMessages occur for a single slot they are marked
// as proposed and forked correctly.
func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) {
go bc.CaptureHead()
startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
go bc.CaptureHead(ctx, 2, true)
time.Sleep(1 * time.Second)
sendHeadMessage(bc, firstHead, maxRetry, 1)
sendHeadMessage(bc, secondHead, maxRetry, 1)
sendHeadMessage(bc, maxRetry, 2, firstHead, secondHead)
curRetry := 0
for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 1 {
@ -946,18 +1032,22 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH
log.Info("Checking Altair to make sure the fork was marked properly.")
validateSlot(bc, firstHead, epoch, "forked")
validateSlot(bc, secondHead, epoch, "proposed")
cancel()
testStopHeadTracking(ctx, bc, startGoRoutines, true)
}
// A test that ensures that if two HeadMessages occur for a single slot they are marked
// as proposed and forked correctly.
func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) {
bc.KnownGapTableIncrement = tableIncrement
go bc.CaptureHead()
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
go bc.CaptureHead(ctx, 2, true)
time.Sleep(1 * time.Second)
for _, headMsg := range msg {
sendHeadMessage(bc, headMsg, maxRetry, 1)
}
sendHeadMessage(bc, maxRetry, 1, msg...)
curRetry := 0
for atomic.LoadUint64(&bc.Metrics.KnownGapsInserts) != expectedEntries {
@ -975,6 +1065,8 @@ func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, t
if atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 0 {
Fail("We found reorgs when we didn't expect it")
}
cancel()
testStopHeadTracking(ctx, bc, startGoRoutines, true)
}
// This function will make sure we are properly able to get the SszRoot of the SignedBeaconBlock and the BeaconState.
@ -991,3 +1083,22 @@ func testSszRoot(msg Message) {
Expect(err).ToNot(HaveOccurred())
Expect(msg.HeadMessage.Block).To(Equal("0x" + hex.EncodeToString(blockRoot[:])))
}
// A make shift function to stop head tracking and insure we dont have any goroutine leaks
func testStopHeadTracking(ctx context.Context, bc *beaconclient.BeaconClient, startGoRoutines int, skipSse bool) {
bc.StopHeadTracking(ctx, skipSse)
time.Sleep(3 * time.Second)
endNum := runtime.NumGoroutine()
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
log.WithField("startNum", startGoRoutines).Info("Start Go routine number")
log.WithField("endNum", endNum).Info("End Go routine number")
//Expect(endNum <= startGoRoutines).To(BeTrue())
Expect(endNum).To(Equal(startGoRoutines))
}
type headBlocksSent struct {
head beaconclient.Head
expectedEpoch int
expectStatus string
}

View File

@ -37,14 +37,18 @@ func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []e
}
// This function will perform all the necessary clean up tasks for stopping historical processing.
func (bc *BeaconClient) StopHistoric(cancel context.CancelFunc) error {
func (bc *BeaconClient) StopHistoricProcess(ctx context.Context) error {
select {
case <-ctx.Done():
log.Info("We are stopping the historical processing service.")
cancel()
err := bc.HistoricalProcess.releaseDbLocks()
if err != nil {
loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.historic_processing table. Manual Intervention is needed!")
}
return nil
default:
return fmt.Errorf("Tried to stop historic before the context ended...")
}
}
// An interface to enforce any batch processing. Currently there are two use cases for this.
@ -93,9 +97,9 @@ type batchHistoricError struct {
// 5. Handle any errors.
func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) []error {
slotsCh := make(chan slotsToProcess)
workCh := make(chan int)
processedCh := make(chan slotsToProcess)
errCh := make(chan batchHistoricError)
workCh := make(chan int, 5)
processedCh := make(chan slotsToProcess, 5)
errCh := make(chan batchHistoricError, 5)
finalErrCh := make(chan []error, 1)
// Checkout Rows with same node Identifier.
@ -116,6 +120,8 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing,
for {
select {
case <-ctx.Done():
close(workCh)
close(processedCh)
return
case slots := <-slotsCh:
if slots.startSlot > slots.endSlot {
@ -164,8 +170,9 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing,
errs := bp.getSlotRange(ctx, slotsCh) // Periodically adds new entries....
if errs != nil {
finalErrCh <- errs
}
} else {
finalErrCh <- nil
}
log.Debug("We are stopping the processing of adding new entries")
}()
log.Debug("Waiting for shutdown signal from channel")

View File

@ -3,6 +3,7 @@ package beaconclient_test
import (
"context"
"fmt"
"runtime"
"sync/atomic"
"time"
@ -24,17 +25,18 @@ var _ = Describe("Capturehistoric", func() {
Describe("Run the application in historic mode", Label("unit", "behavioral", "historical"), func() {
Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.historic_process table.", Label("deb"), func() {
It("Successfully Process the Blocks", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 101, 10)
BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0)
// Run Two seperate processes
BeaconNodeTester.writeEventToHistoricProcess(bc, 2375703, 2375703, 10)
BeaconNodeTester.runHistoricalProcess(bc, 2, 3, 0, 0, 0)
time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc)
})
})
Context("When the start block is greater than the endBlock", func() {
@ -70,11 +72,12 @@ var _ = Describe("Capturehistoric", func() {
})
})
Describe("Running the Application to process Known Gaps", Label("unit", "behavioral", "knownGaps"), func() {
Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.known_gaps table.", func() {
Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.known_gaps table.", Label("leak"), func() {
It("Successfully Process the Blocks", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.writeEventToKnownGaps(bc, 100, 101)
BeaconNodeTester.runKnownGapsProcess(bc, 2, 2, 0, 0, 0)
// Run Two seperate processes
@ -83,6 +86,7 @@ var _ = Describe("Capturehistoric", func() {
time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc)
})
})
Context("When the start block is greater than the endBlock", func() {
@ -107,13 +111,18 @@ var _ = Describe("Capturehistoric", func() {
})
})
Describe("Running the application in Historic, Head, and KnownGaps mode", Label("unit", "historical", "full"), func() {
Context("When it recieves a head, historic and known Gaps message (in order)", func() {
Context("When it recieves a head, historic and known Gaps message (in order)", Label("deb"), func() {
It("Should process them all successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
// Head
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
head: BeaconNodeTester.TestEvents["2375703"].HeadMessage,
expectedEpoch: 74240,
expectStatus: "proposed",
})
// Historical
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10)
@ -125,19 +134,25 @@ var _ = Describe("Capturehistoric", func() {
time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc)
})
})
Context("When it recieves a historic, head and known Gaps message (in order)", func() {
It("Should process them all successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
// Historical
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10)
BeaconNodeTester.runHistoricalProcess(bc, 2, 1, 0, 0, 0)
// Head
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
head: BeaconNodeTester.TestEvents["2375703"].HeadMessage,
expectedEpoch: 74240,
expectStatus: "proposed",
})
// Known Gaps
BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101)
@ -149,9 +164,10 @@ var _ = Describe("Capturehistoric", func() {
})
Context("When it recieves a known Gaps, historic and head message (in order)", func() {
It("Should process them all successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
// Known Gaps
BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101)
BeaconNodeTester.runKnownGapsProcess(bc, 2, 1, 0, 0, 0)
@ -161,7 +177,11 @@ var _ = Describe("Capturehistoric", func() {
BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0)
// Head
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
head: BeaconNodeTester.TestEvents["2375703"].HeadMessage,
expectedEpoch: 74240,
expectStatus: "proposed",
})
time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc)
@ -200,25 +220,23 @@ func (tbc TestBeaconNode) writeEventToHistoricProcess(bc *beaconclient.BeaconCli
// Start the CaptureHistoric function, and check for the correct inserted slots.
func (tbc TestBeaconNode) runHistoricalProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
go bc.CaptureHistoric(ctx, maxWorkers)
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)
log.Debug("Calling the stop function for historical processing..")
err := bc.StopHistoric(cancel)
time.Sleep(5 * time.Second)
Expect(err).ToNot(HaveOccurred())
validateAllRowsCheckedOut(bc.Db, hpCheckCheckedOutStmt)
cancel()
testStopHistoricProcessing(ctx, bc, startGoRoutines)
}
// Wrapper function that processes knownGaps
func (tbc TestBeaconNode) runKnownGapsProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
go bc.ProcessKnownGaps(ctx, maxWorkers)
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)
err := bc.StopKnownGapsProcessing(cancel)
time.Sleep(5 * time.Second)
Expect(err).ToNot(HaveOccurred())
validateAllRowsCheckedOut(bc.Db, kgCheckCheckedOutStmt)
cancel()
testStopKnownGapProcessing(ctx, bc, startGoRoutines)
}
func validateMetrics(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
@ -288,3 +306,37 @@ func validateAllRowsCheckedOut(db sql.Database, checkStmt string) {
Expect(err).ToNot(HaveOccurred())
Expect(rows).To(Equal(int64(0)))
}
// A make shift function to stop head tracking and insure we dont have any goroutine leaks
func testStopHistoricProcessing(ctx context.Context, bc *beaconclient.BeaconClient, startGoRoutines int) {
log.Debug("Calling the stop function for historical processing..")
err := bc.StopHistoricProcess(ctx)
Expect(err).ToNot(HaveOccurred())
time.Sleep(5 * time.Second)
validateAllRowsCheckedOut(bc.Db, hpCheckCheckedOutStmt)
time.Sleep(5 * time.Second)
endNum := runtime.NumGoroutine()
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
//Expect(endNum <= startGoRoutines).To(BeTrue())
log.WithField("startNum", startGoRoutines).Info("Start Go routine number")
log.WithField("endNum", endNum).Info("End Go routine number")
Expect(endNum).To(Equal(startGoRoutines))
}
// A make shift function to stop head tracking and insure we dont have any goroutine leaks
func testStopKnownGapProcessing(ctx context.Context, bc *beaconclient.BeaconClient, startGoRoutines int) {
log.Debug("Calling the stop function for knownGaps processing..")
err := bc.StopKnownGapsProcessing(ctx)
Expect(err).ToNot(HaveOccurred())
time.Sleep(5 * time.Second)
validateAllRowsCheckedOut(bc.Db, kgCheckCheckedOutStmt)
time.Sleep(3 * time.Second)
endNum := runtime.NumGoroutine()
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
//Expect(endNum <= startGoRoutines).To(BeTrue())
Expect(endNum).To(Equal(startGoRoutines))
}

View File

@ -18,44 +18,37 @@
package beaconclient
import (
"context"
"encoding/json"
"time"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper"
"golang.org/x/sync/errgroup"
)
var (
shutdownWaitInterval = time.Duration(5) * time.Second
)
// This function will capture all the SSE events for a given SseEvents object.
// When new messages come in, it will ensure that they are decoded into JSON.
// If any errors occur, it log the error information.
func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMetricInc func(uint64)) {
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
err := eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh)
func handleIncomingSseEvent[P ProcessedEvents](ctx context.Context, eventHandler *SseEvents[P], errMetricInc func(uint64), skipSse bool) {
if !skipSse {
for {
err := eventHandler.SseClient.SubscribeChanRawWithContext(ctx, eventHandler.MessagesCh)
if err != nil {
return err
loghelper.LogEndpoint(eventHandler.Endpoint).WithFields(log.Fields{
"err": err}).Error("We are unable to subscribe to the SSE endpoint")
time.Sleep(3 * time.Second)
continue
}
return nil
})
if err := errG.Wait(); err != nil {
log.WithFields(log.Fields{
"err": err,
"endpoint": eventHandler.Endpoint,
}).Error("Unable to subscribe to the SSE endpoint.")
return
} else {
loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.")
break
}
}
}()
for {
select {
case <-ctx.Done():
close(eventHandler.ProcessCh)
return
case message := <-eventHandler.MessagesCh:
// Message can be nil if its a keep-alive message
if len(message.Data) != 0 {
@ -76,23 +69,24 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMe
}
// Turn the data object into a Struct.
func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan<- *SseError) {
func processMsg[P ProcessedEvents](msg []byte, processCh chan<- P, errorCh chan<- SseError) {
var msgMarshaled P
err := json.Unmarshal(msg, &msgMarshaled)
if err != nil {
loghelper.LogError(err).Error("Unable to parse message")
errorCh <- &SseError{
errorCh <- SseError{
err: err,
msg: msg,
}
return
}
processCh <- &msgMarshaled
processCh <- msgMarshaled
log.Debug("Done sending")
}
// Capture all of the event topics.
func (bc *BeaconClient) captureEventTopic() {
func (bc *BeaconClient) captureEventTopic(ctx context.Context, skipSse bool) {
log.Info("We are capturing all SSE events")
go handleIncomingSseEvent(bc.HeadTracking, bc.Metrics.IncrementHeadError)
go handleIncomingSseEvent(bc.ReOrgTracking, bc.Metrics.IncrementReorgError)
go handleIncomingSseEvent(ctx, bc.HeadTracking, bc.Metrics.IncrementHeadError, skipSse)
go handleIncomingSseEvent(ctx, bc.ReOrgTracking, bc.Metrics.IncrementReorgError, skipSse)
}

View File

@ -19,32 +19,49 @@
package beaconclient
import (
"context"
"fmt"
"strconv"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/database/sql"
)
// This function will perform the necessary steps to handle a reorg.
func (bc *BeaconClient) handleReorg() {
func (bc *BeaconClient) handleReorg(ctx context.Context) {
log.Info("Starting to process reorgs.")
for {
reorg := <-bc.ReOrgTracking.ProcessCh
select {
case <-ctx.Done():
return
case reorg := <-bc.ReOrgTracking.ProcessCh:
log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.")
writeReorgs(bc.Db, reorg.Slot, reorg.NewHeadBlock, bc.Metrics)
}
}
}
// This function will handle the latest head event.
func (bc *BeaconClient) handleHead() {
func (bc *BeaconClient) handleHead(ctx context.Context, maxWorkers int) {
log.Info("Starting to process head.")
workCh := make(chan workParams, 5)
log.WithField("workerNumber", maxWorkers).Info("Creating Workers")
for i := 1; i < maxWorkers; i++ {
go bc.headBlockProcessor(ctx, workCh)
}
errorSlots := 0
for {
head := <-bc.HeadTracking.ProcessCh
select {
case <-ctx.Done():
close(workCh)
return
case head := <-bc.HeadTracking.ProcessCh:
// Process all the work here.
slot, err := strconv.Atoi(head.Slot)
if err != nil {
bc.HeadTracking.ErrorCh <- &SseError{
bc.HeadTracking.ErrorCh <- SseError{
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
}
errorSlots = errorSlots + 1
@ -66,12 +83,38 @@ func (bc *BeaconClient) handleHead() {
bc.StartingSlot = slot
}
go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement, bc.CheckDb)
log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.")
workCh <- workParams{db: bc.Db, serverEndpoint: bc.ServerEndpoint, slot: slot, blockRoot: head.Block, stateRoot: head.State, previousSlot: bc.PreviousSlot, previousBlockRoot: bc.PreviousBlockRoot, metrics: bc.Metrics, knownGapsTableIncrement: bc.KnownGapTableIncrement, checkDb: bc.CheckDb}
log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished sending this slot to the workCh")
// Update the previous block
bc.PreviousSlot = slot
bc.PreviousBlockRoot = head.Block
}
}
}
// A worker that will process head slots.
func (bc *BeaconClient) headBlockProcessor(ctx context.Context, workCh <-chan workParams) {
for {
select {
case <-ctx.Done():
return
case wp := <-workCh:
processHeadSlot(ctx, wp.db, wp.serverEndpoint, wp.slot, wp.blockRoot, wp.stateRoot, wp.previousSlot, wp.previousBlockRoot, wp.metrics, wp.knownGapsTableIncrement, wp.checkDb)
}
}
}
// A struct used to pass parameters to the worker.
type workParams struct {
db sql.Database
serverEndpoint string
slot int
blockRoot string
stateRoot string
previousSlot int
previousBlockRoot string
metrics *BeaconClientMetrics
knownGapsTableIncrement int
checkDb bool
}

View File

@ -132,6 +132,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
for len(errCount) < 5 {
select {
case <-ctx.Done():
close(slotCh)
return errCount
default:
if len(errCount) != prevErrCount {
@ -228,7 +229,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
// After a row has been processed it should be removed from its appropriate table.
func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan slotsToProcess, checkProcessedStmt, removeStmt string) error {
errCh := make(chan error)
errCh := make(chan error, 1)
for {
select {
case <-ctx.Done():
@ -241,6 +242,10 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan
"endSlot": slots.endSlot,
}).Debug("Starting to check to see if the following slots have been processed")
for {
select {
case <-ctx.Done():
return
default:
isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot))
if err != nil {
errCh <- err
@ -250,16 +255,15 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan
errCh <- err
}
if isStartProcess && isEndProcess {
break
}
time.Sleep(3 * time.Second)
}
_, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot))
if err != nil {
errCh <- err
}
return
}
time.Sleep(3 * time.Second)
}
}
}()
if len(errCh) != 0 {
return <-errCh

View File

@ -20,6 +20,7 @@ package beaconclient
import (
"context"
"fmt"
"strconv"
log "github.com/sirupsen/logrus"
@ -67,14 +68,18 @@ func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) []
}
// This function will perform all the necessary clean up tasks for stopping historical processing.
func (bc *BeaconClient) StopKnownGapsProcessing(cancel context.CancelFunc) error {
func (bc *BeaconClient) StopKnownGapsProcessing(ctx context.Context) error {
select {
case <-ctx.Done():
log.Info("We are stopping the known gaps processing service.")
cancel()
err := bc.KnownGapsProcess.releaseDbLocks()
if err != nil {
loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.known_gaps table. Manual Intervention is needed!")
}
return nil
default:
return fmt.Errorf("Tried to stop knownGaps Processing without closing the context..")
}
}
// Get a single row of historical slots from the table.

View File

@ -156,10 +156,6 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
if err := g.Wait(); err != nil {
// Make sure channel is empty.
select {
case <-vUnmarshalerCh:
default:
}
return err, "processSlot"
}
@ -239,12 +235,12 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
}
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) {
func processHeadSlot(ctx context.Context, db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) {
// Get the knownGaps at startUp.
if previousSlot == 0 && previousBlockRoot == "" {
writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics)
}
err, errReason := processFullSlot(context.Background(), db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb)
err, errReason := processFullSlot(ctx, db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb)
if err != nil {
writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics)
}
@ -296,14 +292,20 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *d
// Update the SszBeaconState and FullBeaconState object with their respective values.
func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.VersionedUnmarshaler) error {
var stateIdentifier string // Used to query the state
var (
stateIdentifier string // Used to query the state
err error
)
if ps.StateRoot != "" {
stateIdentifier = ps.StateRoot
} else {
stateIdentifier = strconv.Itoa(ps.Slot)
}
stateEndpoint := serverEndpoint + BcStateQueryEndpoint + stateIdentifier
ps.SszBeaconState, _, _ = querySsz(stateEndpoint, strconv.Itoa(ps.Slot))
ps.SszBeaconState, _, err = querySsz(stateEndpoint, strconv.Itoa(ps.Slot))
if err != nil {
return fmt.Errorf("Unable to querrySSZ")
}
versionedUnmarshaler, err := dt.FromState(*ps.SszBeaconState)
if err != nil {

View File

@ -43,12 +43,20 @@ func querySsz(endpoint string, slot string) (*[]byte, int, error) {
return nil, 0, fmt.Errorf("Unable to query Beacon Node: %s", err.Error())
}
defer response.Body.Close()
// Needed for testing.... But might be interesting to test with...
defer client.CloseIdleConnections()
rc := response.StatusCode
//var body []byte
//io.Copy(body, response.Body)
//bytes.buffer...
//_, err = response.Body.Read(body)
body, err := ioutil.ReadAll(response.Body)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!")
return nil, rc, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error())
}
//log.WithField("body", unsafe.Sizeof(body)).Debug("Size of the raw SSZ object")
return &body, rc, nil
}

View File

@ -1,11 +1,13 @@
package beaconclient_test
import (
"context"
"os"
"strconv"
"time"
. "github.com/onsi/ginkgo/v2"
//. "github.com/onsi/gomega"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/beaconclient"
)
@ -28,7 +30,7 @@ var (
)
var _ = Describe("Systemvalidation", Label("system"), func() {
Describe("Run the application against a running lighthouse node", func() {
Context("When we receive head messages", func() {
Context("When we receive head messages", Label("system-head"), func() {
It("We should process the messages successfully", func() {
bc := setUpTest(prodConfig, "10000000000")
processProdHeadBlocks(bc, 3, 0, 0, 0)
@ -63,7 +65,23 @@ func getEnvInt(envVar string) int {
// Start head tracking and wait for the expected results.
func processProdHeadBlocks(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
go bc.CaptureHead()
time.Sleep(1 * time.Second)
//startGoRoutines := runtime.NumGoroutine()
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
ctx, cancel := context.WithCancel(context.Background())
go bc.CaptureHead(ctx, 2, false)
time.Sleep(5 * time.Second)
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)
cancel()
time.Sleep(4 * time.Second)
testStopSystemHeadTracking(ctx, bc)
}
// Custom stop for system testing
func testStopSystemHeadTracking(ctx context.Context, bc *beaconclient.BeaconClient) {
bc.StopHeadTracking(ctx, false)
time.Sleep(3 * time.Second)
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
//Expect(endNum <= startGoRoutines).To(BeTrue())
}

View File

@ -45,7 +45,11 @@ func Shutdown(ctx context.Context, notifierCh chan os.Signal, timeout time.Durat
// add any other syscalls that you want to be notified with
signal.Notify(notifierCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
<-notifierCh
// Wait for one or the other...
select {
case <-notifierCh:
case <-ctx.Done():
}
log.Info("Shutting Down your application")