Compare commits

...

11 Commits

Author SHA1 Message Date
Abdul Rabbani
ad70f6f61c Build and Push Image to Docker Registry on Publish or Edit 2022-05-13 10:47:27 -04:00
Ian Norden
23cbe8f919
multihash key gen func (#36)
* multihash key gen func

* go mod updates

* Added test to ensure the application shuts down gracefully or within a timeframe.

* Disregard race condition since its with the test not the application itself

* Capture the head block in the DB entirely. (#27)

* -- Intermediary Commit --

Just want to commit my code over the weekend, in case I spill coffee on my workstation.

* Create DB models ready for write.

* Handle SSE events

* Update ref for stack-orchestrator

* Use env in one place only.

* Boot Application on PR

* Update syntax

* Update syntax

* Correct command

* Use bash instead of sh

* Use until instead of while

* Make linter happy and check sse subscription err

* Handle Reorgs - Untested

* Feature/22 test handling incoming events - Intermediary Commit (#28)

* Checkpoint before the weekend

* Update location for SetupPostgresDB

* Feature/22 test handling incoming events (#30)

* Checkpoint before the weekend

* Update location for SetupPostgresDB

* Include first functioning tests for processing head

* Fix gitignore

* Test CaptureHead | Add Metrics | Handle Test Race Conditions

This Commit allows us to:

* Test the `CaptureHead` function.
* Test parsing a single Head message.
* Test a Reorg condition.
* Add Metrics. This is primarily used for testing but can have future use cases.
* Rearrange the test due to race conditions introduced by reusing a variable. `BeforeEach` can't be used to update `BC`.

* Update and finalize testing at this stage

* Update code and CI/CD

* Fix lint errors

* Update CICD and fail when file not found.

* Update test to have failed as expected.

* Remove Test file

* Add KnownGaps Errors (#33)

* Handle Skipped Slots (#34)

* Ensure that the node is synced at boot time

* Update test + add logic for checking skipped slots

* Update boot check

* Add skip_sync to config.

* Update a test so it fails

* go mod updates

* Integrate MHKey into existing code base.

* Update go.mod and go.sum

* Utilize the MHkey

* Stop tests from running forever on failure.

* Use sszRoot instead of sszObj for MhKey

* Update entrypoint script

* Update config parameter

Co-authored-by: Abdul Rabbani <abdulrabbani00@gmail.com>
Co-authored-by: Abdul Rabbani <58230246+abdulrabbani00@users.noreply.github.com>
2022-05-13 10:46:13 -04:00
Abdul Rabbani
7ff3efb380
Handle Skipped Slots (#34)
* Ensure that the node is synced at boot time

* Update test + add logic for checking skipped slots

* Update boot check

* Add skip_sync to config.

* Update a test so it fails
2022-05-13 08:48:31 -04:00
Abdul Rabbani
fbc5f2e55b
Add KnownGaps Errors (#33) 2022-05-12 15:44:05 -04:00
Abdul Rabbani
24db98b021 Remove Test file 2022-05-12 09:55:42 -04:00
Abdul Rabbani
7e8d63c667
Feature/22 test handling incoming events (#30)
* Checkpoint before the weekend

* Update location for SetupPostgresDB

* Include first functioning tests for processing head

* Fix gitignore

* Test CaptureHead | Add Metrics | Handle Test Race Conditions

This Commit allows us to:

* Test the `CaptureHead` function.
* Test parsing a single Head message.
* Test a Reorg condition.
* Add Metrics. This is primarily used for testing but can have future use cases.
* Rearrange the test due to race conditions introduced by reusing a variable. `BeforeEach` can't be used to update `BC`.

* Update and finalize testing at this stage

* Update code and CI/CD

* Fix lint errors

* Update CICD and fail when file not found.

* Update test to have failed as expected.
2022-05-12 09:52:13 -04:00
Abdul Rabbani
217ec700a8
Feature/22 test handling incoming events - Intermediary Commit (#28)
* Checkpoint before the weekend

* Update location for SetupPostgresDB
2022-05-09 14:44:27 -04:00
Abdul Rabbani
d1bc6a816f
Capture the head block in the DB entirely. (#27)
* -- Intermediary Commit --

Just want to commit my code over the weekend, in case I spill coffee on my workstation.

* Create DB models ready for write.

* Handle SSE events

* Update ref for stack-orchestrator

* Use env in one place only.

* Boot Application on PR

* Update syntax

* Update syntax

* Correct command

* Use bash instead of sh

* Use until instead of while

* Make linter happy and check sse subscription err

* Handle Reorgs - Untested
2022-05-06 11:03:15 -04:00
Abdul Rabbani
8064b1cb65
Merge pull request #25 from vulcanize/feature/21-test-shutdown
Shutdown Testing
2022-04-28 15:54:56 -04:00
Abdul Rabbani
26de93d9d2 Disregard race condition since its with the test not the application itself 2022-04-28 15:21:09 -04:00
Abdul Rabbani
08facb5f57 Added test to ensure the application shuts down gracefully or within a timeframe. 2022-04-28 14:50:29 -04:00
37 changed files with 2702 additions and 335 deletions

View File

@ -3,14 +3,18 @@ name: Test Application On PR
on:
workflow_dispatch:
inputs:
foundry-test-ref:
description: "The branch, commit or sha from foundry-test to checkout"
stack-orchestrator-ref:
description: "The branch, commit or sha from stack-orchestrator to checkout"
required: false
default: "main"
ipld-eth-db-ref:
description: "The branch, commit or sha from ipld-eth-db to checkout"
required: false
default: "main"
ssz-data-ref:
description: "The branch, commit or sha from ssz-data to checkout"
required: false
default: "main"
pull_request:
paths:
- "!**.md"
@ -20,36 +24,25 @@ on:
- ".github/workflows/on-pr.yml"
- "**"
env:
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'feature/client-build'}}
ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || 'minimal-beacon-chain-schema' }}
ssz-data-ref: ${{ github.event.inputs.ssz-data-ref || 'main' }}
GOPATH: /tmp/go
jobs:
build:
name: Run Docker Build
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Run docker build
run: make docker-build
unit-test:
name: Run Unit Tests
runs-on: ubuntu-latest
## IF you want to update the default branch for `pull_request runs, do it after the ||`
env:
foundry-test-ref: ${{ github.event.inputs.foundry-test-ref || 'c17752de64f208f286f02379b80d2a935237c860'}}
ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || '05600e51d2163e1c5e2a872cb54606bc0a380d12' }}
GOPATH: /tmp/go
steps:
- name: Create GOPATH
run: mkdir -p /tmp/go
- uses: actions/checkout@v2
with:
path: "./ipld-ethcl-indexer"
- uses: actions/checkout@v3
with:
ref: ${{ env.foundry-test-ref }}
path: "./foundry-test/"
repository: vulcanize/foundry-test
ref: ${{ env.stack-orchestrator-ref }}
path: "./stack-orchestrator/"
repository: vulcanize/stack-orchestrator
fetch-depth: 0
- uses: actions/checkout@v3
@ -63,8 +56,73 @@ jobs:
run: |
echo vulcanize_ipld_eth_db=$GITHUB_WORKSPACE/ipld-eth-db/ > ./config.sh
echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh
echo ethcl_capture_mode=boot >> ./config.sh
echo ethcl_skip_sync=true >> ./config.sh
echo ethcl_known_gap_increment=1000000 >> ./config.sh
cat ./config.sh
- name: Run docker compose
run: |
docker-compose \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-db.yml" \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/latest/docker-compose-lighthouse.yml" \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-ethcl-indexer.yml" \
--env-file ./config.sh \
up -d --build
- name: Check to make sure HEALTH file is present
shell: bash
run: |
until $(docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-ethcl-indexer.yml" cp ipld-ethcl-indexer:/root/HEALTH ./HEALTH) ; do sleep 10; done
cat ./HEALTH
if [[ "$(cat ./HEALTH)" -eq "0" ]]; then echo "Application boot successful" && (exit 0); else (exit 1); fi
unit-test:
name: Run Unit Tests
runs-on: ubuntu-latest
## IF you want to update the default branch for `pull_request runs, do it after the ||`
steps:
- name: Create GOPATH
run: mkdir -p /tmp/go
- uses: actions/checkout@v2
with:
path: "./ipld-ethcl-indexer"
- uses: actions/checkout@v3
with:
ref: ${{ env.stack-orchestrator-ref }}
path: "./stack-orchestrator/"
repository: vulcanize/stack-orchestrator
fetch-depth: 0
- uses: actions/checkout@v3
with:
ref: ${{ env.ipld-eth-db-ref }}
repository: vulcanize/ipld-eth-db
path: "./ipld-eth-db/"
fetch-depth: 0
- uses: actions/checkout@v3
with:
ref: ${{ env.ssz-data-ref }}
repository: vulcanize/ssz-data
path: "./ipld-ethcl-indexer/pkg/beaconclient/ssz-data"
fetch-depth: 0
- name: Create config file
run: |
echo vulcanize_ipld_eth_db=$GITHUB_WORKSPACE/ipld-eth-db/ > ./config.sh
echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh
cat ./config.sh
- name: Run docker compose
run: |
docker-compose \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-db.yml" \
--env-file ./config.sh \
up -d --build
- uses: actions/setup-go@v3
with:
go-version: ">=1.17.0"
@ -83,10 +141,6 @@ jobs:
integration-test:
name: Run Integration Tests
runs-on: ubuntu-latest
env:
foundry-test-ref: feature/build-stack
ipld-eth-db-ref: main
GOPATH: /tmp/go
steps:
- name: Create GOPATH
run: mkdir -p /tmp/go
@ -97,9 +151,9 @@ jobs:
- uses: actions/checkout@v3
with:
ref: ${{ env.foundry-test-ref }}
path: "./foundry-test/"
repository: vulcanize/foundry-test
ref: ${{ env.stack-orchestrator-ref }}
path: "./stack-orchestrator/"
repository: vulcanize/stack-orchestrator
fetch-depth: 0
- uses: actions/checkout@v3
@ -113,13 +167,14 @@ jobs:
run: |
echo vulcanize_ipld_eth_db=$GITHUB_WORKSPACE/ipld-eth-db/ > ./config.sh
echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh
echo ethcl_capture_mode=boot >> ./config.sh
cat ./config.sh
- name: Run docker compose
run: |
docker-compose \
-f "$GITHUB_WORKSPACE/foundry-test/docker/local/docker-compose-db.yml" \
-f "$GITHUB_WORKSPACE/foundry-test/docker/latest/docker-compose-lighthouse.yml" \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-db.yml" \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/latest/docker-compose-lighthouse.yml" \
--env-file ./config.sh \
up -d --build

41
.github/workflows/on-publish.yml vendored Normal file
View File

@ -0,0 +1,41 @@
name: Publish Docker image
on:
release:
types: [published, edited]
jobs:
build:
name: Run docker build
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Get the version
id: vars
run: echo ::set-output name=sha::$(echo ${GITHUB_SHA:0:7})
- name: Run docker build
run: make docker-build
- name: Tag docker image
run: docker tag vulcanize/ipld-ethcl-indexer docker.pkg.github.com/vulcanize/ipld-ethcl-indexer/ipld-ethcl-indexer:${{steps.vars.outputs.sha}}
- name: Docker Login
run: echo ${{ secrets.GITHUB_TOKEN }} | docker login https://docker.pkg.github.com -u vulcanize --password-stdin
- name: Docker Push
run: docker push docker.pkg.github.com/vulcanize/ipld-ethcl-indexer/ipld-ethcl-indexer:${{steps.vars.outputs.sha}}
push_to_registries:
name: Push Docker image to Docker Hub
runs-on: ubuntu-latest
needs: build
steps:
- name: Get the version
id: vars
run: |
echo ::set-output name=sha::$(echo ${GITHUB_SHA:0:7})
echo ::set-output name=tag::$(echo ${GITHUB_REF#refs/tags/})
- name: Docker Login to Github Registry
run: echo ${{ secrets.GITHUB_TOKEN }} | docker login https://docker.pkg.github.com -u vulcanize --password-stdin
- name: Docker Pull
run: docker pull docker.pkg.github.com/vulcanize/ipld-ethcl-indexer/ipld-ethcl-indexer:${{steps.vars.outputs.sha}}
- name: Docker Login to Docker Registry
run: echo ${{ secrets.VULCANIZEJENKINS_PAT }} | docker login -u vulcanizejenkins --password-stdin
- name: Tag docker image
run: docker tag docker.pkg.github.com/vulcanize/ipld-ethcl-indexer/ipld-ethcl-indexer:${{steps.vars.outputs.sha}} vulcanize/ipld-ethcl-indexer:${{steps.vars.outputs.tag}}
- name: Docker Push to Docker Hub
run: docker push vulcanize/ipld-ethcl-indexer:${{steps.vars.outputs.tag}}

3
.gitignore vendored
View File

@ -3,3 +3,6 @@ ipld-ethcl-indexer
ipld-ethcl-indexer.log
report.json
cover.profile
temp/*
pkg/beaconclient/ssz-data/
*.test

View File

@ -9,7 +9,7 @@ COPY go.sum .
RUN go mod tidy; go mod download
COPY . .
RUN GCO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-extldflags "-static"' -o ipld-ethcl-indexer .
RUN GCO_ENABLED=0 GOOS=linux go build -race -a -installsuffix cgo -ldflags '-extldflags "-static"' -o ipld-ethcl-indexer .
RUN chmod +x ipld-ethcl-indexer
FROM frolvlad/alpine-bash:latest

View File

@ -30,6 +30,17 @@ integration-test-ci:
--cover --coverprofile=cover.profile \
--race --trace --json-report=report.json
.PHONY: integration-test-ci-no-race
integration-test-ci-no-race:
go vet ./...
go fmt ./...
$(GINKGO) -r --label-filter integration \
--procs=4 --compilers=4 \
--randomize-all --randomize-suites \
--fail-on-pending --keep-going \
--cover --coverprofile=cover.profile \
--trace --json-report=report.json
.PHONY: integration-test-local
integration-test-local:
go vet ./...
@ -38,28 +49,36 @@ integration-test-local:
--procs=4 --compilers=4 \
--randomize-all --randomize-suites \
--fail-on-pending --keep-going \
--race --trace
--trace --race
.PHONY: integration-test-local-no-race
integration-test-local-no-race:
go vet ./...
go fmt ./...
$(GINKGO) -r --label-filter integration \
--procs=4 --compilers=4 \
--randomize-all --randomize-suites \
--fail-on-pending --keep-going \
--trace
.PHONY: unit-test-local
unit-test-local:
go vet ./...
go fmt ./...
$(GINKGO) -r --label-filter unit \
--procs=4 --compilers=4 \
--randomize-all --randomize-suites \
--fail-on-pending --keep-going \
--race --trace
--trace
.PHONY: unit-test-ci
unit-test-ci:
go vet ./...
go fmt ./...
$(GINKGO) -r --label-filter unit \
--procs=4 --compilers=4 \
--randomize-all --randomize-suites \
--fail-on-pending --keep-going \
--cover --coverprofile=cover.profile \
--race --trace --json-report=report.json
--trace --json-report=report.json
.PHONY: build

View File

@ -14,12 +14,30 @@ This application will capture all the `BeaconState`'s and `SignedBeaconBlock`'s
To learn more about the applications individual components, please read the [application components](/application_component.md).
# Running the Application
# Quick Start
To run the application, utilize the following command, and update the values as needed.
## Running the Application
To run the application, do as follows:
1. Setup the prerequisite applications.
a. Run a beacon client (such as lighthouse).
b. Run a postgres DB.
c. You can utilize the `stack-orchestrator` [repository](https://github.com/vulcanize/stack-orchestrato).
```
./wrapper.sh -e skip \
-d ../docker/local/docker-compose-db.yml \
-d ../docker/latest/docker-compose-lighthouse.yml \
-v remove \
-p ../local-config.sh
```
2. Run the start up command.
```
go run main.go capture head --db.address localhost \
go run -race main.go capture head --db.address localhost \
--db.password password \
--db.port 8077 \
--db.username vdbm \
@ -27,9 +45,21 @@ go run main.go capture head --db.address localhost \
--db.driver PGX \
--bc.address localhost \
--bc.port 5052 \
--log.level info
--bc.connectionProtocol http \
--t.skipSync=true \
--log.level info \
--log.output=true \
--kg.increment 100
```
## Running Tests
To run tests, you will need to clone another repository which contains all the ssz files.
1. `git clone git@github.com:vulcanize/ssz-data.git pkg/beaconclient/ssz-data`
2. To run unit tests, make sure you have a DB running: `make unit-test-local`
3. To run integration tests, make sure you have a lighthouse client and a DB running: `make integration-test-local-no-race` .
# Development Patterns
This section will cover some generic development patterns utilizes.
@ -58,9 +88,18 @@ This project utilizes `ginkgo` for testing. A few notes on testing:
- All test packages are named `{base_package}_test`. This ensures we only test the public methods.
- If there is a need to test a private method, please include why in the testing file.
- Unit tests must contain the `Label("unit")`.
- Unit tests should not rely on any running service. If a running service is needed. Utilize an integration test.
- Unit tests should not rely on any running service (except for a postgres DB). If a running service is needed. Utilize an integration test.
- Integration tests must contain the `Label("integration")`.
#### Understanding Testing Components
A few notes about the testing components.
- The `TestEvents` map contains several events for testers to leverage when testing.
- Any object ending in `-dummy` is not a real object. You will also notice it has a present field called `MimicConfig`. This object will use an existing SSZ object, and update the parameters from the `Head` and `MimicConfig`.
- This is done because creating an empty or minimal `SignedBeaconBlock` and `BeaconState` is fairly challenging.
- By slightly modifying an existing object, we can test re-org, malformed objects, and other negative conditions.
# Contribution
If you want to contribute please make sure you do the following:

View File

@ -22,6 +22,33 @@ The `database` package allows us to interact with a postgres DB. We utilize the
This package will contain code to interact with the beacon client.
### Known Gaps
Known Gaps tracking is handled within this package. The columns are as follows:
- StartSlot - The start slot for known_gaps, inclusive.
- EndSlot - The end slot for known_gaps, inclusive.
- CheckedOut - Indicates if any process is currently processing this entry.
- ErrorMessage - Captures any error message that might have occurred when previously processing this entry.
- EntryTime - The time this range was added to the DB. This can help us catch ranges that have not been processed for a long time due to some error.
- EntryProcess - The entry process that added this process. Potential options are StartUp, Error, HeadGap.
- This can help us understand how a specific entry was added. It can be useful for debugging the application.
- StartUp - Gaps found when we started the application.
- Error - Indicates that the entry was added due to an error with processing.
- HeadGap - Indicates that gaps where found when keeping up with Head.
## `pkg/version`
A generic package which can be utilized to easily version our applications.
## `pkg/gracefulshutdown`
A generic package that can be used to shutdown various services within an application.
## `pkg/loghelper`
This package contains useful functions for logging.
## `internal/shutdown`
This package is used to shutdown the `ipld-ethcl-indexer`. It calls the `pkg/gracefulshutdown` package.

80
cmd/boot.go Normal file
View File

@ -0,0 +1,80 @@
/*
Copyright © 2022 Abdul Rabbani <abdulrabbani00@gmail.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package cmd
import (
"context"
"os"
"syscall"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// bootCmd represents the boot command
var bootCmd = &cobra.Command{
Use: "boot",
Short: "Run the boot command then exit",
Long: `Run the application to boot and exit. Primarily used for testing.`,
Run: func(cmd *cobra.Command, args []string) {
bootApp()
},
}
func bootApp() {
// Boot the application
log.Info("Starting the application in boot mode.")
ctx := context.Background()
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, testDisregardSync)
if err != nil {
loghelper.LogError(err).Error("Unable to Start application")
}
log.Info("Boot complete, we are going to shutdown.")
notifierCh := make(chan os.Signal, 1)
go func() {
notifierCh <- syscall.SIGTERM
}()
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC)
if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
} else {
log.Info("Gracefully shutdown ipld-ethcl-indexer")
}
}
func init() {
captureCmd.AddCommand(bootCmd)
// Here you will define your flags and configuration settings.
// Cobra supports Persistent Flags which will work for this command
// and all subcommands, e.g.:
// bootCmd.PersistentFlags().String("foo", "", "A help for foo")
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// bootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
}

View File

@ -22,7 +22,10 @@ var (
bcAddress string
bcPort int
bcConnectionProtocol string
bcType string
maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second
notifierCh chan os.Signal = make(chan os.Signal, 1)
testDisregardSync bool
)
// captureCmd represents the capture command
@ -61,14 +64,18 @@ func init() {
exitErr(err)
//// Beacon Client Specific
captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required if username is set)")
captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required if username is set)")
captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required)")
captureCmd.PersistentFlags().StringVarP(&bcType, "bc.type", "", "lighthouse", "The beacon client we are using, options are prysm and lighthouse.")
captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required )")
captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.")
err = captureCmd.MarkPersistentFlagRequired("bc.address")
exitErr(err)
err = captureCmd.MarkPersistentFlagRequired("bc.port")
exitErr(err)
//// Testing Specific
captureCmd.PersistentFlags().BoolVar(&testDisregardSync, "t.skipSync", false, "Should we disregard the head sync?")
// Bind Flags with Viper
//// DB Flags
err = viper.BindPFlag("db.username", captureCmd.PersistentFlags().Lookup("db.username"))
@ -81,12 +88,18 @@ func init() {
exitErr(err)
err = viper.BindPFlag("db.name", captureCmd.PersistentFlags().Lookup("db.name"))
exitErr(err)
err = viper.BindPFlag("db.driver", captureCmd.PersistentFlags().Lookup("db.driver"))
err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync"))
exitErr(err)
// Testing Specific
err = viper.BindPFlag("t.driver", captureCmd.PersistentFlags().Lookup("db.driver"))
exitErr(err)
// LH specific
err = viper.BindPFlag("bc.address", captureCmd.PersistentFlags().Lookup("bc.address"))
exitErr(err)
err = viper.BindPFlag("bc.type", captureCmd.PersistentFlags().Lookup("bc.type"))
exitErr(err)
err = viper.BindPFlag("bc.port", captureCmd.PersistentFlags().Lookup("bc.port"))
exitErr(err)
err = viper.BindPFlag("bc.connectionProtocol", captureCmd.PersistentFlags().Lookup("bc.connectionProtocol"))

View File

@ -6,15 +6,20 @@ package cmd
import (
"context"
"time"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
var (
kgTableIncrement int
)
// headCmd represents the head command
var headCmd = &cobra.Command{
Use: "head",
@ -31,28 +36,34 @@ func startHeadTracking() {
log.Info("Starting the application in head tracking mode.")
ctx := context.Background()
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, testDisregardSync)
if err != nil {
loghelper.LogError(err).Error("Unable to Start application")
if DB != nil {
DB.Close()
}
os.Exit(1)
}
log.Info("The Beacon Client has booted successfully!")
// Capture head blocks
go BC.CaptureHead()
go BC.CaptureHead(kgTableIncrement)
// Shutdown when the time is right.
shutdown.ShutdownServices(ctx, time.Duration(maxWaitSecondsShutdown), DB, BC)
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC)
if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
} else {
log.Info("Gracefully shutdown ipld-ethcl-indexer")
}
}
func init() {
captureCmd.AddCommand(headCmd)
// Here you will define your flags and configuration settings.
// Cobra supports Persistent Flags which will work for this command
// and all subcommands, e.g.:
// headCmd.PersistentFlags().String("foo", "", "A help for foo")
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// headCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
// Known Gaps specific
captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.")
err := viper.BindPFlag("kg.increment", captureCmd.PersistentFlags().Lookup("kg.increment"))
exitErr(err)
}

View File

@ -1,7 +1,9 @@
#!/bin/bash
sleep 10
echo "Starting ipld-ethcl-indexer"
echo /root/ipld-ethcl-indexer capture head --db.address $DB_ADDRESS \
echo /root/ipld-ethcl-indexer capture ${CAPTURE_MODE} --db.address $DB_ADDRESS \
--db.password $DB_PASSWORD \
--db.port $DB_PORT \
--db.username $DB_USER \
@ -9,9 +11,11 @@ echo /root/ipld-ethcl-indexer capture head --db.address $DB_ADDRESS \
--db.driver $DB_DRIVER \
--bc.address $BC_ADDRESS \
--bc.port $BC_PORT \
--log.level $LOG_LEVEL
--log.level $LOG_LEVEL\
--t.skipSync=$SKIP_SYNC \
--kg.increment $KNOWN_GAP_INCREMENT
/root/ipld-ethcl-indexer capture head --db.address $DB_ADDRESS \
/root/ipld-ethcl-indexer capture ${CAPTURE_MODE} --db.address $DB_ADDRESS \
--db.password $DB_PASSWORD \
--db.port $DB_PORT \
--db.username $DB_USER \
@ -19,13 +23,18 @@ echo /root/ipld-ethcl-indexer capture head --db.address $DB_ADDRESS \
--db.driver $DB_DRIVER \
--bc.address $BC_ADDRESS \
--bc.port $BC_PORT \
--log.level $LOG_LEVEL
--log.level $LOG_LEVEL \
--t.skipSync=$SKIP_SYNC \
--kg.increment $KNOWN_GAP_INCREMENT
rv=$?
if [ $rv != 0 ]; then
echo "ipld-ethcl-indexer startup failed"
exit 1
echo 1 > /root/HEALTH
else
echo "ipld-ethcl-indexer startup succeeded"
echo 0 > /root/HEALTH
fi
tail -f /dev/null

75
go.mod
View File

@ -3,42 +3,80 @@ module github.com/vulcanize/ipld-ethcl-indexer
go 1.18
require (
github.com/jackc/pgconn v1.11.0
github.com/onsi/ginkgo/v2 v2.1.3
github.com/ipfs/go-ipfs-blockstore v1.2.0
github.com/ipfs/go-ipfs-ds-help v1.1.0
github.com/jackc/pgconn v1.12.0
github.com/multiformats/go-multihash v0.1.0
github.com/onsi/ginkgo/v2 v2.1.4
github.com/onsi/gomega v1.19.0
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc
github.com/sirupsen/logrus v1.8.1
)
require (
github.com/ethereum/go-ethereum v1.10.17 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.10.0 // indirect
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-block-format v0.0.3 // indirect
github.com/ipfs/go-cid v0.1.0 // indirect
github.com/ipfs/go-datastore v0.5.0 // indirect
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
github.com/ipfs/go-ipld-format v0.3.0 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-log/v2 v2.5.0 // indirect
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.2.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgtype v1.10.0 // indirect
github.com/jackc/pgtype v1.11.0 // indirect
github.com/jackc/puddle v1.2.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lib/pq v1.10.4 // indirect
github.com/minio/sha256-simd v0.1.1 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/klauspost/cpuid/v2 v2.0.12 // indirect
github.com/lib/pq v1.10.5 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.0.4 // indirect
github.com/multiformats/go-base36 v0.1.0 // indirect
github.com/multiformats/go-multibase v0.0.3 // indirect
github.com/multiformats/go-varint v0.0.6 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect
github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/thomaso-mirodin/intmath v0.0.0-20160323211736-5dc6d854e46e // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f // indirect
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect
google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3 // indirect
google.golang.org/grpc v1.46.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
lukechampine.com/blake3 v1.1.7 // indirect
)
require (
github.com/ferranbt/fastssz v0.0.0-20220303160658-88bb965b6747
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/ferranbt/fastssz v0.0.0-20220303160658-88bb965b6747 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/georgysavva/scany v0.3.0
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jackc/pgx/v4 v4.15.0
github.com/jackc/pgx/v4 v4.16.0
github.com/jarcoal/httpmock v1.2.0
github.com/magiconair/properties v1.8.6 // indirect
github.com/mitchellh/mapstructure v1.4.3 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pelletier/go-toml/v2 v2.0.0-beta.8 // indirect
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.0 // indirect
github.com/prysmaticlabs/prysm v1.4.2-0.20220504145118-df695346a53c
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/cobra v1.4.0
@ -46,7 +84,8 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.11.0
github.com/subosito/gotenv v1.2.0 // indirect
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/ini.v1 v1.66.4 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect

576
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -2,13 +2,13 @@ package boot
import (
"context"
"fmt"
"time"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql/postgres"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
var (
@ -18,36 +18,6 @@ var (
BC *beaconclient.BeaconClient = &beaconclient.BeaconClient{}
)
// A simple wrapper to create a DB object to use.
func SetupPostgresDb(dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string) (sql.Database, error) {
log.Debug("Resolving Driver Type")
DbDriver, err := postgres.ResolveDriverType(driverName)
if err != nil {
log.WithFields(log.Fields{
"err": err,
"driver_name_provided": driverName,
}).Error("Can't resolve driver type")
}
log.Info("Using Driver:", DbDriver)
postgresConfig := postgres.Config{
Hostname: dbHostname,
Port: dbPort,
DatabaseName: dbName,
Username: dbUsername,
Password: dbPassword,
Driver: DbDriver,
}
DB, err = postgres.NewPostgresDB(postgresConfig)
if err != nil {
loghelper.LogError(err).Error("Unable to connect to the DB")
return nil, err
}
return DB, err
}
// This function will perform some boot operations. If any steps fail, the application will fail to start.
// Keep in mind that the DB connection can be lost later in the lifecycle of the application or
// it might not be able to connect to the beacon client.
@ -56,7 +26,8 @@ func SetupPostgresDb(dbHostname string, dbPort int, dbName string, dbUsername st
//
// 2. Connect to the database.
//
func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string) (*beaconclient.BeaconClient, sql.Database, error) {
// 3. Make sure the node is synced, unless disregardSync is true.
func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
log.Info("Booting the Application")
log.Debug("Creating the Beacon Client")
@ -69,23 +40,43 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName
}
log.Debug("Setting up DB connection")
DB, err := SetupPostgresDb(dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName)
DB, err = postgres.SetupPostgresDb(dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName)
if err != nil {
return nil, nil, err
}
BC.Db = DB
var status bool
if !disregardSync {
status, err = BC.CheckHeadSync()
if err != nil {
log.Error("Unable to get the nodes sync status")
return BC, DB, err
}
if status {
log.Error("The node is still syncing..")
err = fmt.Errorf("The node is still syncing.")
return BC, DB, err
}
} else {
log.Warn("We are not checking to see if the node has synced to head.")
}
return BC, DB, nil
}
// Add retry logic to ensure that we are give the Beacon Client and the DB time to start.
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string) (*beaconclient.BeaconClient, sql.Database, error) {
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
var err error
for i := 0; i < maxRetry; i++ {
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol)
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol, disregardSync)
if err != nil {
log.WithFields(log.Fields{
"retryNumber": i,
"err": err,
}).Warn("Unable to boot application. Going to try again")
time.Sleep(time.Duration(retryInterval) * time.Second)
continue
}
break
}

View File

@ -21,29 +21,36 @@ var _ = Describe("Boot", func() {
bcConnectionProtocol string = "http"
)
Describe("Booting the application", Label("integration"), func() {
Context("When the DB and BC are both up and running", func() {
Context("When the DB and BC are both up and running, and we skip checking for a synced head", func() {
It("Should connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true)
defer db.Close()
Expect(err).To(BeNil())
Expect(err).ToNot(HaveOccurred())
})
})
Context("When the DB and BC are both up and running, and we check for a synced head", func() {
It("Should not connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, false)
defer db.Close()
Expect(err).To(HaveOccurred())
})
})
Context("When the DB is running but not the BC", func() {
It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol)
Expect(err).ToNot(BeNil())
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true)
Expect(err).To(HaveOccurred())
})
})
Context("When the BC is running but not the DB", func() {
It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
Expect(err).ToNot(BeNil())
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true)
Expect(err).To(HaveOccurred())
})
})
Context("When neither the BC or DB are running", func() {
It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol)
Expect(err).ToNot(BeNil())
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true)
Expect(err).To(HaveOccurred())
})
})
})

View File

@ -2,9 +2,9 @@ package shutdown
import (
"context"
"os"
"time"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown"
@ -12,16 +12,11 @@ import (
)
// Shutdown all the internal services for the application.
func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) {
successCh, errCh := gracefulshutdown.Shutdown(ctx, waitTime, map[string]gracefulshutdown.Operation{
"database": func(ctx context.Context) error {
err := DB.Close()
if err != nil {
loghelper.LogError(err).Error("Unable to close the DB")
}
return err
},
func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
successCh, errCh := gracefulshutdown.Shutdown(ctx, notifierCh, waitTime, map[string]gracefulshutdown.Operation{
// Combining DB shutdown with BC because BC needs DB open to cleanly shutdown.
"beaconClient": func(ctx context.Context) error {
defer DB.Close()
err := BC.StopHeadTracking()
if err != nil {
loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking")
@ -31,9 +26,9 @@ func ShutdownServices(ctx context.Context, waitTime time.Duration, DB sql.Databa
})
select {
case _ = <-successCh:
log.Info("Gracefully Shutdown ipld-ethcl-indexer!")
case <-successCh:
return nil
case err := <-errCh:
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
return err
}
}

View File

@ -0,0 +1,13 @@
package shutdown_test
import (
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestShutdown(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Shutdown Suite")
}

View File

@ -0,0 +1,118 @@
//go:build !race
// +build !race
package shutdown_test
import (
"context"
"os"
"syscall"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/r3labs/sse"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown"
)
var _ = Describe("Shutdown", func() {
var (
dbAddress string = "localhost"
dbPort int = 8077
dbName string = "vulcanize_testing"
dbUsername string = "vdbm"
dbPassword string = "password"
dbDriver string = "PGX"
bcAddress string = "localhost"
bcPort int = 5052
bcConnectionProtocol string = "http"
maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second
DB sql.Database
BC *beaconclient.BeaconClient
err error
ctx context.Context
notifierCh chan os.Signal
)
BeforeEach(func() {
ctx = context.Background()
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true)
notifierCh = make(chan os.Signal, 1)
Expect(err).To(BeNil())
})
Describe("Run Shutdown Function,", Label("integration"), func() {
Context("When Channels are empty,", func() {
It("Should Shutdown Successfully.", func() {
go func() {
log.Debug("Starting shutdown chan")
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...")
Expect(err).ToNot(HaveOccurred())
}()
})
})
Context("When the Channels are not empty,", func() {
It("Should try to clear them and shutdown gracefully.", func() {
shutdownCh := make(chan bool)
//log.SetLevel(log.DebugLevel)
go func() {
log.Debug("Starting shutdown chan")
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...")
Expect(err).ToNot(HaveOccurred())
shutdownCh <- true
}()
messageAddCh := make(chan bool)
go func() {
log.Debug("Adding messages to Channels")
BC.HeadTracking.MessagesCh <- &sse.Event{}
//BC.FinalizationTracking.MessagesCh <- &sse.Event{}
BC.ReOrgTracking.MessagesCh <- &sse.Event{}
log.Debug("Message adding complete")
messageAddCh <- true
}()
go func() {
<-messageAddCh
log.Debug("Calling SIGTERM")
notifierCh <- syscall.SIGTERM
log.Debug("Reading messages from channel")
<-BC.HeadTracking.MessagesCh
//<-BC.FinalizationTracking.MessagesCh
<-BC.ReOrgTracking.MessagesCh
}()
<-shutdownCh
})
It("Should try to clear them, if it can't, shutdown within a given time frame.", func() {
shutdownCh := make(chan bool)
//log.SetLevel(log.DebugLevel)
go func() {
log.Debug("Starting shutdown chan")
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...")
Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String())))
shutdownCh <- true
}()
go func() {
log.Debug("Adding messages to Channels")
BC.HeadTracking.MessagesCh <- &sse.Event{}
//BC.FinalizationTracking.MessagesCh <- &sse.Event{}
BC.ReOrgTracking.MessagesCh <- &sse.Event{}
log.Debug("Message adding complete")
log.Debug("Calling SIGHUP")
notifierCh <- syscall.SIGTERM
}()
<-shutdownCh
})
})
})
})

View File

@ -6,26 +6,49 @@ import (
"github.com/r3labs/sse"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
)
// TODO: Use prysms config values instead of hardcoding them here.
var (
bcHealthEndpoint = "/eth/v1/node/health" // Endpoint used for the healthcheck
bcHeadTopicEndpoint = "/eth/v1/events?topics=head" // Endpoint used to subscribe to the head of the chain
BcHeadTopicEndpoint = "/eth/v1/events?topics=head" // Endpoint used to subscribe to the head of the chain
bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain
bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
bcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks
bcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States
BcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks
BcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States
BcSyncStatusEndpoint = "/eth/v1/node/syncing"
BcBlockRootEndpoint = func(slot string) string {
return "/eth/v1/beacon/blocks/" + slot + "/root"
}
bcSlotsPerEpoch = 32 // Number of slots in a single Epoch
//bcSlotPerHistoricalVector = 8192 // The number of slots in a historic vector.
//bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
)
// A structure utilized for keeping track of various metrics. Currently, mostly used in testing.
type BeaconClientMetrics struct {
HeadTrackingInserts uint64 // Number of head events we wrote to the DB.
HeadTrackingReorgs uint64 // The number of reorg events written to the DB.
}
// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying.
type BeaconClient struct {
Context context.Context // A context generic context with multiple uses.
ServerEndpoint string // What is the endpoint of the beacon server.
PerformHeadTracking bool // Should we track head?
PerformHistoricalProcessing bool // Should we perform historical processing?
Db sql.Database // Database object used for reads and writes.
Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics.
KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
// Used for Head Tracking
PerformHeadTracking bool // Should we track head?
StartingSlot int // If we're performing head tracking. What is the first slot we processed.
PreviousSlot int // Whats the previous slot we processed
PreviousBlockRoot string // Whats the previous block root, used to check the next blocks parent.
CheckKnownGaps bool // Should we check for gaps at start up.
HeadTracking *SseEvents[Head] // Track the head block
ReOrgTracking *SseEvents[ChainReorg] // Track all Reorgs
FinalizationTracking *SseEvents[FinalizedCheckpoint] // Track all finalization checkpoints
//FinalizationTracking *SseEvents[FinalizedCheckpoint] // Track all finalization checkpoints
}
// A struct to keep track of relevant the head event topic.
@ -50,9 +73,13 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres
return &BeaconClient{
Context: ctx,
ServerEndpoint: endpoint,
HeadTracking: createSseEvent[Head](endpoint, bcHeadTopicEndpoint),
HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint),
ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
Metrics: &BeaconClientMetrics{
HeadTrackingInserts: 0,
HeadTrackingReorgs: 0,
},
//FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
}
}
@ -61,7 +88,7 @@ func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEve
endpoint := baseEndpoint + path
sseEvents := &SseEvents[P]{
Endpoint: endpoint,
MessagesCh: make(chan *sse.Event),
MessagesCh: make(chan *sse.Event, 1),
ErrorCh: make(chan *SseError),
ProcessCh: make(chan *P),
SseClient: func(endpoint string) *sse.Client {

View File

@ -5,48 +5,19 @@ package beaconclient
import (
"time"
"github.com/ferranbt/fastssz/spectests"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHead() {
func (bc *BeaconClient) CaptureHead(knownGapsTableIncrement int) {
bc.KnownGapTableIncrement = knownGapsTableIncrement
log.Info("We are tracking the head of the chain.")
bc.tempHelper()
// go bc.handleHead()
// go bc.handleFinalizedCheckpoint()
// go bc.handleReorgs()
// bc.captureEventTopic()
}
// A temporary helper function to see the output of beacon block and states.
func (bc *BeaconClient) tempHelper() {
slot := "3200"
blockEndpoint := bc.ServerEndpoint + bcBlockQueryEndpoint + slot
stateEndpoint := bc.ServerEndpoint + bcStateQueryEndpoint + slot
// Query
log.Info("Get")
blockSsz, _ := querySsz(blockEndpoint, slot)
stateSsz, _ := querySsz(stateEndpoint, slot)
// Transform
log.Info("Tranform")
stateObj := new(spectests.BeaconState)
err := stateObj.UnmarshalSSZ(stateSsz)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
}
blockObj := new(spectests.SignedBeaconBlock)
err = blockObj.UnmarshalSSZ(blockSsz)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
}
// Check
log.Info("Check")
log.Info("State Slot: ", stateObj.Slot)
log.Info("Block Slot: ", blockObj.Block.Slot)
//bc.tempHelper()
go bc.handleHead()
//go bc.handleFinalizedCheckpoint()
go bc.handleReorg()
bc.captureEventTopic()
}
// Stop the head tracking service.
@ -54,14 +25,11 @@ func (bc *BeaconClient) StopHeadTracking() error {
log.Info("We are going to stop tracking the head of chain because of the shutdown signal.")
chHead := make(chan bool)
chReorg := make(chan bool)
chFinal := make(chan bool)
go bc.HeadTracking.finishProcessingChannel(chHead)
go bc.ReOrgTracking.finishProcessingChannel(chReorg)
go bc.FinalizationTracking.finishProcessingChannel(chFinal)
<-chHead
<-chFinal
<-chReorg
log.Info("Successfully stopped the head tracking service.")
return nil
@ -71,7 +39,7 @@ func (bc *BeaconClient) StopHeadTracking() error {
func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) {
loghelper.LogEndpoint(se.Endpoint).Info("Received a close event.")
se.SseClient.Unsubscribe(se.MessagesCh)
for len(se.MessagesCh) != 0 && len(se.ProcessCh) != 0 {
for len(se.MessagesCh) != 0 || len(se.ProcessCh) != 0 {
time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond)
}
loghelper.LogEndpoint(se.Endpoint).Info("Done processing all messages, ready for shutdown")

View File

@ -0,0 +1,541 @@
package beaconclient_test
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"os"
"path/filepath"
"strconv"
"sync/atomic"
"time"
"github.com/jarcoal/httpmock"
. "github.com/onsi/ginkgo/v2"
types "github.com/prysmaticlabs/prysm/consensus-types/primitives"
st "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/r3labs/sse"
log "github.com/sirupsen/logrus"
. "github.com/onsi/gomega"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql/postgres"
)
type Message struct {
HeadMessage beaconclient.Head // The head messsage that will be streamed to the BeaconClient
TestNotes string // A small explanation of the purpose this structure plays in the testing landscape.
MimicConfig *MimicConfig // A configuration of parameters that you are trying to
SignedBeaconBlock string // The file path output of an SSZ encoded SignedBeaconBlock.
BeaconState string // The file path output of an SSZ encoded BeaconState.
}
// A structure that can be utilized to mimic and existing SSZ object but change it ever so slightly.
// This is used because creating your own SSZ object is a headache.
type MimicConfig struct {
ParentRoot string // The parent root, leave it empty if you want a to use the universal
}
var _ = Describe("Capturehead", func() {
var (
TestConfig Config
BeaconNodeTester TestBeaconNode
address string = "localhost"
port int = 8080
protocol string = "http"
TestEvents map[string]Message
dbHost string = "localhost"
dbPort int = 8077
dbName string = "vulcanize_testing"
dbUser string = "vdbm"
dbPassword string = "password"
dbDriver string = "pgx"
dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42"
knownGapsTableIncrement int = 100000
maxRetry int = 30
)
BeforeEach(func() {
TestEvents = map[string]Message{
"100-dummy": {
HeadMessage: beaconclient.Head{
Slot: "100",
Block: "04955400371347e26f61d7a4bbda5b23fa0b25d5fc465160f2a92d52a63b919b",
State: "36d5c9a129979b4502bd9a06e57a742810ecbc3fa55a0361c0723c92c1782bfa",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "A block that is supposed to replicate slot 100, but contains some dummy test information.",
MimicConfig: &MimicConfig{},
SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
},
"100-dummy-2": {
HeadMessage: beaconclient.Head{
Slot: "100",
Block: "04955400371347e26f61d7a4bbda5b23fa0b25d5fc465160f2a9aaaaaaaaaaaa",
State: "36d5c9a129979b4502bd9a06e57a742810ecbc3fa55a0361c072bbbbbbbbbbbb",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "A block that is supposed to replicate slot 100, but contains some dummy test information.",
MimicConfig: &MimicConfig{},
SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
},
"100": {
HeadMessage: beaconclient.Head{
Slot: "100",
Block: "0x582187e97f7520bb69eea014c3834c964c45259372a0eaaea3f032013797996b",
State: "0xf286a0379c0386a3c7be28d05d829f8eb7b280cc9ede15449af20ebcd06a7a56",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "An easy to process Phase 0 block",
SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
},
"2375703-dummy": {
HeadMessage: beaconclient.Head{
Slot: "2375703",
Block: "c9fb337b62e2a0dae4f27ab49913132570f7f2cab3f23ad99f4d07508a8e648e",
State: "0299a145bcda2c8f5e7d2e068ee101861edbee2ec1db2d5e1d850b0d265aef5f",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "This is a dummy message that is used for reorgs",
MimicConfig: &MimicConfig{},
SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
},
"2375703-dummy-2": {
HeadMessage: beaconclient.Head{
Slot: "2375703",
Block: "c9fb337b62e2a0dae4f27ab49913132570f7f2cab3f23ad99f4d07508aaaaaaa",
State: "0299a145bcda2c8f5e7d2e068ee101861edbee2ec1db2d5e1d850b0d2bbbbbbb",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "This is a dummy message that is used for reorgs",
MimicConfig: &MimicConfig{},
SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
},
"2375703": {
HeadMessage: beaconclient.Head{
Slot: "2375703",
Block: "0x4392372c5f6e39499e31bf924388b5815639103149f0f54f8a453773b1802301",
State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e",
CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false},
TestNotes: "An easy to process Altair Block",
SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
},
}
TestConfig = Config{
protocol: protocol,
address: address,
port: port,
dummyParentRoot: dummyParentRoot,
dbHost: dbHost,
dbPort: dbPort,
dbName: dbName,
dbUser: dbUser,
dbPassword: dbPassword,
dbDriver: dbDriver,
knownGapsTableIncrement: knownGapsTableIncrement,
}
BeaconNodeTester = TestBeaconNode{
TestEvents: TestEvents,
TestConfig: TestConfig,
}
})
// We might also want to add an integration test that will actually process a single event, then end.
// This will help us know that our models match that actual data being served from the beacon node.
Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() {
Context("Correctly formatted Phase0 Block", func() {
It("Should turn it into a struct successfully.", func() {
BeaconNodeTester.testProcessBlock(BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry)
})
})
Context("Correctly formatted Altair Block", func() {
It("Should turn it into a struct successfully.", func() {
BeaconNodeTester.testProcessBlock(BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry)
})
})
//Context("A single incorrectly formatted head message", func() {
// It("Should create an error, maybe also add the projected slot to the knownGaps table......")
// If it can unmarshal the head add it to knownGaps
//})
//Context("An incorrectly formatted message sandwiched between correctly formatted messages", func() {
// It("Should create an error, maybe also add the projected slot to the knownGaps table......")
//})
// Context("When there is a skipped slot", func() {
// It("Should indicate that the slot was skipped")
// })
// Context("When the slot is not properly served", func() {
// It("Should return an error, and add the slot to the knownGaps table.")
// })
//})
// Context("With gaps in between head slots", func() {
// It("Should add the slots in between to the knownGaps table")
// })
// Context("With the previousBlockHash not matching the parentBlockHash", func() {
// It("Should recognize the reorg and add the previous slot to knownGaps table.")
// })
// Context("Out of order", func() {
// It("Not sure what it should do....")
// })
})
Describe("ReOrg Scenario", Label("unit", "behavioral"), func() {
Context("Altair: Multiple head messages for the same slot.", func() {
It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
BeaconNodeTester.testMultipleHead(TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry)
})
})
Context("Phase0: Multiple head messages for the same slot.", func() {
It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
BeaconNodeTester.testMultipleHead(TestEvents["100-dummy"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry)
})
})
Context("Phase 0: Multiple reorgs have occurred on this slot", Label("new"), func() {
It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
BeaconNodeTester.testMultipleReorgs(TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry)
})
})
Context("Altair: Multiple reorgs have occurred on this slot", Label("new"), func() {
It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
BeaconNodeTester.testMultipleReorgs(TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry)
})
})
//Context("Reorg slot in not already in the DB", func() {
// It("Should simply have the correct slot in the DB.")
// Add to knowngaps
//})
})
})
type Config struct {
protocol string
address string
port int
dummyParentRoot string
dbHost string
dbPort int
dbName string
dbUser string
dbPassword string
dbDriver string
knownGapsTableIncrement int
}
//////////////////////////////////////////////////////
// Helper functions
//////////////////////////////////////////////////////
// Must run before each test. We can't use the beforeEach because of the way
// Gingko treats race conditions.
func setUpTest(config Config) *beaconclient.BeaconClient {
bc := *beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port)
db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver)
Expect(err).ToNot(HaveOccurred())
// Drop all records from the DB.
clearEthclDbTables(db)
bc.Db = db
return &bc
}
// A helper function to validate the expected output.
func validateSlot(bc *beaconclient.BeaconClient, headMessage *beaconclient.Head, correctEpoch int, correctStatus string) {
epoch, dbSlot, blockRoot, stateRoot, status := queryDbSlotAndBlock(bc.Db, headMessage.Slot, headMessage.Block)
baseSlot, err := strconv.Atoi(headMessage.Slot)
Expect(err).ToNot(HaveOccurred())
Expect(dbSlot).To(Equal(baseSlot))
Expect(epoch).To(Equal(correctEpoch))
Expect(blockRoot).To(Equal(headMessage.Block))
Expect(stateRoot).To(Equal(headMessage.State))
Expect(status).To(Equal(correctStatus))
}
// Wrapper function to send a head message to the beaconclient
func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxRetry int) {
data, err := json.Marshal(head)
Expect(err).ToNot(HaveOccurred())
startInserts := atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts)
bc.HeadTracking.MessagesCh <- &sse.Event{
ID: []byte{},
Data: data,
Event: []byte{},
Retry: []byte{},
}
curRetry := 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts) != startInserts+1 {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
Fail(" Too many retries have occured.")
}
}
}
// A helper function to query the ethcl.slots table based on the slot and block_root
func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot string) (int, int, string, string, string) {
sqlStatement := `SELECT epoch, slot, block_root, state_root, status FROM ethcl.slots WHERE slot=$1 AND block_root=$2;`
var epoch, slot int
var blockRoot, stateRoot, status string
row := db.QueryRow(context.Background(), sqlStatement, querySlot, queryBlockRoot)
err := row.Scan(&epoch, &slot, &blockRoot, &stateRoot, &status)
Expect(err).ToNot(HaveOccurred())
return epoch, slot, blockRoot, stateRoot, status
}
// A function that will remove all entries from the ethcl tables for you.
func clearEthclDbTables(db sql.Database) {
deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.known_gaps;"}
for _, queries := range deleteQueries {
_, err := db.Exec(context.Background(), queries)
Expect(err).ToNot(HaveOccurred())
}
}
// An object that is used to aggregate test functions. Test functions are needed because we need to
// run the same tests on multiple blocks for multiple forks. So they save us time.
type TestBeaconNode struct {
TestEvents map[string]Message
TestConfig Config
}
// Create a new new mock for the beacon node.
func (tbc TestBeaconNode) SetupBeaconNodeMock(TestEvents map[string]Message, protocol string, address string, port int, dummyParentRoot string) {
httpmock.Activate()
stateUrl := `=~^` + protocol + "://" + address + ":" + strconv.Itoa(port) + beaconclient.BcStateQueryEndpoint + `([^/]+)\z`
httpmock.RegisterResponder("GET", stateUrl,
func(req *http.Request) (*http.Response, error) {
// Get ID from request
id := httpmock.MustGetSubmatch(req, 1)
dat, err := tbc.provideSsz(id, "state", dummyParentRoot)
if err != nil {
Expect(err).NotTo(HaveOccurred())
return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find file for %s", id)), err
}
return httpmock.NewBytesResponse(200, dat), nil
},
)
blockUrl := `=~^` + protocol + "://" + address + ":" + strconv.Itoa(port) + beaconclient.BcBlockQueryEndpoint + `([^/]+)\z`
httpmock.RegisterResponder("GET", blockUrl,
func(req *http.Request) (*http.Response, error) {
// Get ID from request
id := httpmock.MustGetSubmatch(req, 1)
dat, err := tbc.provideSsz(id, "block", dummyParentRoot)
if err != nil {
Expect(err).NotTo(HaveOccurred())
return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find file for %s", id)), err
}
return httpmock.NewBytesResponse(200, dat), nil
},
)
}
// A function to mimic querying the state from the beacon node. We simply get the SSZ file are return it.
func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string, dummyParentRoot string) ([]byte, error) {
var slotFile string
var Message Message
for _, val := range tbc.TestEvents {
if sszIdentifier == "state" {
if val.HeadMessage.Slot == slotIdentifier || val.HeadMessage.State == slotIdentifier {
slotFile = val.BeaconState
Message = val
}
} else if sszIdentifier == "block" {
if val.HeadMessage.Slot == slotIdentifier || val.HeadMessage.Block == slotIdentifier {
slotFile = val.SignedBeaconBlock
Message = val
}
}
}
if Message.MimicConfig != nil {
log.Info("We are going to create a custom SSZ object for testing purposes.")
if sszIdentifier == "block" {
dat, err := os.ReadFile(slotFile)
if err != nil {
return nil, fmt.Errorf("Can't find the slot file, %s", slotFile)
}
block := &st.SignedBeaconBlock{}
err = block.UnmarshalSSZ(dat)
if err != nil {
log.Error("Error unmarshalling: ", err)
}
slot, err := strconv.ParseUint(Message.HeadMessage.Slot, 10, 64)
Expect(err).ToNot(HaveOccurred())
block.Block.Slot = types.Slot(slot)
block.Block.StateRoot, err = hex.DecodeString(Message.HeadMessage.State)
Expect(err).ToNot(HaveOccurred())
if Message.MimicConfig.ParentRoot == "" {
block.Block.ParentRoot, err = hex.DecodeString(dummyParentRoot)
Expect(err).ToNot(HaveOccurred())
} else {
block.Block.ParentRoot, err = hex.DecodeString(Message.MimicConfig.ParentRoot)
Expect(err).ToNot(HaveOccurred())
}
return block.MarshalSSZ()
}
if sszIdentifier == "state" {
dat, err := os.ReadFile(slotFile)
if err != nil {
return nil, fmt.Errorf("Can't find the slot file, %s", slotFile)
}
state := st.BeaconState{}
err = state.UnmarshalSSZ(dat)
Expect(err)
slot, err := strconv.ParseUint(Message.HeadMessage.Slot, 10, 64)
Expect(err).ToNot(HaveOccurred())
state.Slot = types.Slot(slot)
return state.MarshalSSZ()
}
}
if slotFile == "" {
return nil, fmt.Errorf("We couldn't find the slot file for %s", slotIdentifier)
}
dat, err := os.ReadFile(slotFile)
if err != nil {
return nil, fmt.Errorf("Can't find the slot file, %s", slotFile)
}
return dat, nil
}
// Helper function to test three reorg messages. There are going to be many functions like this,
// Because we need to test the same logic for multiple phases.
func (tbc TestBeaconNode) testMultipleReorgs(firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) {
bc := setUpTest(tbc.TestConfig)
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
time.Sleep(1 * time.Second)
log.Info("Sending Phase0 Messages to BeaconClient")
sendHeadMessage(bc, firstHead, maxRetry)
sendHeadMessage(bc, secondHead, maxRetry)
sendHeadMessage(bc, thirdHead, maxRetry)
curRetry := 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 2 {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
Fail(" Too many retries have occured.")
}
}
log.Info("Checking Phase0 to make sure the fork was marked properly.")
validateSlot(bc, &firstHead, epoch, "forked")
validateSlot(bc, &secondHead, epoch, "forked")
validateSlot(bc, &thirdHead, epoch, "proposed")
log.Info("Send the reorg message.")
data, err := json.Marshal(&beaconclient.ChainReorg{
Slot: firstHead.Slot,
Depth: "1",
OldHeadBlock: thirdHead.Block,
NewHeadBlock: secondHead.Block,
OldHeadState: thirdHead.State,
NewHeadState: secondHead.State,
Epoch: strconv.Itoa(epoch),
ExecutionOptimistic: false,
})
Expect(err).ToNot(HaveOccurred())
bc.ReOrgTracking.MessagesCh <- &sse.Event{
Data: data,
}
curRetry = 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 3 {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
Fail(" Too many retries have occured.")
}
}
log.Info("Make sure the forks were properly updated!")
validateSlot(bc, &firstHead, epoch, "forked")
validateSlot(bc, &secondHead, epoch, "proposed")
validateSlot(bc, &thirdHead, epoch, "forked")
}
// A test to validate a single block was processed correctly
func (tbc TestBeaconNode) testProcessBlock(head beaconclient.Head, epoch int, maxRetry int) {
bc := setUpTest(tbc.TestConfig)
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
time.Sleep(1 * time.Second)
sendHeadMessage(bc, head, maxRetry)
validateSlot(bc, &head, epoch, "proposed")
}
// A test that ensures that if two HeadMessages occur for a single slot they are marked
// as proposed and forked correctly.
func (tbc TestBeaconNode) testMultipleHead(firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) {
bc := setUpTest(tbc.TestConfig)
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
time.Sleep(1 * time.Second)
sendHeadMessage(bc, firstHead, maxRetry)
sendHeadMessage(bc, secondHead, maxRetry)
curRetry := 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 1 {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
Fail(" Too many retries have occured.")
}
}
log.Info("Checking Altair to make sure the fork was marked properly.")
validateSlot(bc, &firstHead, epoch, "forked")
validateSlot(bc, &secondHead, epoch, "proposed")
}

View File

@ -0,0 +1,57 @@
package beaconclient
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// The sync response
type Sync struct {
Data SyncData `json:"data"`
}
// The sync data
type SyncData struct {
IsSync bool `json:"is_syncing"`
HeadSlot string `json:"head_slot"`
SyncDistance string `json:"sync_distance"`
}
// This function will check to see if we are synced up with the head of chain.
//{"data":{"is_syncing":true,"head_slot":"62528","sync_distance":"3734299"}}
func (bc BeaconClient) CheckHeadSync() (bool, error) {
bcSync := bc.ServerEndpoint + BcSyncStatusEndpoint
resp, err := http.Get(bcSync)
if err != nil {
loghelper.LogEndpoint(bcSync).Error("Unable to check the sync status")
return true, err
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
loghelper.LogEndpoint(bcSync).WithFields(log.Fields{"returnCode": resp.StatusCode}).Error("Error when getting the sync status")
return true, fmt.Errorf("Querying the sync status returned a non 2xx status code, code provided: %d", resp.StatusCode)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return true, err
}
var syncStatus Sync
if err := json.Unmarshal(body, &syncStatus); err != nil {
loghelper.LogEndpoint(bcSync).WithFields(log.Fields{
"rawMessage": string(body),
"err": err,
}).Error("Unable to unmarshal sync status")
return true, err
}
return syncStatus.Data.IsSync, nil
}

View File

@ -0,0 +1,391 @@
package beaconclient
import (
"context"
"fmt"
"strconv"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
var (
// Statement to upsert to the ethcl.slots table.
UpsertSlotsStmt string = `
INSERT INTO ethcl.slots (epoch, slot, block_root, state_root, status)
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (slot, block_root) DO NOTHING`
// Statement to upsert to the ethcl.signed_beacon_blocks table.
UpsertSignedBeaconBlockStmt string = `
INSERT INTO ethcl.signed_beacon_block (slot, block_root, parent_block_root, eth1_block_hash, mh_key)
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (slot, block_root) DO NOTHING`
// Statement to upsert to the ethcl.beacon_state table.
UpsertBeaconState string = `
INSERT INTO ethcl.beacon_state (slot, state_root, mh_key)
VALUES ($1, $2, $3) ON CONFLICT (slot, state_root) DO NOTHING`
// Statement to upsert to the public.blocks table.
UpsertBlocksStmt string = `
INSERT INTO public.blocks (key, data)
VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
UpdateForkedStmt string = `UPDATE ethcl.slots
SET status='forked'
WHERE slot=$1 AND block_root<>$2
RETURNING block_root;`
UpdateProposedStmt string = `UPDATE ethcl.slots
SET status='proposed'
WHERE slot=$1 AND block_root=$2
RETURNING block_root;`
CheckProposedStmt string = `SELECT slot, block_root
FROM ethcl.slots
WHERE slot=$1 AND block_root=$2;`
// Statement to insert known_gaps. We don't pass in timestamp, we let the server take care of that one.
UpsertKnownGapsStmt string = `
INSERT INTO ethcl.known_gaps (start_slot, end_slot, checked_out, reprocessing_error, entry_error, entry_process)
VALUES ($1, $2, $3, $4, $5, $6) on CONFLICT (start_slot, end_slot) DO NOTHING`
QueryHighestSlotStmt string = "SELECT COALESCE(MAX(slot), 0) FROM ethcl.slots"
)
// Put all functionality to prepare the write object
// And write it in this file.
// Remove any of it from the processslot file.
type DatabaseWriter struct {
Db sql.Database
Metrics *BeaconClientMetrics
DbSlots *DbSlots
DbSignedBeaconBlock *DbSignedBeaconBlock
DbBeaconState *DbBeaconState
rawBeaconState []byte
rawSignedBeaconBlock []byte
}
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string,
eth1BlockHash string, status string, rawSignedBeaconBlock []byte, rawBeaconState []byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) {
dw := &DatabaseWriter{
Db: db,
rawBeaconState: rawBeaconState,
rawSignedBeaconBlock: rawSignedBeaconBlock,
Metrics: metrics,
}
dw.prepareSlotsModel(slot, stateRoot, blockRoot, status)
err := dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot, eth1BlockHash)
if err != nil {
return nil, err
}
err = dw.prepareBeaconStateModel(slot, stateRoot)
if err != nil {
return nil, err
}
return dw, err
}
// Write functions to write each all together...
// Should I do one atomic write?
// Create the model for the ethcl.slots table
func (dw *DatabaseWriter) prepareSlotsModel(slot int, stateRoot string, blockRoot string, status string) {
dw.DbSlots = &DbSlots{
Epoch: calculateEpoch(slot, bcSlotsPerEpoch),
Slot: strconv.Itoa(slot),
StateRoot: stateRoot,
BlockRoot: blockRoot,
Status: status,
}
log.Debug("dw.DbSlots: ", dw.DbSlots)
}
// Create the model for the ethcl.signed_beacon_block table.
func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot int, blockRoot string, parentBlockRoot string, eth1BlockHash string) error {
mhKey, err := MultihashKeyFromSSZRoot([]byte(dw.DbSlots.BlockRoot))
if err != nil {
return err
}
dw.DbSignedBeaconBlock = &DbSignedBeaconBlock{
Slot: strconv.Itoa(slot),
BlockRoot: blockRoot,
ParentBlock: parentBlockRoot,
Eth1BlockHash: eth1BlockHash,
MhKey: mhKey,
}
log.Debug("dw.DbSignedBeaconBlock: ", dw.DbSignedBeaconBlock)
return nil
}
// Create the model for the ethcl.beacon_state table.
func (dw *DatabaseWriter) prepareBeaconStateModel(slot int, stateRoot string) error {
mhKey, err := MultihashKeyFromSSZRoot([]byte(dw.DbSlots.StateRoot))
if err != nil {
return err
}
dw.DbBeaconState = &DbBeaconState{
Slot: strconv.Itoa(slot),
StateRoot: stateRoot,
MhKey: mhKey,
}
log.Debug("dw.DbBeaconState: ", dw.DbBeaconState)
return nil
}
// Write all the data for a given slot.
func (dw *DatabaseWriter) writeFullSlot() error {
// Add errors for each function call
// If an error occurs, write to knownGaps table.
err := dw.writeSlots()
if err != nil {
return err
}
if dw.DbSlots.Status != "skipped" {
err = dw.writeSignedBeaconBlocks()
if err != nil {
return err
}
err = dw.writeBeaconState()
if err != nil {
return err
}
}
dw.Metrics.IncrementHeadTrackingInserts(1)
return nil
}
// Write the information for the generic slots table. For now this is only one function.
// But in the future if we need to incorporate any FK's or perform any actions to write to the
// slots table we can do it all here.
func (dw *DatabaseWriter) writeSlots() error {
return dw.upsertSlots()
}
// Upsert to the ethcl.slots table.
func (dw *DatabaseWriter) upsertSlots() error {
_, err := dw.Db.Exec(context.Background(), UpsertSlotsStmt, dw.DbSlots.Epoch, dw.DbSlots.Slot, dw.DbSlots.BlockRoot, dw.DbSlots.StateRoot, dw.DbSlots.Status)
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.slots table")
return err
}
return nil
}
// Write the information for the signed_beacon_block.
func (dw *DatabaseWriter) writeSignedBeaconBlocks() error {
err := dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock)
if err != nil {
return err
}
err = dw.upsertSignedBeaconBlock()
if err != nil {
return err
}
return nil
}
// Upsert to public.blocks.
func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error {
_, err := dw.Db.Exec(context.Background(), UpsertBlocksStmt, key, data)
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table")
return err
}
return nil
}
// Upsert to the ethcl.signed_beacon_block table.
func (dw *DatabaseWriter) upsertSignedBeaconBlock() error {
_, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.Eth1BlockHash, dw.DbSignedBeaconBlock.MhKey)
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table")
return err
}
return nil
}
// Write the information for the beacon_state.
func (dw *DatabaseWriter) writeBeaconState() error {
err := dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState)
if err != nil {
return err
}
err = dw.upsertBeaconState()
if err != nil {
return err
}
return nil
}
// Upsert to the ethcl.beacon_state table.
func (dw *DatabaseWriter) upsertBeaconState() error {
_, err := dw.Db.Exec(context.Background(), UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey)
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table")
return err
}
return nil
}
// Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot.
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
slotNum, strErr := strconv.Atoi(slot)
if strErr != nil {
loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...")
}
forkCount, err := updateForked(db, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
}
proposedCount, err := updateProposed(db, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
}
if forkCount > 0 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"forkCount": forkCount,
}).Info("Updated rows that were forked.")
} else {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"forkCount": forkCount,
}).Warn("There were no forked rows to update.")
}
if proposedCount == 1 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": proposedCount,
}).Info("Updated the row that should have been marked as proposed.")
} else if proposedCount > 1 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": proposedCount,
}).Error("Too many rows were marked as proposed!")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
} else if proposedCount == 0 {
var count int
err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
}
if count != 1 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": count,
}).Warn("The proposed block was not marked as proposed...")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
} else {
loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
}
}
metrics.IncrementHeadTrackingReorgs(1)
}
// Update the slots table by marking the old slot's as forked.
func updateForked(db sql.Database, slot string, latestBlockRoot string) (int64, error) {
res, err := db.Exec(context.Background(), UpdateForkedStmt, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the forked slots")
return 0, err
}
count, err := res.RowsAffected()
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were marked as forked.")
return 0, err
}
return count, err
}
func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64, error) {
res, err := db.Exec(context.Background(), UpdateProposedStmt, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the proposed slot.")
return 0, err
}
count, err := res.RowsAffected()
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were marked as proposed")
return 0, err
}
return count, err
}
// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaos into
// smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would
// have 10 entries as follows: 1-10, 11-20, etc...
func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string) {
if endSlot-startSlot <= tableIncrement {
kgModel := DbKnownGaps{
StartSlot: strconv.Itoa(startSlot),
EndSlot: strconv.Itoa(endSlot),
CheckedOut: false,
ReprocessingError: "",
EntryError: entryError.Error(),
EntryProcess: entryProcess,
}
upsertKnownGaps(db, kgModel)
}
totalSlots := endSlot - startSlot
var chunks int
chunks = totalSlots / tableIncrement
if totalSlots%tableIncrement != 0 {
chunks = chunks + 1
}
for i := 0; i < chunks; i++ {
var tempStart, tempEnd int
tempStart = startSlot + (i * tableIncrement)
if i+1 == chunks {
tempEnd = endSlot
} else {
tempEnd = startSlot + ((i + 1) * tableIncrement)
}
kgModel := DbKnownGaps{
StartSlot: strconv.Itoa(tempStart),
EndSlot: strconv.Itoa(tempEnd),
CheckedOut: false,
ReprocessingError: "",
EntryError: entryError.Error(),
EntryProcess: entryProcess,
}
upsertKnownGaps(db, kgModel)
}
}
// A function to upsert a single entry to the ethcl.known_gaps table.
func upsertKnownGaps(db sql.Database, knModel DbKnownGaps) {
_, err := db.Exec(context.Background(), UpsertKnownGapsStmt, knModel.StartSlot, knModel.EndSlot,
knModel.CheckedOut, knModel.ReprocessingError, knModel.EntryError, knModel.EntryProcess)
if err != nil {
log.WithFields(log.Fields{
"err": err,
"startSlot": knModel.StartSlot,
"endSlot": knModel.EndSlot,
}).Fatal("We are unable to write to the ethcl.known_gaps table!!! We will stop the application because of that.")
}
log.WithFields(log.Fields{
"startSlot": knModel.StartSlot,
"endSlot": knModel.EndSlot,
}).Warn("A new gap has been added to the ethcl.known_gaps table.")
}
// A function to write the gap between the highest slot in the DB and the first processed slot.
func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot int) {
var maxSlot int
err := db.QueryRow(context.Background(), QueryHighestSlotStmt).Scan(&maxSlot)
if err != nil {
loghelper.LogError(err).Fatal("Unable to get the max block from the DB. We must close the application or we might have undetected gaps.")
}
if err != nil {
loghelper.LogError(err).WithFields(log.Fields{
"maxSlot": maxSlot,
}).Fatal("Unable to get convert max block from DB to int. We must close the application or we might have undetected gaps.")
}
writeKnownGaps(db, tableIncrement, maxSlot, firstSlot, fmt.Errorf(""), "startup")
}
// A quick helper function to calculate the epoch.
func calculateEpoch(slot int, slotPerEpoch int) string {
epoch := slot / slotPerEpoch
return strconv.Itoa(epoch)
}

View File

@ -22,8 +22,8 @@ func (bc BeaconClient) CheckBeaconClient() error {
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
log.Error("We recieved a non 2xx status code when checking the health of the beacon node.")
log.Error("Health Endpoint Status Code: ", resp.StatusCode)
loghelper.LogEndpoint(bcEndpoint).Error("We recieved a non 2xx status code when checking the health of the beacon node.")
loghelper.LogEndpoint(bcEndpoint).Error("Health Endpoint Status Code: ", resp.StatusCode)
return fmt.Errorf("beacon Node Provided a non 2xx status code, code provided: %d", resp.StatusCode)
}

View File

@ -8,6 +8,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
"golang.org/x/sync/errgroup"
)
var (
@ -18,8 +19,26 @@ var (
// When new messages come in, it will ensure that they are decoded into JSON.
// If any errors occur, it log the error information.
func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) {
loghelper.LogEndpoint(eventHandler.Endpoint).Info("Subscribing to Messages")
go eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh)
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
err := eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh)
if err != nil {
return err
}
return nil
})
if err := errG.Wait(); err != nil {
log.WithFields(log.Fields{
"err": err,
"endpoint": eventHandler.Endpoint,
}).Error("Unable to subscribe to the SSE endpoint.")
return
} else {
loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.")
}
}()
for {
select {
case message := <-eventHandler.MessagesCh:
@ -62,5 +81,4 @@ func (bc *BeaconClient) captureEventTopic() {
log.Info("We are capturing all SSE events")
go handleIncomingSseEvent(bc.HeadTracking)
go handleIncomingSseEvent(bc.ReOrgTracking)
go handleIncomingSseEvent(bc.FinalizationTracking)
}

View File

@ -0,0 +1,15 @@
package beaconclient
import "sync/atomic"
// Wrapper function to increment inserts. If we want to use mutexes later we can easily update all
// occurrences here.
func (m *BeaconClientMetrics) IncrementHeadTrackingInserts(inc uint64) {
atomic.AddUint64(&m.HeadTrackingInserts, inc)
}
// Wrapper function to increment reorgs. If we want to use mutexes later we can easily update all
// occurrences here.
func (m *BeaconClientMetrics) IncrementHeadTrackingReorgs(inc uint64) {
atomic.AddUint64(&m.HeadTrackingReorgs, inc)
}

View File

@ -2,7 +2,7 @@ package beaconclient
// This interface captured what the events can be for processed event streams.
type ProcessedEvents interface {
Head | FinalizedCheckpoint | ChainReorg
Head | ChainReorg
}
// This struct captures the JSON representation of the head topic
@ -35,3 +35,40 @@ type ChainReorg struct {
Epoch string `json:"epoch"`
ExecutionOptimistic bool `json:"execution_optimistic"`
}
// A struct to capture whats being written to the ethcl.slots table.
type DbSlots struct {
Epoch string // The epoch.
Slot string // The slot.
BlockRoot string // The block root
StateRoot string // The state root
Status string // The status, it can be proposed | forked | skipped.
}
// A struct to capture whats being written to ethcl.signed_beacon_block table.
type DbSignedBeaconBlock struct {
Slot string // The slot.
BlockRoot string // The block root
ParentBlock string // The parent block root.
Eth1BlockHash string // The eth1 block_hash
MhKey string // The ipld multihash key.
}
// A struct to capture whats being written to ethcl.beacon_state table.
type DbBeaconState struct {
Slot string // The slot.
StateRoot string // The state root
MhKey string // The ipld multihash key.
}
// A structure to capture whats being written to the ethcl.known_gaps table.
type DbKnownGaps struct {
StartSlot string // The start slot for known_gaps, inclusive.
EndSlot string // The end slot for known_gaps, inclusive.
CheckedOut bool // Indicates if any process is currently processing this entry.
ReprocessingError string // The error that occurred when attempting to reprocess these entries.
EntryError string // The error that caused this entry to be added to the table. Could be null.
EntryTime string // The time this range was added to the DB. This can help us catch ranges that have not been processed for a long time due to some error.
EntryProcess string // The entry process that added this process. Potential options are StartUp, Error, ManualEntry, HeadGap.
}

View File

@ -0,0 +1,24 @@
package beaconclient
import (
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
"github.com/multiformats/go-multihash"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
const SSZ_SHA2_256_PREFIX uint64 = 0xb501
// MultihashKeyFromSSZRoot converts a SSZ-SHA2-256 root hash into a blockstore prefixed multihash key
func MultihashKeyFromSSZRoot(root []byte) (string, error) {
mh, err := multihash.Encode(root, SSZ_SHA2_256_PREFIX)
if err != nil {
loghelper.LogError(err).Error("Unable to create a multihash Key")
return "", err
}
dbKey := dshelp.MultihashToDsKey(mh)
mhKey := blockstore.BlockPrefix.String() + dbKey.String()
log.WithFields(log.Fields{"mhKey": mhKey, "len": len(root)}).Debug("The MHKEY")
return mhKey, nil
}

View File

@ -3,36 +3,56 @@
package beaconclient
import log "github.com/sirupsen/logrus"
import (
"fmt"
"strconv"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// This function will perform the necessary steps to handle a reorg.
func (bc *BeaconClient) handleReorgs() {
func (bc *BeaconClient) handleReorg() {
log.Info("Starting to process reorgs.")
for {
// We will add real functionality later
reorg := <-bc.ReOrgTracking.ProcessCh
log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.")
writeReorgs(bc.Db, reorg.Slot, reorg.NewHeadBlock, bc.Metrics)
}
}
// This function will perform the necessary steps to handle a reorg.
func (bc *BeaconClient) handleFinalizedCheckpoint() {
log.Info("Starting to process finalized checkpoints.")
for {
// We will add real functionality later
finalized := <-bc.ReOrgTracking.ProcessCh
log.WithFields(log.Fields{"finalized": finalized}).Debug("Received a new finalized checkpoint.")
}
}
// This function will handle the latest head event.
func (bc *BeaconClient) handleHead() {
log.Info("Starting to process head.")
errorSlots := 0
for {
// We will add real functionality later
head := <-bc.ReOrgTracking.ProcessCh
head := <-bc.HeadTracking.ProcessCh
// Process all the work here.
slot, err := strconv.Atoi(head.Slot)
if err != nil {
bc.HeadTracking.ErrorCh <- &SseError{
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
}
errorSlots = errorSlots + 1
continue
}
if errorSlots != 0 && bc.PreviousSlot != 0 {
log.WithFields(log.Fields{
"lastProcessedSlot": bc.PreviousSlot,
"errorMessages": errorSlots,
}).Warn("We added slots to the knownGaps table because we got bad head messages.")
writeKnownGaps(bc.Db, bc.KnownGapTableIncrement, bc.PreviousSlot, bcSlotsPerEpoch+errorSlots, fmt.Errorf("Bad Head Messages"), "headProcessing")
}
err = processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement)
if err != nil {
loghelper.LogSlotError(head.Slot, err).Error("Unable to process a slot")
}
log.WithFields(log.Fields{"head": head}).Debug("Received a new head event.")
// Update the previous block
bc.PreviousSlot = slot
bc.PreviousBlockRoot = head.Block
}
}

View File

@ -0,0 +1,268 @@
// This file will keep track of all the code needed to process a slot.
// To process a slot, it should have all the necessary data needed to write it to the DB.
// But not actually write it.
package beaconclient
import (
"encoding/hex"
"fmt"
"strconv"
"strings"
// The below is temporary, once https://github.com/prysmaticlabs/prysm/issues/10006 has been resolved we wont need it.
// pb "github.com/prysmaticlabs/prysm/proto/prysm/v2"
st "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
var (
SlotUnmarshalError = func(obj string) string {
return fmt.Sprintf("Unable to properly unmarshal the Slot field in the %s.", obj)
}
ParentRootUnmarshalError = "Unable to properly unmarshal the ParentRoot field in the SignedBeaconBlock."
MissingIdentifiedError = "Can't query state without a set slot or block_root"
MissingEth1Data = "Can't get the Eth1 block_hash"
)
type ProcessSlot struct {
// Generic
Slot int // The slot number.
Epoch int // The epoch number.
BlockRoot string // The hex encoded string of the BlockRoot.
StateRoot string // The hex encoded string of the StateRoot.
ParentBlockRoot string // The hex encoded string of the parent block.
Status string // The status of the block
HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and skipped slots.
Db sql.Database // The DB object used to write to the DB.
Metrics *BeaconClientMetrics // An object to keep track of the beaconclient metrics
// BeaconBlock
SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock
FullSignedBeaconBlock *st.SignedBeaconBlock // The unmarshaled BeaconState object, the unmarshalling could have errors.
// BeaconState
FullBeaconState *st.BeaconState // The unmarshaled BeaconState object, the unmarshalling could have errors.
SszBeaconState []byte // The entire SSZ encoded BeaconState
// DB Write objects
DbSlotsModel *DbSlots // The model being written to the slots table.
DbSignedBeaconBlockModel *DbSignedBeaconBlock // The model being written to the signed_beacon_block table.
DbBeaconState *DbBeaconState // The model being written to the beacon_state table.
}
// This function will do all the work to process the slot and write it to the DB.
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) error {
ps := &ProcessSlot{
Slot: slot,
BlockRoot: blockRoot,
StateRoot: stateRoot,
HeadOrHistoric: headOrHistoric,
Db: db,
Metrics: metrics,
}
// Get the BeaconState.
err := ps.getBeaconState(serverAddress)
if err != nil {
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
return err
}
// Get the SignedBeaconBlock.
err = ps.getSignedBeaconBlock(serverAddress)
if err != nil {
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
return err
}
if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" {
writeStartUpGaps(db, knownGapsTableIncrement, ps.Slot)
}
// Get this object ready to write
blockRootEndpoint := serverAddress + BcBlockRootEndpoint(strconv.Itoa(ps.Slot))
dw, err := ps.createWriteObjects(blockRootEndpoint)
if err != nil {
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "blockRoot")
return err
}
// Write the object to the DB.
err = dw.writeFullSlot()
if err != nil {
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
}
// Handle any reorgs or skipped slots.
headOrHistoric = strings.ToLower(headOrHistoric)
if headOrHistoric != "head" && headOrHistoric != "historic" {
return fmt.Errorf("headOrHistoric must be either historic or head!")
}
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" {
ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement)
}
return nil
}
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) error {
return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement)
}
// Handle a historic slot. A wrapper function for calling `handleFullSlot`.
// Commented because of the linter...... LOL
//func handleHistoricSlot(db sql.Database, serverAddress string, slot int) error {
// return handleFullSlot(db, serverAddress, slot, "", "", 0, "", "historic")
//}
// Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values.
func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error {
var blockIdentifier string // Used to query the block
if ps.BlockRoot != "" {
blockIdentifier = ps.BlockRoot
} else if ps.Slot != 0 {
blockIdentifier = strconv.Itoa(ps.Slot)
} else {
log.Error(MissingIdentifiedError)
return fmt.Errorf(MissingIdentifiedError)
}
blockEndpoint := serverAddress + BcBlockQueryEndpoint + blockIdentifier
var err error
var rc int
ps.SszSignedBeaconBlock, rc, err = querySsz(blockEndpoint, strconv.Itoa(ps.Slot))
if err != nil {
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error("Unable to properly query the slot.")
return err
}
if rc != 200 {
ps.FullSignedBeaconBlock = &st.SignedBeaconBlock{}
ps.SszSignedBeaconBlock = []byte{}
ps.ParentBlockRoot = ""
ps.Status = "skipped"
return nil
}
ps.FullSignedBeaconBlock = &st.SignedBeaconBlock{}
err = ps.FullSignedBeaconBlock.UnmarshalSSZ(ps.SszSignedBeaconBlock)
if err != nil {
if ps.FullSignedBeaconBlock.Block.Slot == 0 {
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(SlotUnmarshalError("SignedBeaconBlock"))
return fmt.Errorf(SlotUnmarshalError("SignedBeaconBlock"))
} else if ps.FullSignedBeaconBlock.Block.ParentRoot == nil {
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(ParentRootUnmarshalError)
return fmt.Errorf(ParentRootUnmarshalError)
} else if hex.EncodeToString(ps.FullBeaconState.Eth1Data.BlockHash) == "" {
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(MissingEth1Data)
return fmt.Errorf(MissingEth1Data)
}
log.Warn("We received a processing error: ", err)
}
ps.ParentBlockRoot = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot)
return nil
}
// Update the SszBeaconState and FullBeaconState object with their respective values.
func (ps *ProcessSlot) getBeaconState(serverEndpoint string) error {
var stateIdentifier string // Used to query the state
if ps.StateRoot != "" {
stateIdentifier = ps.StateRoot
} else if ps.Slot != 0 {
stateIdentifier = strconv.Itoa(ps.Slot)
} else {
log.Error(MissingIdentifiedError)
return fmt.Errorf(MissingIdentifiedError)
}
stateEndpoint := serverEndpoint + BcStateQueryEndpoint + stateIdentifier
ps.SszBeaconState, _, _ = querySsz(stateEndpoint, strconv.Itoa(ps.Slot))
ps.FullBeaconState = new(st.BeaconState)
err := ps.FullBeaconState.UnmarshalSSZ(ps.SszBeaconState)
if err != nil {
if ps.FullBeaconState.Slot == 0 {
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(SlotUnmarshalError("BeaconState"))
return fmt.Errorf(SlotUnmarshalError("BeaconState"))
}
}
return nil
}
// Check to make sure that the previous block we processed is the parent of the current block.
func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) {
parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot)
if previousSlot == int(ps.FullBeaconState.Slot) {
log.WithFields(log.Fields{
"slot": ps.FullBeaconState.Slot,
"fork": true,
}).Warn("A fork occurred! The previous slot and current slot match.")
writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
} else if previousSlot+1 != int(ps.FullBeaconState.Slot) {
log.WithFields(log.Fields{
"previousSlot": previousSlot,
"currentSlot": ps.FullBeaconState.Slot,
}).Error("We skipped a few slots.")
writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot)-1, fmt.Errorf("Gaps during head processing"), "headGaps")
} else if previousBlockRoot != parentRoot {
log.WithFields(log.Fields{
"previousBlockRoot": previousBlockRoot,
"currentBlockParent": parentRoot,
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
writeReorgs(ps.Db, strconv.Itoa(previousSlot), parentRoot, ps.Metrics)
writeKnownGaps(ps.Db, 1, ps.Slot-1, ps.Slot-1, fmt.Errorf("Incorrect Parent"), "processSlot")
} else {
log.Debug("Previous Slot and Current Slot are one distance from each other.")
}
}
// Transforms all the raw data into DB models that can be written to the DB.
func (ps *ProcessSlot) createWriteObjects(blockRootEndpoint string) (*DatabaseWriter, error) {
var (
stateRoot string
blockRoot string
status string
eth1BlockHash string
)
if ps.Status == "skipped" {
stateRoot = ""
blockRoot = ""
eth1BlockHash = ""
} else {
if ps.StateRoot != "" {
stateRoot = ps.StateRoot
} else {
stateRoot = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.StateRoot)
log.Debug("StateRoot: ", stateRoot)
}
if ps.BlockRoot != "" {
blockRoot = ps.BlockRoot
} else {
var err error
blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot))
if err != nil {
return nil, err
}
}
eth1BlockHash = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.Body.Eth1Data.BlockHash)
}
if ps.Status != "" {
status = ps.Status
} else {
status = "proposed"
}
dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.SszSignedBeaconBlock, ps.SszBeaconState, ps.Metrics)
if err != nil {
return dw, err
}
return dw, nil
}

View File

@ -3,6 +3,7 @@
package beaconclient
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
@ -11,91 +12,70 @@ import (
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// Attempt to use generics..
// // These are types that append slot at the end of the URL to handle a request.
// type SlotBasedRequests interface {
// *specs.BeaconState | *specs.SignedBeaconBlock
// UnmarshalSSZ([]byte) error
// }
//
// func queryState[R SlotBasedRequests](endpoint string, slot string) (R, error) {
// obj := new(R)
// rawState, err := querySlot(endpoint, slot)
// if err != nil {
// return *obj, err
// }
//
// err = &obj.UnmarshalSSZ(rawState)
// err = (*obj).UnmarshalSSZ(rawState)
// if err != nil {
// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node Successfully!: %s", err.Error())
// }
// return *obj, nil
// }
// This function will query a state object based on the slot provided.
// The object is SSZ encoded.
//type BeaconBlockResponse struct {
// version string `json: `
//}
// func queryState(endpoint string, slot string) (spectests.BeaconState, error) {
// obj := new(spectests.BeaconState)
// fullEndpoint := endpoint + slot
// rawState, err := querySsz(fullEndpoint, slot)
// if err != nil {
// return *obj, err
// }
//
// err = obj.UnmarshalSSZ(rawState)
// if err != nil {
// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node")
// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node: %s", err.Error())
// }
// return *obj, nil
// }
//
// // This function will query a state object based on the slot provided.
// // The object is SSZ encoded.
// func queryBlock(endpoint string, slot string) (spectests.SignedBeaconBlock, error) {
// obj := new(spectests.SignedBeaconBlock)
// fullEndpoint := endpoint + slot
// rawBlock, err := querySsz(fullEndpoint, slot)
// if err != nil {
// return *obj, err
// }
//
// err = obj.UnmarshalSSZ(rawBlock)
// if err != nil {
// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node Successfully!: %s", err.Error())
// }
// return *obj, nil
// }
// A helper function to query endpoints that utilize slots.
func querySsz(endpoint string, slot string) ([]byte, error) {
log.WithFields(log.Fields{"endpoint": endpoint}).Info("Querying endpoint")
func querySsz(endpoint string, slot string) ([]byte, int, error) {
log.WithFields(log.Fields{"endpoint": endpoint}).Debug("Querying endpoint")
client := &http.Client{}
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to create a request!")
return nil, fmt.Errorf("Unable to create a request!: %s", err.Error())
return nil, 0, fmt.Errorf("Unable to create a request!: %s", err.Error())
}
// Not set correctly
req.Header.Set("Accept", "application/octet-stream")
response, err := client.Do(req)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to query Beacon Node!")
return nil, fmt.Errorf("Unable to query Beacon Node: %s", err.Error())
return nil, 0, fmt.Errorf("Unable to query Beacon Node: %s", err.Error())
}
defer response.Body.Close()
rc := response.StatusCode
body, err := ioutil.ReadAll(response.Body)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!")
return nil, rc, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error())
}
return body, rc, nil
}
// Object to unmarshal the BlockRootResponse
type BlockRootResponse struct {
Data BlockRootMessage `json:"data"`
}
// Object to unmarshal the BlockRoot Message
type BlockRootMessage struct {
Root string `json:"root"`
}
// A function to query the blockroot for a given slot.
func queryBlockRoot(endpoint string, slot string) (string, error) {
log.WithFields(log.Fields{"endpoint": endpoint}).Debug("Querying endpoint")
client := &http.Client{}
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to create a request!")
return "", fmt.Errorf("Unable to create a request!: %s", err.Error())
}
req.Header.Set("Accept", "application/json")
response, err := client.Do(req)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to query Beacon Node!")
return "", fmt.Errorf("Unable to query Beacon Node: %s", err.Error())
}
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!")
return nil, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error())
return "", fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error())
}
return body, nil
resp := BlockRootResponse{}
if err := json.Unmarshal(body, &resp); err != nil {
loghelper.LogEndpoint(endpoint).WithFields(log.Fields{
"rawMessage": string(body),
"err": err,
}).Error("Unable to unmarshal the block root")
return "", err
}
return resp.Data.Root, nil
}

View File

@ -6,6 +6,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
var _ sql.Database = &DB{}
@ -23,6 +24,35 @@ func NewPostgresDB(c Config) (*DB, error) {
return &DB{driver}, nil
}
// A simple wrapper to create a DB object to use.
func SetupPostgresDb(dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string) (sql.Database, error) {
log.Debug("Resolving Driver Type")
DbDriver, err := ResolveDriverType(driverName)
if err != nil {
log.WithFields(log.Fields{
"err": err,
"driver_name_provided": driverName,
}).Error("Can't resolve driver type")
}
log.Info("Using Driver:", DbDriver)
postgresConfig := Config{
Hostname: dbHostname,
Port: dbPort,
DatabaseName: dbName,
Username: dbUsername,
Password: dbPassword,
Driver: DbDriver,
}
DB, err := NewPostgresDB(postgresConfig)
if err != nil {
loghelper.LogError(err).Error("Unable to connect to the DB")
return nil, err
}
return DB, err
}
// Create a driver based on the config
func createDriver(c Config) (*pgxDriver, error) {
switch c.Driver {

View File

@ -29,17 +29,18 @@ var _ = Describe("Pgx", func() {
_, err := postgres.NewPostgresDB(postgres.Config{
Driver: "PGX",
})
Expect(err).NotTo(BeNil())
Expect(err).To(HaveOccurred())
present, err := doesContainsSubstring(err.Error(), sql.DbConnectionFailedMsg)
Expect(present).To(BeTrue())
Expect(err).NotTo(HaveOccurred())
})
})
Context("The connection is successful", func() {
It("Should create a DB object", func() {
db, err := postgres.NewPostgresDB(postgres.DefaultConfig)
defer db.Close()
Expect(err).To(BeNil())
defer db.Close()
})
})
})

View File

@ -16,24 +16,28 @@ import (
// operation is a clean up function on shutting down
type Operation func(ctx context.Context) error
var (
TimeoutErr = func(timeout string) error {
return fmt.Errorf("The Timeout %s, has been elapsed, the application will forcefully exit", timeout)
}
)
// gracefulShutdown waits for termination syscalls and doing clean up operations after received it
func Shutdown(ctx context.Context, timeout time.Duration, ops map[string]Operation) (<-chan struct{}, <-chan error) {
func Shutdown(ctx context.Context, notifierCh chan os.Signal, timeout time.Duration, ops map[string]Operation) (<-chan struct{}, <-chan error) {
waitCh := make(chan struct{})
errCh := make(chan error)
go func() {
s := make(chan os.Signal, 1)
// add any other syscalls that you want to be notified with
signal.Notify(s, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
<-s
signal.Notify(notifierCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
<-notifierCh
log.Info("Shutting Down your application")
// set timeout for the ops to be done to prevent system hang
timeoutFunc := time.AfterFunc(timeout, func() {
log.Warnf("timeout %d ms has been elapsed, force exit", timeout.Milliseconds())
errCh <- fmt.Errorf("Application shutdown took too long.")
return
log.Warnf(TimeoutErr(timeout.String()).Error())
errCh <- TimeoutErr(timeout.String())
})
defer timeoutFunc.Stop()

22
pkg/loghelper/logreorg.go Normal file
View File

@ -0,0 +1,22 @@
package loghelper
import (
log "github.com/sirupsen/logrus"
)
// A simple helper function that will help wrap the reorg error messages.
func LogReorgError(slot string, latestBlockRoot string, err error) *log.Entry {
return log.WithFields(log.Fields{
"err": err,
"slot": slot,
"latestBlockRoot": latestBlockRoot,
})
}
// A simple helper function that will help wrap regular reorg messages.
func LogReorg(slot string, latestBlockRoot string) *log.Entry {
return log.WithFields(log.Fields{
"slot": slot,
"latestBlockRoot": latestBlockRoot,
})
}