Add functions to interact with the beacon client #18

Merged
abdulrabbani00 merged 7 commits from feature/17-interact-with-the-beacon-client into develop 2022-04-27 18:07:46 +00:00
27 changed files with 714 additions and 66 deletions

View File

View File

@ -14,10 +14,11 @@ on:
pull_request:
paths:
- "!**.md"
- ".gitignore"
- "!.gitignore"
- "!LICENSE"
- "!.github/workflows/**"
- ".github/workflows/on-pr.yml"
- "**"
jobs:
build:
@ -33,8 +34,8 @@ jobs:
runs-on: ubuntu-latest
## IF you want to update the default branch for `pull_request runs, do it after the ||`
env:
foundry-test-ref: ${{ github.event.inputs.foundry-test-ref || 'feature/build-stack'}}
ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || 'main' }}
foundry-test-ref: ${{ github.event.inputs.foundry-test-ref || 'c17752de64f208f286f02379b80d2a935237c860'}}
ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || '05600e51d2163e1c5e2a872cb54606bc0a380d12' }}
GOPATH: /tmp/go
steps:
- name: Create GOPATH

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
ipld-ethcl-indexer
ipld-ethcl-indexer.log
report.json
cover.profile

View File

@ -30,6 +30,26 @@ integration-test-ci:
--cover --coverprofile=cover.profile \
--race --trace --json-report=report.json
.PHONY: integration-test-local
integration-test-local:
go vet ./...
go fmt ./...
$(GINKGO) -r --label-filter integration \
--procs=4 --compilers=4 \
--randomize-all --randomize-suites \
--fail-on-pending --keep-going \
--race --trace
.PHONY: unit-test-local
unit-test-local:
go vet ./...
go fmt ./...
$(GINKGO) -r --label-filter unit \
--procs=4 --compilers=4 \
--randomize-all --randomize-suites \
--fail-on-pending --keep-going \
--race --trace
.PHONY: unit-test-ci
unit-test-ci:
go vet ./...

View File

@ -10,6 +10,18 @@ This document will go through various application components
# Components
## Boot
## `internal/boot`
The boot package in `internal` is utilized to start the application. Everything in the boot process must complete successfully for the application to start. If it does not, the application will not start.
## `pkg/database`
The `database` package allows us to interact with a postgres DB. We utilize the interface to ensure we can interact with any `sql` database as well. I copied most of the code here from `vulcanize/go-ethereum`. Down the road, internal teams should be able to reference the package instead of copy pasting it and re-implementing it.
## `pkg/beaconclient`
This package will contain code to interact with the beacon client.
## `pkg/version`
A generic package which can be utilized to easily version our applications.

View File

@ -6,6 +6,7 @@ package cmd
import (
"os"
"time"
"github.com/spf13/cobra"
"github.com/spf13/viper"
@ -20,6 +21,8 @@ var (
dbPort int
bcAddress string
bcPort int
bcConnectionProtocol string
maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second
)
// captureCmd represents the capture command
@ -60,6 +63,7 @@ func init() {
//// Beacon Client Specific
captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required if username is set)")
captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required if username is set)")
captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.")
err = captureCmd.MarkPersistentFlagRequired("bc.address")
exitErr(err)
err = captureCmd.MarkPersistentFlagRequired("bc.port")
@ -85,6 +89,8 @@ func init() {
exitErr(err)
err = viper.BindPFlag("bc.port", captureCmd.PersistentFlags().Lookup("bc.port"))
exitErr(err)
err = viper.BindPFlag("bc.connectionProtocol", captureCmd.PersistentFlags().Lookup("bc.connectionProtocol"))
exitErr(err)
// Here you will define your flags and configuration settings.
}

View File

@ -5,8 +5,13 @@ Copyright © 2022 NAME HERE <EMAIL ADDRESS>
package cmd
import (
"context"
"time"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
@ -22,10 +27,20 @@ var headCmd = &cobra.Command{
// Start the application to track at head.
func startHeadTracking() {
_, err := boot.BootApplicationWithRetry(dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort)
// Boot the application
log.Info("Starting the application in head tracking mode.")
ctx := context.Background()
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
if err != nil {
loghelper.LogError(err).Error("Unable to Start application")
}
// Capture head blocks
go BC.CaptureHead()
// Shutdown when the time is right.
shutdown.ShutdownServices(ctx, time.Duration(maxWaitSecondsShutdown), DB, BC)
}
func init() {

4
go.mod
View File

@ -19,13 +19,16 @@ require (
github.com/jackc/puddle v1.2.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lib/pq v1.10.4 // indirect
github.com/minio/sha256-simd v0.1.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
)
require (
github.com/ferranbt/fastssz v0.0.0-20220303160658-88bb965b6747
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/georgysavva/scany v0.3.0
github.com/hashicorp/hcl v1.0.0 // indirect
@ -35,6 +38,7 @@ require (
github.com/mitchellh/mapstructure v1.4.3 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pelletier/go-toml/v2 v2.0.0-beta.8 // indirect
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/cobra v1.4.0

12
go.sum
View File

@ -65,6 +65,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/ferranbt/fastssz v0.0.0-20220303160658-88bb965b6747 h1:K2Bt7NSX8x/5MD2RiO7cPLy21dBgnQ84r9uR0QYoHrE=
github.com/ferranbt/fastssz v0.0.0-20220303160658-88bb965b6747/go.mod h1:S8yiDeAXy8f88W4Ul+0dBMPx49S05byYbmZD6Uv94K4=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
@ -112,6 +114,8 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
@ -251,6 +255,9 @@ github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU=
github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
github.com/mitchellh/mapstructure v1.3.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs=
github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
@ -276,6 +283,8 @@ github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qR
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o=
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
@ -401,6 +410,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@ -660,6 +670,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=

View File

@ -1,48 +1,23 @@
package boot
import (
"fmt"
"net/http"
"strconv"
"context"
"time"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql/postgres"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
var (
bcHealthEndpoint = "/eth/v1/node/health"
maxRetry = 5 // Max times to try to connect to the DB or BC at boot.
retryInterval = 30 // The time to wait between each try.
DB sql.Database = &postgres.DB{}
BC *beaconclient.BeaconClient = &beaconclient.BeaconClient{}
)
// This function will ensure that we can connect to the beacon client.
// Keep in mind, the beacon client will allow you to connect to it but it might
// Not allow you to make http requests. This is part of its built in logic, and you will have
// to follow their provided guidelines. https://lighthouse-book.sigmaprime.io/api-bn.html#security
func checkBeaconClient(bcAddress string, bcPort int) error {
log.Debug("Attempting to connect to the beacon client")
bcEndpoint := "http://" + bcAddress + ":" + strconv.Itoa(bcPort) + bcHealthEndpoint
resp, err := http.Get(bcEndpoint)
if err != nil {
loghelper.LogError(err).Error("Unable to get bc endpoint: ", bcEndpoint)
return err
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
log.Error("We recieved a non 2xx status code when checking the health of the beacon node.")
log.Error("Health Endpoint Status Code: ", resp.StatusCode)
return fmt.Errorf("beacon Node Provided a non 2xx status code, code provided: %d", resp.StatusCode)
}
log.Info("We can successfully reach the beacon client.")
return nil
}
// A simple wrapper to create a DB object to use.
func SetupPostgresDb(dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string) (sql.Database, error) {
log.Debug("Resolving Driver Type")
@ -81,35 +56,38 @@ func SetupPostgresDb(dbHostname string, dbPort int, dbName string, dbUsername st
//
// 2. Connect to the database.
//
func BootApplication(dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int) (sql.Database, error) {
func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string) (*beaconclient.BeaconClient, sql.Database, error) {
log.Info("Booting the Application")
log.Debug("Checking beacon Client")
err := checkBeaconClient(bcAddress, bcPort)
log.Debug("Creating the Beacon Client")
BC = beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort)
log.Debug("Checking Beacon Client")
err := BC.CheckBeaconClient()
if err != nil {
return nil, err
return nil, nil, err
}
log.Debug("Setting up DB connection")
DB, err := SetupPostgresDb(dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName)
if err != nil {
return nil, err
return nil, nil, err
}
return DB, nil
return BC, DB, nil
}
// Add retry logic to ensure that we are give the Beacon Client and the DB time to start.
func BootApplicationWithRetry(dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int) (sql.Database, error) {
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string) (*beaconclient.BeaconClient, sql.Database, error) {
var err error
for i := 0; i < maxRetry; i++ {
DB, err = BootApplication(dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort)
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol)
if err != nil {
log.WithFields(log.Fields{
"retryNumber": i,
}).Warn("Unable to boot application. Going to try again")
time.Sleep(time.Duration(retryInterval) * time.Second)
continue
}
break
}
return DB, err
return BC, DB, err
}

View File

@ -1,6 +1,8 @@
package boot_test
import (
"context"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
@ -16,30 +18,31 @@ var _ = Describe("Boot", func() {
dbDriver string = "PGX"
bcAddress string = "localhost"
bcPort int = 5052
bcConnectionProtocol string = "http"
)
Describe("Booting the application", Label("integration"), func() {
Context("When the DB and BC are both up and running", func() {
It("Should connect successfully", func() {
db, err := boot.BootApplicationWithRetry(dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort)
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
defer db.Close()
Expect(err).To(BeNil())
})
})
Context("When the DB is running but not the BC", func() {
It("Should not connect successfully", func() {
_, err := boot.BootApplication(dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100)
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol)
Expect(err).ToNot(BeNil())
})
})
Context("When the BC is running but not the DB", func() {
It("Should not connect successfully", func() {
_, err := boot.BootApplication("hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort)
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
Expect(err).ToNot(BeNil())
})
})
Context("When neither the BC or DB are running", func() {
It("Should not connect successfully", func() {
_, err := boot.BootApplication("hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100)
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol)
Expect(err).ToNot(BeNil())
})
})

View File

@ -0,0 +1,39 @@
package shutdown
import (
"context"
"time"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// Shutdown all the internal services for the application.
func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) {
successCh, errCh := gracefulshutdown.Shutdown(ctx, waitTime, map[string]gracefulshutdown.Operation{
"database": func(ctx context.Context) error {
err := DB.Close()
if err != nil {
loghelper.LogError(err).Error("Unable to close the DB")
}
return err
},
"beaconClient": func(ctx context.Context) error {
err := BC.StopHeadTracking()
if err != nil {
loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking")
}
return err
},
})
select {
case _ = <-successCh:
log.Info("Gracefully Shutdown ipld-ethcl-indexer!")
case err := <-errCh:
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
}
}

View File

@ -0,0 +1,73 @@
package beaconclient
import (
"context"
"fmt"
"github.com/r3labs/sse"
log "github.com/sirupsen/logrus"
)
var (
bcHealthEndpoint = "/eth/v1/node/health" // Endpoint used for the healthcheck
bcHeadTopicEndpoint = "/eth/v1/events?topics=head" // Endpoint used to subscribe to the head of the chain
bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain
bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
bcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks
bcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States
)
// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying.
type BeaconClient struct {
Context context.Context // A context generic context with multiple uses.
ServerEndpoint string // What is the endpoint of the beacon server.
PerformHeadTracking bool // Should we track head?
PerformHistoricalProcessing bool // Should we perform historical processing?
HeadTracking *SseEvents[Head] // Track the head block
ReOrgTracking *SseEvents[ChainReorg] // Track all Reorgs
FinalizationTracking *SseEvents[FinalizedCheckpoint] // Track all finalization checkpoints
}
// A struct to keep track of relevant the head event topic.
type SseEvents[P ProcessedEvents] struct {
Endpoint string // The endpoint for the subscription. Primarily used for logging
MessagesCh chan *sse.Event // Contains all the messages from the SSE Channel
ErrorCh chan *SseError // Contains any errors while SSE streaming occurred
ProcessCh chan *P // Used to capture processed data in its proper struct.
SseClient *sse.Client // sse.Client object that is used to interact with the SSE stream
}
// An object to capture any errors when turning an SSE message to JSON.
type SseError struct {
err error
msg []byte
}
// A Function to create the BeaconClient.
func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int) *BeaconClient {
endpoint := fmt.Sprintf("%s://%s:%d", connectionProtocol, bcAddress, bcPort)
log.Info("Creating the BeaconClient")
return &BeaconClient{
Context: ctx,
ServerEndpoint: endpoint,
HeadTracking: createSseEvent[Head](endpoint, bcHeadTopicEndpoint),
ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
}
}
// Create all the channels to handle a SSE events
func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEvents[P] {
endpoint := baseEndpoint + path
sseEvents := &SseEvents[P]{
Endpoint: endpoint,
MessagesCh: make(chan *sse.Event),
ErrorCh: make(chan *SseError),
ProcessCh: make(chan *P),
SseClient: func(endpoint string) *sse.Client {
log.WithFields(log.Fields{"endpoint": endpoint}).Info("Creating SSE client")
return sse.NewClient(endpoint)
}(endpoint),
}
return sseEvents
}

View File

@ -0,0 +1,13 @@
package beaconclient_test
import (
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestBeaconClient(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "BeaconClient Suite", Label("beacon-client"))
}

View File

@ -0,0 +1,79 @@
// This file will call all the functions to start and stop capturing the head of the beacon chain.
package beaconclient
import (
"time"
"github.com/ferranbt/fastssz/spectests"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHead() {
log.Info("We are tracking the head of the chain.")
bc.tempHelper()
// go bc.handleHead()
// go bc.handleFinalizedCheckpoint()
// go bc.handleReorgs()
// bc.captureEventTopic()
}
// A temporary helper function to see the output of beacon block and states.
func (bc *BeaconClient) tempHelper() {
slot := "3200"
blockEndpoint := bc.ServerEndpoint + bcBlockQueryEndpoint + slot
stateEndpoint := bc.ServerEndpoint + bcStateQueryEndpoint + slot
// Query
log.Info("Get")
blockSsz, _ := querySsz(blockEndpoint, slot)
stateSsz, _ := querySsz(stateEndpoint, slot)
// Transform
log.Info("Tranform")
stateObj := new(spectests.BeaconState)
err := stateObj.UnmarshalSSZ(stateSsz)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
}
blockObj := new(spectests.SignedBeaconBlock)
err = blockObj.UnmarshalSSZ(blockSsz)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
}
// Check
log.Info("Check")
log.Info("State Slot: ", stateObj.Slot)
log.Info("Block Slot: ", blockObj.Block.Slot)
}
// Stop the head tracking service.
func (bc *BeaconClient) StopHeadTracking() error {
log.Info("We are going to stop tracking the head of chain because of the shutdown signal.")
chHead := make(chan bool)
chReorg := make(chan bool)
chFinal := make(chan bool)
go bc.HeadTracking.finishProcessingChannel(chHead)
go bc.ReOrgTracking.finishProcessingChannel(chReorg)
go bc.FinalizationTracking.finishProcessingChannel(chFinal)
<-chHead
<-chFinal
<-chReorg
log.Info("Successfully stopped the head tracking service.")
return nil
}
// This function closes the SSE subscription, but waits until the MessagesCh is empty
func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) {
loghelper.LogEndpoint(se.Endpoint).Info("Received a close event.")
se.SseClient.Unsubscribe(se.MessagesCh)
for len(se.MessagesCh) != 0 && len(se.ProcessCh) != 0 {
time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond)
}
loghelper.LogEndpoint(se.Endpoint).Info("Done processing all messages, ready for shutdown")
finish <- true
}

View File

@ -0,0 +1,33 @@
package beaconclient
import (
"fmt"
"net/http"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// This function will ensure that we can connect to the beacon client.
// Keep in mind, the beacon client will allow you to connect to it but it might
// Not allow you to make http requests. This is part of its built in logic, and you will have
// to follow their provided guidelines. https://lighthouse-book.sigmaprime.io/api-bn.html#security
func (bc BeaconClient) CheckBeaconClient() error {
log.Debug("Attempting to connect to the beacon client")
bcEndpoint := bc.ServerEndpoint + bcHealthEndpoint
resp, err := http.Get(bcEndpoint)
if err != nil {
loghelper.LogError(err).Error("Unable to get bc endpoint: ", bcEndpoint)
return err
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
log.Error("We recieved a non 2xx status code when checking the health of the beacon node.")
log.Error("Health Endpoint Status Code: ", resp.StatusCode)
return fmt.Errorf("beacon Node Provided a non 2xx status code, code provided: %d", resp.StatusCode)
}
log.Info("We can successfully reach the beacon client.")
return nil
}

View File

@ -0,0 +1,30 @@
package beaconclient_test
import (
"context"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
beaconclient "github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
)
var _ = Describe("Healthcheck", func() {
var (
BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052)
errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010)
)
Describe("Connecting to the lighthouse client", Label("integration"), func() {
Context("When the client is running", func() {
It("We should connect successfully", func() {
err := BC.CheckBeaconClient()
Expect(err).To(BeNil())
})
})
Context("When the client is not running", func() {
It("We not should connect successfully", func() {
err := errBc.CheckBeaconClient()
Expect(err).ToNot(BeNil())
})
})
})
})

View File

@ -0,0 +1,66 @@
// This package will handle all event subscriptions that utilize SSE.
package beaconclient
import (
"encoding/json"
"time"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
var (
shutdownWaitInterval = time.Duration(5) * time.Second
)
// This function will capture all the SSE events for a given SseEvents object.
// When new messages come in, it will ensure that they are decoded into JSON.
// If any errors occur, it log the error information.
func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) {
loghelper.LogEndpoint(eventHandler.Endpoint).Info("Subscribing to Messages")
go eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh)
for {
select {
case message := <-eventHandler.MessagesCh:
// Message can be nil if its a keep-alive message
if len(message.Data) != 0 {
go processMsg(message.Data, eventHandler.ProcessCh, eventHandler.ErrorCh)
}
case headErr := <-eventHandler.ErrorCh:
log.WithFields(log.Fields{
"endpoint": eventHandler.Endpoint,
"err": headErr.err,
"msg": headErr.msg,
},
).Error("Unable to handle event.")
case process := <-eventHandler.ProcessCh:
log.WithFields(log.Fields{"processed": process}).Debug("Processesing a Message")
}
}
}
// Turn the data object into a Struct.
func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan<- *SseError) {
var msgMarshaled P
err := json.Unmarshal(msg, &msgMarshaled)
if err != nil {
loghelper.LogError(err).Error("Unable to parse message")
errorCh <- &SseError{
err: err,
msg: msg,
}
return
}
processCh <- &msgMarshaled
}
// Capture all of the event topics.
func (bc *BeaconClient) captureEventTopic() {
log.Info("We are capturing all SSE events")
go handleIncomingSseEvent(bc.HeadTracking)
go handleIncomingSseEvent(bc.ReOrgTracking)
go handleIncomingSseEvent(bc.FinalizationTracking)
}

View File

@ -0,0 +1,37 @@
package beaconclient
// This interface captured what the events can be for processed event streams.
type ProcessedEvents interface {
Head | FinalizedCheckpoint | ChainReorg
}
// This struct captures the JSON representation of the head topic
type Head struct {
Slot string `json:"slot"`
Block string `json:"block"`
State string `json:"state"`
CurrentDutyDependentRoot string `json:"current_duty_dependent_root"`
PreviousDutyDependentRoot string `json:"previous_duty_dependent_root"`
EpochTransition bool `json:"epoch_transition"`
ExecutionOptimistic bool `json:"execution_optimistic"`
}
// This struct captures the JSON representation of the finalized_checkpoint topic.
type FinalizedCheckpoint struct {
Block string `json:"block"`
State string `json:"state"`
Epoch string `json:"epoch"`
ExecutionOptimistic bool `json:"execution_optimistic"`
}
// This struct captures the JSON representation of the chain_reorg topic.
type ChainReorg struct {
Slot string `json:"slot"`
Depth string `json:"depth"`
OldHeadBlock string `json:"old_head_block"`
NewHeadBlock string `json:"new_head_block"`
OldHeadState string `json:"old_head_state"`
NewHeadState string `json:"new_head_state"`
Epoch string `json:"epoch"`
ExecutionOptimistic bool `json:"execution_optimistic"`
}

View File

@ -0,0 +1,38 @@
// This file contains all the functions to handle SSE events after they have been turned
// to the structs.
package beaconclient
import log "github.com/sirupsen/logrus"
// This function will perform the necessary steps to handle a reorg.
func (bc *BeaconClient) handleReorgs() {
log.Info("Starting to process reorgs.")
for {
// We will add real functionality later
reorg := <-bc.ReOrgTracking.ProcessCh
log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.")
}
}
// This function will perform the necessary steps to handle a reorg.
func (bc *BeaconClient) handleFinalizedCheckpoint() {
log.Info("Starting to process finalized checkpoints.")
for {
// We will add real functionality later
finalized := <-bc.ReOrgTracking.ProcessCh
log.WithFields(log.Fields{"finalized": finalized}).Debug("Received a new finalized checkpoint.")
}
}
// This function will handle the latest head event.
func (bc *BeaconClient) handleHead() {
log.Info("Starting to process head.")
for {
// We will add real functionality later
head := <-bc.ReOrgTracking.ProcessCh
log.WithFields(log.Fields{"head": head}).Debug("Received a new head event.")
}
}

View File

@ -0,0 +1,101 @@
// This file will contain functions to query the Beacon Chain Server.
package beaconclient
import (
"fmt"
"io/ioutil"
"net/http"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// Attempt to use generics..
// // These are types that append slot at the end of the URL to handle a request.
// type SlotBasedRequests interface {
// *specs.BeaconState | *specs.SignedBeaconBlock
// UnmarshalSSZ([]byte) error
// }
//
// func queryState[R SlotBasedRequests](endpoint string, slot string) (R, error) {
// obj := new(R)
// rawState, err := querySlot(endpoint, slot)
// if err != nil {
// return *obj, err
// }
//
// err = &obj.UnmarshalSSZ(rawState)
// err = (*obj).UnmarshalSSZ(rawState)
// if err != nil {
// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node Successfully!: %s", err.Error())
// }
// return *obj, nil
// }
// This function will query a state object based on the slot provided.
// The object is SSZ encoded.
//type BeaconBlockResponse struct {
// version string `json: `
//}
// func queryState(endpoint string, slot string) (spectests.BeaconState, error) {
// obj := new(spectests.BeaconState)
// fullEndpoint := endpoint + slot
// rawState, err := querySsz(fullEndpoint, slot)
// if err != nil {
// return *obj, err
// }
//
// err = obj.UnmarshalSSZ(rawState)
// if err != nil {
// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node")
// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node: %s", err.Error())
// }
// return *obj, nil
// }
//
// // This function will query a state object based on the slot provided.
// // The object is SSZ encoded.
// func queryBlock(endpoint string, slot string) (spectests.SignedBeaconBlock, error) {
// obj := new(spectests.SignedBeaconBlock)
// fullEndpoint := endpoint + slot
// rawBlock, err := querySsz(fullEndpoint, slot)
// if err != nil {
// return *obj, err
// }
//
// err = obj.UnmarshalSSZ(rawBlock)
// if err != nil {
// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node Successfully!: %s", err.Error())
// }
// return *obj, nil
// }
// A helper function to query endpoints that utilize slots.
func querySsz(endpoint string, slot string) ([]byte, error) {
log.WithFields(log.Fields{"endpoint": endpoint}).Info("Querying endpoint")
client := &http.Client{}
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to create a request!")
return nil, fmt.Errorf("Unable to create a request!: %s", err.Error())
}
// Not set correctly
req.Header.Set("Accept", "application/octet-stream")
response, err := client.Do(req)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to query Beacon Node!")
return nil, fmt.Errorf("Unable to query Beacon Node: %s", err.Error())
}
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!")
return nil, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error())
}
return body, nil
}

View File

@ -0,0 +1,67 @@
package gracefulshutdown
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// operation is a clean up function on shutting down
type Operation func(ctx context.Context) error
// gracefulShutdown waits for termination syscalls and doing clean up operations after received it
func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operation) (<-chan struct{}, <-chan error) {
waitCh := make(chan struct{})
errCh := make(chan error)
go func() {
s := make(chan os.Signal, 1)
// add any other syscalls that you want to be notified with
signal.Notify(s, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
<-s
log.Info("Shutting Down your application")
// set timeout for the ops to be done to prevent system hang
timeoutFunc := time.AfterFunc(timeout, func() {
log.Warnf("timeout %d ms has been elapsed, force exit", timeout.Milliseconds())
errCh <- fmt.Errorf("Application shutdown took too long.")
return
})
defer timeoutFunc.Stop()
var wg sync.WaitGroup
// Do the operations asynchronously to save time
for key, op := range ops {
wg.Add(1)
innerOp := op
innerKey := key
go func() {
defer wg.Done()
log.Infof("cleaning up: %s", innerKey)
if err := innerOp(ctx); err != nil {
loghelper.LogError(err).Errorf("%s: clean up failed: %s", innerKey, err.Error())
return
}
log.Infof("%s was shutdown gracefully", innerKey)
}()
}
wg.Wait()
close(waitCh)
}()
return waitCh, errCh
}

View File

@ -0,0 +1,12 @@
package loghelper
import (
log "github.com/sirupsen/logrus"
)
// A simple helper function that will help wrap the error message.
func LogEndpoint(endpoint string) *log.Entry {
return log.WithFields(log.Fields{
"endpoint": endpoint,
})
}

View File

@ -11,3 +11,10 @@ func LogError(err error) *log.Entry {
"err": err,
})
}
func LogSlotError(slot string, err error) *log.Entry {
return log.WithFields(log.Fields{
"err": err,
"slot": slot,
})
}

BIN
tmp/ci/state Normal file

Binary file not shown.

BIN
tmp/code/block Normal file

Binary file not shown.

BIN
tmp/code/state Normal file

Binary file not shown.