Feature/44 read write historic slots (#46)

* Update boot to set processing type.

* Handle entries from the historic_process table.

* Update on-pr.yml

* Fix head processing Error

* Update names and debug

* Seperate checking for new entries and locking them

* Application can process historic and known gaps

Untested

* Handle genesis

* Update remove entry for knownGaps, viper

* Disregard unused code from linter
This commit is contained in:
Abdul Rabbani 2022-05-24 16:18:55 -04:00 committed by GitHub
parent 041276da81
commit 347984a547
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1151 additions and 379 deletions

View File

@ -26,7 +26,7 @@ on:
env: env:
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'main'}} stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'main'}}
ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || 'feature/schema-ipld-ethcl-indexer' }} ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || 'feature/historic-processing' }}
ssz-data-ref: ${{ github.event.inputs.ssz-data-ref || 'main' }} ssz-data-ref: ${{ github.event.inputs.ssz-data-ref || 'main' }}
GOPATH: /tmp/go GOPATH: /tmp/go
jobs: jobs:
@ -206,4 +206,5 @@ jobs:
- name: golangci-lint - name: golangci-lint
uses: golangci/golangci-lint-action@v3 uses: golangci/golangci-lint-action@v3
with: with:
args: --timeout 90s args: --timeout 90s --disable deadcode,unused
# args: --timeout 90s --disable deadcode,

View File

@ -37,7 +37,7 @@ To run the application, do as follows:
2. Run the start up command. 2. Run the start up command.
``` ```
go run -race main.go capture head --db.address localhost \ go run -race main.go capture historic --db.address localhost \
--db.password password \ --db.password password \
--db.port 8076 \ --db.port 8076 \
--db.username vdbm \ --db.username vdbm \
@ -45,11 +45,14 @@ go run -race main.go capture head --db.address localhost \
--db.driver PGX \ --db.driver PGX \
--bc.address localhost \ --bc.address localhost \
--bc.port 5052 \ --bc.port 5052 \
--bc.maxHistoricProcessWorker 2 \
--bc.maxKnownGapsWorker 2 \
--bc.knownGapsProcess=true \
--bc.connectionProtocol http \ --bc.connectionProtocol http \
--t.skipSync=true \ --t.skipSync=false \
--log.level info \ --log.level debug \
--log.output=true \ --log.output=true \
--kg.increment 100 --kg.increment 1000000
``` ```
## Running Tests ## Running Tests

View File

@ -44,7 +44,8 @@ func bootApp() {
log.Info("Starting the application in boot mode.") log.Info("Starting the application in boot mode.")
ctx := context.Background() ctx := context.Background()
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", testDisregardSync) BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver,
bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, kgTableIncrement, "head", testDisregardSync)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to Start application") loghelper.LogError(err).Error("Unable to Start application")
} }

View File

@ -17,6 +17,7 @@
package cmd package cmd
import ( import (
"fmt"
"os" "os"
"time" "time"
@ -25,21 +26,25 @@ import (
) )
var ( var (
dbUsername string dbUsername string
dbPassword string dbPassword string
dbName string dbName string
dbAddress string dbAddress string
dbDriver string dbDriver string
dbPort int dbPort int
bcAddress string bcAddress string
bcPort int bcPort int
bcBootRetryInterval int bcBootRetryInterval int
bcBootMaxRetry int bcBootMaxRetry int
bcConnectionProtocol string bcConnectionProtocol string
bcType string bcType string
maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second bcMaxHistoricProcessWorker int
notifierCh chan os.Signal = make(chan os.Signal, 1) kgMaxWorker int
testDisregardSync bool kgTableIncrement int
kgProcessGaps bool
maxWaitSecondsShutdown time.Duration = time.Duration(20) * time.Second
notifierCh chan os.Signal = make(chan os.Signal, 1)
testDisregardSync bool
) )
// captureCmd represents the capture command // captureCmd represents the capture command
@ -64,18 +69,18 @@ func init() {
captureCmd.PersistentFlags().StringVarP(&dbName, "db.name", "n", "", "Database name connect to DB(required)") captureCmd.PersistentFlags().StringVarP(&dbName, "db.name", "n", "", "Database name connect to DB(required)")
captureCmd.PersistentFlags().StringVarP(&dbDriver, "db.driver", "", "", "Database Driver to connect to DB(required)") captureCmd.PersistentFlags().StringVarP(&dbDriver, "db.driver", "", "", "Database Driver to connect to DB(required)")
captureCmd.PersistentFlags().IntVarP(&dbPort, "db.port", "", 0, "Port to connect to DB(required)") captureCmd.PersistentFlags().IntVarP(&dbPort, "db.port", "", 0, "Port to connect to DB(required)")
err := captureCmd.MarkPersistentFlagRequired("db.username") //err := captureCmd.MarkPersistentFlagRequired("db.username")
exitErr(err) // exitErr(err)
err = captureCmd.MarkPersistentFlagRequired("db.password") // err = captureCmd.MarkPersistentFlagRequired("db.password")
exitErr(err) // exitErr(err)
err = captureCmd.MarkPersistentFlagRequired("db.address") // err = captureCmd.MarkPersistentFlagRequired("db.address")
exitErr(err) // exitErr(err)
err = captureCmd.MarkPersistentFlagRequired("db.port") // err = captureCmd.MarkPersistentFlagRequired("db.port")
exitErr(err) // exitErr(err)
err = captureCmd.MarkPersistentFlagRequired("db.name") // err = captureCmd.MarkPersistentFlagRequired("db.name")
exitErr(err) // exitErr(err)
err = captureCmd.MarkPersistentFlagRequired("db.driver") // err = captureCmd.MarkPersistentFlagRequired("db.driver")
exitErr(err) // exitErr(err)
//// Beacon Client Specific //// Beacon Client Specific
captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required)") captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required)")
@ -84,17 +89,23 @@ func init() {
captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.") captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.")
captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application")
captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application")
err = captureCmd.MarkPersistentFlagRequired("bc.address") captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.")
exitErr(err) // err = captureCmd.MarkPersistentFlagRequired("bc.address")
err = captureCmd.MarkPersistentFlagRequired("bc.port") // exitErr(err)
exitErr(err) // err = captureCmd.MarkPersistentFlagRequired("bc.port")
// exitErr(err)
//// Known Gaps specific
captureCmd.PersistentFlags().BoolVarP(&kgProcessGaps, "kg.processKnownGaps", "", true, "Should we process the slots within the ethcl.known_gaps table.")
captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.")
captureCmd.PersistentFlags().IntVarP(&kgMaxWorker, "kg.maxKnownGapsWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.known_gaps table. Be careful of system memory.")
//// Testing Specific //// Testing Specific
captureCmd.PersistentFlags().BoolVar(&testDisregardSync, "t.skipSync", false, "Should we disregard the head sync?") captureCmd.PersistentFlags().BoolVar(&testDisregardSync, "t.skipSync", false, "Should we disregard the head sync?")
// Bind Flags with Viper // Bind Flags with Viper
//// DB Flags //// DB Flags
err = viper.BindPFlag("db.username", captureCmd.PersistentFlags().Lookup("db.username")) err := viper.BindPFlag("db.username", captureCmd.PersistentFlags().Lookup("db.username"))
exitErr(err) exitErr(err)
err = viper.BindPFlag("db.password", captureCmd.PersistentFlags().Lookup("db.password")) err = viper.BindPFlag("db.password", captureCmd.PersistentFlags().Lookup("db.password"))
exitErr(err) exitErr(err)
@ -104,14 +115,12 @@ func init() {
exitErr(err) exitErr(err)
err = viper.BindPFlag("db.name", captureCmd.PersistentFlags().Lookup("db.name")) err = viper.BindPFlag("db.name", captureCmd.PersistentFlags().Lookup("db.name"))
exitErr(err) exitErr(err)
//// Testing Specific
err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync")) err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync"))
exitErr(err) exitErr(err)
// Testing Specific //// LH specific
err = viper.BindPFlag("t.driver", captureCmd.PersistentFlags().Lookup("db.driver"))
exitErr(err)
// LH specific
err = viper.BindPFlag("bc.address", captureCmd.PersistentFlags().Lookup("bc.address")) err = viper.BindPFlag("bc.address", captureCmd.PersistentFlags().Lookup("bc.address"))
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.type", captureCmd.PersistentFlags().Lookup("bc.type")) err = viper.BindPFlag("bc.type", captureCmd.PersistentFlags().Lookup("bc.type"))
@ -124,14 +133,25 @@ func init() {
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.bootMaxRetry", captureCmd.PersistentFlags().Lookup("bc.bootMaxRetry")) err = viper.BindPFlag("bc.bootMaxRetry", captureCmd.PersistentFlags().Lookup("bc.bootMaxRetry"))
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker"))
exitErr(err)
// Here you will define your flags and configuration settings. // Here you will define your flags and configuration settings.
//// Known Gap Specific
err = viper.BindPFlag("kg.processKnownGaps", captureCmd.PersistentFlags().Lookup("kg.processKnownGaps"))
exitErr(err)
err = viper.BindPFlag("kg.increment", captureCmd.PersistentFlags().Lookup("kg.increment"))
exitErr(err)
err = viper.BindPFlag("kg.processKnownGaps", captureCmd.PersistentFlags().Lookup("kg.maxKnownGapsWorker"))
exitErr(err)
} }
// Helper function to catch any errors. // Helper function to catch any errors.
// We need to capture these errors for the linter. // We need to capture these errors for the linter.
func exitErr(err error) { func exitErr(err error) {
if err != nil { if err != nil {
fmt.Println("Error: ", err)
os.Exit(1) os.Exit(1)
} }
} }

View File

@ -18,7 +18,7 @@ package cmd
import ( import (
"context" "context"
"os" "fmt"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -26,10 +26,7 @@ import (
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot" "github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown" "github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
) "golang.org/x/sync/errgroup"
var (
kgTableIncrement int
) )
// headCmd represents the head command // headCmd represents the head command
@ -48,21 +45,35 @@ func startHeadTracking() {
log.Info("Starting the application in head tracking mode.") log.Info("Starting the application in head tracking mode.")
ctx := context.Background() ctx := context.Background()
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "historic", testDisregardSync) Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"))
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to Start application") StopApplicationPreBoot(err, Db)
if DB != nil {
DB.Close()
}
os.Exit(1)
} }
log.Info("The Beacon Client has booted successfully!") log.Info("The Beacon Client has booted successfully!")
// Capture head blocks // Capture head blocks
go BC.CaptureHead(kgTableIncrement) go Bc.CaptureHead()
if viper.GetBool("kg.processKnownGaps") {
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
errs := Bc.ProcessKnownGaps(viper.GetInt("kg.maxKnownGapsWorker"))
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
}
return nil
})
if err := errG.Wait(); err != nil {
loghelper.LogError(err).Error("Error with knownGaps processing")
}
}()
}
// Shutdown when the time is right. // Shutdown when the time is right.
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
} else { } else {
@ -73,9 +84,4 @@ func startHeadTracking() {
func init() { func init() {
captureCmd.AddCommand(headCmd) captureCmd.AddCommand(headCmd)
// Known Gaps specific
captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.")
err := viper.BindPFlag("kg.increment", captureCmd.PersistentFlags().Lookup("kg.increment"))
exitErr(err)
} }

View File

@ -18,13 +18,17 @@ package cmd
import ( import (
"context" "context"
"fmt"
"os" "os"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot" "github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
"golang.org/x/sync/errgroup"
) )
// historicCmd represents the historic command // historicCmd represents the historic command
@ -43,9 +47,49 @@ func startHistoricProcessing() {
log.Info("Starting the application in head tracking mode.") log.Info("Starting the application in head tracking mode.")
ctx := context.Background() ctx := context.Background()
_, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "historic", testDisregardSync) Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"))
if err != nil { if err != nil {
StopApplicationPreBoot(err, DB) StopApplicationPreBoot(err, Db)
}
errG, _ := errgroup.WithContext(context.Background())
errG.Go(func() error {
errs := Bc.CaptureHistoric(viper.GetInt("bc.maxHistoricProcessWorker"))
if len(errs) != 0 {
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events")
return fmt.Errorf("Application ended because there were too many error when attempting to process historic")
}
}
return nil
})
if viper.GetBool("kg.processKnownGaps") {
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
errs := Bc.ProcessKnownGaps(viper.GetInt("kg.maxKnownGapsWorker"))
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
}
return nil
})
if err := errG.Wait(); err != nil {
loghelper.LogError(err).Error("Error with knownGaps processing")
}
}()
}
// Shutdown when the time is right.
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
} else {
log.Info("Gracefully shutdown ipld-ethcl-indexer")
} }
} }

View File

@ -47,6 +47,7 @@ It can either do this will keeping track of head, or backfilling historic data.`
func Execute() { func Execute() {
err := rootCmd.Execute() err := rootCmd.Execute()
if err != nil { if err != nil {
fmt.Println("Err when executing rootCmd", err)
os.Exit(1) os.Exit(1)
} }
} }

View File

@ -0,0 +1,33 @@
{
"db": {
"address": "localhost",
"password": "password",
"port": 8076,
"username": "vdbm",
"name": "vulcanize_testing",
"driver": "PGX"
},
"bc": {
"address": "10.203.8.51",
"port": 5052,
"type": "lighthouse",
"bootRetryInterval": 30,
"bootMaxRetry": 5,
"maxHistoricProcessWorker": 2,
"connectionProtocol": "http"
},
"t": {
"skipSync": false
},
"log": {
"level": "debug",
"output": true,
"file": "./ipld-ethcl-indexer.log",
"format": "json"
},
"kg": {
"increment": 10000,
"processKnownGaps": true,
"maxKnownGapsWorker": 2
}
}

View File

@ -41,11 +41,12 @@ var (
// 2. Connect to the database. // 2. Connect to the database.
// //
// 3. Make sure the node is synced, unless disregardSync is true. // 3. Make sure the node is synced, unless disregardSync is true.
func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) { func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
log.Info("Booting the Application") log.Info("Booting the Application")
log.Debug("Creating the Beacon Client") log.Debug("Creating the Beacon Client")
BC = beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort) BC = beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement)
log.Debug("Checking Beacon Client") log.Debug("Checking Beacon Client")
err := BC.CheckBeaconClient() err := BC.CheckBeaconClient()
@ -81,32 +82,65 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName
// Add retry logic to ensure that we are give the Beacon Client and the DB time to start. // Add retry logic to ensure that we are give the Beacon Client and the DB time to start.
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, startUpMode string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) { bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int, startUpMode string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
var err error var err error
for i := 0; i < bcMaxRetry; i++ {
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol, disregardSync) if bcMaxRetry < 0 {
if err != nil { i := 0
log.WithFields(log.Fields{ for {
"retryNumber": i, BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName,
"err": err, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync)
}).Warn("Unable to boot application. Going to try again") if err != nil {
time.Sleep(time.Duration(bcRetryInterval) * time.Second) log.WithFields(log.Fields{
continue "retryNumber": i,
"err": err,
}).Warn("Unable to boot application. Going to try again")
time.Sleep(time.Duration(bcRetryInterval) * time.Second)
i = i + 1
continue
}
break
}
} else {
for i := 0; i < bcMaxRetry; i++ {
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName,
bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync)
if err != nil {
log.WithFields(log.Fields{
"retryNumber": i,
"err": err,
}).Warn("Unable to boot application. Going to try again")
time.Sleep(time.Duration(bcRetryInterval) * time.Second)
continue
}
break
} }
break
} }
switch strings.ToLower(startUpMode) { switch strings.ToLower(startUpMode) {
case "head": case "head":
log.Debug("No further actions needed to boot the application at this phase.") BC.PerformHeadTracking = true
case "historic": case "historic":
log.Debug("Performing additional boot steps for historical processing") log.Debug("Performing additional boot steps for historical processing")
BC.PerformHistoricalProcessing = true
// This field is not currently used.
// The idea is, that if we are doing historially processing and we get a slot
// greater than this slot, then we would rerun this function.
// this would ensure that we have the slots necessary for processing
// within the beacon server.
// We can implement this feature if we notice any errors.
headSlot, err := BC.GetLatestSlotInBeaconServer(bcType) headSlot, err := BC.GetLatestSlotInBeaconServer(bcType)
if err != nil { if err != nil {
return BC, DB, err return BC, DB, err
} }
BC.UpdateLatestSlotInBeaconServer(int64(headSlot)) BC.UpdateLatestSlotInBeaconServer(int64(headSlot))
// Add another switch case for bcType if its ever needed. // Add another switch case for bcType if its ever needed.
default:
log.WithFields(log.Fields{
"startUpMode": startUpMode,
}).Error("The startUpMode provided is not handled.")
} }
return BC, DB, err return BC, DB, err
} }

View File

@ -37,44 +37,45 @@ var _ = Describe("Boot", func() {
bcType string = "lighthouse" bcType string = "lighthouse"
bcBootRetryInterval int = 1 bcBootRetryInterval int = 1
bcBootMaxRetry int = 5 bcBootMaxRetry int = 5
bcKgTableIncrement int = 10
) )
Describe("Booting the application", Label("integration"), func() { Describe("Booting the application", Label("integration"), func() {
Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() { Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() {
It("Should connect successfully", func() { It("Should connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", true) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true)
defer db.Close() defer db.Close()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
}) })
Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() { Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() {
It("Should connect successfully", func() { It("Should connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "historic", true) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true)
defer db.Close() defer db.Close()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
}) })
Context("When the DB and BC are both up and running, and we check for a synced head", func() { Context("When the DB and BC are both up and running, and we check for a synced head", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", false) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false)
defer db.Close() defer db.Close()
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })
Context("When the DB is running but not the BC", func() { Context("When the DB is running but not the BC", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true) _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })
Context("When the BC is running but not the DB", func() { Context("When the BC is running but not the DB", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true) _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })
Context("When neither the BC or DB are running", func() { Context("When neither the BC or DB are running", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true) _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })

View File

@ -49,6 +49,7 @@ var _ = Describe("Shutdown", func() {
bcType string = "lighthouse" bcType string = "lighthouse"
bcBootRetryInterval int = 1 bcBootRetryInterval int = 1
bcBootMaxRetry int = 5 bcBootMaxRetry int = 5
bcKgTableIncrement int = 10
maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second
DB sql.Database DB sql.Database
BC *beaconclient.BeaconClient BC *beaconclient.BeaconClient
@ -58,7 +59,8 @@ var _ = Describe("Shutdown", func() {
) )
BeforeEach(func() { BeforeEach(func() {
ctx = context.Background() ctx = context.Background()
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", true) BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress,
bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true)
notifierCh = make(chan os.Signal, 1) notifierCh = make(chan os.Signal, 1)
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })

View File

@ -43,12 +43,11 @@ var (
// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying. // A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying.
type BeaconClient struct { type BeaconClient struct {
Context context.Context // A context generic context with multiple uses. Context context.Context // A context generic context with multiple uses.
ServerEndpoint string // What is the endpoint of the beacon server. ServerEndpoint string // What is the endpoint of the beacon server.
PerformHistoricalProcessing bool // Should we perform historical processing? Db sql.Database // Database object used for reads and writes.
Db sql.Database // Database object used for reads and writes. Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics.
Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics. KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
// Used for Head Tracking // Used for Head Tracking
@ -65,7 +64,8 @@ type BeaconClient struct {
// The latest available slot within the Beacon Server. We can't query any slot greater than this. // The latest available slot within the Beacon Server. We can't query any slot greater than this.
// This value is lazily updated. Therefore at times it will be outdated. // This value is lazily updated. Therefore at times it will be outdated.
LatestSlotInBeaconServer int64 LatestSlotInBeaconServer int64
PerformHistoricalProcessing bool // Should we perform historical processing?
} }
// A struct to keep track of relevant the head event topic. // A struct to keep track of relevant the head event topic.
@ -84,20 +84,21 @@ type SseError struct {
} }
// A Function to create the BeaconClient. // A Function to create the BeaconClient.
func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int) *BeaconClient { func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int) *BeaconClient {
endpoint := fmt.Sprintf("%s://%s:%d", connectionProtocol, bcAddress, bcPort) endpoint := fmt.Sprintf("%s://%s:%d", connectionProtocol, bcAddress, bcPort)
log.Info("Creating the BeaconClient") log.Info("Creating the BeaconClient")
return &BeaconClient{ return &BeaconClient{
Context: ctx, Context: ctx,
ServerEndpoint: endpoint, ServerEndpoint: endpoint,
HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint), KnownGapTableIncrement: bcKgTableIncrement,
ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint),
ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
Metrics: &BeaconClientMetrics{ Metrics: &BeaconClientMetrics{
HeadTrackingInserts: 0, SlotInserts: 0,
HeadTrackingReorgs: 0, ReorgInserts: 0,
HeadTrackingKnownGaps: 0, KnownGapsInserts: 0,
HeadError: 0, HeadError: 0,
HeadReorgError: 0, HeadReorgError: 0,
}, },
//FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), //FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
} }

View File

@ -25,12 +25,9 @@ import (
) )
// This function will perform all the heavy lifting for tracking the head of the chain. // This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHead(knownGapsTableIncrement int) { func (bc *BeaconClient) CaptureHead() {
bc.KnownGapTableIncrement = knownGapsTableIncrement
log.Info("We are tracking the head of the chain.") log.Info("We are tracking the head of the chain.")
//bc.tempHelper()
go bc.handleHead() go bc.handleHead()
//go bc.handleFinalizedCheckpoint()
go bc.handleReorg() go bc.handleReorg()
bc.captureEventTopic() bc.captureEventTopic()
} }

View File

@ -44,6 +44,172 @@ import (
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql/postgres" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql/postgres"
) )
var (
address string = "localhost"
port int = 8080
protocol string = "http"
dbHost string = "localhost"
dbPort int = 8076
dbName string = "vulcanize_testing"
dbUser string = "vdbm"
dbPassword string = "password"
dbDriver string = "pgx"
dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42"
knownGapsTableIncrement int = 100000
maxRetry int = 120
TestEvents = map[string]Message{
"100-dummy": {
HeadMessage: beaconclient.Head{
Slot: "100",
Block: "04955400371347e26f61d7a4bbda5b23fa0b25d5fc465160f2a92d52a63b919b",
State: "36d5c9a129979b4502bd9a06e57a742810ecbc3fa55a0361c0723c92c1782bfa",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "A block that is supposed to replicate slot 100, but contains some dummy test information.",
MimicConfig: &MimicConfig{
ForkVersion: "phase0",
},
SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
},
"100-dummy-2": {
HeadMessage: beaconclient.Head{
Slot: "100",
Block: "04955400371347e26f61d7a4bbda5b23fa0b25d5fc465160f2a9aaaaaaaaaaaa",
State: "36d5c9a129979b4502bd9a06e57a742810ecbc3fa55a0361c072bbbbbbbbbbbb",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "A block that is supposed to replicate slot 100, but contains some dummy test information.",
MimicConfig: &MimicConfig{
ForkVersion: "phase0",
},
SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
},
"102-wrong-ssz-1": {
HeadMessage: beaconclient.Head{
Slot: "102",
Block: "0x46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42",
State: "0x9b20b114c613c1aa462e02d590b3da902b0a1377e938ed0f94dd3491d763ef67",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "A bad block that returns the wrong ssz objects, used for testing incorrect SSZ decoding.",
BeaconState: filepath.Join("ssz-data", "102", "signed-beacon-block.ssz"),
SignedBeaconBlock: filepath.Join("ssz-data", "102", "beacon-state.ssz"),
},
"100": {
HeadMessage: beaconclient.Head{
Slot: "100",
Block: "0x582187e97f7520bb69eea014c3834c964c45259372a0eaaea3f032013797996b",
State: "0xf286a0379c0386a3c7be28d05d829f8eb7b280cc9ede15449af20ebcd06a7a56",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "An easy to process Phase 0 block",
SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
},
"101": {
HeadMessage: beaconclient.Head{
Slot: "101",
Block: "0xabe1a972e512182d04f0d4a5c9c25f9ee57c2e9d0ff3f4c4c82fd42d13d31083",
State: "0xcb04aa2edbf13c7bb7e7bd9b621ced6832e0075e89147352eac3019a824ce847",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "An easy to process Phase 0 block",
SignedBeaconBlock: filepath.Join("ssz-data", "101", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "101", "beacon-state.ssz"),
},
"2375703-dummy": {
HeadMessage: beaconclient.Head{
Slot: "2375703",
Block: "c9fb337b62e2a0dae4f27ab49913132570f7f2cab3f23ad99f4d07508a8e648e",
State: "0299a145bcda2c8f5e7d2e068ee101861edbee2ec1db2d5e1d850b0d265aef5f",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "This is a dummy message that is used for reorgs",
MimicConfig: &MimicConfig{
ForkVersion: "altair",
},
SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
},
"2375703-dummy-2": {
HeadMessage: beaconclient.Head{
Slot: "2375703",
Block: "c9fb337b62e2a0dae4f27ab49913132570f7f2cab3f23ad99f4d07508aaaaaaa",
State: "0299a145bcda2c8f5e7d2e068ee101861edbee2ec1db2d5e1d850b0d2bbbbbbb",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "This is a dummy message that is used for reorgs",
MimicConfig: &MimicConfig{
ForkVersion: "altair",
},
SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
},
"2375703": {
HeadMessage: beaconclient.Head{
Slot: "2375703",
Block: "0x4392372c5f6e39499e31bf924388b5815639103149f0f54f8a453773b1802301",
State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e",
CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false},
TestNotes: "An easy to process Altair Block",
SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
},
"3797056": {
HeadMessage: beaconclient.Head{
Slot: "3797056",
Block: "",
State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e",
CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false},
TestNotes: "An easy to process Altair Block",
SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
},
}
TestConfig = Config{
protocol: protocol,
address: address,
port: port,
dummyParentRoot: dummyParentRoot,
dbHost: dbHost,
dbPort: dbPort,
dbName: dbName,
dbUser: dbUser,
dbPassword: dbPassword,
dbDriver: dbDriver,
knownGapsTableIncrement: knownGapsTableIncrement,
}
BeaconNodeTester = TestBeaconNode{
TestEvents: TestEvents,
TestConfig: TestConfig,
}
)
type Message struct { type Message struct {
HeadMessage beaconclient.Head // The head messsage that will be streamed to the BeaconClient HeadMessage beaconclient.Head // The head messsage that will be streamed to the BeaconClient
TestNotes string // A small explanation of the purpose this structure plays in the testing landscape. TestNotes string // A small explanation of the purpose this structure plays in the testing landscape.
@ -61,177 +227,6 @@ type MimicConfig struct {
var _ = Describe("Capturehead", func() { var _ = Describe("Capturehead", func() {
var (
TestConfig Config
BeaconNodeTester TestBeaconNode
address string = "localhost"
port int = 8080
protocol string = "http"
TestEvents map[string]Message
dbHost string = "localhost"
dbPort int = 8076
dbName string = "vulcanize_testing"
dbUser string = "vdbm"
dbPassword string = "password"
dbDriver string = "pgx"
dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42"
knownGapsTableIncrement int = 100000
maxRetry int = 60
)
BeforeEach(func() {
TestEvents = map[string]Message{
"100-dummy": {
HeadMessage: beaconclient.Head{
Slot: "100",
Block: "04955400371347e26f61d7a4bbda5b23fa0b25d5fc465160f2a92d52a63b919b",
State: "36d5c9a129979b4502bd9a06e57a742810ecbc3fa55a0361c0723c92c1782bfa",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "A block that is supposed to replicate slot 100, but contains some dummy test information.",
MimicConfig: &MimicConfig{
ForkVersion: "phase0",
},
SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
},
"100-dummy-2": {
HeadMessage: beaconclient.Head{
Slot: "100",
Block: "04955400371347e26f61d7a4bbda5b23fa0b25d5fc465160f2a9aaaaaaaaaaaa",
State: "36d5c9a129979b4502bd9a06e57a742810ecbc3fa55a0361c072bbbbbbbbbbbb",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "A block that is supposed to replicate slot 100, but contains some dummy test information.",
MimicConfig: &MimicConfig{
ForkVersion: "phase0",
},
SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
},
"102-wrong-ssz-1": {
HeadMessage: beaconclient.Head{
Slot: "102",
Block: "0x46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42",
State: "0x9b20b114c613c1aa462e02d590b3da902b0a1377e938ed0f94dd3491d763ef67",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "A bad block that returns the wrong ssz objects, used for testing incorrect SSZ decoding.",
BeaconState: filepath.Join("ssz-data", "102", "signed-beacon-block.ssz"),
SignedBeaconBlock: filepath.Join("ssz-data", "102", "beacon-state.ssz"),
},
"100": {
HeadMessage: beaconclient.Head{
Slot: "100",
Block: "0x582187e97f7520bb69eea014c3834c964c45259372a0eaaea3f032013797996b",
State: "0xf286a0379c0386a3c7be28d05d829f8eb7b280cc9ede15449af20ebcd06a7a56",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "An easy to process Phase 0 block",
SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
},
"101": {
HeadMessage: beaconclient.Head{
Slot: "101",
Block: "0xabe1a972e512182d04f0d4a5c9c25f9ee57c2e9d0ff3f4c4c82fd42d13d31083",
State: "0xcb04aa2edbf13c7bb7e7bd9b621ced6832e0075e89147352eac3019a824ce847",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "An easy to process Phase 0 block",
SignedBeaconBlock: filepath.Join("ssz-data", "101", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "101", "beacon-state.ssz"),
},
"2375703-dummy": {
HeadMessage: beaconclient.Head{
Slot: "2375703",
Block: "c9fb337b62e2a0dae4f27ab49913132570f7f2cab3f23ad99f4d07508a8e648e",
State: "0299a145bcda2c8f5e7d2e068ee101861edbee2ec1db2d5e1d850b0d265aef5f",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "This is a dummy message that is used for reorgs",
MimicConfig: &MimicConfig{
ForkVersion: "altair",
},
SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
},
"2375703-dummy-2": {
HeadMessage: beaconclient.Head{
Slot: "2375703",
Block: "c9fb337b62e2a0dae4f27ab49913132570f7f2cab3f23ad99f4d07508aaaaaaa",
State: "0299a145bcda2c8f5e7d2e068ee101861edbee2ec1db2d5e1d850b0d2bbbbbbb",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "This is a dummy message that is used for reorgs",
MimicConfig: &MimicConfig{
ForkVersion: "altair",
},
SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
},
"2375703": {
HeadMessage: beaconclient.Head{
Slot: "2375703",
Block: "0x4392372c5f6e39499e31bf924388b5815639103149f0f54f8a453773b1802301",
State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e",
CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false},
TestNotes: "An easy to process Altair Block",
SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
},
"3797056": {
HeadMessage: beaconclient.Head{
Slot: "3797056",
Block: "",
State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e",
CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false},
TestNotes: "An easy to process Altair Block",
SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
},
}
TestConfig = Config{
protocol: protocol,
address: address,
port: port,
dummyParentRoot: dummyParentRoot,
dbHost: dbHost,
dbPort: dbPort,
dbName: dbName,
dbUser: dbUser,
dbPassword: dbPassword,
dbDriver: dbDriver,
knownGapsTableIncrement: knownGapsTableIncrement,
}
BeaconNodeTester = TestBeaconNode{
TestEvents: TestEvents,
TestConfig: TestConfig,
}
})
Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() { Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() {
Context("Correctly formatted Phase0 Block", func() { Context("Correctly formatted Phase0 Block", func() {
It("Should turn it into a struct successfully.", func() { It("Should turn it into a struct successfully.", func() {
@ -315,7 +310,7 @@ var _ = Describe("Capturehead", func() {
// }) // })
//}) //})
Context("When the proper SSZ objects are not served", Label("now"), func() { Context("When the proper SSZ objects are not served", func() {
It("Should return an error, and add the slot to the knownGaps table.", func() { It("Should return an error, and add the slot to the knownGaps table.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "101") bc := setUpTest(BeaconNodeTester.TestConfig, "101")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
@ -403,7 +398,7 @@ var _ = Describe("Capturehead", func() {
BeaconNodeTester.testMultipleReorgs(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry) BeaconNodeTester.testMultipleReorgs(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry)
}) })
}) })
Context("Altair: Multiple reorgs have occurred on this slot", Label("new"), func() { Context("Altair: Multiple reorgs have occurred on this slot", func() {
It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
@ -435,7 +430,7 @@ type Config struct {
// Must run before each test. We can't use the beforeEach because of the way // Must run before each test. We can't use the beforeEach because of the way
// Gingko treats race conditions. // Gingko treats race conditions.
func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient { func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient {
bc := *beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port) bc := *beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement)
db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver) db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
@ -491,7 +486,7 @@ func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxR
data, err := json.Marshal(head) data, err := json.Marshal(head)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
startInserts := atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts) startInserts := atomic.LoadUint64(&bc.Metrics.SlotInserts)
bc.HeadTracking.MessagesCh <- &sse.Event{ bc.HeadTracking.MessagesCh <- &sse.Event{
ID: []byte{}, ID: []byte{},
Data: data, Data: data,
@ -499,13 +494,14 @@ func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxR
Retry: []byte{}, Retry: []byte{},
} }
curRetry := 0 curRetry := 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts) != startInserts+expectedSuccessfulInserts { for atomic.LoadUint64(&bc.Metrics.SlotInserts) != startInserts+expectedSuccessfulInserts {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
curRetry = curRetry + 1 curRetry = curRetry + 1
if curRetry == maxRetry { if curRetry == maxRetry {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"startInsert": startInserts, "startInsert": startInserts,
"currentValue": atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts), "expectedSuccessfulInserts": expectedSuccessfulInserts,
"currentValue": atomic.LoadUint64(&bc.Metrics.SlotInserts),
}).Error("HeadTracking Insert wasn't incremented properly.") }).Error("HeadTracking Insert wasn't incremented properly.")
Fail("Too many retries have occurred.") Fail("Too many retries have occurred.")
} }
@ -517,9 +513,10 @@ func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot strin
sqlStatement := `SELECT epoch, slot, block_root, state_root, status FROM ethcl.slots WHERE slot=$1 AND block_root=$2;` sqlStatement := `SELECT epoch, slot, block_root, state_root, status FROM ethcl.slots WHERE slot=$1 AND block_root=$2;`
var epoch, slot int var epoch, slot int
var blockRoot, stateRoot, status string var blockRoot, stateRoot, status string
row := db.QueryRow(context.Background(), sqlStatement, querySlot, queryBlockRoot) log.Debug("Starting to query the ethcl.slots table, ", querySlot, " ", queryBlockRoot)
err := row.Scan(&epoch, &slot, &blockRoot, &stateRoot, &status) err := db.QueryRow(context.Background(), sqlStatement, querySlot, queryBlockRoot).Scan(&epoch, &slot, &blockRoot, &stateRoot, &status)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
log.Debug("Querying the ethcl.slots table complete")
return epoch, slot, blockRoot, stateRoot, status return epoch, slot, blockRoot, stateRoot, status
} }
@ -567,7 +564,7 @@ func queryKnownGaps(db sql.Database, queryStartGap string, QueryEndGap string) (
// A function that will remove all entries from the ethcl tables for you. // A function that will remove all entries from the ethcl tables for you.
func clearEthclDbTables(db sql.Database) { func clearEthclDbTables(db sql.Database) {
deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.known_gaps;"} deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.known_gaps;", "DELETE FROM ethcl.historic_process;"}
for _, queries := range deleteQueries { for _, queries := range deleteQueries {
_, err := db.Exec(context.Background(), queries) _, err := db.Exec(context.Background(), queries)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
@ -670,6 +667,33 @@ func (tbc TestBeaconNode) SetupBeaconNodeMock(TestEvents map[string]Message, pro
return httpmock.NewBytesResponse(200, dat), nil return httpmock.NewBytesResponse(200, dat), nil
}, },
) )
// Not needed but could be useful to have.
blockRootUrl := `=~^` + protocol + "://" + address + ":" + strconv.Itoa(port) + "/eth/v1/beacon/blocks/" + `([^/]+)` + "/root"
httpmock.RegisterResponder("GET", blockRootUrl,
func(req *http.Request) (*http.Response, error) {
// Get ID from request
slot := httpmock.MustGetSubmatch(req, 1)
dat, err := tbc.provideBlockRoot(slot)
if err != nil {
Expect(err).NotTo(HaveOccurred())
return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find block root for %s", slot)), err
}
return httpmock.NewBytesResponse(200, dat), nil
},
)
}
// Provide the Block root
func (tbc TestBeaconNode) provideBlockRoot(slot string) ([]byte, error) {
for _, val := range tbc.TestEvents {
if val.HeadMessage.Slot == slot && val.MimicConfig == nil {
block, err := hex.DecodeString(val.HeadMessage.Block[2:])
Expect(err).ToNot(HaveOccurred())
return block, nil
}
}
return nil, fmt.Errorf("Unable to find the Blockroot in test object.")
} }
// A function to mimic querying the state from the beacon node. We simply get the SSZ file are return it. // A function to mimic querying the state from the beacon node. We simply get the SSZ file are return it.
@ -679,12 +703,12 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string
for _, val := range tbc.TestEvents { for _, val := range tbc.TestEvents {
if sszIdentifier == "state" { if sszIdentifier == "state" {
if val.HeadMessage.Slot == slotIdentifier || val.HeadMessage.State == slotIdentifier { if (val.HeadMessage.Slot == slotIdentifier && val.MimicConfig == nil) || val.HeadMessage.State == slotIdentifier {
slotFile = val.BeaconState slotFile = val.BeaconState
Message = val Message = val
} }
} else if sszIdentifier == "block" { } else if sszIdentifier == "block" {
if val.HeadMessage.Slot == slotIdentifier || val.HeadMessage.Block == slotIdentifier { if (val.HeadMessage.Slot == slotIdentifier && val.MimicConfig == nil) || val.HeadMessage.Block == slotIdentifier {
slotFile = val.SignedBeaconBlock slotFile = val.SignedBeaconBlock
Message = val Message = val
} }
@ -770,16 +794,16 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string
// Helper function to test three reorg messages. There are going to be many functions like this, // Helper function to test three reorg messages. There are going to be many functions like this,
// Because we need to test the same logic for multiple phases. // Because we need to test the same logic for multiple phases.
func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) { func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) {
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement) go bc.CaptureHead()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
log.Info("Sending Phase0 Messages to BeaconClient") log.Info("Sending Messages to BeaconClient")
sendHeadMessage(bc, firstHead, maxRetry, 1) sendHeadMessage(bc, firstHead, maxRetry, 1)
sendHeadMessage(bc, secondHead, maxRetry, 1) sendHeadMessage(bc, secondHead, maxRetry, 1)
sendHeadMessage(bc, thirdHead, maxRetry, 1) sendHeadMessage(bc, thirdHead, maxRetry, 1)
curRetry := 0 curRetry := 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 2 { for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 2 {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
curRetry = curRetry + 1 curRetry = curRetry + 1
if curRetry == maxRetry { if curRetry == maxRetry {
@ -810,7 +834,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
} }
curRetry = 0 curRetry = 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 3 { for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 3 {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
curRetry = curRetry + 1 curRetry = curRetry + 1
if curRetry == maxRetry { if curRetry == maxRetry {
@ -818,7 +842,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
} }
} }
if bc.Metrics.HeadTrackingKnownGaps != 0 { if bc.Metrics.KnownGapsInserts != 0 {
Fail("We found gaps when processing a single block") Fail("We found gaps when processing a single block")
} }
@ -832,25 +856,25 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
// A test to validate a single block was processed correctly // A test to validate a single block was processed correctly
func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) { func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) {
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement) go bc.CaptureHead()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert) sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert)
curRetry := 0 curRetry := 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingKnownGaps) != expectedKnownGaps { for atomic.LoadUint64(&bc.Metrics.KnownGapsInserts) != expectedKnownGaps {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
curRetry = curRetry + 1 curRetry = curRetry + 1
if curRetry == maxRetry { if curRetry == maxRetry {
Fail(fmt.Sprintf("Wrong gap metrics, got: %d, wanted %d", bc.Metrics.HeadTrackingKnownGaps, expectedKnownGaps)) Fail(fmt.Sprintf("Wrong gap metrics, got: %d, wanted %d", bc.Metrics.KnownGapsInserts, expectedKnownGaps))
} }
} }
curRetry = 0 curRetry = 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != expectedReorgs { for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != expectedReorgs {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
curRetry = curRetry + 1 curRetry = curRetry + 1
if curRetry == maxRetry { if curRetry == maxRetry {
Fail(fmt.Sprintf("Wrong reorg metrics, got: %d, wanted %d", bc.Metrics.HeadTrackingKnownGaps, expectedKnownGaps)) Fail(fmt.Sprintf("Wrong reorg metrics, got: %d, wanted %d", bc.Metrics.KnownGapsInserts, expectedKnownGaps))
} }
} }
@ -862,14 +886,14 @@ func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head b
// A test that ensures that if two HeadMessages occur for a single slot they are marked // A test that ensures that if two HeadMessages occur for a single slot they are marked
// as proposed and forked correctly. // as proposed and forked correctly.
func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) { func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) {
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement) go bc.CaptureHead()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
sendHeadMessage(bc, firstHead, maxRetry, 1) sendHeadMessage(bc, firstHead, maxRetry, 1)
sendHeadMessage(bc, secondHead, maxRetry, 1) sendHeadMessage(bc, secondHead, maxRetry, 1)
curRetry := 0 curRetry := 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 1 { for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 1 {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
curRetry = curRetry + 1 curRetry = curRetry + 1
if curRetry == maxRetry { if curRetry == maxRetry {
@ -877,7 +901,7 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH
} }
} }
if bc.Metrics.HeadTrackingKnownGaps != 0 { if bc.Metrics.KnownGapsInserts != 0 {
Fail("We found gaps when processing a single block") Fail("We found gaps when processing a single block")
} }
@ -889,7 +913,8 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH
// A test that ensures that if two HeadMessages occur for a single slot they are marked // A test that ensures that if two HeadMessages occur for a single slot they are marked
// as proposed and forked correctly. // as proposed and forked correctly.
func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) { func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) {
go bc.CaptureHead(tableIncrement) bc.KnownGapTableIncrement = tableIncrement
go bc.CaptureHead()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
for _, headMsg := range msg { for _, headMsg := range msg {
@ -897,7 +922,7 @@ func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, t
} }
curRetry := 0 curRetry := 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingKnownGaps) != expectedEntries { for atomic.LoadUint64(&bc.Metrics.KnownGapsInserts) != expectedEntries {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
curRetry = curRetry + 1 curRetry = curRetry + 1
if curRetry == maxRetry { if curRetry == maxRetry {
@ -909,7 +934,7 @@ func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, t
knownGapCount := countKnownGapsTable(bc.Db) knownGapCount := countKnownGapsTable(bc.Db)
Expect(knownGapCount).To(Equal(int(expectedEntries))) Expect(knownGapCount).To(Equal(int(expectedEntries)))
if atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 0 { if atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 0 {
Fail("We found reorgs when we didn't expect it") Fail("We found reorgs when we didn't expect it")
} }
} }

View File

@ -0,0 +1,136 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// This file will call all the functions to start and stop capturing the head of the beacon chain.
package beaconclient
import (
"fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"golang.org/x/sync/errgroup"
)
// This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHistoric(maxWorkers int) []error {
log.Info("We are starting the historical processing service.")
hp := historicProcessing{db: bc.Db, metrics: bc.Metrics}
errs := handleBatchProcess(maxWorkers, hp, hp.db, bc.ServerEndpoint, bc.Metrics)
log.Debug("Exiting Historical")
return errs
}
// An interface to enforce any batch processing. Currently there are two use cases for this.
//
// 1. Historic Processing
//
// 2. Known Gaps Processing
type BatchProcessing interface {
getSlotRange(chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write.
handleProcessingErrors(<-chan batchHistoricError)
removeTableEntry(<-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database.
}
// A struct to pass around indicating a table entry for slots to process.
type slotsToProcess struct {
startSlot int // The start slot
endSlot int // The end slot
}
type batchHistoricError struct {
err error // The error that occurred when attempting to a slot
errProcess string // The process that caused the error.
slot int // The slot which the error is for.
}
// Wrapper function for the BatchProcessing interface.
// This function will take the structure that needs batch processing.
// It follows a generic format.
// Get new entries from any given table.
// 1. Add it to the slotsCh.
//
// 2. Run the maximum specified workers to handle individual slots. We need a maximum because we don't want
// To store too many SSZ objects in memory.
//
// 3. Process the slots and send the err to the ErrCh. Each structure can define how it wants its own errors handled.
//
// 4. Remove the slot entry from the DB.
//
// 5. Handle any errors.
func handleBatchProcess(maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics) []error {
slotsCh := make(chan slotsToProcess)
workCh := make(chan int)
processedCh := make(chan slotsToProcess)
errCh := make(chan batchHistoricError)
finishCh := make(chan []error, 1)
// Start workers
for w := 1; w <= maxWorkers; w++ {
log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers")
go processSlotRangeWorker(workCh, errCh, db, serverEndpoint, metrics)
}
// Process all ranges and send each individual slot to the worker.
go func() {
for slots := range slotsCh {
if slots.startSlot > slots.endSlot {
log.Error("We received a batch process request where the startSlot is greater than the end slot.")
errCh <- batchHistoricError{
err: fmt.Errorf("We received a startSlot where the start was greater than the end."),
errProcess: "RangeOrder",
slot: slots.startSlot,
}
errCh <- batchHistoricError{
err: fmt.Errorf("We received a endSlot where the start was greater than the end."),
errProcess: "RangeOrder",
slot: slots.endSlot,
}
} else {
for i := slots.startSlot; i <= slots.endSlot; i++ {
workCh <- i
}
processedCh <- slots
}
}
}()
// Remove entries, end the application if a row cannot be removed..
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
return bp.removeTableEntry(processedCh)
})
if err := errG.Wait(); err != nil {
finishCh <- []error{err}
}
}()
// Process errors from slot processing.
go bp.handleProcessingErrors(errCh)
// Get slots from the DB.
go func() {
errs := bp.getSlotRange(slotsCh) // Periodically adds new entries....
if errs != nil {
finishCh <- errs
}
finishCh <- nil
}()
errs := <-finishCh
log.Debug("Finishing the batchProcess")
return errs
}

View File

@ -0,0 +1,70 @@
package beaconclient_test
import (
"context"
"fmt"
"sync/atomic"
"time"
"github.com/jarcoal/httpmock"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
)
var _ = Describe("Capturehistoric", func() {
Describe("Run the application in historic mode", Label("unit", "behavioral"), func() {
Context("Phase0: When we need to process a single block in the ethcl.historic_process table.", Label("now"), func() {
It("Successfully Process the Block", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 101, 10)
BeaconNodeTester.runBatchProcess(bc, 2, 100, 101, 0, 0)
time.Sleep(2 * time.Second)
validateSlot(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, "proposed")
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE")
validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW")
//validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE")
//validateBeaconState(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW")
})
})
})
})
// This function will write an even to the ethcl.historic_process table
func (tbc TestBeaconNode) writeEventToHistoricProcess(bc *beaconclient.BeaconClient, startSlot, endSlot, priority int) {
log.Debug("We are writing the necessary events to batch process")
insertHistoricProcessingStmt := `INSERT INTO ethcl.historic_process (start_slot, end_slot, priority)
VALUES ($1, $2, $3);`
res, err := bc.Db.Exec(context.Background(), insertHistoricProcessingStmt, startSlot, endSlot, priority)
Expect(err)
rows, err := res.RowsAffected()
if rows != 1 {
Fail("We didnt write...")
}
Expect(err)
}
// Start the batch processing function, and check for the correct inserted slots.
func (tbc TestBeaconNode) runBatchProcess(bc *beaconclient.BeaconClient, maxWorkers int, startSlot uint64, endSlot uint64, expectedReorgs uint64, expectedKnownGaps uint64) {
go bc.CaptureHistoric(maxWorkers)
diff := endSlot - startSlot + 1
curRetry := 0
for atomic.LoadUint64(&bc.Metrics.SlotInserts) != diff {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
Fail(fmt.Sprintf("Too many retries have occurred. The number of inserts expects %d, the number that actually occurred, %d", atomic.LoadUint64(&bc.Metrics.SlotInserts), diff))
}
}
Expect(atomic.LoadUint64(&bc.Metrics.KnownGapsInserts)).To(Equal(expectedKnownGaps))
Expect(atomic.LoadUint64(&bc.Metrics.ReorgInserts)).To(Equal(expectedKnownGaps))
}

View File

@ -23,6 +23,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
"golang.org/x/sync/errgroup"
) )
var ( var (
@ -53,10 +54,19 @@ VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
CheckProposedStmt string = `SELECT slot, block_root CheckProposedStmt string = `SELECT slot, block_root
FROM ethcl.slots FROM ethcl.slots
WHERE slot=$1 AND block_root=$2;` WHERE slot=$1 AND block_root=$2;`
// Used to get a single slot from the table if it exists
QueryBySlotStmt string = `SELECT slot
FROM ethcl.slots
WHERE slot=$1`
// Statement to insert known_gaps. We don't pass in timestamp, we let the server take care of that one. // Statement to insert known_gaps. We don't pass in timestamp, we let the server take care of that one.
UpsertKnownGapsStmt string = ` UpsertKnownGapsStmt string = `
INSERT INTO ethcl.known_gaps (start_slot, end_slot, checked_out, reprocessing_error, entry_error, entry_process) INSERT INTO ethcl.known_gaps (start_slot, end_slot, checked_out, reprocessing_error, entry_error, entry_process)
VALUES ($1, $2, $3, $4, $5, $6) on CONFLICT (start_slot, end_slot) DO NOTHING` VALUES ($1, $2, $3, $4, $5, $6) on CONFLICT (start_slot, end_slot) DO NOTHING`
UpsertKnownGapsErrorStmt string = `
UPDATE ethcl.known_gaps
SET reprocessing_error=$3
WHERE start_slot=$1 AND end_slot=$2;`
// Get the highest slot if one exists
QueryHighestSlotStmt string = "SELECT COALESCE(MAX(slot), 0) FROM ethcl.slots" QueryHighestSlotStmt string = "SELECT COALESCE(MAX(slot), 0) FROM ethcl.slots"
) )
@ -148,19 +158,24 @@ func (dw *DatabaseWriter) writeFullSlot() error {
}).Debug("Starting to write to the DB.") }).Debug("Starting to write to the DB.")
err := dw.writeSlots() err := dw.writeSlots()
if err != nil { if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl.slots table...")
return err return err
} }
log.Debug("We finished writing to the ethcl.slots table.")
if dw.DbSlots.Status != "skipped" { if dw.DbSlots.Status != "skipped" {
err = dw.writeSignedBeaconBlocks() errG, _ := errgroup.WithContext(context.Background())
if err != nil { errG.Go(func() error {
return err return dw.writeSignedBeaconBlocks()
} })
err = dw.writeBeaconState() errG.Go(func() error {
if err != nil { return dw.writeBeaconState()
})
if err := errG.Wait(); err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl block or state table...")
return err return err
} }
} }
dw.Metrics.IncrementHeadTrackingInserts(1) dw.Metrics.IncrementSlotInserts(1)
return nil return nil
} }
@ -291,7 +306,7 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *
} }
} }
metrics.IncrementHeadTrackingReorgs(1) metrics.IncrementReorgsInsert(1)
} }
// Update the slots table by marking the old slot's as forked. // Update the slots table by marking the old slot's as forked.
@ -383,7 +398,7 @@ func upsertKnownGaps(db sql.Database, knModel DbKnownGaps, metric *BeaconClientM
"startSlot": knModel.StartSlot, "startSlot": knModel.StartSlot,
"endSlot": knModel.EndSlot, "endSlot": knModel.EndSlot,
}).Warn("A new gap has been added to the ethcl.known_gaps table.") }).Warn("A new gap has been added to the ethcl.known_gaps table.")
metric.IncrementHeadTrackingKnownGaps(1) metric.IncrementKnownGapsInserts(1)
} }
// A function to write the gap between the highest slot in the DB and the first processed slot. // A function to write the gap between the highest slot in the DB and the first processed slot.
@ -404,8 +419,50 @@ func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot int, metric
} }
} }
// A function to update a knownGap range with a reprocessing error.
func updateKnownGapErrors(db sql.Database, startSlot int, endSlot int, reprocessingErr error, metric *BeaconClientMetrics) error {
res, err := db.Exec(context.Background(), UpsertKnownGapsErrorStmt, startSlot, endSlot, reprocessingErr.Error())
if err != nil {
loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Error("Unable to update reprocessing_error")
metric.IncrementKnownGapsProcessingError(1)
return err
}
row, err := res.RowsAffected()
if err != nil {
loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Error("Unable to count rows affected when trying to update reprocessing_error.")
metric.IncrementKnownGapsProcessingError(1)
return err
}
if row != 1 {
loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).WithFields(log.Fields{
"rowCount": row,
}).Error("The rows affected by the upsert for reprocessing_error is not 1.")
metric.IncrementKnownGapsProcessingError(1)
return err
}
metric.IncrementKnownGapsProcessed(1)
return nil
}
// A quick helper function to calculate the epoch. // A quick helper function to calculate the epoch.
func calculateEpoch(slot int, slotPerEpoch int) string { func calculateEpoch(slot int, slotPerEpoch int) string {
epoch := slot / slotPerEpoch epoch := slot / slotPerEpoch
return strconv.Itoa(epoch) return strconv.Itoa(epoch)
} }
// A helper function to check to see if the slot is processed.
func isSlotProcessed(db sql.Database, checkProcessStmt string, slot string) (bool, error) {
processRow, err := db.Exec(context.Background(), checkProcessStmt, slot)
if err != nil {
return false, err
}
row, err := processRow.RowsAffected()
if err != nil {
return false, err
}
if row > 0 {
return true, nil
}
return false, nil
}

View File

@ -25,8 +25,8 @@ import (
var _ = Describe("Healthcheck", func() { var _ = Describe("Healthcheck", func() {
var ( var (
BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052) BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10)
errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010) errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10)
) )
Describe("Connecting to the lighthouse client", Label("integration"), func() { Describe("Connecting to the lighthouse client", Label("integration"), func() {
Context("When the client is running", func() { Context("When the client is running", func() {

View File

@ -94,5 +94,5 @@ func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan
func (bc *BeaconClient) captureEventTopic() { func (bc *BeaconClient) captureEventTopic() {
log.Info("We are capturing all SSE events") log.Info("We are capturing all SSE events")
go handleIncomingSseEvent(bc.HeadTracking, bc.Metrics.IncrementHeadError) go handleIncomingSseEvent(bc.HeadTracking, bc.Metrics.IncrementHeadError)
go handleIncomingSseEvent(bc.ReOrgTracking, bc.Metrics.IncrementHeadReorgError) go handleIncomingSseEvent(bc.ReOrgTracking, bc.Metrics.IncrementReorgError)
} }

View File

@ -21,29 +21,43 @@ import (
// A structure utilized for keeping track of various metrics. Currently, mostly used in testing. // A structure utilized for keeping track of various metrics. Currently, mostly used in testing.
type BeaconClientMetrics struct { type BeaconClientMetrics struct {
HeadTrackingInserts uint64 // Number of head events we successfully wrote to the DB. SlotInserts uint64 // Number of head events we successfully wrote to the DB.
HeadTrackingReorgs uint64 // Number of reorg events we successfully wrote to the DB. ReorgInserts uint64 // Number of reorg events we successfully wrote to the DB.
HeadTrackingKnownGaps uint64 // Number of known_gaps we successfully wrote to the DB. KnownGapsInserts uint64 // Number of known_gaps we successfully wrote to the DB.
HeadError uint64 // Number of errors that occurred when decoding the head message. knownGapsProcessed uint64 // Number of knownGaps processed.
HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message. KnownGapsProcessingError uint64 // Number of errors that occurred while processing a knownGap
HeadError uint64 // Number of errors that occurred when decoding the head message.
HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message.
} }
// Wrapper function to increment inserts. If we want to use mutexes later we can easily update all // Wrapper function to increment inserts. If we want to use mutexes later we can easily update all
// occurrences here. // occurrences here.
func (m *BeaconClientMetrics) IncrementHeadTrackingInserts(inc uint64) { func (m *BeaconClientMetrics) IncrementSlotInserts(inc uint64) {
atomic.AddUint64(&m.HeadTrackingInserts, inc) atomic.AddUint64(&m.SlotInserts, inc)
} }
// Wrapper function to increment reorgs. If we want to use mutexes later we can easily update all // Wrapper function to increment reorgs. If we want to use mutexes later we can easily update all
// occurrences here. // occurrences here.
func (m *BeaconClientMetrics) IncrementHeadTrackingReorgs(inc uint64) { func (m *BeaconClientMetrics) IncrementReorgsInsert(inc uint64) {
atomic.AddUint64(&m.HeadTrackingReorgs, inc) atomic.AddUint64(&m.ReorgInserts, inc)
} }
// Wrapper function to increment known gaps. If we want to use mutexes later we can easily update all // Wrapper function to increment known gaps. If we want to use mutexes later we can easily update all
// occurrences here. // occurrences here.
func (m *BeaconClientMetrics) IncrementHeadTrackingKnownGaps(inc uint64) { func (m *BeaconClientMetrics) IncrementKnownGapsInserts(inc uint64) {
atomic.AddUint64(&m.HeadTrackingKnownGaps, inc) atomic.AddUint64(&m.KnownGapsInserts, inc)
}
// Wrapper function to increment known gaps processed. If we want to use mutexes later we can easily update all
// occurrences here.
func (m *BeaconClientMetrics) IncrementKnownGapsProcessed(inc uint64) {
atomic.AddUint64(&m.knownGapsProcessed, inc)
}
// Wrapper function to increment known gaps processing error. If we want to use mutexes later we can easily update all
// occurrences here.
func (m *BeaconClientMetrics) IncrementKnownGapsProcessingError(inc uint64) {
atomic.AddUint64(&m.KnownGapsProcessingError, inc)
} }
// Wrapper function to increment head errors. If we want to use mutexes later we can easily update all // Wrapper function to increment head errors. If we want to use mutexes later we can easily update all
@ -54,6 +68,6 @@ func (m *BeaconClientMetrics) IncrementHeadError(inc uint64) {
// Wrapper function to increment reorg errors. If we want to use mutexes later we can easily update all // Wrapper function to increment reorg errors. If we want to use mutexes later we can easily update all
// occurrences here. // occurrences here.
func (m *BeaconClientMetrics) IncrementHeadReorgError(inc uint64) { func (m *BeaconClientMetrics) IncrementReorgError(inc uint64) {
atomic.AddUint64(&m.HeadReorgError, inc) atomic.AddUint64(&m.HeadReorgError, inc)
} }

View File

@ -23,9 +23,6 @@ import (
"strconv" "strconv"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
"golang.org/x/sync/errgroup"
) )
// This function will perform the necessary steps to handle a reorg. // This function will perform the necessary steps to handle a reorg.
@ -63,19 +60,7 @@ func (bc *BeaconClient) handleHead() {
log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.") log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.")
go func(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) { go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement)
errG := new(errgroup.Group)
errG.Go(func() error {
err = processHeadSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, metrics, knownGapsTableIncrement)
if err != nil {
return err
}
return nil
})
if err := errG.Wait(); err != nil {
loghelper.LogSlotError(strconv.Itoa(slot), err).Error("Unable to process a slot")
}
}(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement)
log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.") log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.")
@ -83,5 +68,4 @@ func (bc *BeaconClient) handleHead() {
bc.PreviousSlot = slot bc.PreviousSlot = slot
bc.PreviousBlockRoot = head.Block bc.PreviousBlockRoot = head.Block
} }
} }

View File

@ -0,0 +1,223 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// This file contains all the code to process historic slots.
package beaconclient
import (
"context"
"fmt"
"strconv"
"time"
"github.com/jackc/pgx/v4"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
var (
// Get a single highest priority and non-checked out row row from ethcl.historical_process
getHpEntryStmt string = `SELECT start_slot, end_slot FROM ethcl.historic_process
WHERE checked_out=false
ORDER BY priority ASC
LIMIT 1;`
// Used to periodically check to see if there is a new entry in the ethcl.historic_process table.
checkHpEntryStmt string = `SELECT * FROM ethcl.historic_process WHERE checked_out=false;`
// Used to checkout a row from the ethcl.historic_process table
lockHpEntryStmt string = `UPDATE ethcl.historic_process
SET checked_out=true
WHERE start_slot=$1 AND end_slot=$2;`
// Used to delete an entry from the knownGaps table
deleteHpEntryStmt string = `DELETE FROM ethcl.historic_process
WHERE start_slot=$1 AND end_slot=$2;`
)
type historicProcessing struct {
db sql.Database
metrics *BeaconClientMetrics
}
// Get a single row of historical slots from the table.
func (hp historicProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error {
return getBatchProcessRow(hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh)
}
// Remove the table entry.
func (hp historicProcessing) removeTableEntry(processCh <-chan slotsToProcess) error {
return removeRowPostProcess(hp.db, processCh, QueryBySlotStmt, deleteHpEntryStmt)
}
// Remove the table entry.
func (hp historicProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) {
for {
errMs := <-errMessages
loghelper.LogSlotError(strconv.Itoa(errMs.slot), errMs.err)
writeKnownGaps(hp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, hp.metrics)
}
}
// Process the slot range.
func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics) {
for slot := range workCh {
log.Debug("Handling slot: ", slot)
err, errProcess := handleHistoricSlot(db, serverAddress, slot, metrics)
errMs := batchHistoricError{
err: err,
errProcess: errProcess,
slot: slot,
}
if err != nil {
errCh <- errMs
}
}
}
// A wrapper function that insert the start_slot and end_slot from a single row into a channel.
// It also locks the row by updating the checked_out column.
// The statement for getting the start_slot and end_slot must be provided.
// The statement for "locking" the row must also be provided.
func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess) []error {
errCount := make([]error, 0)
// 5 is an arbitrary number. It allows us to retry a few times before
// ending the application.
prevErrCount := 0
for len(errCount) < 5 {
if len(errCount) != prevErrCount {
log.WithFields(log.Fields{
"errCount": errCount,
}).Error("New error entry added")
}
processRow, err := db.Exec(context.Background(), checkNewRowsStmt)
if err != nil {
errCount = append(errCount, err)
}
row, err := processRow.RowsAffected()
if err != nil {
errCount = append(errCount, err)
}
if row < 1 {
time.Sleep(1000 * time.Millisecond)
log.Debug("We are checking rows, be patient")
continue
}
log.Debug("We found a new row")
ctx := context.Background()
// Setup TX
tx, err := db.Begin(ctx)
if err != nil {
loghelper.LogError(err).Error("We are unable to Begin a SQL transaction")
errCount = append(errCount, err)
continue
}
defer func() {
err := tx.Rollback(ctx)
if err != nil {
loghelper.LogError(err).Error("We were unable to Rollback a transaction")
errCount = append(errCount, err)
}
}()
// Query the DB for slots.
sp := slotsToProcess{}
err = tx.QueryRow(ctx, getStartEndSlotStmt).Scan(&sp.startSlot, &sp.endSlot)
if err != nil {
if err == pgx.ErrNoRows {
time.Sleep(100 * time.Millisecond)
continue
}
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), getStartEndSlotStmt, err).Error("Unable to get a row")
errCount = append(errCount, err)
continue
}
// Checkout the Row
res, err := tx.Exec(ctx, checkOutRowStmt, sp.startSlot, sp.endSlot)
if err != nil {
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).Error("Unable to checkout the row")
errCount = append(errCount, err)
continue
}
rows, err := res.RowsAffected()
if err != nil {
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, fmt.Errorf("Unable to determine the rows affected when trying to checkout a row."))
errCount = append(errCount, err)
continue
}
if rows > 1 {
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).WithFields(log.Fields{
"rowsReturn": rows,
}).Error("We locked too many rows.....")
errCount = append(errCount, err)
continue
}
if rows == 0 {
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).WithFields(log.Fields{
"rowsReturn": rows,
}).Error("We did not lock a single row.")
errCount = append(errCount, err)
continue
}
err = tx.Commit(ctx)
if err != nil {
loghelper.LogSlotRangeError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), err).Error("Unable commit transactions.")
errCount = append(errCount, err)
continue
}
slotCh <- sp
}
log.WithFields(log.Fields{
"ErrCount": errCount,
}).Error("The ErrCounter")
return errCount
}
// After a row has been processed it should be removed from its appropriate table.
func removeRowPostProcess(db sql.Database, processCh <-chan slotsToProcess, checkProcessedStmt, removeStmt string) error {
errCh := make(chan error)
for {
slots := <-processCh
// Make sure the start and end slot exist in the slots table.
go func() {
finishedProcess := false
for !finishedProcess {
isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot))
if err != nil {
errCh <- err
}
isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.endSlot))
if err != nil {
errCh <- err
}
if isStartProcess && isEndProcess {
finishedProcess = true
}
}
_, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot))
if err != nil {
errCh <- err
}
}()
if len(errCh) != 0 {
return <-errCh
}
}
}

View File

@ -0,0 +1,101 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// This file contains all the code to process historic slots.
package beaconclient
import (
"context"
"strconv"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
var (
// Get a single non-checked out row row from ethcl.known_gaps.
getKgEntryStmt string = `SELECT start_slot, end_slot FROM ethcl.known_gaps
WHERE checked_out=false
LIMIT 1;`
// Used to periodically check to see if there is a new entry in the ethcl.known_gaps table.
checkKgEntryStmt string = `SELECT * FROM ethcl.known_gaps WHERE checked_out=false;`
// Used to checkout a row from the ethcl.known_gaps table
lockKgEntryStmt string = `UPDATE ethcl.known_gaps
SET checked_out=true
WHERE start_slot=$1 AND end_slot=$2;`
// Used to delete an entry from the knownGaps table
deleteKgEntryStmt string = `DELETE FROM ethcl.known_gaps
WHERE start_slot=$1 AND end_slot=$2;`
// Used to check to see if a single slot exists in the known_gaps table.
checkKgSingleSlotStmt string = `SELECT start_slot, end_slot FROM ethcl.known_gaps
WHERE start_slot=$1 AND end_slot=$2;`
)
type knownGapsProcessing struct {
db sql.Database
metrics *BeaconClientMetrics
}
// This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) ProcessKnownGaps(maxWorkers int) []error {
log.Info("We are starting the known gaps processing service.")
hp := knownGapsProcessing{db: bc.Db, metrics: bc.Metrics}
errs := handleBatchProcess(maxWorkers, hp, hp.db, bc.ServerEndpoint, bc.Metrics)
log.Debug("Exiting known gaps processing service")
return errs
}
// Get a single row of historical slots from the table.
func (kgp knownGapsProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error {
return getBatchProcessRow(kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh)
}
// Remove the table entry.
func (kgp knownGapsProcessing) removeTableEntry(processCh <-chan slotsToProcess) error {
return removeRowPostProcess(kgp.db, processCh, QueryBySlotStmt, deleteKgEntryStmt)
}
// Remove the table entry.
func (kgp knownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) {
for {
errMs := <-errMessages
// Check to see if this if this entry already exists.
res, err := kgp.db.Exec(context.Background(), checkKgSingleSlotStmt, errMs.slot, errMs.slot)
if err != nil {
loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).Error("Unable to see if this slot is in the ethcl.known_gaps table")
}
rows, err := res.RowsAffected()
if err != nil {
loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).WithFields(log.Fields{
"queryStatement": checkKgSingleSlotStmt,
}).Error("Unable to get the number of rows affected by this statement.")
}
if rows > 0 {
loghelper.LogSlotError(strconv.Itoa(errMs.slot), errMs.err).Error("We received an error when processing a knownGap")
err = updateKnownGapErrors(kgp.db, errMs.slot, errMs.slot, errMs.err, kgp.metrics)
if err != nil {
loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).Error("Error processing known gap")
}
} else {
writeKnownGaps(kgp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, kgp.metrics)
}
}
}

View File

@ -45,7 +45,6 @@ var (
return fmt.Sprintf("Unable to properly unmarshal the Slot field in the %s.", obj) return fmt.Sprintf("Unable to properly unmarshal the Slot field in the %s.", obj)
} }
ParentRootUnmarshalError = "Unable to properly unmarshal the ParentRoot field in the SignedBeaconBlock." ParentRootUnmarshalError = "Unable to properly unmarshal the ParentRoot field in the SignedBeaconBlock."
MissingIdentifiedError = "Can't query state without a set slot or block_root"
MissingEth1Data = "Can't get the Eth1 block_hash" MissingEth1Data = "Can't get the Eth1 block_hash"
VersionedUnmarshalerError = "Unable to create a versioned unmarshaler" VersionedUnmarshalerError = "Unable to create a versioned unmarshaler"
) )
@ -78,7 +77,9 @@ type ProcessSlot struct {
} }
// This function will do all the work to process the slot and write it to the DB. // This function will do all the work to process the slot and write it to the DB.
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) error { // It will return the error and error process. The error process is used for providing reach detail to the
// known_gaps table.
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) (error, string) {
ps := &ProcessSlot{ ps := &ProcessSlot{
Slot: slot, Slot: slot,
BlockRoot: blockRoot, BlockRoot: blockRoot,
@ -110,60 +111,55 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
}) })
if err := g.Wait(); err != nil { if err := g.Wait(); err != nil {
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot", ps.Metrics) return err, "processSlot"
return err
}
if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" {
writeStartUpGaps(db, knownGapsTableIncrement, ps.Slot, ps.Metrics)
} }
// Get this object ready to write // Get this object ready to write
blockRootEndpoint := serverAddress + BcBlockRootEndpoint(strconv.Itoa(ps.Slot)) dw, err := ps.createWriteObjects()
dw, err := ps.createWriteObjects(blockRootEndpoint)
if err != nil { if err != nil {
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "blockRoot", ps.Metrics) return err, "blockRoot"
return err
} }
// Write the object to the DB. // Write the object to the DB.
err = dw.writeFullSlot() err = dw.writeFullSlot()
if err != nil { if err != nil {
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot", ps.Metrics) return err, "processSlot"
return err
} }
// Handle any reorgs or skipped slots. // Handle any reorgs or skipped slots.
headOrHistoric = strings.ToLower(headOrHistoric) headOrHistoric = strings.ToLower(headOrHistoric)
if headOrHistoric != "head" && headOrHistoric != "historic" { if headOrHistoric != "head" && headOrHistoric != "historic" {
return fmt.Errorf("headOrHistoric must be either historic or head!") return fmt.Errorf("headOrHistoric must be either historic or head!"), ""
} }
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" { if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" {
ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement) ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement)
} }
return nil return nil, ""
} }
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`. // Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) error { func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) {
return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement) // Get the knownGaps at startUp.
if previousSlot == 0 && previousBlockRoot == "" {
writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics)
}
err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement)
if err != nil {
writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics)
}
} }
// Handle a historic slot. A wrapper function for calling `handleFullSlot`. // Handle a historic slot. A wrapper function for calling `handleFullSlot`.
// Commented because of the linter...... LOL func handleHistoricSlot(db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics) (error, string) {
//func handleHistoricSlot(db sql.Database, serverAddress string, slot int) error { return processFullSlot(db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1)
// return handleFullSlot(db, serverAddress, slot, "", "", 0, "", "historic") }
//}
// Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values. // Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values.
func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *dt.VersionedUnmarshaler) error { func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *dt.VersionedUnmarshaler) error {
var blockIdentifier string // Used to query the block var blockIdentifier string // Used to query the block
if ps.BlockRoot != "" { if ps.BlockRoot != "" {
blockIdentifier = ps.BlockRoot blockIdentifier = ps.BlockRoot
} else if ps.Slot != 0 {
blockIdentifier = strconv.Itoa(ps.Slot)
} else { } else {
log.Error(MissingIdentifiedError) blockIdentifier = strconv.Itoa(ps.Slot)
return fmt.Errorf(MissingIdentifiedError)
} }
blockEndpoint := serverAddress + BcBlockQueryEndpoint + blockIdentifier blockEndpoint := serverAddress + BcBlockQueryEndpoint + blockIdentifier
var err error var err error
@ -208,11 +204,8 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.Ver
var stateIdentifier string // Used to query the state var stateIdentifier string // Used to query the state
if ps.StateRoot != "" { if ps.StateRoot != "" {
stateIdentifier = ps.StateRoot stateIdentifier = ps.StateRoot
} else if ps.Slot != 0 {
stateIdentifier = strconv.Itoa(ps.Slot)
} else { } else {
log.Error(MissingIdentifiedError) stateIdentifier = strconv.Itoa(ps.Slot)
return fmt.Errorf(MissingIdentifiedError)
} }
stateEndpoint := serverEndpoint + BcStateQueryEndpoint + stateIdentifier stateEndpoint := serverEndpoint + BcStateQueryEndpoint + stateIdentifier
ps.SszBeaconState, _, _ = querySsz(stateEndpoint, strconv.Itoa(ps.Slot)) ps.SszBeaconState, _, _ = querySsz(stateEndpoint, strconv.Itoa(ps.Slot))
@ -243,10 +236,15 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().ParentRoot()) parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().ParentRoot())
if previousSlot == int(ps.FullBeaconState.Slot()) { if previousSlot == int(ps.FullBeaconState.Slot()) {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"slot": ps.FullBeaconState.Slot, "slot": ps.FullBeaconState.Slot(),
"fork": true, "fork": true,
}).Warn("A fork occurred! The previous slot and current slot match.") }).Warn("A fork occurred! The previous slot and current slot match.")
writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics) writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
} else if previousSlot > int(ps.FullBeaconState.Slot()) {
log.WithFields(log.Fields{
"previousSlot": previousSlot,
"curSlot": int(ps.FullBeaconState.Slot()),
}).Warn("We noticed the previous slot is greater than the current slot.")
} else if previousSlot+1 != int(ps.FullBeaconState.Slot()) { } else if previousSlot+1 != int(ps.FullBeaconState.Slot()) {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"previousSlot": previousSlot, "previousSlot": previousSlot,
@ -265,7 +263,7 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
} }
// Transforms all the raw data into DB models that can be written to the DB. // Transforms all the raw data into DB models that can be written to the DB.
func (ps *ProcessSlot) createWriteObjects(blockRootEndpoint string) (*DatabaseWriter, error) { func (ps *ProcessSlot) createWriteObjects() (*DatabaseWriter, error) {
var ( var (
stateRoot string stateRoot string
blockRoot string blockRoot string
@ -289,10 +287,13 @@ func (ps *ProcessSlot) createWriteObjects(blockRootEndpoint string) (*DatabaseWr
blockRoot = ps.BlockRoot blockRoot = ps.BlockRoot
} else { } else {
var err error var err error
blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot)) rawBlockRoot, err := ps.FullSignedBeaconBlock.Block().HashTreeRoot()
//blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot))
if err != nil { if err != nil {
return nil, err return nil, err
} }
blockRoot = "0x" + hex.EncodeToString(rawBlockRoot[:])
log.WithFields(log.Fields{"blockRoot": blockRoot}).Debug("Block Root from ssz:")
} }
eth1BlockHash = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().Body().Eth1Data().BlockHash) eth1BlockHash = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().Body().Eth1Data().BlockHash)
} }

View File

@ -27,9 +27,26 @@ func LogError(err error) *log.Entry {
}) })
} }
// A simple herlper function to log slot and error.
func LogSlotError(slot string, err error) *log.Entry { func LogSlotError(slot string, err error) *log.Entry {
return log.WithFields(log.Fields{ return log.WithFields(log.Fields{
"err": err, "err": err,
"slot": slot, "slot": slot,
}) })
} }
func LogSlotRangeError(startSlot string, endSlot string, err error) *log.Entry {
return log.WithFields(log.Fields{
"err": err,
"startSlot": startSlot,
"endSlot": endSlot,
})
}
func LogSlotRangeStatementError(startSlot string, endSlot string, statement string, err error) *log.Entry {
return log.WithFields(log.Fields{
"err": err,
"startSlot": startSlot,
"endSlot": endSlot,
"SqlStatement": statement,
})
}