Merge pull request #18 from vulcanize/feature/17-interact-with-the-beacon-client
Add functions to interact with the beacon client
This commit is contained in:
commit
0b712935a3
0
.github/workflows/.env
vendored
0
.github/workflows/.env
vendored
7
.github/workflows/on-pr.yml
vendored
7
.github/workflows/on-pr.yml
vendored
@ -14,10 +14,11 @@ on:
|
|||||||
pull_request:
|
pull_request:
|
||||||
paths:
|
paths:
|
||||||
- "!**.md"
|
- "!**.md"
|
||||||
- ".gitignore"
|
- "!.gitignore"
|
||||||
- "!LICENSE"
|
- "!LICENSE"
|
||||||
- "!.github/workflows/**"
|
- "!.github/workflows/**"
|
||||||
- ".github/workflows/on-pr.yml"
|
- ".github/workflows/on-pr.yml"
|
||||||
|
- "**"
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
build:
|
build:
|
||||||
@ -33,8 +34,8 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
## IF you want to update the default branch for `pull_request runs, do it after the ||`
|
## IF you want to update the default branch for `pull_request runs, do it after the ||`
|
||||||
env:
|
env:
|
||||||
foundry-test-ref: ${{ github.event.inputs.foundry-test-ref || 'feature/build-stack'}}
|
foundry-test-ref: ${{ github.event.inputs.foundry-test-ref || 'c17752de64f208f286f02379b80d2a935237c860'}}
|
||||||
ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || 'main' }}
|
ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || '05600e51d2163e1c5e2a872cb54606bc0a380d12' }}
|
||||||
GOPATH: /tmp/go
|
GOPATH: /tmp/go
|
||||||
steps:
|
steps:
|
||||||
- name: Create GOPATH
|
- name: Create GOPATH
|
||||||
|
2
.gitignore
vendored
2
.gitignore
vendored
@ -1,3 +1,5 @@
|
|||||||
|
|
||||||
ipld-ethcl-indexer
|
ipld-ethcl-indexer
|
||||||
ipld-ethcl-indexer.log
|
ipld-ethcl-indexer.log
|
||||||
|
report.json
|
||||||
|
cover.profile
|
||||||
|
20
Makefile
20
Makefile
@ -30,6 +30,26 @@ integration-test-ci:
|
|||||||
--cover --coverprofile=cover.profile \
|
--cover --coverprofile=cover.profile \
|
||||||
--race --trace --json-report=report.json
|
--race --trace --json-report=report.json
|
||||||
|
|
||||||
|
.PHONY: integration-test-local
|
||||||
|
integration-test-local:
|
||||||
|
go vet ./...
|
||||||
|
go fmt ./...
|
||||||
|
$(GINKGO) -r --label-filter integration \
|
||||||
|
--procs=4 --compilers=4 \
|
||||||
|
--randomize-all --randomize-suites \
|
||||||
|
--fail-on-pending --keep-going \
|
||||||
|
--race --trace
|
||||||
|
|
||||||
|
.PHONY: unit-test-local
|
||||||
|
unit-test-local:
|
||||||
|
go vet ./...
|
||||||
|
go fmt ./...
|
||||||
|
$(GINKGO) -r --label-filter unit \
|
||||||
|
--procs=4 --compilers=4 \
|
||||||
|
--randomize-all --randomize-suites \
|
||||||
|
--fail-on-pending --keep-going \
|
||||||
|
--race --trace
|
||||||
|
|
||||||
.PHONY: unit-test-ci
|
.PHONY: unit-test-ci
|
||||||
unit-test-ci:
|
unit-test-ci:
|
||||||
go vet ./...
|
go vet ./...
|
||||||
|
@ -10,6 +10,18 @@ This document will go through various application components
|
|||||||
|
|
||||||
# Components
|
# Components
|
||||||
|
|
||||||
## Boot
|
## `internal/boot`
|
||||||
|
|
||||||
The boot package in `internal` is utilized to start the application. Everything in the boot process must complete successfully for the application to start. If it does not, the application will not start.
|
The boot package in `internal` is utilized to start the application. Everything in the boot process must complete successfully for the application to start. If it does not, the application will not start.
|
||||||
|
|
||||||
|
## `pkg/database`
|
||||||
|
|
||||||
|
The `database` package allows us to interact with a postgres DB. We utilize the interface to ensure we can interact with any `sql` database as well. I copied most of the code here from `vulcanize/go-ethereum`. Down the road, internal teams should be able to reference the package instead of copy pasting it and re-implementing it.
|
||||||
|
|
||||||
|
## `pkg/beaconclient`
|
||||||
|
|
||||||
|
This package will contain code to interact with the beacon client.
|
||||||
|
|
||||||
|
## `pkg/version`
|
||||||
|
|
||||||
|
A generic package which can be utilized to easily version our applications.
|
||||||
|
@ -6,20 +6,23 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
dbUsername string
|
dbUsername string
|
||||||
dbPassword string
|
dbPassword string
|
||||||
dbName string
|
dbName string
|
||||||
dbAddress string
|
dbAddress string
|
||||||
dbDriver string
|
dbDriver string
|
||||||
dbPort int
|
dbPort int
|
||||||
bcAddress string
|
bcAddress string
|
||||||
bcPort int
|
bcPort int
|
||||||
|
bcConnectionProtocol string
|
||||||
|
maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// captureCmd represents the capture command
|
// captureCmd represents the capture command
|
||||||
@ -60,6 +63,7 @@ func init() {
|
|||||||
//// Beacon Client Specific
|
//// Beacon Client Specific
|
||||||
captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required if username is set)")
|
captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required if username is set)")
|
||||||
captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required if username is set)")
|
captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required if username is set)")
|
||||||
|
captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.")
|
||||||
err = captureCmd.MarkPersistentFlagRequired("bc.address")
|
err = captureCmd.MarkPersistentFlagRequired("bc.address")
|
||||||
exitErr(err)
|
exitErr(err)
|
||||||
err = captureCmd.MarkPersistentFlagRequired("bc.port")
|
err = captureCmd.MarkPersistentFlagRequired("bc.port")
|
||||||
@ -85,6 +89,8 @@ func init() {
|
|||||||
exitErr(err)
|
exitErr(err)
|
||||||
err = viper.BindPFlag("bc.port", captureCmd.PersistentFlags().Lookup("bc.port"))
|
err = viper.BindPFlag("bc.port", captureCmd.PersistentFlags().Lookup("bc.port"))
|
||||||
exitErr(err)
|
exitErr(err)
|
||||||
|
err = viper.BindPFlag("bc.connectionProtocol", captureCmd.PersistentFlags().Lookup("bc.connectionProtocol"))
|
||||||
|
exitErr(err)
|
||||||
// Here you will define your flags and configuration settings.
|
// Here you will define your flags and configuration settings.
|
||||||
|
|
||||||
}
|
}
|
||||||
|
17
cmd/head.go
17
cmd/head.go
@ -5,8 +5,13 @@ Copyright © 2022 NAME HERE <EMAIL ADDRESS>
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
|
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
|
||||||
|
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
|
||||||
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -22,10 +27,20 @@ var headCmd = &cobra.Command{
|
|||||||
|
|
||||||
// Start the application to track at head.
|
// Start the application to track at head.
|
||||||
func startHeadTracking() {
|
func startHeadTracking() {
|
||||||
_, err := boot.BootApplicationWithRetry(dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort)
|
// Boot the application
|
||||||
|
log.Info("Starting the application in head tracking mode.")
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogError(err).Error("Unable to Start application")
|
loghelper.LogError(err).Error("Unable to Start application")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Capture head blocks
|
||||||
|
go BC.CaptureHead()
|
||||||
|
|
||||||
|
// Shutdown when the time is right.
|
||||||
|
shutdown.ShutdownServices(ctx, time.Duration(maxWaitSecondsShutdown), DB, BC)
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
4
go.mod
4
go.mod
@ -19,13 +19,16 @@ require (
|
|||||||
github.com/jackc/puddle v1.2.1 // indirect
|
github.com/jackc/puddle v1.2.1 // indirect
|
||||||
github.com/kr/text v0.2.0 // indirect
|
github.com/kr/text v0.2.0 // indirect
|
||||||
github.com/lib/pq v1.10.4 // indirect
|
github.com/lib/pq v1.10.4 // indirect
|
||||||
|
github.com/minio/sha256-simd v0.1.1 // indirect
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
|
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
|
||||||
golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect
|
golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect
|
||||||
|
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
|
||||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/ferranbt/fastssz v0.0.0-20220303160658-88bb965b6747
|
||||||
github.com/fsnotify/fsnotify v1.5.1 // indirect
|
github.com/fsnotify/fsnotify v1.5.1 // indirect
|
||||||
github.com/georgysavva/scany v0.3.0
|
github.com/georgysavva/scany v0.3.0
|
||||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
github.com/hashicorp/hcl v1.0.0 // indirect
|
||||||
@ -35,6 +38,7 @@ require (
|
|||||||
github.com/mitchellh/mapstructure v1.4.3 // indirect
|
github.com/mitchellh/mapstructure v1.4.3 // indirect
|
||||||
github.com/pelletier/go-toml v1.9.4 // indirect
|
github.com/pelletier/go-toml v1.9.4 // indirect
|
||||||
github.com/pelletier/go-toml/v2 v2.0.0-beta.8 // indirect
|
github.com/pelletier/go-toml/v2 v2.0.0-beta.8 // indirect
|
||||||
|
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc
|
||||||
github.com/spf13/afero v1.8.2 // indirect
|
github.com/spf13/afero v1.8.2 // indirect
|
||||||
github.com/spf13/cast v1.4.1 // indirect
|
github.com/spf13/cast v1.4.1 // indirect
|
||||||
github.com/spf13/cobra v1.4.0
|
github.com/spf13/cobra v1.4.0
|
||||||
|
12
go.sum
12
go.sum
@ -65,6 +65,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
|
|||||||
github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po=
|
github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po=
|
||||||
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
|
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
|
||||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||||
|
github.com/ferranbt/fastssz v0.0.0-20220303160658-88bb965b6747 h1:K2Bt7NSX8x/5MD2RiO7cPLy21dBgnQ84r9uR0QYoHrE=
|
||||||
|
github.com/ferranbt/fastssz v0.0.0-20220303160658-88bb965b6747/go.mod h1:S8yiDeAXy8f88W4Ul+0dBMPx49S05byYbmZD6Uv94K4=
|
||||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||||
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
|
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
|
||||||
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
|
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
|
||||||
@ -112,6 +114,8 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
|
|||||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||||
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
|
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
|
||||||
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||||
|
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
|
||||||
|
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||||
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
|
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
|
||||||
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
|
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
|
||||||
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
||||||
@ -251,6 +255,9 @@ github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd
|
|||||||
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
|
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
|
||||||
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
|
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
|
||||||
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
|
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
|
||||||
|
github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU=
|
||||||
|
github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
|
||||||
|
github.com/mitchellh/mapstructure v1.3.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
||||||
github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs=
|
github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs=
|
||||||
github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
||||||
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
||||||
@ -276,6 +283,8 @@ github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qR
|
|||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||||
|
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o=
|
||||||
|
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8=
|
||||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||||
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
|
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
|
||||||
github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
|
github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
|
||||||
@ -401,6 +410,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
|
|||||||
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
|
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
@ -660,6 +670,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
|
|||||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||||
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
|
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
|
||||||
|
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
|
||||||
|
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||||
|
@ -1,48 +1,23 @@
|
|||||||
package boot
|
package boot
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"context"
|
||||||
"net/http"
|
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
|
||||||
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
|
||||||
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql/postgres"
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql/postgres"
|
||||||
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
bcHealthEndpoint = "/eth/v1/node/health"
|
maxRetry = 5 // Max times to try to connect to the DB or BC at boot.
|
||||||
maxRetry = 5 // Max times to try to connect to the DB or BC at boot.
|
retryInterval = 30 // The time to wait between each try.
|
||||||
retryInterval = 30 // The time to wait between each try.
|
DB sql.Database = &postgres.DB{}
|
||||||
DB sql.Database = &postgres.DB{}
|
BC *beaconclient.BeaconClient = &beaconclient.BeaconClient{}
|
||||||
)
|
)
|
||||||
|
|
||||||
// This function will ensure that we can connect to the beacon client.
|
|
||||||
// Keep in mind, the beacon client will allow you to connect to it but it might
|
|
||||||
// Not allow you to make http requests. This is part of its built in logic, and you will have
|
|
||||||
// to follow their provided guidelines. https://lighthouse-book.sigmaprime.io/api-bn.html#security
|
|
||||||
func checkBeaconClient(bcAddress string, bcPort int) error {
|
|
||||||
log.Debug("Attempting to connect to the beacon client")
|
|
||||||
bcEndpoint := "http://" + bcAddress + ":" + strconv.Itoa(bcPort) + bcHealthEndpoint
|
|
||||||
resp, err := http.Get(bcEndpoint)
|
|
||||||
if err != nil {
|
|
||||||
loghelper.LogError(err).Error("Unable to get bc endpoint: ", bcEndpoint)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.StatusCode < 200 || resp.StatusCode > 299 {
|
|
||||||
log.Error("We recieved a non 2xx status code when checking the health of the beacon node.")
|
|
||||||
log.Error("Health Endpoint Status Code: ", resp.StatusCode)
|
|
||||||
return fmt.Errorf("beacon Node Provided a non 2xx status code, code provided: %d", resp.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info("We can successfully reach the beacon client.")
|
|
||||||
return nil
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// A simple wrapper to create a DB object to use.
|
// A simple wrapper to create a DB object to use.
|
||||||
func SetupPostgresDb(dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string) (sql.Database, error) {
|
func SetupPostgresDb(dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string) (sql.Database, error) {
|
||||||
log.Debug("Resolving Driver Type")
|
log.Debug("Resolving Driver Type")
|
||||||
@ -81,35 +56,38 @@ func SetupPostgresDb(dbHostname string, dbPort int, dbName string, dbUsername st
|
|||||||
//
|
//
|
||||||
// 2. Connect to the database.
|
// 2. Connect to the database.
|
||||||
//
|
//
|
||||||
func BootApplication(dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int) (sql.Database, error) {
|
func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string) (*beaconclient.BeaconClient, sql.Database, error) {
|
||||||
log.Info("Booting the Application")
|
log.Info("Booting the Application")
|
||||||
|
|
||||||
log.Debug("Checking beacon Client")
|
log.Debug("Creating the Beacon Client")
|
||||||
err := checkBeaconClient(bcAddress, bcPort)
|
BC = beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort)
|
||||||
|
|
||||||
|
log.Debug("Checking Beacon Client")
|
||||||
|
err := BC.CheckBeaconClient()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("Setting up DB connection")
|
log.Debug("Setting up DB connection")
|
||||||
DB, err := SetupPostgresDb(dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName)
|
DB, err := SetupPostgresDb(dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
return DB, nil
|
return BC, DB, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add retry logic to ensure that we are give the Beacon Client and the DB time to start.
|
// Add retry logic to ensure that we are give the Beacon Client and the DB time to start.
|
||||||
func BootApplicationWithRetry(dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int) (sql.Database, error) {
|
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string) (*beaconclient.BeaconClient, sql.Database, error) {
|
||||||
var err error
|
var err error
|
||||||
for i := 0; i < maxRetry; i++ {
|
for i := 0; i < maxRetry; i++ {
|
||||||
DB, err = BootApplication(dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort)
|
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"retryNumber": i,
|
"retryNumber": i,
|
||||||
}).Warn("Unable to boot application. Going to try again")
|
}).Warn("Unable to boot application. Going to try again")
|
||||||
time.Sleep(time.Duration(retryInterval) * time.Second)
|
time.Sleep(time.Duration(retryInterval) * time.Second)
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
break
|
||||||
}
|
}
|
||||||
return DB, err
|
return BC, DB, err
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
package boot_test
|
package boot_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo/v2"
|
. "github.com/onsi/ginkgo/v2"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
|
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
|
||||||
@ -8,38 +10,39 @@ import (
|
|||||||
|
|
||||||
var _ = Describe("Boot", func() {
|
var _ = Describe("Boot", func() {
|
||||||
var (
|
var (
|
||||||
dbAddress string = "localhost"
|
dbAddress string = "localhost"
|
||||||
dbPort int = 8077
|
dbPort int = 8077
|
||||||
dbName string = "vulcanize_testing"
|
dbName string = "vulcanize_testing"
|
||||||
dbUsername string = "vdbm"
|
dbUsername string = "vdbm"
|
||||||
dbPassword string = "password"
|
dbPassword string = "password"
|
||||||
dbDriver string = "PGX"
|
dbDriver string = "PGX"
|
||||||
bcAddress string = "localhost"
|
bcAddress string = "localhost"
|
||||||
bcPort int = 5052
|
bcPort int = 5052
|
||||||
|
bcConnectionProtocol string = "http"
|
||||||
)
|
)
|
||||||
Describe("Booting the application", Label("integration"), func() {
|
Describe("Booting the application", Label("integration"), func() {
|
||||||
Context("When the DB and BC are both up and running", func() {
|
Context("When the DB and BC are both up and running", func() {
|
||||||
It("Should connect successfully", func() {
|
It("Should connect successfully", func() {
|
||||||
db, err := boot.BootApplicationWithRetry(dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort)
|
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
Expect(err).To(BeNil())
|
Expect(err).To(BeNil())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
Context("When the DB is running but not the BC", func() {
|
Context("When the DB is running but not the BC", func() {
|
||||||
It("Should not connect successfully", func() {
|
It("Should not connect successfully", func() {
|
||||||
_, err := boot.BootApplication(dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100)
|
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol)
|
||||||
Expect(err).ToNot(BeNil())
|
Expect(err).ToNot(BeNil())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
Context("When the BC is running but not the DB", func() {
|
Context("When the BC is running but not the DB", func() {
|
||||||
It("Should not connect successfully", func() {
|
It("Should not connect successfully", func() {
|
||||||
_, err := boot.BootApplication("hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort)
|
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
|
||||||
Expect(err).ToNot(BeNil())
|
Expect(err).ToNot(BeNil())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
Context("When neither the BC or DB are running", func() {
|
Context("When neither the BC or DB are running", func() {
|
||||||
It("Should not connect successfully", func() {
|
It("Should not connect successfully", func() {
|
||||||
_, err := boot.BootApplication("hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100)
|
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol)
|
||||||
Expect(err).ToNot(BeNil())
|
Expect(err).ToNot(BeNil())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
39
internal/shutdown/shutdown.go
Normal file
39
internal/shutdown/shutdown.go
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
package shutdown
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
|
||||||
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
|
||||||
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown"
|
||||||
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Shutdown all the internal services for the application.
|
||||||
|
func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) {
|
||||||
|
successCh, errCh := gracefulshutdown.Shutdown(ctx, waitTime, map[string]gracefulshutdown.Operation{
|
||||||
|
"database": func(ctx context.Context) error {
|
||||||
|
err := DB.Close()
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogError(err).Error("Unable to close the DB")
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
},
|
||||||
|
"beaconClient": func(ctx context.Context) error {
|
||||||
|
err := BC.StopHeadTracking()
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking")
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
select {
|
||||||
|
case _ = <-successCh:
|
||||||
|
log.Info("Gracefully Shutdown ipld-ethcl-indexer!")
|
||||||
|
case err := <-errCh:
|
||||||
|
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
|
||||||
|
}
|
||||||
|
}
|
73
pkg/beaconclient/beaconclient.go
Normal file
73
pkg/beaconclient/beaconclient.go
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
package beaconclient
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/r3labs/sse"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
bcHealthEndpoint = "/eth/v1/node/health" // Endpoint used for the healthcheck
|
||||||
|
bcHeadTopicEndpoint = "/eth/v1/events?topics=head" // Endpoint used to subscribe to the head of the chain
|
||||||
|
bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain
|
||||||
|
bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
|
||||||
|
bcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks
|
||||||
|
bcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States
|
||||||
|
)
|
||||||
|
|
||||||
|
// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying.
|
||||||
|
type BeaconClient struct {
|
||||||
|
Context context.Context // A context generic context with multiple uses.
|
||||||
|
ServerEndpoint string // What is the endpoint of the beacon server.
|
||||||
|
PerformHeadTracking bool // Should we track head?
|
||||||
|
PerformHistoricalProcessing bool // Should we perform historical processing?
|
||||||
|
HeadTracking *SseEvents[Head] // Track the head block
|
||||||
|
ReOrgTracking *SseEvents[ChainReorg] // Track all Reorgs
|
||||||
|
FinalizationTracking *SseEvents[FinalizedCheckpoint] // Track all finalization checkpoints
|
||||||
|
}
|
||||||
|
|
||||||
|
// A struct to keep track of relevant the head event topic.
|
||||||
|
type SseEvents[P ProcessedEvents] struct {
|
||||||
|
Endpoint string // The endpoint for the subscription. Primarily used for logging
|
||||||
|
MessagesCh chan *sse.Event // Contains all the messages from the SSE Channel
|
||||||
|
ErrorCh chan *SseError // Contains any errors while SSE streaming occurred
|
||||||
|
ProcessCh chan *P // Used to capture processed data in its proper struct.
|
||||||
|
SseClient *sse.Client // sse.Client object that is used to interact with the SSE stream
|
||||||
|
}
|
||||||
|
|
||||||
|
// An object to capture any errors when turning an SSE message to JSON.
|
||||||
|
type SseError struct {
|
||||||
|
err error
|
||||||
|
msg []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// A Function to create the BeaconClient.
|
||||||
|
func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int) *BeaconClient {
|
||||||
|
endpoint := fmt.Sprintf("%s://%s:%d", connectionProtocol, bcAddress, bcPort)
|
||||||
|
log.Info("Creating the BeaconClient")
|
||||||
|
return &BeaconClient{
|
||||||
|
Context: ctx,
|
||||||
|
ServerEndpoint: endpoint,
|
||||||
|
HeadTracking: createSseEvent[Head](endpoint, bcHeadTopicEndpoint),
|
||||||
|
ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
|
||||||
|
FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create all the channels to handle a SSE events
|
||||||
|
func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEvents[P] {
|
||||||
|
endpoint := baseEndpoint + path
|
||||||
|
sseEvents := &SseEvents[P]{
|
||||||
|
Endpoint: endpoint,
|
||||||
|
MessagesCh: make(chan *sse.Event),
|
||||||
|
ErrorCh: make(chan *SseError),
|
||||||
|
ProcessCh: make(chan *P),
|
||||||
|
SseClient: func(endpoint string) *sse.Client {
|
||||||
|
log.WithFields(log.Fields{"endpoint": endpoint}).Info("Creating SSE client")
|
||||||
|
return sse.NewClient(endpoint)
|
||||||
|
}(endpoint),
|
||||||
|
}
|
||||||
|
return sseEvents
|
||||||
|
}
|
13
pkg/beaconclient/beaconclient_suite_test.go
Normal file
13
pkg/beaconclient/beaconclient_suite_test.go
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
package beaconclient_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
. "github.com/onsi/ginkgo/v2"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBeaconClient(t *testing.T) {
|
||||||
|
RegisterFailHandler(Fail)
|
||||||
|
RunSpecs(t, "BeaconClient Suite", Label("beacon-client"))
|
||||||
|
}
|
79
pkg/beaconclient/capturehead.go
Normal file
79
pkg/beaconclient/capturehead.go
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
// This file will call all the functions to start and stop capturing the head of the beacon chain.
|
||||||
|
|
||||||
|
package beaconclient
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ferranbt/fastssz/spectests"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
|
||||||
|
)
|
||||||
|
|
||||||
|
// This function will perform all the heavy lifting for tracking the head of the chain.
|
||||||
|
func (bc *BeaconClient) CaptureHead() {
|
||||||
|
log.Info("We are tracking the head of the chain.")
|
||||||
|
bc.tempHelper()
|
||||||
|
// go bc.handleHead()
|
||||||
|
// go bc.handleFinalizedCheckpoint()
|
||||||
|
// go bc.handleReorgs()
|
||||||
|
// bc.captureEventTopic()
|
||||||
|
}
|
||||||
|
|
||||||
|
// A temporary helper function to see the output of beacon block and states.
|
||||||
|
func (bc *BeaconClient) tempHelper() {
|
||||||
|
slot := "3200"
|
||||||
|
blockEndpoint := bc.ServerEndpoint + bcBlockQueryEndpoint + slot
|
||||||
|
stateEndpoint := bc.ServerEndpoint + bcStateQueryEndpoint + slot
|
||||||
|
// Query
|
||||||
|
log.Info("Get")
|
||||||
|
blockSsz, _ := querySsz(blockEndpoint, slot)
|
||||||
|
stateSsz, _ := querySsz(stateEndpoint, slot)
|
||||||
|
// Transform
|
||||||
|
log.Info("Tranform")
|
||||||
|
stateObj := new(spectests.BeaconState)
|
||||||
|
err := stateObj.UnmarshalSSZ(stateSsz)
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
|
||||||
|
}
|
||||||
|
|
||||||
|
blockObj := new(spectests.SignedBeaconBlock)
|
||||||
|
err = blockObj.UnmarshalSSZ(blockSsz)
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check
|
||||||
|
log.Info("Check")
|
||||||
|
log.Info("State Slot: ", stateObj.Slot)
|
||||||
|
log.Info("Block Slot: ", blockObj.Block.Slot)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop the head tracking service.
|
||||||
|
func (bc *BeaconClient) StopHeadTracking() error {
|
||||||
|
log.Info("We are going to stop tracking the head of chain because of the shutdown signal.")
|
||||||
|
chHead := make(chan bool)
|
||||||
|
chReorg := make(chan bool)
|
||||||
|
chFinal := make(chan bool)
|
||||||
|
|
||||||
|
go bc.HeadTracking.finishProcessingChannel(chHead)
|
||||||
|
go bc.ReOrgTracking.finishProcessingChannel(chReorg)
|
||||||
|
go bc.FinalizationTracking.finishProcessingChannel(chFinal)
|
||||||
|
|
||||||
|
<-chHead
|
||||||
|
<-chFinal
|
||||||
|
<-chReorg
|
||||||
|
log.Info("Successfully stopped the head tracking service.")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// This function closes the SSE subscription, but waits until the MessagesCh is empty
|
||||||
|
func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) {
|
||||||
|
loghelper.LogEndpoint(se.Endpoint).Info("Received a close event.")
|
||||||
|
se.SseClient.Unsubscribe(se.MessagesCh)
|
||||||
|
for len(se.MessagesCh) != 0 && len(se.ProcessCh) != 0 {
|
||||||
|
time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond)
|
||||||
|
}
|
||||||
|
loghelper.LogEndpoint(se.Endpoint).Info("Done processing all messages, ready for shutdown")
|
||||||
|
finish <- true
|
||||||
|
}
|
33
pkg/beaconclient/healthcheck.go
Normal file
33
pkg/beaconclient/healthcheck.go
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
package beaconclient
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
|
||||||
|
)
|
||||||
|
|
||||||
|
// This function will ensure that we can connect to the beacon client.
|
||||||
|
// Keep in mind, the beacon client will allow you to connect to it but it might
|
||||||
|
// Not allow you to make http requests. This is part of its built in logic, and you will have
|
||||||
|
// to follow their provided guidelines. https://lighthouse-book.sigmaprime.io/api-bn.html#security
|
||||||
|
func (bc BeaconClient) CheckBeaconClient() error {
|
||||||
|
log.Debug("Attempting to connect to the beacon client")
|
||||||
|
bcEndpoint := bc.ServerEndpoint + bcHealthEndpoint
|
||||||
|
resp, err := http.Get(bcEndpoint)
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogError(err).Error("Unable to get bc endpoint: ", bcEndpoint)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode < 200 || resp.StatusCode > 299 {
|
||||||
|
log.Error("We recieved a non 2xx status code when checking the health of the beacon node.")
|
||||||
|
log.Error("Health Endpoint Status Code: ", resp.StatusCode)
|
||||||
|
return fmt.Errorf("beacon Node Provided a non 2xx status code, code provided: %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("We can successfully reach the beacon client.")
|
||||||
|
return nil
|
||||||
|
|
||||||
|
}
|
30
pkg/beaconclient/healthcheck_test.go
Normal file
30
pkg/beaconclient/healthcheck_test.go
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
package beaconclient_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
. "github.com/onsi/ginkgo/v2"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
beaconclient "github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = Describe("Healthcheck", func() {
|
||||||
|
var (
|
||||||
|
BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052)
|
||||||
|
errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010)
|
||||||
|
)
|
||||||
|
Describe("Connecting to the lighthouse client", Label("integration"), func() {
|
||||||
|
Context("When the client is running", func() {
|
||||||
|
It("We should connect successfully", func() {
|
||||||
|
err := BC.CheckBeaconClient()
|
||||||
|
Expect(err).To(BeNil())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
Context("When the client is not running", func() {
|
||||||
|
It("We not should connect successfully", func() {
|
||||||
|
err := errBc.CheckBeaconClient()
|
||||||
|
Expect(err).ToNot(BeNil())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
66
pkg/beaconclient/incomingsse.go
Normal file
66
pkg/beaconclient/incomingsse.go
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
// This package will handle all event subscriptions that utilize SSE.
|
||||||
|
|
||||||
|
package beaconclient
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
shutdownWaitInterval = time.Duration(5) * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// This function will capture all the SSE events for a given SseEvents object.
|
||||||
|
// When new messages come in, it will ensure that they are decoded into JSON.
|
||||||
|
// If any errors occur, it log the error information.
|
||||||
|
func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) {
|
||||||
|
loghelper.LogEndpoint(eventHandler.Endpoint).Info("Subscribing to Messages")
|
||||||
|
go eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case message := <-eventHandler.MessagesCh:
|
||||||
|
// Message can be nil if its a keep-alive message
|
||||||
|
if len(message.Data) != 0 {
|
||||||
|
go processMsg(message.Data, eventHandler.ProcessCh, eventHandler.ErrorCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
case headErr := <-eventHandler.ErrorCh:
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"endpoint": eventHandler.Endpoint,
|
||||||
|
"err": headErr.err,
|
||||||
|
"msg": headErr.msg,
|
||||||
|
},
|
||||||
|
).Error("Unable to handle event.")
|
||||||
|
|
||||||
|
case process := <-eventHandler.ProcessCh:
|
||||||
|
log.WithFields(log.Fields{"processed": process}).Debug("Processesing a Message")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Turn the data object into a Struct.
|
||||||
|
func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan<- *SseError) {
|
||||||
|
var msgMarshaled P
|
||||||
|
err := json.Unmarshal(msg, &msgMarshaled)
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogError(err).Error("Unable to parse message")
|
||||||
|
errorCh <- &SseError{
|
||||||
|
err: err,
|
||||||
|
msg: msg,
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
processCh <- &msgMarshaled
|
||||||
|
}
|
||||||
|
|
||||||
|
// Capture all of the event topics.
|
||||||
|
func (bc *BeaconClient) captureEventTopic() {
|
||||||
|
log.Info("We are capturing all SSE events")
|
||||||
|
go handleIncomingSseEvent(bc.HeadTracking)
|
||||||
|
go handleIncomingSseEvent(bc.ReOrgTracking)
|
||||||
|
go handleIncomingSseEvent(bc.FinalizationTracking)
|
||||||
|
}
|
37
pkg/beaconclient/models.go
Normal file
37
pkg/beaconclient/models.go
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
package beaconclient
|
||||||
|
|
||||||
|
// This interface captured what the events can be for processed event streams.
|
||||||
|
type ProcessedEvents interface {
|
||||||
|
Head | FinalizedCheckpoint | ChainReorg
|
||||||
|
}
|
||||||
|
|
||||||
|
// This struct captures the JSON representation of the head topic
|
||||||
|
type Head struct {
|
||||||
|
Slot string `json:"slot"`
|
||||||
|
Block string `json:"block"`
|
||||||
|
State string `json:"state"`
|
||||||
|
CurrentDutyDependentRoot string `json:"current_duty_dependent_root"`
|
||||||
|
PreviousDutyDependentRoot string `json:"previous_duty_dependent_root"`
|
||||||
|
EpochTransition bool `json:"epoch_transition"`
|
||||||
|
ExecutionOptimistic bool `json:"execution_optimistic"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// This struct captures the JSON representation of the finalized_checkpoint topic.
|
||||||
|
type FinalizedCheckpoint struct {
|
||||||
|
Block string `json:"block"`
|
||||||
|
State string `json:"state"`
|
||||||
|
Epoch string `json:"epoch"`
|
||||||
|
ExecutionOptimistic bool `json:"execution_optimistic"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// This struct captures the JSON representation of the chain_reorg topic.
|
||||||
|
type ChainReorg struct {
|
||||||
|
Slot string `json:"slot"`
|
||||||
|
Depth string `json:"depth"`
|
||||||
|
OldHeadBlock string `json:"old_head_block"`
|
||||||
|
NewHeadBlock string `json:"new_head_block"`
|
||||||
|
OldHeadState string `json:"old_head_state"`
|
||||||
|
NewHeadState string `json:"new_head_state"`
|
||||||
|
Epoch string `json:"epoch"`
|
||||||
|
ExecutionOptimistic bool `json:"execution_optimistic"`
|
||||||
|
}
|
38
pkg/beaconclient/processevents.go
Normal file
38
pkg/beaconclient/processevents.go
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
// This file contains all the functions to handle SSE events after they have been turned
|
||||||
|
// to the structs.
|
||||||
|
|
||||||
|
package beaconclient
|
||||||
|
|
||||||
|
import log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
// This function will perform the necessary steps to handle a reorg.
|
||||||
|
func (bc *BeaconClient) handleReorgs() {
|
||||||
|
log.Info("Starting to process reorgs.")
|
||||||
|
for {
|
||||||
|
// We will add real functionality later
|
||||||
|
reorg := <-bc.ReOrgTracking.ProcessCh
|
||||||
|
log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This function will perform the necessary steps to handle a reorg.
|
||||||
|
func (bc *BeaconClient) handleFinalizedCheckpoint() {
|
||||||
|
log.Info("Starting to process finalized checkpoints.")
|
||||||
|
for {
|
||||||
|
// We will add real functionality later
|
||||||
|
finalized := <-bc.ReOrgTracking.ProcessCh
|
||||||
|
log.WithFields(log.Fields{"finalized": finalized}).Debug("Received a new finalized checkpoint.")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// This function will handle the latest head event.
|
||||||
|
func (bc *BeaconClient) handleHead() {
|
||||||
|
log.Info("Starting to process head.")
|
||||||
|
for {
|
||||||
|
// We will add real functionality later
|
||||||
|
head := <-bc.ReOrgTracking.ProcessCh
|
||||||
|
log.WithFields(log.Fields{"head": head}).Debug("Received a new head event.")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
101
pkg/beaconclient/queryserver.go
Normal file
101
pkg/beaconclient/queryserver.go
Normal file
@ -0,0 +1,101 @@
|
|||||||
|
// This file will contain functions to query the Beacon Chain Server.
|
||||||
|
|
||||||
|
package beaconclient
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Attempt to use generics..
|
||||||
|
// // These are types that append slot at the end of the URL to handle a request.
|
||||||
|
// type SlotBasedRequests interface {
|
||||||
|
// *specs.BeaconState | *specs.SignedBeaconBlock
|
||||||
|
// UnmarshalSSZ([]byte) error
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// func queryState[R SlotBasedRequests](endpoint string, slot string) (R, error) {
|
||||||
|
// obj := new(R)
|
||||||
|
// rawState, err := querySlot(endpoint, slot)
|
||||||
|
// if err != nil {
|
||||||
|
// return *obj, err
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// err = &obj.UnmarshalSSZ(rawState)
|
||||||
|
// err = (*obj).UnmarshalSSZ(rawState)
|
||||||
|
// if err != nil {
|
||||||
|
// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
|
||||||
|
// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node Successfully!: %s", err.Error())
|
||||||
|
// }
|
||||||
|
// return *obj, nil
|
||||||
|
// }
|
||||||
|
|
||||||
|
// This function will query a state object based on the slot provided.
|
||||||
|
// The object is SSZ encoded.
|
||||||
|
|
||||||
|
//type BeaconBlockResponse struct {
|
||||||
|
// version string `json: `
|
||||||
|
//}
|
||||||
|
|
||||||
|
// func queryState(endpoint string, slot string) (spectests.BeaconState, error) {
|
||||||
|
// obj := new(spectests.BeaconState)
|
||||||
|
// fullEndpoint := endpoint + slot
|
||||||
|
// rawState, err := querySsz(fullEndpoint, slot)
|
||||||
|
// if err != nil {
|
||||||
|
// return *obj, err
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// err = obj.UnmarshalSSZ(rawState)
|
||||||
|
// if err != nil {
|
||||||
|
// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node")
|
||||||
|
// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node: %s", err.Error())
|
||||||
|
// }
|
||||||
|
// return *obj, nil
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // This function will query a state object based on the slot provided.
|
||||||
|
// // The object is SSZ encoded.
|
||||||
|
// func queryBlock(endpoint string, slot string) (spectests.SignedBeaconBlock, error) {
|
||||||
|
// obj := new(spectests.SignedBeaconBlock)
|
||||||
|
// fullEndpoint := endpoint + slot
|
||||||
|
// rawBlock, err := querySsz(fullEndpoint, slot)
|
||||||
|
// if err != nil {
|
||||||
|
// return *obj, err
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// err = obj.UnmarshalSSZ(rawBlock)
|
||||||
|
// if err != nil {
|
||||||
|
// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
|
||||||
|
// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node Successfully!: %s", err.Error())
|
||||||
|
// }
|
||||||
|
// return *obj, nil
|
||||||
|
// }
|
||||||
|
|
||||||
|
// A helper function to query endpoints that utilize slots.
|
||||||
|
func querySsz(endpoint string, slot string) ([]byte, error) {
|
||||||
|
log.WithFields(log.Fields{"endpoint": endpoint}).Info("Querying endpoint")
|
||||||
|
client := &http.Client{}
|
||||||
|
req, err := http.NewRequest("GET", endpoint, nil)
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogSlotError(slot, err).Error("Unable to create a request!")
|
||||||
|
return nil, fmt.Errorf("Unable to create a request!: %s", err.Error())
|
||||||
|
}
|
||||||
|
// Not set correctly
|
||||||
|
req.Header.Set("Accept", "application/octet-stream")
|
||||||
|
response, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogSlotError(slot, err).Error("Unable to query Beacon Node!")
|
||||||
|
return nil, fmt.Errorf("Unable to query Beacon Node: %s", err.Error())
|
||||||
|
}
|
||||||
|
defer response.Body.Close()
|
||||||
|
body, err := ioutil.ReadAll(response.Body)
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!")
|
||||||
|
return nil, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error())
|
||||||
|
}
|
||||||
|
return body, nil
|
||||||
|
}
|
67
pkg/gracefulshutdown/gracefulshutdown.go
Normal file
67
pkg/gracefulshutdown/gracefulshutdown.go
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
package gracefulshutdown
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"sync"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
|
||||||
|
)
|
||||||
|
|
||||||
|
// operation is a clean up function on shutting down
|
||||||
|
type Operation func(ctx context.Context) error
|
||||||
|
|
||||||
|
// gracefulShutdown waits for termination syscalls and doing clean up operations after received it
|
||||||
|
func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operation) (<-chan struct{}, <-chan error) {
|
||||||
|
waitCh := make(chan struct{})
|
||||||
|
errCh := make(chan error)
|
||||||
|
go func() {
|
||||||
|
s := make(chan os.Signal, 1)
|
||||||
|
|
||||||
|
// add any other syscalls that you want to be notified with
|
||||||
|
signal.Notify(s, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
|
||||||
|
<-s
|
||||||
|
|
||||||
|
log.Info("Shutting Down your application")
|
||||||
|
|
||||||
|
// set timeout for the ops to be done to prevent system hang
|
||||||
|
timeoutFunc := time.AfterFunc(timeout, func() {
|
||||||
|
log.Warnf("timeout %d ms has been elapsed, force exit", timeout.Milliseconds())
|
||||||
|
errCh <- fmt.Errorf("Application shutdown took too long.")
|
||||||
|
return
|
||||||
|
})
|
||||||
|
|
||||||
|
defer timeoutFunc.Stop()
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
// Do the operations asynchronously to save time
|
||||||
|
for key, op := range ops {
|
||||||
|
wg.Add(1)
|
||||||
|
innerOp := op
|
||||||
|
innerKey := key
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
log.Infof("cleaning up: %s", innerKey)
|
||||||
|
if err := innerOp(ctx); err != nil {
|
||||||
|
loghelper.LogError(err).Errorf("%s: clean up failed: %s", innerKey, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("%s was shutdown gracefully", innerKey)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
close(waitCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
return waitCh, errCh
|
||||||
|
}
|
12
pkg/loghelper/log_endpoint.go
Normal file
12
pkg/loghelper/log_endpoint.go
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
package loghelper
|
||||||
|
|
||||||
|
import (
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// A simple helper function that will help wrap the error message.
|
||||||
|
func LogEndpoint(endpoint string) *log.Entry {
|
||||||
|
return log.WithFields(log.Fields{
|
||||||
|
"endpoint": endpoint,
|
||||||
|
})
|
||||||
|
}
|
@ -11,3 +11,10 @@ func LogError(err error) *log.Entry {
|
|||||||
"err": err,
|
"err": err,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func LogSlotError(slot string, err error) *log.Entry {
|
||||||
|
return log.WithFields(log.Fields{
|
||||||
|
"err": err,
|
||||||
|
"slot": slot,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
BIN
tmp/ci/state
Normal file
BIN
tmp/ci/state
Normal file
Binary file not shown.
BIN
tmp/code/block
Normal file
BIN
tmp/code/block
Normal file
Binary file not shown.
BIN
tmp/code/state
Normal file
BIN
tmp/code/state
Normal file
Binary file not shown.
Loading…
Reference in New Issue
Block a user