diff --git a/.github/workflows/on-pr.yml b/.github/workflows/on-pr.yml index 1e7f347..ac5b146 100644 --- a/.github/workflows/on-pr.yml +++ b/.github/workflows/on-pr.yml @@ -22,189 +22,15 @@ on: - "!LICENSE" - "!.github/workflows/**" - ".github/workflows/on-pr.yml" + - ".github/workflows/tests.yml" - "**" -env: - stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'main'}} - ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || 'feature/historic-processing' }} - ssz-data-ref: ${{ github.event.inputs.ssz-data-ref || 'main' }} - GOPATH: /tmp/go jobs: - build: - name: Run Docker Build - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - with: - path: "./ipld-ethcl-indexer" - - - uses: actions/checkout@v3 - with: - ref: ${{ env.stack-orchestrator-ref }} - path: "./stack-orchestrator/" - repository: vulcanize/stack-orchestrator - fetch-depth: 0 - - - uses: actions/checkout@v3 - with: - ref: ${{ env.ipld-ethcl-db-ref }} - repository: vulcanize/ipld-ethcl-db - path: "./ipld-ethcl-db/" - token: ${{ secrets.GH_PAT }} - fetch-depth: 0 - - - name: Create config file - run: | - echo vulcanize_ipld_ethcl_db=$GITHUB_WORKSPACE/ipld-ethcl-db/ > ./config.sh - echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh - echo ethcl_capture_mode=boot >> ./config.sh - echo ethcl_skip_sync=true >> ./config.sh - echo ethcl_known_gap_increment=1000000 >> ./config.sh - cat ./config.sh - - - name: Run docker compose - run: | - docker-compose \ - -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \ - -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/latest/docker-compose-lighthouse.yml" \ - -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-ethcl-indexer.yml" \ - --env-file ./config.sh \ - up -d --build - - - name: Check to make sure HEALTH file is present - shell: bash - run: | - until $(docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-ethcl-indexer.yml" cp ipld-ethcl-indexer:/root/HEALTH ./HEALTH) ; do sleep 10; done - cat ./HEALTH - if [[ "$(cat ./HEALTH)" -eq "0" ]]; then echo "Application boot successful" && (exit 0); else docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-ethcl-indexer.yml" cp ipld-ethcl-indexer:/root/ipld-ethcl-indexer.log . && cat ipld-ethcl-indexer.log && (exit 1); fi - - unit-test: - name: Run Unit Tests - runs-on: ubuntu-latest - ## IF you want to update the default branch for `pull_request runs, do it after the ||` - steps: - - name: Create GOPATH - run: mkdir -p /tmp/go - - - uses: actions/checkout@v2 - with: - path: "./ipld-ethcl-indexer" - - - uses: actions/checkout@v3 - with: - ref: ${{ env.stack-orchestrator-ref }} - path: "./stack-orchestrator/" - repository: vulcanize/stack-orchestrator - fetch-depth: 0 - - - uses: actions/checkout@v3 - with: - ref: ${{ env.ipld-ethcl-db-ref }} - repository: vulcanize/ipld-ethcl-db - path: "./ipld-ethcl-db/" - token: ${{ secrets.GH_PAT }} - fetch-depth: 0 - - - uses: actions/checkout@v3 - with: - ref: ${{ env.ssz-data-ref }} - repository: vulcanize/ssz-data - path: "./ipld-ethcl-indexer/pkg/beaconclient/ssz-data" - fetch-depth: 0 - - - name: Create config file - run: | - echo vulcanize_ipld_ethcl_db=$GITHUB_WORKSPACE/ipld-ethcl-db/ > ./config.sh - echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh - cat ./config.sh - - - name: Run docker compose - run: | - docker-compose \ - -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \ - --env-file ./config.sh \ - up -d --build - - - uses: actions/setup-go@v3 - with: - go-version: ">=1.17.0" - check-latest: true - - - name: Install packages - run: | - go install github.com/onsi/ginkgo/v2/ginkgo@latest - which ginkgo - - - name: Run the tests using Make - run: | - cd ipld-ethcl-indexer - make unit-test-ci - - integration-test: - name: Run Integration Tests - runs-on: ubuntu-latest - steps: - - name: Create GOPATH - run: mkdir -p /tmp/go - - - uses: actions/checkout@v2 - with: - path: "./ipld-ethcl-indexer" - - - uses: actions/checkout@v3 - with: - ref: ${{ env.stack-orchestrator-ref }} - path: "./stack-orchestrator/" - repository: vulcanize/stack-orchestrator - fetch-depth: 0 - - - uses: actions/checkout@v3 - with: - ref: ${{ env.ipld-ethcl-db-ref }} - repository: vulcanize/ipld-ethcl-db - path: "./ipld-ethcl-db/" - token: ${{ secrets.GH_PAT }} - fetch-depth: 0 - - - name: Create config file - run: | - echo vulcanize_ipld_ethcl_db=$GITHUB_WORKSPACE/ipld-ethcl-db/ > ./config.sh - echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh - echo ethcl_capture_mode=boot >> ./config.sh - cat ./config.sh - - - name: Run docker compose - run: | - docker-compose \ - -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \ - -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/latest/docker-compose-lighthouse.yml" \ - --env-file ./config.sh \ - up -d --build - - - uses: actions/setup-go@v3 - with: - go-version: ">=1.17.0" - check-latest: true - - - name: Install packages - run: | - go install github.com/onsi/ginkgo/v2/ginkgo@latest - which ginkgo - - - name: Run the tests using Make - run: | - cd ipld-ethcl-indexer - make integration-test-ci - - golangci: - runs-on: ubuntu-latest - steps: - - uses: actions/setup-go@v3 - with: - go-version: ">=1.17.0" - - uses: actions/checkout@v3 - - name: golangci-lint - uses: golangci/golangci-lint-action@v3 - with: - args: --timeout 90s --disable deadcode,unused -# args: --timeout 90s --disable deadcode, + trigger-tests: + uses: ./.github/workflows/tests.yml + with: + stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref }} + ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref }} + ssz-data-ref: ${{ github.event.inputs.ssz-data-ref }} + secrets: + GHA_KEY: ${{secrets.GHA_KEY}} diff --git a/.github/workflows/on-publish.yml b/.github/workflows/on-publish.yml index 801a1c9..11ffbaa 100644 --- a/.github/workflows/on-publish.yml +++ b/.github/workflows/on-publish.yml @@ -3,9 +3,18 @@ on: release: types: [published, edited] jobs: + trigger-tests: + uses: ./.github/workflows/tests.yml + with: + stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref }} + ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref }} + ssz-data-ref: ${{ github.event.inputs.ssz-data-ref }} + secrets: + GHA_KEY: ${{secrets.GHA_KEY}} build: name: Run docker build runs-on: ubuntu-latest + needs: trigger-tests steps: - uses: actions/checkout@v2 - name: Get the version diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..8077c67 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,201 @@ +name: Test the stack. +on: + workflow_call: + inputs: + stack-orchestrator-ref: + required: false + type: string + ipld-ethcl-db-ref: + required: false + type: string + ssz-data-ref: + required: false + type: string + secrets: + GHA_KEY: + required: true + +env: + stack-orchestrator-ref: ${{ inputs.stack-orchestrator-ref || 'develop' }} + ipld-ethcl-db-ref: ${{ inputs.ipld-ethcl-db-ref || 'feature/historic-processing' }} + ssz-data-ref: ${{ inputs.ssz-data-ref || 'main' }} + GOPATH: /tmp/go +jobs: + build: + name: Run Docker Build + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + path: "./ipld-ethcl-indexer" + + - uses: actions/checkout@v3 + with: + ref: ${{ env.stack-orchestrator-ref }} + path: "./stack-orchestrator/" + repository: vulcanize/stack-orchestrator + fetch-depth: 0 + + - uses: actions/checkout@v3 + with: + ref: ${{ env.ipld-ethcl-db-ref }} + repository: vulcanize/ipld-ethcl-db + path: "./ipld-ethcl-db/" + ssh-key: ${{secrets.GHA_KEY}} + fetch-depth: 0 + + - name: Create config file + run: | + echo vulcanize_ipld_ethcl_db=$GITHUB_WORKSPACE/ipld-ethcl-db/ > ./config.sh + echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh + echo ethcl_capture_mode=boot >> ./config.sh + echo ethcl_skip_sync=true >> ./config.sh + echo ethcl_known_gap_increment=1000000 >> ./config.sh + cat ./config.sh + + - name: Run docker compose + run: | + docker-compose \ + -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \ + -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/latest/docker-compose-lighthouse.yml" \ + -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-ethcl-indexer.yml" \ + --env-file ./config.sh \ + up -d --build + + - name: Check to make sure HEALTH file is present + shell: bash + run: | + until $(docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-ethcl-indexer.yml" cp ipld-ethcl-indexer:/root/HEALTH ./HEALTH) ; do sleep 10; done + cat ./HEALTH + if [[ "$(cat ./HEALTH)" -eq "0" ]]; then echo "Application boot successful" && (exit 0); else docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-ethcl-indexer.yml" cp ipld-ethcl-indexer:/root/ipld-ethcl-indexer.log . && cat ipld-ethcl-indexer.log && (exit 1); fi + + unit-test: + name: Run Unit Tests + runs-on: ubuntu-latest + ## IF you want to update the default branch for `pull_request runs, do it after the ||` + steps: + - name: Create GOPATH + run: mkdir -p /tmp/go + + - uses: actions/checkout@v2 + with: + path: "./ipld-ethcl-indexer" + + - uses: actions/checkout@v3 + with: + ref: ${{ env.stack-orchestrator-ref }} + path: "./stack-orchestrator/" + repository: vulcanize/stack-orchestrator + fetch-depth: 0 + + - uses: actions/checkout@v3 + with: + ref: ${{ env.ipld-ethcl-db-ref }} + repository: vulcanize/ipld-ethcl-db + path: "./ipld-ethcl-db/" + ssh-key: ${{ secrets.GHA_KEY }} + fetch-depth: 0 + + - uses: actions/checkout@v3 + with: + ref: ${{ env.ssz-data-ref }} + repository: vulcanize/ssz-data + path: "./ipld-ethcl-indexer/pkg/beaconclient/ssz-data" + fetch-depth: 0 + + - name: Create config file + run: | + echo vulcanize_ipld_ethcl_db=$GITHUB_WORKSPACE/ipld-ethcl-db/ > ./config.sh + echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh + cat ./config.sh + + - name: Run docker compose + run: | + docker-compose \ + -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \ + --env-file ./config.sh \ + up -d --build + + - uses: actions/setup-go@v3 + with: + go-version: ">=1.17.0" + check-latest: true + + - name: Install packages + run: | + go install github.com/onsi/ginkgo/v2/ginkgo@latest + which ginkgo + + - name: Run the tests using Make + run: | + cd ipld-ethcl-indexer + make unit-test-ci + + integration-test: + name: Run Integration Tests + runs-on: ubuntu-latest + steps: + - name: Create GOPATH + run: mkdir -p /tmp/go + + - uses: actions/checkout@v2 + with: + path: "./ipld-ethcl-indexer" + + - uses: actions/checkout@v3 + with: + ref: ${{ env.stack-orchestrator-ref }} + path: "./stack-orchestrator/" + repository: vulcanize/stack-orchestrator + fetch-depth: 0 + + - uses: actions/checkout@v3 + with: + ref: ${{ env.ipld-ethcl-db-ref }} + repository: vulcanize/ipld-ethcl-db + path: "./ipld-ethcl-db/" + ssh-key: ${{secrets.GHA_KEY}} + fetch-depth: 0 + + - name: Create config file + run: | + echo vulcanize_ipld_ethcl_db=$GITHUB_WORKSPACE/ipld-ethcl-db/ > ./config.sh + echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh + echo ethcl_capture_mode=boot >> ./config.sh + cat ./config.sh + + - name: Run docker compose + run: | + docker-compose \ + -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \ + -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/latest/docker-compose-lighthouse.yml" \ + --env-file ./config.sh \ + up -d --build + + - uses: actions/setup-go@v3 + with: + go-version: ">=1.17.0" + check-latest: true + + - name: Install packages + run: | + go install github.com/onsi/ginkgo/v2/ginkgo@latest + which ginkgo + + - name: Run the tests using Make + run: | + cd ipld-ethcl-indexer + make integration-test-ci + + golangci: + runs-on: ubuntu-latest + steps: + - uses: actions/setup-go@v3 + with: + go-version: ">=1.17.0" + - uses: actions/checkout@v3 + - name: golangci-lint + uses: golangci/golangci-lint-action@v3 + with: + args: --timeout 90s --disable deadcode,unused +# args: --timeout 90s --disable deadcode, diff --git a/cmd/boot.go b/cmd/boot.go index 9fc3532..6f98729 100644 --- a/cmd/boot.go +++ b/cmd/boot.go @@ -23,6 +23,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/spf13/viper" "github.com/vulcanize/ipld-ethcl-indexer/internal/boot" "github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" @@ -44,10 +45,11 @@ func bootApp() { log.Info("Starting the application in boot mode.") ctx := context.Background() - BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, - bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, kgTableIncrement, "head", testDisregardSync) + Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), + viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), + viper.GetInt("kg.increment"), "boot", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) if err != nil { - loghelper.LogError(err).Error("Unable to Start application") + StopApplicationPreBoot(err, Db) } log.Info("Boot complete, we are going to shutdown.") @@ -58,7 +60,7 @@ func bootApp() { notifierCh <- syscall.SIGTERM }() - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) + err = shutdown.ShutdownBoot(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") } else { diff --git a/cmd/capture.go b/cmd/capture.go index e1b46c8..d4ad6fc 100644 --- a/cmd/capture.go +++ b/cmd/capture.go @@ -39,6 +39,7 @@ var ( bcConnectionProtocol string bcType string bcMaxHistoricProcessWorker int + bcUniqueNodeIdentifier int kgMaxWorker int kgTableIncrement int kgProcessGaps bool @@ -93,6 +94,7 @@ func init() { captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.") + captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.") // err = captureCmd.MarkPersistentFlagRequired("bc.address") // exitErr(err) // err = captureCmd.MarkPersistentFlagRequired("bc.port") @@ -123,6 +125,8 @@ func init() { exitErr(err) err = viper.BindPFlag("db.name", captureCmd.PersistentFlags().Lookup("db.name")) exitErr(err) + err = viper.BindPFlag("db.driver", captureCmd.PersistentFlags().Lookup("db.driver")) + exitErr(err) //// Testing Specific err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync")) @@ -143,6 +147,8 @@ func init() { exitErr(err) err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker")) exitErr(err) + err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier")) + exitErr(err) // Here you will define your flags and configuration settings. //// Known Gap Specific diff --git a/cmd/head.go b/cmd/head.go index 1006fe1..f2dd0e5 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -50,7 +50,7 @@ func startHeadTracking() { Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), - viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync")) + viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) if err != nil { StopApplicationPreBoot(err, Db) } @@ -81,7 +81,7 @@ func startHeadTracking() { } // Shutdown when the time is right. - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) + err = shutdown.ShutdownHeadTracking(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") } else { @@ -94,6 +94,7 @@ func init() { captureCmd.AddCommand(headCmd) } +// Start prometheus server func serveProm(addr string) { mux := http.NewServeMux() mux.Handle("/metrics", promhttp.Handler()) diff --git a/cmd/historic.go b/cmd/historic.go index cfd9d03..e9d8d95 100644 --- a/cmd/historic.go +++ b/cmd/historic.go @@ -50,7 +50,7 @@ func startHistoricProcessing() { Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), - viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync")) + viper.GetInt("kg.increment"), "historic", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) if err != nil { StopApplicationPreBoot(err, Db) } @@ -91,7 +91,7 @@ func startHistoricProcessing() { } // Shutdown when the time is right. - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) + err = shutdown.ShutdownHistoricProcessing(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") } else { diff --git a/example.ipld-ethcl-indexer-config.json b/example.ipld-ethcl-indexer-config.json index 0929773..69746ca 100644 --- a/example.ipld-ethcl-indexer-config.json +++ b/example.ipld-ethcl-indexer-config.json @@ -14,7 +14,8 @@ "bootRetryInterval": 30, "bootMaxRetry": 5, "maxHistoricProcessWorker": 2, - "connectionProtocol": "http" + "connectionProtocol": "http", + "uniqueNodeIdentifier": 100 }, "t": { "skipSync": true diff --git a/go.mod b/go.mod index 5bb8589..3c844c8 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/multiformats/go-multihash v0.1.0 github.com/onsi/ginkgo/v2 v2.1.4 github.com/onsi/gomega v1.19.0 + github.com/prometheus/client_golang v1.12.1 github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc github.com/sirupsen/logrus v1.8.1 ) @@ -57,7 +58,6 @@ require ( github.com/multiformats/go-varint v0.0.6 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.12.1 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect diff --git a/internal/boot/boot.go b/internal/boot/boot.go index e726cf8..b1e476b 100644 --- a/internal/boot/boot.go +++ b/internal/boot/boot.go @@ -42,14 +42,17 @@ var ( // // 3. Make sure the node is synced, unless disregardSync is true. func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, - bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) { + bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool, uniqueNodeIdentifier int) (*beaconclient.BeaconClient, sql.Database, error) { log.Info("Booting the Application") log.Debug("Creating the Beacon Client") - BC = beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement) + Bc, err := beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement, uniqueNodeIdentifier) + if err != nil { + return Bc, nil, err + } log.Debug("Checking Beacon Client") - err := BC.CheckBeaconClient() + err = Bc.CheckBeaconClient() if err != nil { return nil, nil, err } @@ -60,36 +63,37 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName return nil, nil, err } - BC.Db = DB + Bc.Db = DB var status bool if !disregardSync { - status, err = BC.CheckHeadSync() + status, err = Bc.CheckHeadSync() if err != nil { log.Error("Unable to get the nodes sync status") - return BC, DB, err + return Bc, DB, err } if status { log.Error("The node is still syncing..") err = fmt.Errorf("The node is still syncing.") - return BC, DB, err + return Bc, DB, err } } else { log.Warn("We are not checking to see if the node has synced to head.") } - return BC, DB, nil + return Bc, DB, nil } // Add retry logic to ensure that we are give the Beacon Client and the DB time to start. func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, - bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int, startUpMode string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) { + bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int, + startUpMode string, disregardSync bool, uniqueNodeIdentifier int) (*beaconclient.BeaconClient, sql.Database, error) { var err error if bcMaxRetry < 0 { i := 0 for { BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, - bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync) + bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier) if err != nil { log.WithFields(log.Fields{ "retryNumber": i, @@ -104,7 +108,7 @@ func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int } else { for i := 0; i < bcMaxRetry; i++ { BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, - bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync) + bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier) if err != nil { log.WithFields(log.Fields{ "retryNumber": i, @@ -136,6 +140,8 @@ func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int } BC.UpdateLatestSlotInBeaconServer(int64(headSlot)) // Add another switch case for bcType if its ever needed. + case "boot": + log.Debug("Running application in boot mode.") default: log.WithFields(log.Fields{ "startUpMode": startUpMode, diff --git a/internal/boot/boot_test.go b/internal/boot/boot_test.go index aa77e56..ef621ae 100644 --- a/internal/boot/boot_test.go +++ b/internal/boot/boot_test.go @@ -38,44 +38,52 @@ var _ = Describe("Boot", func() { bcBootRetryInterval int = 1 bcBootMaxRetry int = 5 bcKgTableIncrement int = 10 + bcUniqueIdentifier int = 100 ) Describe("Booting the application", Label("integration"), func() { Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() { It("Should connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier) defer db.Close() Expect(err).ToNot(HaveOccurred()) }) }) Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() { It("Should connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true, bcUniqueIdentifier) defer db.Close() Expect(err).ToNot(HaveOccurred()) }) }) Context("When the DB and BC are both up and running, and we check for a synced head", func() { It("Should not connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, bcUniqueIdentifier) + defer db.Close() + Expect(err).To(HaveOccurred()) + }) + }) + Context("When the DB and BC are both up and running, we skip checking for a synced head, but the unique identifier is 0", func() { + It("Should not connect successfully", func() { + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, 0) defer db.Close() Expect(err).To(HaveOccurred()) }) }) Context("When the DB is running but not the BC", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true) + _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) Expect(err).To(HaveOccurred()) }) }) Context("When the BC is running but not the DB", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true) + _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) Expect(err).To(HaveOccurred()) }) }) Context("When neither the BC or DB are running", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true) + _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) Expect(err).To(HaveOccurred()) }) }) diff --git a/internal/shutdown/shutdown.go b/internal/shutdown/shutdown.go index 22c7310..228b760 100644 --- a/internal/shutdown/shutdown.go +++ b/internal/shutdown/shutdown.go @@ -27,18 +27,9 @@ import ( ) // Shutdown all the internal services for the application. -func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { - successCh, errCh := gracefulshutdown.Shutdown(ctx, notifierCh, waitTime, map[string]gracefulshutdown.Operation{ - // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. - "beaconClient": func(ctx context.Context) error { - defer DB.Close() - err := BC.StopHeadTracking() - if err != nil { - loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking") - } - return err - }, - }) +func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient, shutdownOperations map[string]gracefulshutdown.Operation) error { + //successCh, errCh := gracefulshutdown.Shutdown(ctx, notifierCh, waitTime, ) + successCh, errCh := gracefulshutdown.Shutdown(ctx, notifierCh, waitTime, shutdownOperations) select { case <-successCh: @@ -47,3 +38,55 @@ func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime t return err } } + +// Wrapper function for shutting down the head tracking process. +func ShutdownHeadTracking(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { + return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ + // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. + "beaconClient": func(ctx context.Context) error { + defer DB.Close() + err := BC.StopHeadTracking() + if err != nil { + loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking") + } + if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { + err = BC.StopKnownGapsProcessing() + if err != nil { + loghelper.LogError(err).Error("Unable to stop processing known gaps") + } + } + return err + }, + }) +} + +// Wrapper function for shutting down the head tracking process. +func ShutdownHistoricProcessing(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { + return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ + // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. + "beaconClient": func(ctx context.Context) error { + defer DB.Close() + err := BC.StopHistoric() + if err != nil { + loghelper.LogError(err).Error("Unable to stop processing historic") + } + if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { + err = BC.StopKnownGapsProcessing() + if err != nil { + loghelper.LogError(err).Error("Unable to stop processing known gaps") + } + } + return err + }, + }) +} + +// Wrapper function for shutting down the application in boot mode. +func ShutdownBoot(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { + return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ + // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. + "Database": func(ctx context.Context) error { + return DB.Close() + }, + }) +} diff --git a/internal/shutdown/shutdown_test.go b/internal/shutdown/shutdown_test.go index 0bbf38d..c6fa00e 100644 --- a/internal/shutdown/shutdown_test.go +++ b/internal/shutdown/shutdown_test.go @@ -35,42 +35,44 @@ import ( "github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown" ) +var ( + dbAddress string = "localhost" + dbPort int = 8076 + dbName string = "vulcanize_testing" + dbUsername string = "vdbm" + dbPassword string = "password" + dbDriver string = "PGX" + bcAddress string = "localhost" + bcPort int = 5052 + bcConnectionProtocol string = "http" + bcType string = "lighthouse" + bcBootRetryInterval int = 1 + bcBootMaxRetry int = 5 + bcKgTableIncrement int = 10 + bcUniqueIdentifier int = 100 + maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second + DB sql.Database + BC *beaconclient.BeaconClient + err error + ctx context.Context + notifierCh chan os.Signal +) + var _ = Describe("Shutdown", func() { - var ( - dbAddress string = "localhost" - dbPort int = 8076 - dbName string = "vulcanize_testing" - dbUsername string = "vdbm" - dbPassword string = "password" - dbDriver string = "PGX" - bcAddress string = "localhost" - bcPort int = 5052 - bcConnectionProtocol string = "http" - bcType string = "lighthouse" - bcBootRetryInterval int = 1 - bcBootMaxRetry int = 5 - bcKgTableIncrement int = 10 - maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second - DB sql.Database - BC *beaconclient.BeaconClient - err error - ctx context.Context - notifierCh chan os.Signal - ) BeforeEach(func() { ctx = context.Background() BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, - bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true) + bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier) notifierCh = make(chan os.Signal, 1) Expect(err).To(BeNil()) }) - Describe("Run Shutdown Function,", Label("integration"), func() { + Describe("Run Shutdown Function for head tracking,", Label("integration"), func() { Context("When Channels are empty,", func() { It("Should Shutdown Successfully.", func() { go func() { log.Debug("Starting shutdown chan") - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) + err = shutdown.ShutdownHeadTracking(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") Expect(err).ToNot(HaveOccurred()) }() @@ -82,7 +84,7 @@ var _ = Describe("Shutdown", func() { //log.SetLevel(log.DebugLevel) go func() { log.Debug("Starting shutdown chan") - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) + err = shutdown.ShutdownHeadTracking(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") Expect(err).ToNot(HaveOccurred()) shutdownCh <- true @@ -115,7 +117,7 @@ var _ = Describe("Shutdown", func() { //log.SetLevel(log.DebugLevel) go func() { log.Debug("Starting shutdown chan") - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) + err = shutdown.ShutdownHeadTracking(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String()))) shutdownCh <- true diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index 15f24b2..e9fd5c0 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -18,6 +18,7 @@ package beaconclient import ( "context" "fmt" + "math/rand" "github.com/r3labs/sse" log "github.com/sirupsen/logrus" @@ -48,6 +49,8 @@ type BeaconClient struct { Db sql.Database // Database object used for reads and writes. Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics. KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry. + UniqueNodeIdentifier int // The unique identifier within the cluster of this individual node. + KnownGapsProcess KnownGapsProcessing // object keeping track of knowngaps processing // Used for Head Tracking @@ -65,7 +68,8 @@ type BeaconClient struct { // The latest available slot within the Beacon Server. We can't query any slot greater than this. // This value is lazily updated. Therefore at times it will be outdated. LatestSlotInBeaconServer int64 - PerformHistoricalProcessing bool // Should we perform historical processing? + PerformHistoricalProcessing bool // Should we perform historical processing? + HistoricalProcess historicProcessing // object keeping track of historical processing } // A struct to keep track of relevant the head event topic. @@ -84,7 +88,17 @@ type SseError struct { } // A Function to create the BeaconClient. -func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int) *BeaconClient { +func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int, uniqueNodeIdentifier int) (*BeaconClient, error) { + if uniqueNodeIdentifier == 0 { + uniqueNodeIdentifier := rand.Int() + log.WithField("randomUniqueNodeIdentifier", uniqueNodeIdentifier).Warn("No uniqueNodeIdentifier provided, we are going to use a randomly generated one.") + } + + metrics, err := CreateBeaconClientMetrics() + if err != nil { + return nil, err + } + endpoint := fmt.Sprintf("%s://%s:%d", connectionProtocol, bcAddress, bcPort) log.Info("Creating the BeaconClient") return &BeaconClient{ @@ -93,9 +107,10 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres KnownGapTableIncrement: bcKgTableIncrement, HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint), ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), - Metrics: CreateBeaconClientMetrics(), + Metrics: metrics, + UniqueNodeIdentifier: uniqueNodeIdentifier, //FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), - } + }, nil } // Create all the channels to handle a SSE events diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go index 459b2af..0856cbe 100644 --- a/pkg/beaconclient/capturehead_test.go +++ b/pkg/beaconclient/capturehead_test.go @@ -54,6 +54,7 @@ var ( dbUser string = "vdbm" dbPassword string = "password" dbDriver string = "pgx" + bcUniqueIdentifier int = 100 dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42" knownGapsTableIncrement int = 100000 maxRetry int = 120 @@ -202,6 +203,7 @@ var ( dbPassword: dbPassword, dbDriver: dbDriver, knownGapsTableIncrement: knownGapsTableIncrement, + bcUniqueIdentifier: bcUniqueIdentifier, } BeaconNodeTester = TestBeaconNode{ @@ -421,6 +423,7 @@ type Config struct { dbPassword string dbDriver string knownGapsTableIncrement int + bcUniqueIdentifier int } ////////////////////////////////////////////////////// @@ -430,7 +433,8 @@ type Config struct { // Must run before each test. We can't use the beforeEach because of the way // Gingko treats race conditions. func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient { - bc := *beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement) + bc, err := beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement, config.bcUniqueIdentifier) + Expect(err).ToNot(HaveOccurred()) db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver) Expect(err).ToNot(HaveOccurred()) @@ -441,7 +445,7 @@ func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient { writeSlot(db, maxSlot) bc.Db = db - return &bc + return bc } // A helper function to validate the expected output from the ethcl.slots table. diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index 3c4d29b..cd88c90 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -22,29 +22,47 @@ import ( log "github.com/sirupsen/logrus" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" "golang.org/x/sync/errgroup" ) // This function will perform all the heavy lifting for tracking the head of the chain. func (bc *BeaconClient) CaptureHistoric(maxWorkers int) []error { log.Info("We are starting the historical processing service.") - hp := historicProcessing{db: bc.Db, metrics: bc.Metrics} - errs := handleBatchProcess(maxWorkers, hp, hp.db, bc.ServerEndpoint, bc.Metrics) + bc.HistoricalProcess = historicProcessing{db: bc.Db, metrics: bc.Metrics} + errs := handleBatchProcess(maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.finishProcessing, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics) log.Debug("Exiting Historical") return errs } +// This function will perform all the necessary clean up tasks for stopping historical processing. +func (bc *BeaconClient) StopHistoric() error { + log.Info("We are stopping the historical processing service.") + err := bc.HistoricalProcess.releaseDbLocks() + if err != nil { + loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the ethcl.historic_processing table. Manual Intervention is needed!") + } + return nil +} + // An interface to enforce any batch processing. Currently there are two use cases for this. // // 1. Historic Processing // // 2. Known Gaps Processing type BatchProcessing interface { - getSlotRange(chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write. - handleProcessingErrors(<-chan batchHistoricError) - removeTableEntry(<-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database. + getSlotRange(chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write. + handleProcessingErrors(<-chan batchHistoricError) // Custom logic to handle errors. + removeTableEntry(<-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database. + releaseDbLocks() error // Update the checked_out column to false for whatever table is being updated. } +/// ^^^ +// Might be better to remove the interface and create a single struct that historicalProcessing +// and knownGapsProcessing can use. The struct would contain all the SQL strings that they need. +// And the only difference in logic for processing would be within the error handling. +// Which can be a function we pass into handleBatchProcess() + // A struct to pass around indicating a table entry for slots to process. type slotsToProcess struct { startSlot int // The start slot @@ -71,12 +89,12 @@ type batchHistoricError struct { // 4. Remove the slot entry from the DB. // // 5. Handle any errors. -func handleBatchProcess(maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics) []error { +func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics) []error { slotsCh := make(chan slotsToProcess) workCh := make(chan int) processedCh := make(chan slotsToProcess) errCh := make(chan batchHistoricError) - finishCh := make(chan []error, 1) + finalErrCh := make(chan []error, 1) // Start workers for w := 1; w <= maxWorkers; w++ { @@ -115,7 +133,7 @@ func handleBatchProcess(maxWorkers int, bp BatchProcessing, db sql.Database, ser return bp.removeTableEntry(processedCh) }) if err := errG.Wait(); err != nil { - finishCh <- []error{err} + finalErrCh <- []error{err} } }() // Process errors from slot processing. @@ -125,12 +143,17 @@ func handleBatchProcess(maxWorkers int, bp BatchProcessing, db sql.Database, ser go func() { errs := bp.getSlotRange(slotsCh) // Periodically adds new entries.... if errs != nil { - finishCh <- errs + finalErrCh <- errs } - finishCh <- nil + finalErrCh <- nil }() - - errs := <-finishCh - log.Debug("Finishing the batchProcess") - return errs + log.Debug("Waiting for shutdown signal from channel") + select { + case <-finishCh: + log.Debug("Received shutdown signal from channel") + return nil + case errs := <-finalErrCh: + log.Debug("Finishing the batchProcess") + return errs + } } diff --git a/pkg/beaconclient/healthcheck_test.go b/pkg/beaconclient/healthcheck_test.go index df92398..c09e3d8 100644 --- a/pkg/beaconclient/healthcheck_test.go +++ b/pkg/beaconclient/healthcheck_test.go @@ -25,19 +25,22 @@ import ( var _ = Describe("Healthcheck", func() { var ( - BC *beaconclient.BeaconClient + Bc *beaconclient.BeaconClient errBc *beaconclient.BeaconClient ) BeforeEach(func() { - BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10) - errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10) + var err error + Bc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10, bcUniqueIdentifier) + Expect(err).ToNot(HaveOccurred()) + errBc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10, bcUniqueIdentifier) + Expect(err).ToNot(HaveOccurred()) }) Describe("Connecting to the lighthouse client", Label("integration"), func() { Context("When the client is running", func() { It("We should connect successfully", func() { - err := BC.CheckBeaconClient() + err := Bc.CheckBeaconClient() Expect(err).To(BeNil()) }) }) diff --git a/pkg/beaconclient/metrics.go b/pkg/beaconclient/metrics.go index 6156e2d..086d7a7 100644 --- a/pkg/beaconclient/metrics.go +++ b/pkg/beaconclient/metrics.go @@ -24,7 +24,7 @@ import ( ) //Create a metric struct and register each channel with prometheus -func CreateBeaconClientMetrics() *BeaconClientMetrics { +func CreateBeaconClientMetrics() (*BeaconClientMetrics, error) { metrics := &BeaconClientMetrics{ SlotInserts: 0, ReorgInserts: 0, @@ -34,17 +34,38 @@ func CreateBeaconClientMetrics() *BeaconClientMetrics { HeadError: 0, HeadReorgError: 0, } - prometheusRegisterHelper("slot_inserts", "Keeps track of the number of slots we have inserted.", &metrics.SlotInserts) - prometheusRegisterHelper("reorg_inserts", "Keeps track of the number of reorgs we have inserted.", &metrics.ReorgInserts) - prometheusRegisterHelper("known_gaps_inserts", "Keeps track of the number of known gaps we have inserted.", &metrics.KnownGapsInserts) - prometheusRegisterHelper("known_gaps_processed", "Keeps track of the number of known gaps we processed.", &metrics.knownGapsProcessed) - prometheusRegisterHelper("known_gaps_processing_error", "Keeps track of the number of known gaps we had errors processing.", &metrics.KnownGapsProcessingError) - prometheusRegisterHelper("head_error", "Keeps track of the number of errors we had processing head messages.", &metrics.HeadError) - prometheusRegisterHelper("head_reorg_error", "Keeps track of the number of errors we had processing reorg messages.", &metrics.HeadReorgError) - return metrics + err := prometheusRegisterHelper("slot_inserts", "Keeps track of the number of slots we have inserted.", &metrics.SlotInserts) + if err != nil { + return nil, err + } + err = prometheusRegisterHelper("reorg_inserts", "Keeps track of the number of reorgs we have inserted.", &metrics.ReorgInserts) + if err != nil { + return nil, err + } + err = prometheusRegisterHelper("known_gaps_inserts", "Keeps track of the number of known gaps we have inserted.", &metrics.KnownGapsInserts) + if err != nil { + return nil, err + } + err = prometheusRegisterHelper("known_gaps_processed", "Keeps track of the number of known gaps we processed.", &metrics.knownGapsProcessed) + if err != nil { + return nil, err + } + err = prometheusRegisterHelper("known_gaps_processing_error", "Keeps track of the number of known gaps we had errors processing.", &metrics.KnownGapsProcessingError) + if err != nil { + return nil, err + } + err = prometheusRegisterHelper("head_error", "Keeps track of the number of errors we had processing head messages.", &metrics.HeadError) + if err != nil { + return nil, err + } + err = prometheusRegisterHelper("head_reorg_error", "Keeps track of the number of errors we had processing reorg messages.", &metrics.HeadReorgError) + if err != nil { + return nil, err + } + return metrics, nil } -func prometheusRegisterHelper(name string, help string, varPointer *uint64) { +func prometheusRegisterHelper(name string, help string, varPointer *uint64) error { err := prometheus.Register(prometheus.NewCounterFunc( prometheus.CounterOpts{ Namespace: "beacon_client", @@ -58,7 +79,9 @@ func prometheusRegisterHelper(name string, help string, varPointer *uint64) { })) if err != nil && err.Error() != "duplicate metrics collector registration attempted" { loghelper.LogError(err).WithField("name", name).Error("Unable to register counter.") + return err } + return nil } // A structure utilized for keeping track of various metrics. Currently, mostly used in testing. diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index 2ba3bd4..eb3e71b 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -40,21 +40,27 @@ var ( checkHpEntryStmt string = `SELECT * FROM ethcl.historic_process WHERE checked_out=false;` // Used to checkout a row from the ethcl.historic_process table lockHpEntryStmt string = `UPDATE ethcl.historic_process - SET checked_out=true + SET checked_out=true, checked_out_by=$3 WHERE start_slot=$1 AND end_slot=$2;` - // Used to delete an entry from the knownGaps table + // Used to delete an entry from the ethcl.historic_process table deleteHpEntryStmt string = `DELETE FROM ethcl.historic_process WHERE start_slot=$1 AND end_slot=$2;` + // Used to update every single row that this node has checked out. + releaseHpLockStmt string = `UPDATE ethcl.historic_process + SET checked_out=false + WHERE checked_out_by=$1` ) type historicProcessing struct { - db sql.Database - metrics *BeaconClientMetrics + db sql.Database //db connection + metrics *BeaconClientMetrics // metrics for beaconclient + uniqueNodeIdentifier int // node unique identifier. + finishProcessing chan int // A channel which indicates to the process handleBatchProcess function that its time to end. } // Get a single row of historical slots from the table. func (hp historicProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error { - return getBatchProcessRow(hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh) + return getBatchProcessRow(hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh, strconv.Itoa(hp.uniqueNodeIdentifier)) } // Remove the table entry. @@ -71,6 +77,22 @@ func (hp historicProcessing) handleProcessingErrors(errMessages <-chan batchHist } } +func (hp historicProcessing) releaseDbLocks() error { + go func() { hp.finishProcessing <- 1 }() + log.Debug("Updating all the entries to ethcl.historical processing") + res, err := hp.db.Exec(context.Background(), releaseHpLockStmt, hp.uniqueNodeIdentifier) + if err != nil { + return fmt.Errorf("Unable to remove lock from ethcl.historical_processing table for node %d, error is %e", hp.uniqueNodeIdentifier, err) + } + log.Debug("Update all the entries to ethcl.historical processing") + rows, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("Unable to calculated number of rows affected by releasing locks from ethcl.historical_processing table for node %d, error is %e", hp.uniqueNodeIdentifier, err) + } + log.WithField("rowCount", rows).Info("Released historicalProcess locks for specified rows.") + return nil +} + // Process the slot range. func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics) { for slot := range workCh { @@ -91,7 +113,7 @@ func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, // It also locks the row by updating the checked_out column. // The statement for getting the start_slot and end_slot must be provided. // The statement for "locking" the row must also be provided. -func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess) []error { +func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess, uniqueNodeIdentifier string) []error { errCount := make([]error, 0) // 5 is an arbitrary number. It allows us to retry a few times before @@ -148,7 +170,7 @@ func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRow } // Checkout the Row - res, err := tx.Exec(ctx, checkOutRowStmt, sp.startSlot, sp.endSlot) + res, err := tx.Exec(ctx, checkOutRowStmt, sp.startSlot, sp.endSlot, uniqueNodeIdentifier) if err != nil { loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).Error("Unable to checkout the row") errCount = append(errCount, err) diff --git a/pkg/beaconclient/processknowngaps.go b/pkg/beaconclient/processknowngaps.go index 6c07c02..01762da 100644 --- a/pkg/beaconclient/processknowngaps.go +++ b/pkg/beaconclient/processknowngaps.go @@ -36,7 +36,7 @@ var ( checkKgEntryStmt string = `SELECT * FROM ethcl.known_gaps WHERE checked_out=false;` // Used to checkout a row from the ethcl.known_gaps table lockKgEntryStmt string = `UPDATE ethcl.known_gaps - SET checked_out=true + SET checked_out=true, checked_out_by=$3 WHERE start_slot=$1 AND end_slot=$2;` // Used to delete an entry from the knownGaps table deleteKgEntryStmt string = `DELETE FROM ethcl.known_gaps @@ -44,34 +44,50 @@ var ( // Used to check to see if a single slot exists in the known_gaps table. checkKgSingleSlotStmt string = `SELECT start_slot, end_slot FROM ethcl.known_gaps WHERE start_slot=$1 AND end_slot=$2;` + // Used to update every single row that this node has checked out. + releaseKgLockStmt string = `UPDATE ethcl.known_gaps + SET checked_out=false + WHERE checked_out_by=$1` ) -type knownGapsProcessing struct { - db sql.Database - metrics *BeaconClientMetrics +type KnownGapsProcessing struct { + db sql.Database //db connection + metrics *BeaconClientMetrics // metrics for beaconclient + uniqueNodeIdentifier int // node unique identifier. + finishProcessing chan int // A channel which indicates to the process handleBatchProcess function that its time to end. } // This function will perform all the heavy lifting for tracking the head of the chain. func (bc *BeaconClient) ProcessKnownGaps(maxWorkers int) []error { log.Info("We are starting the known gaps processing service.") - hp := knownGapsProcessing{db: bc.Db, metrics: bc.Metrics} - errs := handleBatchProcess(maxWorkers, hp, hp.db, bc.ServerEndpoint, bc.Metrics) + bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics, finishProcessing: make(chan int)} + errs := handleBatchProcess(maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.finishProcessing, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics) log.Debug("Exiting known gaps processing service") return errs } +// This function will perform all the necessary clean up tasks for stopping historical processing. +func (bc *BeaconClient) StopKnownGapsProcessing() error { + log.Info("We are stopping the historical processing service.") + err := bc.KnownGapsProcess.releaseDbLocks() + if err != nil { + loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the ethcl.known_gaps table. Manual Intervention is needed!") + } + return nil +} + // Get a single row of historical slots from the table. -func (kgp knownGapsProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error { - return getBatchProcessRow(kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh) +func (kgp KnownGapsProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error { + return getBatchProcessRow(kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh, strconv.Itoa(kgp.uniqueNodeIdentifier)) } // Remove the table entry. -func (kgp knownGapsProcessing) removeTableEntry(processCh <-chan slotsToProcess) error { +func (kgp KnownGapsProcessing) removeTableEntry(processCh <-chan slotsToProcess) error { return removeRowPostProcess(kgp.db, processCh, QueryBySlotStmt, deleteKgEntryStmt) } // Remove the table entry. -func (kgp knownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) { +func (kgp KnownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) { for { errMs := <-errMessages @@ -99,3 +115,18 @@ func (kgp knownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHi } } } + +// Updated checked_out column for the uniqueNodeIdentifier. +func (kgp KnownGapsProcessing) releaseDbLocks() error { + go func() { kgp.finishProcessing <- 1 }() + res, err := kgp.db.Exec(context.Background(), releaseKgLockStmt, kgp.uniqueNodeIdentifier) + if err != nil { + return err + } + rows, err := res.RowsAffected() + if err != nil { + return err + } + log.WithField("rowCount", rows).Info("Released knownGaps locks for specified rows.") + return nil +} diff --git a/pkg/database/sql/postgres/database.go b/pkg/database/sql/postgres/database.go index c8451f8..16cb936 100644 --- a/pkg/database/sql/postgres/database.go +++ b/pkg/database/sql/postgres/database.go @@ -49,7 +49,7 @@ func SetupPostgresDb(dbHostname string, dbPort int, dbName string, dbUsername st "driver_name_provided": driverName, }).Error("Can't resolve driver type") } - log.Info("Using Driver:", DbDriver) + log.Info("Using Driver: ", DbDriver) postgresConfig := Config{ Hostname: dbHostname,