diff --git a/.github/workflows/.env b/.github/workflows/.env deleted file mode 100644 index e69de29..0000000 diff --git a/.github/workflows/on-pr.yml b/.github/workflows/on-pr.yml index a033463..4e1f8ad 100644 --- a/.github/workflows/on-pr.yml +++ b/.github/workflows/on-pr.yml @@ -14,10 +14,11 @@ on: pull_request: paths: - "!**.md" - - ".gitignore" + - "!.gitignore" - "!LICENSE" - "!.github/workflows/**" - ".github/workflows/on-pr.yml" + - "**" jobs: build: @@ -33,8 +34,8 @@ jobs: runs-on: ubuntu-latest ## IF you want to update the default branch for `pull_request runs, do it after the ||` env: - foundry-test-ref: ${{ github.event.inputs.foundry-test-ref || 'feature/build-stack'}} - ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || 'main' }} + foundry-test-ref: ${{ github.event.inputs.foundry-test-ref || 'c17752de64f208f286f02379b80d2a935237c860'}} + ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || '05600e51d2163e1c5e2a872cb54606bc0a380d12' }} GOPATH: /tmp/go steps: - name: Create GOPATH diff --git a/.gitignore b/.gitignore index a0fdb30..d18d04f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ ipld-ethcl-indexer ipld-ethcl-indexer.log +report.json +cover.profile diff --git a/Makefile b/Makefile index 7b97424..bf2e654 100644 --- a/Makefile +++ b/Makefile @@ -30,6 +30,26 @@ integration-test-ci: --cover --coverprofile=cover.profile \ --race --trace --json-report=report.json +.PHONY: integration-test-local +integration-test-local: + go vet ./... + go fmt ./... + $(GINKGO) -r --label-filter integration \ + --procs=4 --compilers=4 \ + --randomize-all --randomize-suites \ + --fail-on-pending --keep-going \ + --race --trace + +.PHONY: unit-test-local +unit-test-local: + go vet ./... + go fmt ./... + $(GINKGO) -r --label-filter unit \ + --procs=4 --compilers=4 \ + --randomize-all --randomize-suites \ + --fail-on-pending --keep-going \ + --race --trace + .PHONY: unit-test-ci unit-test-ci: go vet ./... diff --git a/application_component.md b/application_component.md index 0dbc67d..cfd9e4b 100644 --- a/application_component.md +++ b/application_component.md @@ -10,6 +10,18 @@ This document will go through various application components # Components -## Boot +## `internal/boot` The boot package in `internal` is utilized to start the application. Everything in the boot process must complete successfully for the application to start. If it does not, the application will not start. + +## `pkg/database` + +The `database` package allows us to interact with a postgres DB. We utilize the interface to ensure we can interact with any `sql` database as well. I copied most of the code here from `vulcanize/go-ethereum`. Down the road, internal teams should be able to reference the package instead of copy pasting it and re-implementing it. + +## `pkg/beaconclient` + +This package will contain code to interact with the beacon client. + +## `pkg/version` + +A generic package which can be utilized to easily version our applications. diff --git a/cmd/capture.go b/cmd/capture.go index 54d3fb9..32113a8 100644 --- a/cmd/capture.go +++ b/cmd/capture.go @@ -6,20 +6,23 @@ package cmd import ( "os" + "time" "github.com/spf13/cobra" "github.com/spf13/viper" ) var ( - dbUsername string - dbPassword string - dbName string - dbAddress string - dbDriver string - dbPort int - bcAddress string - bcPort int + dbUsername string + dbPassword string + dbName string + dbAddress string + dbDriver string + dbPort int + bcAddress string + bcPort int + bcConnectionProtocol string + maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second ) // captureCmd represents the capture command @@ -60,6 +63,7 @@ func init() { //// Beacon Client Specific captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required if username is set)") captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required if username is set)") + captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.") err = captureCmd.MarkPersistentFlagRequired("bc.address") exitErr(err) err = captureCmd.MarkPersistentFlagRequired("bc.port") @@ -85,6 +89,8 @@ func init() { exitErr(err) err = viper.BindPFlag("bc.port", captureCmd.PersistentFlags().Lookup("bc.port")) exitErr(err) + err = viper.BindPFlag("bc.connectionProtocol", captureCmd.PersistentFlags().Lookup("bc.connectionProtocol")) + exitErr(err) // Here you will define your flags and configuration settings. } diff --git a/cmd/head.go b/cmd/head.go index 12b432a..ca7e7e3 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -5,8 +5,13 @@ Copyright © 2022 NAME HERE package cmd import ( + "context" + "time" + + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/vulcanize/ipld-ethcl-indexer/internal/boot" + "github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" ) @@ -22,10 +27,20 @@ var headCmd = &cobra.Command{ // Start the application to track at head. func startHeadTracking() { - _, err := boot.BootApplicationWithRetry(dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort) + // Boot the application + log.Info("Starting the application in head tracking mode.") + ctx := context.Background() + + BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol) if err != nil { loghelper.LogError(err).Error("Unable to Start application") } + + // Capture head blocks + go BC.CaptureHead() + + // Shutdown when the time is right. + shutdown.ShutdownServices(ctx, time.Duration(maxWaitSecondsShutdown), DB, BC) } func init() { diff --git a/go.mod b/go.mod index de583a9..37e51b7 100644 --- a/go.mod +++ b/go.mod @@ -19,13 +19,16 @@ require ( github.com/jackc/puddle v1.2.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/lib/pq v1.10.4 // indirect + github.com/minio/sha256-simd v0.1.1 // indirect github.com/pkg/errors v0.9.1 // indirect golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect + gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect ) require ( + github.com/ferranbt/fastssz v0.0.0-20220303160658-88bb965b6747 github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/georgysavva/scany v0.3.0 github.com/hashicorp/hcl v1.0.0 // indirect @@ -35,6 +38,7 @@ require ( github.com/mitchellh/mapstructure v1.4.3 // indirect github.com/pelletier/go-toml v1.9.4 // indirect github.com/pelletier/go-toml/v2 v2.0.0-beta.8 // indirect + github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc github.com/spf13/afero v1.8.2 // indirect github.com/spf13/cast v1.4.1 // indirect github.com/spf13/cobra v1.4.0 diff --git a/go.sum b/go.sum index fa80980..0ffd6c1 100644 --- a/go.sum +++ b/go.sum @@ -65,6 +65,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/ferranbt/fastssz v0.0.0-20220303160658-88bb965b6747 h1:K2Bt7NSX8x/5MD2RiO7cPLy21dBgnQ84r9uR0QYoHrE= +github.com/ferranbt/fastssz v0.0.0-20220303160658-88bb965b6747/go.mod h1:S8yiDeAXy8f88W4Ul+0dBMPx49S05byYbmZD6Uv94K4= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= @@ -112,6 +114,8 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -251,6 +255,9 @@ github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= +github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU= +github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= +github.com/mitchellh/mapstructure v1.3.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs= github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -276,6 +283,8 @@ github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qR github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o= +github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= @@ -401,6 +410,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -660,6 +670,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= +gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= +gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= diff --git a/internal/boot/boot.go b/internal/boot/boot.go index 46e3b02..ce9d61a 100644 --- a/internal/boot/boot.go +++ b/internal/boot/boot.go @@ -1,48 +1,23 @@ package boot import ( - "fmt" - "net/http" - "strconv" + "context" "time" log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql/postgres" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" ) var ( - bcHealthEndpoint = "/eth/v1/node/health" - maxRetry = 5 // Max times to try to connect to the DB or BC at boot. - retryInterval = 30 // The time to wait between each try. - DB sql.Database = &postgres.DB{} + maxRetry = 5 // Max times to try to connect to the DB or BC at boot. + retryInterval = 30 // The time to wait between each try. + DB sql.Database = &postgres.DB{} + BC *beaconclient.BeaconClient = &beaconclient.BeaconClient{} ) -// This function will ensure that we can connect to the beacon client. -// Keep in mind, the beacon client will allow you to connect to it but it might -// Not allow you to make http requests. This is part of its built in logic, and you will have -// to follow their provided guidelines. https://lighthouse-book.sigmaprime.io/api-bn.html#security -func checkBeaconClient(bcAddress string, bcPort int) error { - log.Debug("Attempting to connect to the beacon client") - bcEndpoint := "http://" + bcAddress + ":" + strconv.Itoa(bcPort) + bcHealthEndpoint - resp, err := http.Get(bcEndpoint) - if err != nil { - loghelper.LogError(err).Error("Unable to get bc endpoint: ", bcEndpoint) - return err - } - - if resp.StatusCode < 200 || resp.StatusCode > 299 { - log.Error("We recieved a non 2xx status code when checking the health of the beacon node.") - log.Error("Health Endpoint Status Code: ", resp.StatusCode) - return fmt.Errorf("beacon Node Provided a non 2xx status code, code provided: %d", resp.StatusCode) - } - - log.Info("We can successfully reach the beacon client.") - return nil - -} - // A simple wrapper to create a DB object to use. func SetupPostgresDb(dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string) (sql.Database, error) { log.Debug("Resolving Driver Type") @@ -81,35 +56,38 @@ func SetupPostgresDb(dbHostname string, dbPort int, dbName string, dbUsername st // // 2. Connect to the database. // -func BootApplication(dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int) (sql.Database, error) { +func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string) (*beaconclient.BeaconClient, sql.Database, error) { log.Info("Booting the Application") - log.Debug("Checking beacon Client") - err := checkBeaconClient(bcAddress, bcPort) + log.Debug("Creating the Beacon Client") + BC = beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort) + + log.Debug("Checking Beacon Client") + err := BC.CheckBeaconClient() if err != nil { - return nil, err + return nil, nil, err } log.Debug("Setting up DB connection") DB, err := SetupPostgresDb(dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName) if err != nil { - return nil, err + return nil, nil, err } - return DB, nil + return BC, DB, nil } // Add retry logic to ensure that we are give the Beacon Client and the DB time to start. -func BootApplicationWithRetry(dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int) (sql.Database, error) { +func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string) (*beaconclient.BeaconClient, sql.Database, error) { var err error for i := 0; i < maxRetry; i++ { - DB, err = BootApplication(dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort) + BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol) if err != nil { log.WithFields(log.Fields{ "retryNumber": i, }).Warn("Unable to boot application. Going to try again") time.Sleep(time.Duration(retryInterval) * time.Second) - continue } + break } - return DB, err + return BC, DB, err } diff --git a/internal/boot/boot_test.go b/internal/boot/boot_test.go index 729f30b..3cbf71c 100644 --- a/internal/boot/boot_test.go +++ b/internal/boot/boot_test.go @@ -1,6 +1,8 @@ package boot_test import ( + "context" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/vulcanize/ipld-ethcl-indexer/internal/boot" @@ -8,38 +10,39 @@ import ( var _ = Describe("Boot", func() { var ( - dbAddress string = "localhost" - dbPort int = 8077 - dbName string = "vulcanize_testing" - dbUsername string = "vdbm" - dbPassword string = "password" - dbDriver string = "PGX" - bcAddress string = "localhost" - bcPort int = 5052 + dbAddress string = "localhost" + dbPort int = 8077 + dbName string = "vulcanize_testing" + dbUsername string = "vdbm" + dbPassword string = "password" + dbDriver string = "PGX" + bcAddress string = "localhost" + bcPort int = 5052 + bcConnectionProtocol string = "http" ) Describe("Booting the application", Label("integration"), func() { Context("When the DB and BC are both up and running", func() { It("Should connect successfully", func() { - db, err := boot.BootApplicationWithRetry(dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol) defer db.Close() Expect(err).To(BeNil()) }) }) Context("When the DB is running but not the BC", func() { It("Should not connect successfully", func() { - _, err := boot.BootApplication(dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100) + _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol) Expect(err).ToNot(BeNil()) }) }) Context("When the BC is running but not the DB", func() { It("Should not connect successfully", func() { - _, err := boot.BootApplication("hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort) + _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol) Expect(err).ToNot(BeNil()) }) }) Context("When neither the BC or DB are running", func() { It("Should not connect successfully", func() { - _, err := boot.BootApplication("hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100) + _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol) Expect(err).ToNot(BeNil()) }) }) diff --git a/internal/shutdown/shutdown.go b/internal/shutdown/shutdown.go new file mode 100644 index 0000000..087adb4 --- /dev/null +++ b/internal/shutdown/shutdown.go @@ -0,0 +1,39 @@ +package shutdown + +import ( + "context" + "time" + + log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" +) + +// Shutdown all the internal services for the application. +func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) { + successCh, errCh := gracefulshutdown.Shutdown(ctx, waitTime, map[string]gracefulshutdown.Operation{ + "database": func(ctx context.Context) error { + err := DB.Close() + if err != nil { + loghelper.LogError(err).Error("Unable to close the DB") + } + return err + }, + "beaconClient": func(ctx context.Context) error { + err := BC.StopHeadTracking() + if err != nil { + loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking") + } + return err + }, + }) + + select { + case _ = <-successCh: + log.Info("Gracefully Shutdown ipld-ethcl-indexer!") + case err := <-errCh: + loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") + } +} diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go new file mode 100644 index 0000000..44fce6c --- /dev/null +++ b/pkg/beaconclient/beaconclient.go @@ -0,0 +1,73 @@ +package beaconclient + +import ( + "context" + "fmt" + + "github.com/r3labs/sse" + log "github.com/sirupsen/logrus" +) + +var ( + bcHealthEndpoint = "/eth/v1/node/health" // Endpoint used for the healthcheck + bcHeadTopicEndpoint = "/eth/v1/events?topics=head" // Endpoint used to subscribe to the head of the chain + bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain + bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain + bcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks + bcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States +) + +// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying. +type BeaconClient struct { + Context context.Context // A context generic context with multiple uses. + ServerEndpoint string // What is the endpoint of the beacon server. + PerformHeadTracking bool // Should we track head? + PerformHistoricalProcessing bool // Should we perform historical processing? + HeadTracking *SseEvents[Head] // Track the head block + ReOrgTracking *SseEvents[ChainReorg] // Track all Reorgs + FinalizationTracking *SseEvents[FinalizedCheckpoint] // Track all finalization checkpoints +} + +// A struct to keep track of relevant the head event topic. +type SseEvents[P ProcessedEvents] struct { + Endpoint string // The endpoint for the subscription. Primarily used for logging + MessagesCh chan *sse.Event // Contains all the messages from the SSE Channel + ErrorCh chan *SseError // Contains any errors while SSE streaming occurred + ProcessCh chan *P // Used to capture processed data in its proper struct. + SseClient *sse.Client // sse.Client object that is used to interact with the SSE stream +} + +// An object to capture any errors when turning an SSE message to JSON. +type SseError struct { + err error + msg []byte +} + +// A Function to create the BeaconClient. +func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int) *BeaconClient { + endpoint := fmt.Sprintf("%s://%s:%d", connectionProtocol, bcAddress, bcPort) + log.Info("Creating the BeaconClient") + return &BeaconClient{ + Context: ctx, + ServerEndpoint: endpoint, + HeadTracking: createSseEvent[Head](endpoint, bcHeadTopicEndpoint), + ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), + FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), + } +} + +// Create all the channels to handle a SSE events +func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEvents[P] { + endpoint := baseEndpoint + path + sseEvents := &SseEvents[P]{ + Endpoint: endpoint, + MessagesCh: make(chan *sse.Event), + ErrorCh: make(chan *SseError), + ProcessCh: make(chan *P), + SseClient: func(endpoint string) *sse.Client { + log.WithFields(log.Fields{"endpoint": endpoint}).Info("Creating SSE client") + return sse.NewClient(endpoint) + }(endpoint), + } + return sseEvents +} diff --git a/pkg/beaconclient/beaconclient_suite_test.go b/pkg/beaconclient/beaconclient_suite_test.go new file mode 100644 index 0000000..67e98de --- /dev/null +++ b/pkg/beaconclient/beaconclient_suite_test.go @@ -0,0 +1,13 @@ +package beaconclient_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestBeaconClient(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "BeaconClient Suite", Label("beacon-client")) +} diff --git a/pkg/beaconclient/capturehead.go b/pkg/beaconclient/capturehead.go new file mode 100644 index 0000000..a300175 --- /dev/null +++ b/pkg/beaconclient/capturehead.go @@ -0,0 +1,79 @@ +// This file will call all the functions to start and stop capturing the head of the beacon chain. + +package beaconclient + +import ( + "time" + + "github.com/ferranbt/fastssz/spectests" + log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" +) + +// This function will perform all the heavy lifting for tracking the head of the chain. +func (bc *BeaconClient) CaptureHead() { + log.Info("We are tracking the head of the chain.") + bc.tempHelper() + // go bc.handleHead() + // go bc.handleFinalizedCheckpoint() + // go bc.handleReorgs() + // bc.captureEventTopic() +} + +// A temporary helper function to see the output of beacon block and states. +func (bc *BeaconClient) tempHelper() { + slot := "3200" + blockEndpoint := bc.ServerEndpoint + bcBlockQueryEndpoint + slot + stateEndpoint := bc.ServerEndpoint + bcStateQueryEndpoint + slot + // Query + log.Info("Get") + blockSsz, _ := querySsz(blockEndpoint, slot) + stateSsz, _ := querySsz(stateEndpoint, slot) + // Transform + log.Info("Tranform") + stateObj := new(spectests.BeaconState) + err := stateObj.UnmarshalSSZ(stateSsz) + if err != nil { + loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!") + } + + blockObj := new(spectests.SignedBeaconBlock) + err = blockObj.UnmarshalSSZ(blockSsz) + if err != nil { + loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!") + } + + // Check + log.Info("Check") + log.Info("State Slot: ", stateObj.Slot) + log.Info("Block Slot: ", blockObj.Block.Slot) +} + +// Stop the head tracking service. +func (bc *BeaconClient) StopHeadTracking() error { + log.Info("We are going to stop tracking the head of chain because of the shutdown signal.") + chHead := make(chan bool) + chReorg := make(chan bool) + chFinal := make(chan bool) + + go bc.HeadTracking.finishProcessingChannel(chHead) + go bc.ReOrgTracking.finishProcessingChannel(chReorg) + go bc.FinalizationTracking.finishProcessingChannel(chFinal) + + <-chHead + <-chFinal + <-chReorg + log.Info("Successfully stopped the head tracking service.") + return nil +} + +// This function closes the SSE subscription, but waits until the MessagesCh is empty +func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) { + loghelper.LogEndpoint(se.Endpoint).Info("Received a close event.") + se.SseClient.Unsubscribe(se.MessagesCh) + for len(se.MessagesCh) != 0 && len(se.ProcessCh) != 0 { + time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond) + } + loghelper.LogEndpoint(se.Endpoint).Info("Done processing all messages, ready for shutdown") + finish <- true +} diff --git a/pkg/beaconclient/healthcheck.go b/pkg/beaconclient/healthcheck.go new file mode 100644 index 0000000..c7f3fcd --- /dev/null +++ b/pkg/beaconclient/healthcheck.go @@ -0,0 +1,33 @@ +package beaconclient + +import ( + "fmt" + "net/http" + + log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" +) + +// This function will ensure that we can connect to the beacon client. +// Keep in mind, the beacon client will allow you to connect to it but it might +// Not allow you to make http requests. This is part of its built in logic, and you will have +// to follow their provided guidelines. https://lighthouse-book.sigmaprime.io/api-bn.html#security +func (bc BeaconClient) CheckBeaconClient() error { + log.Debug("Attempting to connect to the beacon client") + bcEndpoint := bc.ServerEndpoint + bcHealthEndpoint + resp, err := http.Get(bcEndpoint) + if err != nil { + loghelper.LogError(err).Error("Unable to get bc endpoint: ", bcEndpoint) + return err + } + + if resp.StatusCode < 200 || resp.StatusCode > 299 { + log.Error("We recieved a non 2xx status code when checking the health of the beacon node.") + log.Error("Health Endpoint Status Code: ", resp.StatusCode) + return fmt.Errorf("beacon Node Provided a non 2xx status code, code provided: %d", resp.StatusCode) + } + + log.Info("We can successfully reach the beacon client.") + return nil + +} diff --git a/pkg/beaconclient/healthcheck_test.go b/pkg/beaconclient/healthcheck_test.go new file mode 100644 index 0000000..88aab7e --- /dev/null +++ b/pkg/beaconclient/healthcheck_test.go @@ -0,0 +1,30 @@ +package beaconclient_test + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + beaconclient "github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient" +) + +var _ = Describe("Healthcheck", func() { + var ( + BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052) + errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010) + ) + Describe("Connecting to the lighthouse client", Label("integration"), func() { + Context("When the client is running", func() { + It("We should connect successfully", func() { + err := BC.CheckBeaconClient() + Expect(err).To(BeNil()) + }) + }) + Context("When the client is not running", func() { + It("We not should connect successfully", func() { + err := errBc.CheckBeaconClient() + Expect(err).ToNot(BeNil()) + }) + }) + }) +}) diff --git a/pkg/beaconclient/incomingsse.go b/pkg/beaconclient/incomingsse.go new file mode 100644 index 0000000..73f3382 --- /dev/null +++ b/pkg/beaconclient/incomingsse.go @@ -0,0 +1,66 @@ +// This package will handle all event subscriptions that utilize SSE. + +package beaconclient + +import ( + "encoding/json" + "time" + + log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" +) + +var ( + shutdownWaitInterval = time.Duration(5) * time.Second +) + +// This function will capture all the SSE events for a given SseEvents object. +// When new messages come in, it will ensure that they are decoded into JSON. +// If any errors occur, it log the error information. +func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) { + loghelper.LogEndpoint(eventHandler.Endpoint).Info("Subscribing to Messages") + go eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh) + for { + select { + case message := <-eventHandler.MessagesCh: + // Message can be nil if its a keep-alive message + if len(message.Data) != 0 { + go processMsg(message.Data, eventHandler.ProcessCh, eventHandler.ErrorCh) + } + + case headErr := <-eventHandler.ErrorCh: + log.WithFields(log.Fields{ + "endpoint": eventHandler.Endpoint, + "err": headErr.err, + "msg": headErr.msg, + }, + ).Error("Unable to handle event.") + + case process := <-eventHandler.ProcessCh: + log.WithFields(log.Fields{"processed": process}).Debug("Processesing a Message") + } + } +} + +// Turn the data object into a Struct. +func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan<- *SseError) { + var msgMarshaled P + err := json.Unmarshal(msg, &msgMarshaled) + if err != nil { + loghelper.LogError(err).Error("Unable to parse message") + errorCh <- &SseError{ + err: err, + msg: msg, + } + return + } + processCh <- &msgMarshaled +} + +// Capture all of the event topics. +func (bc *BeaconClient) captureEventTopic() { + log.Info("We are capturing all SSE events") + go handleIncomingSseEvent(bc.HeadTracking) + go handleIncomingSseEvent(bc.ReOrgTracking) + go handleIncomingSseEvent(bc.FinalizationTracking) +} diff --git a/pkg/beaconclient/models.go b/pkg/beaconclient/models.go new file mode 100644 index 0000000..e047845 --- /dev/null +++ b/pkg/beaconclient/models.go @@ -0,0 +1,37 @@ +package beaconclient + +// This interface captured what the events can be for processed event streams. +type ProcessedEvents interface { + Head | FinalizedCheckpoint | ChainReorg +} + +// This struct captures the JSON representation of the head topic +type Head struct { + Slot string `json:"slot"` + Block string `json:"block"` + State string `json:"state"` + CurrentDutyDependentRoot string `json:"current_duty_dependent_root"` + PreviousDutyDependentRoot string `json:"previous_duty_dependent_root"` + EpochTransition bool `json:"epoch_transition"` + ExecutionOptimistic bool `json:"execution_optimistic"` +} + +// This struct captures the JSON representation of the finalized_checkpoint topic. +type FinalizedCheckpoint struct { + Block string `json:"block"` + State string `json:"state"` + Epoch string `json:"epoch"` + ExecutionOptimistic bool `json:"execution_optimistic"` +} + +// This struct captures the JSON representation of the chain_reorg topic. +type ChainReorg struct { + Slot string `json:"slot"` + Depth string `json:"depth"` + OldHeadBlock string `json:"old_head_block"` + NewHeadBlock string `json:"new_head_block"` + OldHeadState string `json:"old_head_state"` + NewHeadState string `json:"new_head_state"` + Epoch string `json:"epoch"` + ExecutionOptimistic bool `json:"execution_optimistic"` +} diff --git a/pkg/beaconclient/processevents.go b/pkg/beaconclient/processevents.go new file mode 100644 index 0000000..0b5b821 --- /dev/null +++ b/pkg/beaconclient/processevents.go @@ -0,0 +1,38 @@ +// This file contains all the functions to handle SSE events after they have been turned +// to the structs. + +package beaconclient + +import log "github.com/sirupsen/logrus" + +// This function will perform the necessary steps to handle a reorg. +func (bc *BeaconClient) handleReorgs() { + log.Info("Starting to process reorgs.") + for { + // We will add real functionality later + reorg := <-bc.ReOrgTracking.ProcessCh + log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.") + } +} + +// This function will perform the necessary steps to handle a reorg. +func (bc *BeaconClient) handleFinalizedCheckpoint() { + log.Info("Starting to process finalized checkpoints.") + for { + // We will add real functionality later + finalized := <-bc.ReOrgTracking.ProcessCh + log.WithFields(log.Fields{"finalized": finalized}).Debug("Received a new finalized checkpoint.") + } + +} + +// This function will handle the latest head event. +func (bc *BeaconClient) handleHead() { + log.Info("Starting to process head.") + for { + // We will add real functionality later + head := <-bc.ReOrgTracking.ProcessCh + log.WithFields(log.Fields{"head": head}).Debug("Received a new head event.") + } + +} diff --git a/pkg/beaconclient/queryserver.go b/pkg/beaconclient/queryserver.go new file mode 100644 index 0000000..14dd0a1 --- /dev/null +++ b/pkg/beaconclient/queryserver.go @@ -0,0 +1,101 @@ +// This file will contain functions to query the Beacon Chain Server. + +package beaconclient + +import ( + "fmt" + "io/ioutil" + "net/http" + + log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" +) + +// Attempt to use generics.. +// // These are types that append slot at the end of the URL to handle a request. +// type SlotBasedRequests interface { +// *specs.BeaconState | *specs.SignedBeaconBlock +// UnmarshalSSZ([]byte) error +// } +// +// func queryState[R SlotBasedRequests](endpoint string, slot string) (R, error) { +// obj := new(R) +// rawState, err := querySlot(endpoint, slot) +// if err != nil { +// return *obj, err +// } +// +// err = &obj.UnmarshalSSZ(rawState) +// err = (*obj).UnmarshalSSZ(rawState) +// if err != nil { +// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!") +// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node Successfully!: %s", err.Error()) +// } +// return *obj, nil +// } + +// This function will query a state object based on the slot provided. +// The object is SSZ encoded. + +//type BeaconBlockResponse struct { +// version string `json: ` +//} + +// func queryState(endpoint string, slot string) (spectests.BeaconState, error) { +// obj := new(spectests.BeaconState) +// fullEndpoint := endpoint + slot +// rawState, err := querySsz(fullEndpoint, slot) +// if err != nil { +// return *obj, err +// } +// +// err = obj.UnmarshalSSZ(rawState) +// if err != nil { +// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node") +// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node: %s", err.Error()) +// } +// return *obj, nil +// } +// +// // This function will query a state object based on the slot provided. +// // The object is SSZ encoded. +// func queryBlock(endpoint string, slot string) (spectests.SignedBeaconBlock, error) { +// obj := new(spectests.SignedBeaconBlock) +// fullEndpoint := endpoint + slot +// rawBlock, err := querySsz(fullEndpoint, slot) +// if err != nil { +// return *obj, err +// } +// +// err = obj.UnmarshalSSZ(rawBlock) +// if err != nil { +// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!") +// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node Successfully!: %s", err.Error()) +// } +// return *obj, nil +// } + +// A helper function to query endpoints that utilize slots. +func querySsz(endpoint string, slot string) ([]byte, error) { + log.WithFields(log.Fields{"endpoint": endpoint}).Info("Querying endpoint") + client := &http.Client{} + req, err := http.NewRequest("GET", endpoint, nil) + if err != nil { + loghelper.LogSlotError(slot, err).Error("Unable to create a request!") + return nil, fmt.Errorf("Unable to create a request!: %s", err.Error()) + } + // Not set correctly + req.Header.Set("Accept", "application/octet-stream") + response, err := client.Do(req) + if err != nil { + loghelper.LogSlotError(slot, err).Error("Unable to query Beacon Node!") + return nil, fmt.Errorf("Unable to query Beacon Node: %s", err.Error()) + } + defer response.Body.Close() + body, err := ioutil.ReadAll(response.Body) + if err != nil { + loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!") + return nil, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error()) + } + return body, nil +} diff --git a/pkg/gracefulshutdown/gracefulshutdown.go b/pkg/gracefulshutdown/gracefulshutdown.go new file mode 100644 index 0000000..9f01d85 --- /dev/null +++ b/pkg/gracefulshutdown/gracefulshutdown.go @@ -0,0 +1,67 @@ +package gracefulshutdown + +import ( + "context" + "fmt" + "os" + "os/signal" + "sync" + "syscall" + "time" + + log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" +) + +// operation is a clean up function on shutting down +type Operation func(ctx context.Context) error + +// gracefulShutdown waits for termination syscalls and doing clean up operations after received it +func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operation) (<-chan struct{}, <-chan error) { + waitCh := make(chan struct{}) + errCh := make(chan error) + go func() { + s := make(chan os.Signal, 1) + + // add any other syscalls that you want to be notified with + signal.Notify(s, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) + <-s + + log.Info("Shutting Down your application") + + // set timeout for the ops to be done to prevent system hang + timeoutFunc := time.AfterFunc(timeout, func() { + log.Warnf("timeout %d ms has been elapsed, force exit", timeout.Milliseconds()) + errCh <- fmt.Errorf("Application shutdown took too long.") + return + }) + + defer timeoutFunc.Stop() + + var wg sync.WaitGroup + + // Do the operations asynchronously to save time + for key, op := range ops { + wg.Add(1) + innerOp := op + innerKey := key + go func() { + defer wg.Done() + + log.Infof("cleaning up: %s", innerKey) + if err := innerOp(ctx); err != nil { + loghelper.LogError(err).Errorf("%s: clean up failed: %s", innerKey, err.Error()) + return + } + + log.Infof("%s was shutdown gracefully", innerKey) + }() + } + + wg.Wait() + + close(waitCh) + }() + + return waitCh, errCh +} diff --git a/pkg/loghelper/log_endpoint.go b/pkg/loghelper/log_endpoint.go new file mode 100644 index 0000000..809bf62 --- /dev/null +++ b/pkg/loghelper/log_endpoint.go @@ -0,0 +1,12 @@ +package loghelper + +import ( + log "github.com/sirupsen/logrus" +) + +// A simple helper function that will help wrap the error message. +func LogEndpoint(endpoint string) *log.Entry { + return log.WithFields(log.Fields{ + "endpoint": endpoint, + }) +} diff --git a/pkg/loghelper/log_error.go b/pkg/loghelper/log_error.go index 41d0149..66305dd 100644 --- a/pkg/loghelper/log_error.go +++ b/pkg/loghelper/log_error.go @@ -11,3 +11,10 @@ func LogError(err error) *log.Entry { "err": err, }) } + +func LogSlotError(slot string, err error) *log.Entry { + return log.WithFields(log.Fields{ + "err": err, + "slot": slot, + }) +}