Include all changes needed for historical and knownGaps processing #59

Merged
abdulrabbani00 merged 11 commits from develop into main 2022-06-10 13:45:42 +00:00
21 changed files with 526 additions and 300 deletions
Showing only changes of commit e3b4fad3c7 - Show all commits

View File

@ -22,189 +22,15 @@ on:
- "!LICENSE" - "!LICENSE"
- "!.github/workflows/**" - "!.github/workflows/**"
- ".github/workflows/on-pr.yml" - ".github/workflows/on-pr.yml"
- ".github/workflows/tests.yml"
- "**" - "**"
env:
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'main'}}
ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || 'feature/historic-processing' }}
ssz-data-ref: ${{ github.event.inputs.ssz-data-ref || 'main' }}
GOPATH: /tmp/go
jobs: jobs:
build: trigger-tests:
name: Run Docker Build uses: ./.github/workflows/tests.yml
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with: with:
path: "./ipld-ethcl-indexer" stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref }}
ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref }}
- uses: actions/checkout@v3 ssz-data-ref: ${{ github.event.inputs.ssz-data-ref }}
with: secrets:
ref: ${{ env.stack-orchestrator-ref }} GHA_KEY: ${{secrets.GHA_KEY}}
path: "./stack-orchestrator/"
repository: vulcanize/stack-orchestrator
fetch-depth: 0
- uses: actions/checkout@v3
with:
ref: ${{ env.ipld-ethcl-db-ref }}
repository: vulcanize/ipld-ethcl-db
path: "./ipld-ethcl-db/"
token: ${{ secrets.GH_PAT }}
fetch-depth: 0
- name: Create config file
run: |
echo vulcanize_ipld_ethcl_db=$GITHUB_WORKSPACE/ipld-ethcl-db/ > ./config.sh
echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh
echo ethcl_capture_mode=boot >> ./config.sh
echo ethcl_skip_sync=true >> ./config.sh
echo ethcl_known_gap_increment=1000000 >> ./config.sh
cat ./config.sh
- name: Run docker compose
run: |
docker-compose \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/latest/docker-compose-lighthouse.yml" \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-ethcl-indexer.yml" \
--env-file ./config.sh \
up -d --build
- name: Check to make sure HEALTH file is present
shell: bash
run: |
until $(docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-ethcl-indexer.yml" cp ipld-ethcl-indexer:/root/HEALTH ./HEALTH) ; do sleep 10; done
cat ./HEALTH
if [[ "$(cat ./HEALTH)" -eq "0" ]]; then echo "Application boot successful" && (exit 0); else docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-ethcl-indexer.yml" cp ipld-ethcl-indexer:/root/ipld-ethcl-indexer.log . && cat ipld-ethcl-indexer.log && (exit 1); fi
unit-test:
name: Run Unit Tests
runs-on: ubuntu-latest
## IF you want to update the default branch for `pull_request runs, do it after the ||`
steps:
- name: Create GOPATH
run: mkdir -p /tmp/go
- uses: actions/checkout@v2
with:
path: "./ipld-ethcl-indexer"
- uses: actions/checkout@v3
with:
ref: ${{ env.stack-orchestrator-ref }}
path: "./stack-orchestrator/"
repository: vulcanize/stack-orchestrator
fetch-depth: 0
- uses: actions/checkout@v3
with:
ref: ${{ env.ipld-ethcl-db-ref }}
repository: vulcanize/ipld-ethcl-db
path: "./ipld-ethcl-db/"
token: ${{ secrets.GH_PAT }}
fetch-depth: 0
- uses: actions/checkout@v3
with:
ref: ${{ env.ssz-data-ref }}
repository: vulcanize/ssz-data
path: "./ipld-ethcl-indexer/pkg/beaconclient/ssz-data"
fetch-depth: 0
- name: Create config file
run: |
echo vulcanize_ipld_ethcl_db=$GITHUB_WORKSPACE/ipld-ethcl-db/ > ./config.sh
echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh
cat ./config.sh
- name: Run docker compose
run: |
docker-compose \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \
--env-file ./config.sh \
up -d --build
- uses: actions/setup-go@v3
with:
go-version: ">=1.17.0"
check-latest: true
- name: Install packages
run: |
go install github.com/onsi/ginkgo/v2/ginkgo@latest
which ginkgo
- name: Run the tests using Make
run: |
cd ipld-ethcl-indexer
make unit-test-ci
integration-test:
name: Run Integration Tests
runs-on: ubuntu-latest
steps:
- name: Create GOPATH
run: mkdir -p /tmp/go
- uses: actions/checkout@v2
with:
path: "./ipld-ethcl-indexer"
- uses: actions/checkout@v3
with:
ref: ${{ env.stack-orchestrator-ref }}
path: "./stack-orchestrator/"
repository: vulcanize/stack-orchestrator
fetch-depth: 0
- uses: actions/checkout@v3
with:
ref: ${{ env.ipld-ethcl-db-ref }}
repository: vulcanize/ipld-ethcl-db
path: "./ipld-ethcl-db/"
token: ${{ secrets.GH_PAT }}
fetch-depth: 0
- name: Create config file
run: |
echo vulcanize_ipld_ethcl_db=$GITHUB_WORKSPACE/ipld-ethcl-db/ > ./config.sh
echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh
echo ethcl_capture_mode=boot >> ./config.sh
cat ./config.sh
- name: Run docker compose
run: |
docker-compose \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/latest/docker-compose-lighthouse.yml" \
--env-file ./config.sh \
up -d --build
- uses: actions/setup-go@v3
with:
go-version: ">=1.17.0"
check-latest: true
- name: Install packages
run: |
go install github.com/onsi/ginkgo/v2/ginkgo@latest
which ginkgo
- name: Run the tests using Make
run: |
cd ipld-ethcl-indexer
make integration-test-ci
golangci:
runs-on: ubuntu-latest
steps:
- uses: actions/setup-go@v3
with:
go-version: ">=1.17.0"
- uses: actions/checkout@v3
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
args: --timeout 90s --disable deadcode,unused
# args: --timeout 90s --disable deadcode,

View File

@ -3,9 +3,18 @@ on:
release: release:
types: [published, edited] types: [published, edited]
jobs: jobs:
trigger-tests:
uses: ./.github/workflows/tests.yml
with:
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref }}
ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref }}
ssz-data-ref: ${{ github.event.inputs.ssz-data-ref }}
secrets:
GHA_KEY: ${{secrets.GHA_KEY}}
build: build:
name: Run docker build name: Run docker build
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: trigger-tests
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- name: Get the version - name: Get the version

201
.github/workflows/tests.yml vendored Normal file
View File

@ -0,0 +1,201 @@
name: Test the stack.
on:
workflow_call:
inputs:
stack-orchestrator-ref:
required: false
type: string
ipld-ethcl-db-ref:
required: false
type: string
ssz-data-ref:
required: false
type: string
secrets:
GHA_KEY:
required: true
env:
stack-orchestrator-ref: ${{ inputs.stack-orchestrator-ref || 'develop' }}
ipld-ethcl-db-ref: ${{ inputs.ipld-ethcl-db-ref || 'feature/historic-processing' }}
ssz-data-ref: ${{ inputs.ssz-data-ref || 'main' }}
GOPATH: /tmp/go
jobs:
build:
name: Run Docker Build
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
path: "./ipld-ethcl-indexer"
- uses: actions/checkout@v3
with:
ref: ${{ env.stack-orchestrator-ref }}
path: "./stack-orchestrator/"
repository: vulcanize/stack-orchestrator
fetch-depth: 0
- uses: actions/checkout@v3
with:
ref: ${{ env.ipld-ethcl-db-ref }}
repository: vulcanize/ipld-ethcl-db
path: "./ipld-ethcl-db/"
ssh-key: ${{secrets.GHA_KEY}}
fetch-depth: 0
- name: Create config file
run: |
echo vulcanize_ipld_ethcl_db=$GITHUB_WORKSPACE/ipld-ethcl-db/ > ./config.sh
echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh
echo ethcl_capture_mode=boot >> ./config.sh
echo ethcl_skip_sync=true >> ./config.sh
echo ethcl_known_gap_increment=1000000 >> ./config.sh
cat ./config.sh
- name: Run docker compose
run: |
docker-compose \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/latest/docker-compose-lighthouse.yml" \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-ethcl-indexer.yml" \
--env-file ./config.sh \
up -d --build
- name: Check to make sure HEALTH file is present
shell: bash
run: |
until $(docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-ethcl-indexer.yml" cp ipld-ethcl-indexer:/root/HEALTH ./HEALTH) ; do sleep 10; done
cat ./HEALTH
if [[ "$(cat ./HEALTH)" -eq "0" ]]; then echo "Application boot successful" && (exit 0); else docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-ethcl-indexer.yml" cp ipld-ethcl-indexer:/root/ipld-ethcl-indexer.log . && cat ipld-ethcl-indexer.log && (exit 1); fi
unit-test:
name: Run Unit Tests
runs-on: ubuntu-latest
## IF you want to update the default branch for `pull_request runs, do it after the ||`
steps:
- name: Create GOPATH
run: mkdir -p /tmp/go
- uses: actions/checkout@v2
with:
path: "./ipld-ethcl-indexer"
- uses: actions/checkout@v3
with:
ref: ${{ env.stack-orchestrator-ref }}
path: "./stack-orchestrator/"
repository: vulcanize/stack-orchestrator
fetch-depth: 0
- uses: actions/checkout@v3
with:
ref: ${{ env.ipld-ethcl-db-ref }}
repository: vulcanize/ipld-ethcl-db
path: "./ipld-ethcl-db/"
ssh-key: ${{ secrets.GHA_KEY }}
fetch-depth: 0
- uses: actions/checkout@v3
with:
ref: ${{ env.ssz-data-ref }}
repository: vulcanize/ssz-data
path: "./ipld-ethcl-indexer/pkg/beaconclient/ssz-data"
fetch-depth: 0
- name: Create config file
run: |
echo vulcanize_ipld_ethcl_db=$GITHUB_WORKSPACE/ipld-ethcl-db/ > ./config.sh
echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh
cat ./config.sh
- name: Run docker compose
run: |
docker-compose \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \
--env-file ./config.sh \
up -d --build
- uses: actions/setup-go@v3
with:
go-version: ">=1.17.0"
check-latest: true
- name: Install packages
run: |
go install github.com/onsi/ginkgo/v2/ginkgo@latest
which ginkgo
- name: Run the tests using Make
run: |
cd ipld-ethcl-indexer
make unit-test-ci
integration-test:
name: Run Integration Tests
runs-on: ubuntu-latest
steps:
- name: Create GOPATH
run: mkdir -p /tmp/go
- uses: actions/checkout@v2
with:
path: "./ipld-ethcl-indexer"
- uses: actions/checkout@v3
with:
ref: ${{ env.stack-orchestrator-ref }}
path: "./stack-orchestrator/"
repository: vulcanize/stack-orchestrator
fetch-depth: 0
- uses: actions/checkout@v3
with:
ref: ${{ env.ipld-ethcl-db-ref }}
repository: vulcanize/ipld-ethcl-db
path: "./ipld-ethcl-db/"
ssh-key: ${{secrets.GHA_KEY}}
fetch-depth: 0
- name: Create config file
run: |
echo vulcanize_ipld_ethcl_db=$GITHUB_WORKSPACE/ipld-ethcl-db/ > ./config.sh
echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh
echo ethcl_capture_mode=boot >> ./config.sh
cat ./config.sh
- name: Run docker compose
run: |
docker-compose \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/latest/docker-compose-lighthouse.yml" \
--env-file ./config.sh \
up -d --build
- uses: actions/setup-go@v3
with:
go-version: ">=1.17.0"
check-latest: true
- name: Install packages
run: |
go install github.com/onsi/ginkgo/v2/ginkgo@latest
which ginkgo
- name: Run the tests using Make
run: |
cd ipld-ethcl-indexer
make integration-test-ci
golangci:
runs-on: ubuntu-latest
steps:
- uses: actions/setup-go@v3
with:
go-version: ">=1.17.0"
- uses: actions/checkout@v3
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
args: --timeout 90s --disable deadcode,unused
# args: --timeout 90s --disable deadcode,

View File

@ -23,6 +23,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot" "github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown" "github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
@ -44,10 +45,11 @@ func bootApp() {
log.Info("Starting the application in boot mode.") log.Info("Starting the application in boot mode.")
ctx := context.Background() ctx := context.Background()
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, kgTableIncrement, "head", testDisregardSync) viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
viper.GetInt("kg.increment"), "boot", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"))
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to Start application") StopApplicationPreBoot(err, Db)
} }
log.Info("Boot complete, we are going to shutdown.") log.Info("Boot complete, we are going to shutdown.")
@ -58,7 +60,7 @@ func bootApp() {
notifierCh <- syscall.SIGTERM notifierCh <- syscall.SIGTERM
}() }()
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) err = shutdown.ShutdownBoot(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
} else { } else {

View File

@ -39,6 +39,7 @@ var (
bcConnectionProtocol string bcConnectionProtocol string
bcType string bcType string
bcMaxHistoricProcessWorker int bcMaxHistoricProcessWorker int
bcUniqueNodeIdentifier int
kgMaxWorker int kgMaxWorker int
kgTableIncrement int kgTableIncrement int
kgProcessGaps bool kgProcessGaps bool
@ -93,6 +94,7 @@ func init() {
captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application")
captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application")
captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.") captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.")
captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.")
// err = captureCmd.MarkPersistentFlagRequired("bc.address") // err = captureCmd.MarkPersistentFlagRequired("bc.address")
// exitErr(err) // exitErr(err)
// err = captureCmd.MarkPersistentFlagRequired("bc.port") // err = captureCmd.MarkPersistentFlagRequired("bc.port")
@ -123,6 +125,8 @@ func init() {
exitErr(err) exitErr(err)
err = viper.BindPFlag("db.name", captureCmd.PersistentFlags().Lookup("db.name")) err = viper.BindPFlag("db.name", captureCmd.PersistentFlags().Lookup("db.name"))
exitErr(err) exitErr(err)
err = viper.BindPFlag("db.driver", captureCmd.PersistentFlags().Lookup("db.driver"))
exitErr(err)
//// Testing Specific //// Testing Specific
err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync")) err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync"))
@ -143,6 +147,8 @@ func init() {
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker")) err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker"))
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier"))
exitErr(err)
// Here you will define your flags and configuration settings. // Here you will define your flags and configuration settings.
//// Known Gap Specific //// Known Gap Specific

View File

@ -50,7 +50,7 @@ func startHeadTracking() {
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync")) viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"))
if err != nil { if err != nil {
StopApplicationPreBoot(err, Db) StopApplicationPreBoot(err, Db)
} }
@ -81,7 +81,7 @@ func startHeadTracking() {
} }
// Shutdown when the time is right. // Shutdown when the time is right.
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) err = shutdown.ShutdownHeadTracking(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
} else { } else {
@ -94,6 +94,7 @@ func init() {
captureCmd.AddCommand(headCmd) captureCmd.AddCommand(headCmd)
} }
// Start prometheus server
func serveProm(addr string) { func serveProm(addr string) {
mux := http.NewServeMux() mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler()) mux.Handle("/metrics", promhttp.Handler())

View File

@ -50,7 +50,7 @@ func startHistoricProcessing() {
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync")) viper.GetInt("kg.increment"), "historic", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"))
if err != nil { if err != nil {
StopApplicationPreBoot(err, Db) StopApplicationPreBoot(err, Db)
} }
@ -91,7 +91,7 @@ func startHistoricProcessing() {
} }
// Shutdown when the time is right. // Shutdown when the time is right.
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) err = shutdown.ShutdownHistoricProcessing(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
} else { } else {

View File

@ -14,7 +14,8 @@
"bootRetryInterval": 30, "bootRetryInterval": 30,
"bootMaxRetry": 5, "bootMaxRetry": 5,
"maxHistoricProcessWorker": 2, "maxHistoricProcessWorker": 2,
"connectionProtocol": "http" "connectionProtocol": "http",
"uniqueNodeIdentifier": 100
}, },
"t": { "t": {
"skipSync": true "skipSync": true

2
go.mod
View File

@ -9,6 +9,7 @@ require (
github.com/multiformats/go-multihash v0.1.0 github.com/multiformats/go-multihash v0.1.0
github.com/onsi/ginkgo/v2 v2.1.4 github.com/onsi/ginkgo/v2 v2.1.4
github.com/onsi/gomega v1.19.0 github.com/onsi/gomega v1.19.0
github.com/prometheus/client_golang v1.12.1
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc
github.com/sirupsen/logrus v1.8.1 github.com/sirupsen/logrus v1.8.1
) )
@ -57,7 +58,6 @@ require (
github.com/multiformats/go-varint v0.0.6 // indirect github.com/multiformats/go-varint v0.0.6 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.12.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect github.com/prometheus/procfs v0.7.3 // indirect

View File

@ -42,14 +42,17 @@ var (
// //
// 3. Make sure the node is synced, unless disregardSync is true. // 3. Make sure the node is synced, unless disregardSync is true.
func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) { bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool, uniqueNodeIdentifier int) (*beaconclient.BeaconClient, sql.Database, error) {
log.Info("Booting the Application") log.Info("Booting the Application")
log.Debug("Creating the Beacon Client") log.Debug("Creating the Beacon Client")
BC = beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement) Bc, err := beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement, uniqueNodeIdentifier)
if err != nil {
return Bc, nil, err
}
log.Debug("Checking Beacon Client") log.Debug("Checking Beacon Client")
err := BC.CheckBeaconClient() err = Bc.CheckBeaconClient()
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -60,36 +63,37 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName
return nil, nil, err return nil, nil, err
} }
BC.Db = DB Bc.Db = DB
var status bool var status bool
if !disregardSync { if !disregardSync {
status, err = BC.CheckHeadSync() status, err = Bc.CheckHeadSync()
if err != nil { if err != nil {
log.Error("Unable to get the nodes sync status") log.Error("Unable to get the nodes sync status")
return BC, DB, err return Bc, DB, err
} }
if status { if status {
log.Error("The node is still syncing..") log.Error("The node is still syncing..")
err = fmt.Errorf("The node is still syncing.") err = fmt.Errorf("The node is still syncing.")
return BC, DB, err return Bc, DB, err
} }
} else { } else {
log.Warn("We are not checking to see if the node has synced to head.") log.Warn("We are not checking to see if the node has synced to head.")
} }
return BC, DB, nil return Bc, DB, nil
} }
// Add retry logic to ensure that we are give the Beacon Client and the DB time to start. // Add retry logic to ensure that we are give the Beacon Client and the DB time to start.
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int, startUpMode string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) { bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int,
startUpMode string, disregardSync bool, uniqueNodeIdentifier int) (*beaconclient.BeaconClient, sql.Database, error) {
var err error var err error
if bcMaxRetry < 0 { if bcMaxRetry < 0 {
i := 0 i := 0
for { for {
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName,
bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync) bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"retryNumber": i, "retryNumber": i,
@ -104,7 +108,7 @@ func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int
} else { } else {
for i := 0; i < bcMaxRetry; i++ { for i := 0; i < bcMaxRetry; i++ {
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName,
bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync) bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"retryNumber": i, "retryNumber": i,
@ -136,6 +140,8 @@ func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int
} }
BC.UpdateLatestSlotInBeaconServer(int64(headSlot)) BC.UpdateLatestSlotInBeaconServer(int64(headSlot))
// Add another switch case for bcType if its ever needed. // Add another switch case for bcType if its ever needed.
case "boot":
log.Debug("Running application in boot mode.")
default: default:
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"startUpMode": startUpMode, "startUpMode": startUpMode,

View File

@ -38,44 +38,52 @@ var _ = Describe("Boot", func() {
bcBootRetryInterval int = 1 bcBootRetryInterval int = 1
bcBootMaxRetry int = 5 bcBootMaxRetry int = 5
bcKgTableIncrement int = 10 bcKgTableIncrement int = 10
bcUniqueIdentifier int = 100
) )
Describe("Booting the application", Label("integration"), func() { Describe("Booting the application", Label("integration"), func() {
Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() { Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() {
It("Should connect successfully", func() { It("Should connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier)
defer db.Close() defer db.Close()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
}) })
Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() { Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() {
It("Should connect successfully", func() { It("Should connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true, bcUniqueIdentifier)
defer db.Close() defer db.Close()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
}) })
Context("When the DB and BC are both up and running, and we check for a synced head", func() { Context("When the DB and BC are both up and running, and we check for a synced head", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, bcUniqueIdentifier)
defer db.Close()
Expect(err).To(HaveOccurred())
})
})
Context("When the DB and BC are both up and running, we skip checking for a synced head, but the unique identifier is 0", func() {
It("Should not connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, 0)
defer db.Close() defer db.Close()
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })
Context("When the DB is running but not the BC", func() { Context("When the DB is running but not the BC", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true) _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })
Context("When the BC is running but not the DB", func() { Context("When the BC is running but not the DB", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true) _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })
Context("When neither the BC or DB are running", func() { Context("When neither the BC or DB are running", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true) _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })

View File

@ -27,18 +27,9 @@ import (
) )
// Shutdown all the internal services for the application. // Shutdown all the internal services for the application.
func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient, shutdownOperations map[string]gracefulshutdown.Operation) error {
successCh, errCh := gracefulshutdown.Shutdown(ctx, notifierCh, waitTime, map[string]gracefulshutdown.Operation{ //successCh, errCh := gracefulshutdown.Shutdown(ctx, notifierCh, waitTime, )
// Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. successCh, errCh := gracefulshutdown.Shutdown(ctx, notifierCh, waitTime, shutdownOperations)
"beaconClient": func(ctx context.Context) error {
defer DB.Close()
err := BC.StopHeadTracking()
if err != nil {
loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking")
}
return err
},
})
select { select {
case <-successCh: case <-successCh:
@ -47,3 +38,55 @@ func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime t
return err return err
} }
} }
// Wrapper function for shutting down the head tracking process.
func ShutdownHeadTracking(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{
// Combining DB shutdown with BC because BC needs DB open to cleanly shutdown.
"beaconClient": func(ctx context.Context) error {
defer DB.Close()
err := BC.StopHeadTracking()
if err != nil {
loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking")
}
if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) {
err = BC.StopKnownGapsProcessing()
if err != nil {
loghelper.LogError(err).Error("Unable to stop processing known gaps")
}
}
return err
},
})
}
// Wrapper function for shutting down the head tracking process.
func ShutdownHistoricProcessing(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{
// Combining DB shutdown with BC because BC needs DB open to cleanly shutdown.
"beaconClient": func(ctx context.Context) error {
defer DB.Close()
err := BC.StopHistoric()
if err != nil {
loghelper.LogError(err).Error("Unable to stop processing historic")
}
if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) {
err = BC.StopKnownGapsProcessing()
if err != nil {
loghelper.LogError(err).Error("Unable to stop processing known gaps")
}
}
return err
},
})
}
// Wrapper function for shutting down the application in boot mode.
func ShutdownBoot(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{
// Combining DB shutdown with BC because BC needs DB open to cleanly shutdown.
"Database": func(ctx context.Context) error {
return DB.Close()
},
})
}

View File

@ -35,8 +35,7 @@ import (
"github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown" "github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown"
) )
var _ = Describe("Shutdown", func() { var (
var (
dbAddress string = "localhost" dbAddress string = "localhost"
dbPort int = 8076 dbPort int = 8076
dbName string = "vulcanize_testing" dbName string = "vulcanize_testing"
@ -50,27 +49,30 @@ var _ = Describe("Shutdown", func() {
bcBootRetryInterval int = 1 bcBootRetryInterval int = 1
bcBootMaxRetry int = 5 bcBootMaxRetry int = 5
bcKgTableIncrement int = 10 bcKgTableIncrement int = 10
bcUniqueIdentifier int = 100
maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second
DB sql.Database DB sql.Database
BC *beaconclient.BeaconClient BC *beaconclient.BeaconClient
err error err error
ctx context.Context ctx context.Context
notifierCh chan os.Signal notifierCh chan os.Signal
) )
var _ = Describe("Shutdown", func() {
BeforeEach(func() { BeforeEach(func() {
ctx = context.Background() ctx = context.Background()
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress,
bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true) bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier)
notifierCh = make(chan os.Signal, 1) notifierCh = make(chan os.Signal, 1)
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
Describe("Run Shutdown Function,", Label("integration"), func() { Describe("Run Shutdown Function for head tracking,", Label("integration"), func() {
Context("When Channels are empty,", func() { Context("When Channels are empty,", func() {
It("Should Shutdown Successfully.", func() { It("Should Shutdown Successfully.", func() {
go func() { go func() {
log.Debug("Starting shutdown chan") log.Debug("Starting shutdown chan")
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) err = shutdown.ShutdownHeadTracking(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...") log.Debug("We have completed the shutdown...")
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}() }()
@ -82,7 +84,7 @@ var _ = Describe("Shutdown", func() {
//log.SetLevel(log.DebugLevel) //log.SetLevel(log.DebugLevel)
go func() { go func() {
log.Debug("Starting shutdown chan") log.Debug("Starting shutdown chan")
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) err = shutdown.ShutdownHeadTracking(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...") log.Debug("We have completed the shutdown...")
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
shutdownCh <- true shutdownCh <- true
@ -115,7 +117,7 @@ var _ = Describe("Shutdown", func() {
//log.SetLevel(log.DebugLevel) //log.SetLevel(log.DebugLevel)
go func() { go func() {
log.Debug("Starting shutdown chan") log.Debug("Starting shutdown chan")
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) err = shutdown.ShutdownHeadTracking(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...") log.Debug("We have completed the shutdown...")
Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String()))) Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String())))
shutdownCh <- true shutdownCh <- true

View File

@ -18,6 +18,7 @@ package beaconclient
import ( import (
"context" "context"
"fmt" "fmt"
"math/rand"
"github.com/r3labs/sse" "github.com/r3labs/sse"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -48,6 +49,8 @@ type BeaconClient struct {
Db sql.Database // Database object used for reads and writes. Db sql.Database // Database object used for reads and writes.
Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics. Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics.
KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry. KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
UniqueNodeIdentifier int // The unique identifier within the cluster of this individual node.
KnownGapsProcess KnownGapsProcessing // object keeping track of knowngaps processing
// Used for Head Tracking // Used for Head Tracking
@ -66,6 +69,7 @@ type BeaconClient struct {
// This value is lazily updated. Therefore at times it will be outdated. // This value is lazily updated. Therefore at times it will be outdated.
LatestSlotInBeaconServer int64 LatestSlotInBeaconServer int64
PerformHistoricalProcessing bool // Should we perform historical processing? PerformHistoricalProcessing bool // Should we perform historical processing?
HistoricalProcess historicProcessing // object keeping track of historical processing
} }
// A struct to keep track of relevant the head event topic. // A struct to keep track of relevant the head event topic.
@ -84,7 +88,17 @@ type SseError struct {
} }
// A Function to create the BeaconClient. // A Function to create the BeaconClient.
func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int) *BeaconClient { func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int, uniqueNodeIdentifier int) (*BeaconClient, error) {
if uniqueNodeIdentifier == 0 {
uniqueNodeIdentifier := rand.Int()
log.WithField("randomUniqueNodeIdentifier", uniqueNodeIdentifier).Warn("No uniqueNodeIdentifier provided, we are going to use a randomly generated one.")
}
metrics, err := CreateBeaconClientMetrics()
if err != nil {
return nil, err
}
endpoint := fmt.Sprintf("%s://%s:%d", connectionProtocol, bcAddress, bcPort) endpoint := fmt.Sprintf("%s://%s:%d", connectionProtocol, bcAddress, bcPort)
log.Info("Creating the BeaconClient") log.Info("Creating the BeaconClient")
return &BeaconClient{ return &BeaconClient{
@ -93,9 +107,10 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres
KnownGapTableIncrement: bcKgTableIncrement, KnownGapTableIncrement: bcKgTableIncrement,
HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint), HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint),
ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
Metrics: CreateBeaconClientMetrics(), Metrics: metrics,
UniqueNodeIdentifier: uniqueNodeIdentifier,
//FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), //FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
} }, nil
} }
// Create all the channels to handle a SSE events // Create all the channels to handle a SSE events

View File

@ -54,6 +54,7 @@ var (
dbUser string = "vdbm" dbUser string = "vdbm"
dbPassword string = "password" dbPassword string = "password"
dbDriver string = "pgx" dbDriver string = "pgx"
bcUniqueIdentifier int = 100
dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42" dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42"
knownGapsTableIncrement int = 100000 knownGapsTableIncrement int = 100000
maxRetry int = 120 maxRetry int = 120
@ -202,6 +203,7 @@ var (
dbPassword: dbPassword, dbPassword: dbPassword,
dbDriver: dbDriver, dbDriver: dbDriver,
knownGapsTableIncrement: knownGapsTableIncrement, knownGapsTableIncrement: knownGapsTableIncrement,
bcUniqueIdentifier: bcUniqueIdentifier,
} }
BeaconNodeTester = TestBeaconNode{ BeaconNodeTester = TestBeaconNode{
@ -421,6 +423,7 @@ type Config struct {
dbPassword string dbPassword string
dbDriver string dbDriver string
knownGapsTableIncrement int knownGapsTableIncrement int
bcUniqueIdentifier int
} }
////////////////////////////////////////////////////// //////////////////////////////////////////////////////
@ -430,7 +433,8 @@ type Config struct {
// Must run before each test. We can't use the beforeEach because of the way // Must run before each test. We can't use the beforeEach because of the way
// Gingko treats race conditions. // Gingko treats race conditions.
func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient { func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient {
bc := *beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement) bc, err := beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement, config.bcUniqueIdentifier)
Expect(err).ToNot(HaveOccurred())
db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver) db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
@ -441,7 +445,7 @@ func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient {
writeSlot(db, maxSlot) writeSlot(db, maxSlot)
bc.Db = db bc.Db = db
return &bc return bc
} }
// A helper function to validate the expected output from the ethcl.slots table. // A helper function to validate the expected output from the ethcl.slots table.

View File

@ -22,18 +22,29 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
// This function will perform all the heavy lifting for tracking the head of the chain. // This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHistoric(maxWorkers int) []error { func (bc *BeaconClient) CaptureHistoric(maxWorkers int) []error {
log.Info("We are starting the historical processing service.") log.Info("We are starting the historical processing service.")
hp := historicProcessing{db: bc.Db, metrics: bc.Metrics} bc.HistoricalProcess = historicProcessing{db: bc.Db, metrics: bc.Metrics}
errs := handleBatchProcess(maxWorkers, hp, hp.db, bc.ServerEndpoint, bc.Metrics) errs := handleBatchProcess(maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.finishProcessing, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics)
log.Debug("Exiting Historical") log.Debug("Exiting Historical")
return errs return errs
} }
// This function will perform all the necessary clean up tasks for stopping historical processing.
func (bc *BeaconClient) StopHistoric() error {
log.Info("We are stopping the historical processing service.")
err := bc.HistoricalProcess.releaseDbLocks()
if err != nil {
loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the ethcl.historic_processing table. Manual Intervention is needed!")
}
return nil
}
// An interface to enforce any batch processing. Currently there are two use cases for this. // An interface to enforce any batch processing. Currently there are two use cases for this.
// //
// 1. Historic Processing // 1. Historic Processing
@ -41,10 +52,17 @@ func (bc *BeaconClient) CaptureHistoric(maxWorkers int) []error {
// 2. Known Gaps Processing // 2. Known Gaps Processing
type BatchProcessing interface { type BatchProcessing interface {
getSlotRange(chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write. getSlotRange(chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write.
handleProcessingErrors(<-chan batchHistoricError) handleProcessingErrors(<-chan batchHistoricError) // Custom logic to handle errors.
removeTableEntry(<-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database. removeTableEntry(<-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database.
releaseDbLocks() error // Update the checked_out column to false for whatever table is being updated.
} }
/// ^^^
// Might be better to remove the interface and create a single struct that historicalProcessing
// and knownGapsProcessing can use. The struct would contain all the SQL strings that they need.
// And the only difference in logic for processing would be within the error handling.
// Which can be a function we pass into handleBatchProcess()
// A struct to pass around indicating a table entry for slots to process. // A struct to pass around indicating a table entry for slots to process.
type slotsToProcess struct { type slotsToProcess struct {
startSlot int // The start slot startSlot int // The start slot
@ -71,12 +89,12 @@ type batchHistoricError struct {
// 4. Remove the slot entry from the DB. // 4. Remove the slot entry from the DB.
// //
// 5. Handle any errors. // 5. Handle any errors.
func handleBatchProcess(maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics) []error { func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics) []error {
slotsCh := make(chan slotsToProcess) slotsCh := make(chan slotsToProcess)
workCh := make(chan int) workCh := make(chan int)
processedCh := make(chan slotsToProcess) processedCh := make(chan slotsToProcess)
errCh := make(chan batchHistoricError) errCh := make(chan batchHistoricError)
finishCh := make(chan []error, 1) finalErrCh := make(chan []error, 1)
// Start workers // Start workers
for w := 1; w <= maxWorkers; w++ { for w := 1; w <= maxWorkers; w++ {
@ -115,7 +133,7 @@ func handleBatchProcess(maxWorkers int, bp BatchProcessing, db sql.Database, ser
return bp.removeTableEntry(processedCh) return bp.removeTableEntry(processedCh)
}) })
if err := errG.Wait(); err != nil { if err := errG.Wait(); err != nil {
finishCh <- []error{err} finalErrCh <- []error{err}
} }
}() }()
// Process errors from slot processing. // Process errors from slot processing.
@ -125,12 +143,17 @@ func handleBatchProcess(maxWorkers int, bp BatchProcessing, db sql.Database, ser
go func() { go func() {
errs := bp.getSlotRange(slotsCh) // Periodically adds new entries.... errs := bp.getSlotRange(slotsCh) // Periodically adds new entries....
if errs != nil { if errs != nil {
finishCh <- errs finalErrCh <- errs
} }
finishCh <- nil finalErrCh <- nil
}() }()
log.Debug("Waiting for shutdown signal from channel")
errs := <-finishCh select {
case <-finishCh:
log.Debug("Received shutdown signal from channel")
return nil
case errs := <-finalErrCh:
log.Debug("Finishing the batchProcess") log.Debug("Finishing the batchProcess")
return errs return errs
}
} }

View File

@ -25,19 +25,22 @@ import (
var _ = Describe("Healthcheck", func() { var _ = Describe("Healthcheck", func() {
var ( var (
BC *beaconclient.BeaconClient Bc *beaconclient.BeaconClient
errBc *beaconclient.BeaconClient errBc *beaconclient.BeaconClient
) )
BeforeEach(func() { BeforeEach(func() {
BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10) var err error
errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10) Bc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10, bcUniqueIdentifier)
Expect(err).ToNot(HaveOccurred())
errBc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10, bcUniqueIdentifier)
Expect(err).ToNot(HaveOccurred())
}) })
Describe("Connecting to the lighthouse client", Label("integration"), func() { Describe("Connecting to the lighthouse client", Label("integration"), func() {
Context("When the client is running", func() { Context("When the client is running", func() {
It("We should connect successfully", func() { It("We should connect successfully", func() {
err := BC.CheckBeaconClient() err := Bc.CheckBeaconClient()
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
}) })

View File

@ -24,7 +24,7 @@ import (
) )
//Create a metric struct and register each channel with prometheus //Create a metric struct and register each channel with prometheus
func CreateBeaconClientMetrics() *BeaconClientMetrics { func CreateBeaconClientMetrics() (*BeaconClientMetrics, error) {
metrics := &BeaconClientMetrics{ metrics := &BeaconClientMetrics{
SlotInserts: 0, SlotInserts: 0,
ReorgInserts: 0, ReorgInserts: 0,
@ -34,17 +34,38 @@ func CreateBeaconClientMetrics() *BeaconClientMetrics {
HeadError: 0, HeadError: 0,
HeadReorgError: 0, HeadReorgError: 0,
} }
prometheusRegisterHelper("slot_inserts", "Keeps track of the number of slots we have inserted.", &metrics.SlotInserts) err := prometheusRegisterHelper("slot_inserts", "Keeps track of the number of slots we have inserted.", &metrics.SlotInserts)
prometheusRegisterHelper("reorg_inserts", "Keeps track of the number of reorgs we have inserted.", &metrics.ReorgInserts) if err != nil {
prometheusRegisterHelper("known_gaps_inserts", "Keeps track of the number of known gaps we have inserted.", &metrics.KnownGapsInserts) return nil, err
prometheusRegisterHelper("known_gaps_processed", "Keeps track of the number of known gaps we processed.", &metrics.knownGapsProcessed) }
prometheusRegisterHelper("known_gaps_processing_error", "Keeps track of the number of known gaps we had errors processing.", &metrics.KnownGapsProcessingError) err = prometheusRegisterHelper("reorg_inserts", "Keeps track of the number of reorgs we have inserted.", &metrics.ReorgInserts)
prometheusRegisterHelper("head_error", "Keeps track of the number of errors we had processing head messages.", &metrics.HeadError) if err != nil {
prometheusRegisterHelper("head_reorg_error", "Keeps track of the number of errors we had processing reorg messages.", &metrics.HeadReorgError) return nil, err
return metrics }
err = prometheusRegisterHelper("known_gaps_inserts", "Keeps track of the number of known gaps we have inserted.", &metrics.KnownGapsInserts)
if err != nil {
return nil, err
}
err = prometheusRegisterHelper("known_gaps_processed", "Keeps track of the number of known gaps we processed.", &metrics.knownGapsProcessed)
if err != nil {
return nil, err
}
err = prometheusRegisterHelper("known_gaps_processing_error", "Keeps track of the number of known gaps we had errors processing.", &metrics.KnownGapsProcessingError)
if err != nil {
return nil, err
}
err = prometheusRegisterHelper("head_error", "Keeps track of the number of errors we had processing head messages.", &metrics.HeadError)
if err != nil {
return nil, err
}
err = prometheusRegisterHelper("head_reorg_error", "Keeps track of the number of errors we had processing reorg messages.", &metrics.HeadReorgError)
if err != nil {
return nil, err
}
return metrics, nil
} }
func prometheusRegisterHelper(name string, help string, varPointer *uint64) { func prometheusRegisterHelper(name string, help string, varPointer *uint64) error {
err := prometheus.Register(prometheus.NewCounterFunc( err := prometheus.Register(prometheus.NewCounterFunc(
prometheus.CounterOpts{ prometheus.CounterOpts{
Namespace: "beacon_client", Namespace: "beacon_client",
@ -58,7 +79,9 @@ func prometheusRegisterHelper(name string, help string, varPointer *uint64) {
})) }))
if err != nil && err.Error() != "duplicate metrics collector registration attempted" { if err != nil && err.Error() != "duplicate metrics collector registration attempted" {
loghelper.LogError(err).WithField("name", name).Error("Unable to register counter.") loghelper.LogError(err).WithField("name", name).Error("Unable to register counter.")
return err
} }
return nil
} }
// A structure utilized for keeping track of various metrics. Currently, mostly used in testing. // A structure utilized for keeping track of various metrics. Currently, mostly used in testing.

View File

@ -40,21 +40,27 @@ var (
checkHpEntryStmt string = `SELECT * FROM ethcl.historic_process WHERE checked_out=false;` checkHpEntryStmt string = `SELECT * FROM ethcl.historic_process WHERE checked_out=false;`
// Used to checkout a row from the ethcl.historic_process table // Used to checkout a row from the ethcl.historic_process table
lockHpEntryStmt string = `UPDATE ethcl.historic_process lockHpEntryStmt string = `UPDATE ethcl.historic_process
SET checked_out=true SET checked_out=true, checked_out_by=$3
WHERE start_slot=$1 AND end_slot=$2;` WHERE start_slot=$1 AND end_slot=$2;`
// Used to delete an entry from the knownGaps table // Used to delete an entry from the ethcl.historic_process table
deleteHpEntryStmt string = `DELETE FROM ethcl.historic_process deleteHpEntryStmt string = `DELETE FROM ethcl.historic_process
WHERE start_slot=$1 AND end_slot=$2;` WHERE start_slot=$1 AND end_slot=$2;`
// Used to update every single row that this node has checked out.
releaseHpLockStmt string = `UPDATE ethcl.historic_process
SET checked_out=false
WHERE checked_out_by=$1`
) )
type historicProcessing struct { type historicProcessing struct {
db sql.Database db sql.Database //db connection
metrics *BeaconClientMetrics metrics *BeaconClientMetrics // metrics for beaconclient
uniqueNodeIdentifier int // node unique identifier.
finishProcessing chan int // A channel which indicates to the process handleBatchProcess function that its time to end.
} }
// Get a single row of historical slots from the table. // Get a single row of historical slots from the table.
func (hp historicProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error { func (hp historicProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error {
return getBatchProcessRow(hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh) return getBatchProcessRow(hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh, strconv.Itoa(hp.uniqueNodeIdentifier))
} }
// Remove the table entry. // Remove the table entry.
@ -71,6 +77,22 @@ func (hp historicProcessing) handleProcessingErrors(errMessages <-chan batchHist
} }
} }
func (hp historicProcessing) releaseDbLocks() error {
go func() { hp.finishProcessing <- 1 }()
log.Debug("Updating all the entries to ethcl.historical processing")
res, err := hp.db.Exec(context.Background(), releaseHpLockStmt, hp.uniqueNodeIdentifier)
if err != nil {
return fmt.Errorf("Unable to remove lock from ethcl.historical_processing table for node %d, error is %e", hp.uniqueNodeIdentifier, err)
}
log.Debug("Update all the entries to ethcl.historical processing")
rows, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("Unable to calculated number of rows affected by releasing locks from ethcl.historical_processing table for node %d, error is %e", hp.uniqueNodeIdentifier, err)
}
log.WithField("rowCount", rows).Info("Released historicalProcess locks for specified rows.")
return nil
}
// Process the slot range. // Process the slot range.
func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics) { func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics) {
for slot := range workCh { for slot := range workCh {
@ -91,7 +113,7 @@ func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError,
// It also locks the row by updating the checked_out column. // It also locks the row by updating the checked_out column.
// The statement for getting the start_slot and end_slot must be provided. // The statement for getting the start_slot and end_slot must be provided.
// The statement for "locking" the row must also be provided. // The statement for "locking" the row must also be provided.
func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess) []error { func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess, uniqueNodeIdentifier string) []error {
errCount := make([]error, 0) errCount := make([]error, 0)
// 5 is an arbitrary number. It allows us to retry a few times before // 5 is an arbitrary number. It allows us to retry a few times before
@ -148,7 +170,7 @@ func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRow
} }
// Checkout the Row // Checkout the Row
res, err := tx.Exec(ctx, checkOutRowStmt, sp.startSlot, sp.endSlot) res, err := tx.Exec(ctx, checkOutRowStmt, sp.startSlot, sp.endSlot, uniqueNodeIdentifier)
if err != nil { if err != nil {
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).Error("Unable to checkout the row") loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).Error("Unable to checkout the row")
errCount = append(errCount, err) errCount = append(errCount, err)

View File

@ -36,7 +36,7 @@ var (
checkKgEntryStmt string = `SELECT * FROM ethcl.known_gaps WHERE checked_out=false;` checkKgEntryStmt string = `SELECT * FROM ethcl.known_gaps WHERE checked_out=false;`
// Used to checkout a row from the ethcl.known_gaps table // Used to checkout a row from the ethcl.known_gaps table
lockKgEntryStmt string = `UPDATE ethcl.known_gaps lockKgEntryStmt string = `UPDATE ethcl.known_gaps
SET checked_out=true SET checked_out=true, checked_out_by=$3
WHERE start_slot=$1 AND end_slot=$2;` WHERE start_slot=$1 AND end_slot=$2;`
// Used to delete an entry from the knownGaps table // Used to delete an entry from the knownGaps table
deleteKgEntryStmt string = `DELETE FROM ethcl.known_gaps deleteKgEntryStmt string = `DELETE FROM ethcl.known_gaps
@ -44,34 +44,50 @@ var (
// Used to check to see if a single slot exists in the known_gaps table. // Used to check to see if a single slot exists in the known_gaps table.
checkKgSingleSlotStmt string = `SELECT start_slot, end_slot FROM ethcl.known_gaps checkKgSingleSlotStmt string = `SELECT start_slot, end_slot FROM ethcl.known_gaps
WHERE start_slot=$1 AND end_slot=$2;` WHERE start_slot=$1 AND end_slot=$2;`
// Used to update every single row that this node has checked out.
releaseKgLockStmt string = `UPDATE ethcl.known_gaps
SET checked_out=false
WHERE checked_out_by=$1`
) )
type knownGapsProcessing struct { type KnownGapsProcessing struct {
db sql.Database db sql.Database //db connection
metrics *BeaconClientMetrics metrics *BeaconClientMetrics // metrics for beaconclient
uniqueNodeIdentifier int // node unique identifier.
finishProcessing chan int // A channel which indicates to the process handleBatchProcess function that its time to end.
} }
// This function will perform all the heavy lifting for tracking the head of the chain. // This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) ProcessKnownGaps(maxWorkers int) []error { func (bc *BeaconClient) ProcessKnownGaps(maxWorkers int) []error {
log.Info("We are starting the known gaps processing service.") log.Info("We are starting the known gaps processing service.")
hp := knownGapsProcessing{db: bc.Db, metrics: bc.Metrics} bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics, finishProcessing: make(chan int)}
errs := handleBatchProcess(maxWorkers, hp, hp.db, bc.ServerEndpoint, bc.Metrics) errs := handleBatchProcess(maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.finishProcessing, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics)
log.Debug("Exiting known gaps processing service") log.Debug("Exiting known gaps processing service")
return errs return errs
} }
// This function will perform all the necessary clean up tasks for stopping historical processing.
func (bc *BeaconClient) StopKnownGapsProcessing() error {
log.Info("We are stopping the historical processing service.")
err := bc.KnownGapsProcess.releaseDbLocks()
if err != nil {
loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the ethcl.known_gaps table. Manual Intervention is needed!")
}
return nil
}
// Get a single row of historical slots from the table. // Get a single row of historical slots from the table.
func (kgp knownGapsProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error { func (kgp KnownGapsProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error {
return getBatchProcessRow(kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh) return getBatchProcessRow(kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh, strconv.Itoa(kgp.uniqueNodeIdentifier))
} }
// Remove the table entry. // Remove the table entry.
func (kgp knownGapsProcessing) removeTableEntry(processCh <-chan slotsToProcess) error { func (kgp KnownGapsProcessing) removeTableEntry(processCh <-chan slotsToProcess) error {
return removeRowPostProcess(kgp.db, processCh, QueryBySlotStmt, deleteKgEntryStmt) return removeRowPostProcess(kgp.db, processCh, QueryBySlotStmt, deleteKgEntryStmt)
} }
// Remove the table entry. // Remove the table entry.
func (kgp knownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) { func (kgp KnownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) {
for { for {
errMs := <-errMessages errMs := <-errMessages
@ -99,3 +115,18 @@ func (kgp knownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHi
} }
} }
} }
// Updated checked_out column for the uniqueNodeIdentifier.
func (kgp KnownGapsProcessing) releaseDbLocks() error {
go func() { kgp.finishProcessing <- 1 }()
res, err := kgp.db.Exec(context.Background(), releaseKgLockStmt, kgp.uniqueNodeIdentifier)
if err != nil {
return err
}
rows, err := res.RowsAffected()
if err != nil {
return err
}
log.WithField("rowCount", rows).Info("Released knownGaps locks for specified rows.")
return nil
}

View File

@ -49,7 +49,7 @@ func SetupPostgresDb(dbHostname string, dbPort int, dbName string, dbUsername st
"driver_name_provided": driverName, "driver_name_provided": driverName,
}).Error("Can't resolve driver type") }).Error("Can't resolve driver type")
} }
log.Info("Using Driver:", DbDriver) log.Info("Using Driver: ", DbDriver)
postgresConfig := Config{ postgresConfig := Config{
Hostname: dbHostname, Hostname: dbHostname,