diff --git a/.github/workflows/on-pr.yml b/.github/workflows/on-pr.yml
index 259788e..1e7f347 100644
--- a/.github/workflows/on-pr.yml
+++ b/.github/workflows/on-pr.yml
@@ -26,7 +26,7 @@ on:
env:
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'main'}}
- ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || 'feature/schema-ipld-ethcl-indexer' }}
+ ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || 'feature/historic-processing' }}
ssz-data-ref: ${{ github.event.inputs.ssz-data-ref || 'main' }}
GOPATH: /tmp/go
jobs:
@@ -206,4 +206,5 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
- args: --timeout 90s
+ args: --timeout 90s --disable deadcode,unused
+# args: --timeout 90s --disable deadcode,
diff --git a/README.md b/README.md
index cf285e2..2b5ab8b 100644
--- a/README.md
+++ b/README.md
@@ -37,7 +37,7 @@ To run the application, do as follows:
2. Run the start up command.
```
-go run -race main.go capture head --db.address localhost \
+go run -race main.go capture historic --db.address localhost \
--db.password password \
--db.port 8076 \
--db.username vdbm \
@@ -45,11 +45,14 @@ go run -race main.go capture head --db.address localhost \
--db.driver PGX \
--bc.address localhost \
--bc.port 5052 \
+ --bc.maxHistoricProcessWorker 2 \
+ --bc.maxKnownGapsWorker 2 \
+ --bc.knownGapsProcess=true \
--bc.connectionProtocol http \
- --t.skipSync=true \
- --log.level info \
+ --t.skipSync=false \
+ --log.level debug \
--log.output=true \
- --kg.increment 100
+ --kg.increment 1000000
```
## Running Tests
diff --git a/cmd/boot.go b/cmd/boot.go
index 3f31e35..9fc3532 100644
--- a/cmd/boot.go
+++ b/cmd/boot.go
@@ -44,7 +44,8 @@ func bootApp() {
log.Info("Starting the application in boot mode.")
ctx := context.Background()
- BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", testDisregardSync)
+ BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver,
+ bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, kgTableIncrement, "head", testDisregardSync)
if err != nil {
loghelper.LogError(err).Error("Unable to Start application")
}
diff --git a/cmd/capture.go b/cmd/capture.go
index 5916bfa..aad3aef 100644
--- a/cmd/capture.go
+++ b/cmd/capture.go
@@ -17,6 +17,7 @@
package cmd
import (
+ "fmt"
"os"
"time"
@@ -25,21 +26,25 @@ import (
)
var (
- dbUsername string
- dbPassword string
- dbName string
- dbAddress string
- dbDriver string
- dbPort int
- bcAddress string
- bcPort int
- bcBootRetryInterval int
- bcBootMaxRetry int
- bcConnectionProtocol string
- bcType string
- maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second
- notifierCh chan os.Signal = make(chan os.Signal, 1)
- testDisregardSync bool
+ dbUsername string
+ dbPassword string
+ dbName string
+ dbAddress string
+ dbDriver string
+ dbPort int
+ bcAddress string
+ bcPort int
+ bcBootRetryInterval int
+ bcBootMaxRetry int
+ bcConnectionProtocol string
+ bcType string
+ bcMaxHistoricProcessWorker int
+ kgMaxWorker int
+ kgTableIncrement int
+ kgProcessGaps bool
+ maxWaitSecondsShutdown time.Duration = time.Duration(20) * time.Second
+ notifierCh chan os.Signal = make(chan os.Signal, 1)
+ testDisregardSync bool
)
// captureCmd represents the capture command
@@ -64,18 +69,18 @@ func init() {
captureCmd.PersistentFlags().StringVarP(&dbName, "db.name", "n", "", "Database name connect to DB(required)")
captureCmd.PersistentFlags().StringVarP(&dbDriver, "db.driver", "", "", "Database Driver to connect to DB(required)")
captureCmd.PersistentFlags().IntVarP(&dbPort, "db.port", "", 0, "Port to connect to DB(required)")
- err := captureCmd.MarkPersistentFlagRequired("db.username")
- exitErr(err)
- err = captureCmd.MarkPersistentFlagRequired("db.password")
- exitErr(err)
- err = captureCmd.MarkPersistentFlagRequired("db.address")
- exitErr(err)
- err = captureCmd.MarkPersistentFlagRequired("db.port")
- exitErr(err)
- err = captureCmd.MarkPersistentFlagRequired("db.name")
- exitErr(err)
- err = captureCmd.MarkPersistentFlagRequired("db.driver")
- exitErr(err)
+ //err := captureCmd.MarkPersistentFlagRequired("db.username")
+ // exitErr(err)
+ // err = captureCmd.MarkPersistentFlagRequired("db.password")
+ // exitErr(err)
+ // err = captureCmd.MarkPersistentFlagRequired("db.address")
+ // exitErr(err)
+ // err = captureCmd.MarkPersistentFlagRequired("db.port")
+ // exitErr(err)
+ // err = captureCmd.MarkPersistentFlagRequired("db.name")
+ // exitErr(err)
+ // err = captureCmd.MarkPersistentFlagRequired("db.driver")
+ // exitErr(err)
//// Beacon Client Specific
captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required)")
@@ -84,17 +89,23 @@ func init() {
captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.")
captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application")
captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application")
- err = captureCmd.MarkPersistentFlagRequired("bc.address")
- exitErr(err)
- err = captureCmd.MarkPersistentFlagRequired("bc.port")
- exitErr(err)
+ captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.")
+ // err = captureCmd.MarkPersistentFlagRequired("bc.address")
+ // exitErr(err)
+ // err = captureCmd.MarkPersistentFlagRequired("bc.port")
+ // exitErr(err)
+
+ //// Known Gaps specific
+ captureCmd.PersistentFlags().BoolVarP(&kgProcessGaps, "kg.processKnownGaps", "", true, "Should we process the slots within the ethcl.known_gaps table.")
+ captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.")
+ captureCmd.PersistentFlags().IntVarP(&kgMaxWorker, "kg.maxKnownGapsWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.known_gaps table. Be careful of system memory.")
//// Testing Specific
captureCmd.PersistentFlags().BoolVar(&testDisregardSync, "t.skipSync", false, "Should we disregard the head sync?")
// Bind Flags with Viper
//// DB Flags
- err = viper.BindPFlag("db.username", captureCmd.PersistentFlags().Lookup("db.username"))
+ err := viper.BindPFlag("db.username", captureCmd.PersistentFlags().Lookup("db.username"))
exitErr(err)
err = viper.BindPFlag("db.password", captureCmd.PersistentFlags().Lookup("db.password"))
exitErr(err)
@@ -104,14 +115,12 @@ func init() {
exitErr(err)
err = viper.BindPFlag("db.name", captureCmd.PersistentFlags().Lookup("db.name"))
exitErr(err)
+
+ //// Testing Specific
err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync"))
exitErr(err)
- // Testing Specific
- err = viper.BindPFlag("t.driver", captureCmd.PersistentFlags().Lookup("db.driver"))
- exitErr(err)
-
- // LH specific
+ //// LH specific
err = viper.BindPFlag("bc.address", captureCmd.PersistentFlags().Lookup("bc.address"))
exitErr(err)
err = viper.BindPFlag("bc.type", captureCmd.PersistentFlags().Lookup("bc.type"))
@@ -124,14 +133,25 @@ func init() {
exitErr(err)
err = viper.BindPFlag("bc.bootMaxRetry", captureCmd.PersistentFlags().Lookup("bc.bootMaxRetry"))
exitErr(err)
+ err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker"))
+ exitErr(err)
// Here you will define your flags and configuration settings.
+ //// Known Gap Specific
+ err = viper.BindPFlag("kg.processKnownGaps", captureCmd.PersistentFlags().Lookup("kg.processKnownGaps"))
+ exitErr(err)
+ err = viper.BindPFlag("kg.increment", captureCmd.PersistentFlags().Lookup("kg.increment"))
+ exitErr(err)
+ err = viper.BindPFlag("kg.processKnownGaps", captureCmd.PersistentFlags().Lookup("kg.maxKnownGapsWorker"))
+ exitErr(err)
+
}
// Helper function to catch any errors.
// We need to capture these errors for the linter.
func exitErr(err error) {
if err != nil {
+ fmt.Println("Error: ", err)
os.Exit(1)
}
}
diff --git a/cmd/head.go b/cmd/head.go
index d5544a4..7589fd5 100644
--- a/cmd/head.go
+++ b/cmd/head.go
@@ -18,7 +18,7 @@ package cmd
import (
"context"
- "os"
+ "fmt"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@@ -26,10 +26,7 @@ import (
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
-)
-
-var (
- kgTableIncrement int
+ "golang.org/x/sync/errgroup"
)
// headCmd represents the head command
@@ -48,21 +45,35 @@ func startHeadTracking() {
log.Info("Starting the application in head tracking mode.")
ctx := context.Background()
- BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "historic", testDisregardSync)
+ Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
+ viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
+ viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"))
if err != nil {
- loghelper.LogError(err).Error("Unable to Start application")
- if DB != nil {
- DB.Close()
- }
- os.Exit(1)
+ StopApplicationPreBoot(err, Db)
}
log.Info("The Beacon Client has booted successfully!")
// Capture head blocks
- go BC.CaptureHead(kgTableIncrement)
+ go Bc.CaptureHead()
+ if viper.GetBool("kg.processKnownGaps") {
+ go func() {
+ errG := new(errgroup.Group)
+ errG.Go(func() error {
+ errs := Bc.ProcessKnownGaps(viper.GetInt("kg.maxKnownGapsWorker"))
+ if len(errs) != 0 {
+ log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
+ return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
+ }
+ return nil
+ })
+ if err := errG.Wait(); err != nil {
+ loghelper.LogError(err).Error("Error with knownGaps processing")
+ }
+ }()
+ }
// Shutdown when the time is right.
- err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC)
+ err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
} else {
@@ -73,9 +84,4 @@ func startHeadTracking() {
func init() {
captureCmd.AddCommand(headCmd)
-
- // Known Gaps specific
- captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.")
- err := viper.BindPFlag("kg.increment", captureCmd.PersistentFlags().Lookup("kg.increment"))
- exitErr(err)
}
diff --git a/cmd/historic.go b/cmd/historic.go
index d52cb0d..519d00e 100644
--- a/cmd/historic.go
+++ b/cmd/historic.go
@@ -18,13 +18,17 @@ package cmd
import (
"context"
+ "fmt"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
+ "github.com/spf13/viper"
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
+ "github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
+ "golang.org/x/sync/errgroup"
)
// historicCmd represents the historic command
@@ -43,9 +47,49 @@ func startHistoricProcessing() {
log.Info("Starting the application in head tracking mode.")
ctx := context.Background()
- _, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "historic", testDisregardSync)
+ Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
+ viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
+ viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"))
if err != nil {
- StopApplicationPreBoot(err, DB)
+ StopApplicationPreBoot(err, Db)
+ }
+
+ errG, _ := errgroup.WithContext(context.Background())
+
+ errG.Go(func() error {
+ errs := Bc.CaptureHistoric(viper.GetInt("bc.maxHistoricProcessWorker"))
+ if len(errs) != 0 {
+ if len(errs) != 0 {
+ log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events")
+ return fmt.Errorf("Application ended because there were too many error when attempting to process historic")
+ }
+ }
+ return nil
+ })
+
+ if viper.GetBool("kg.processKnownGaps") {
+ go func() {
+ errG := new(errgroup.Group)
+ errG.Go(func() error {
+ errs := Bc.ProcessKnownGaps(viper.GetInt("kg.maxKnownGapsWorker"))
+ if len(errs) != 0 {
+ log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
+ return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
+ }
+ return nil
+ })
+ if err := errG.Wait(); err != nil {
+ loghelper.LogError(err).Error("Error with knownGaps processing")
+ }
+ }()
+ }
+
+ // Shutdown when the time is right.
+ err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc)
+ if err != nil {
+ loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
+ } else {
+ log.Info("Gracefully shutdown ipld-ethcl-indexer")
}
}
diff --git a/cmd/root.go b/cmd/root.go
index 2625dbe..68e8e78 100644
--- a/cmd/root.go
+++ b/cmd/root.go
@@ -47,6 +47,7 @@ It can either do this will keeping track of head, or backfilling historic data.`
func Execute() {
err := rootCmd.Execute()
if err != nil {
+ fmt.Println("Err when executing rootCmd", err)
os.Exit(1)
}
}
diff --git a/example.ipld-ethcl-indexer-config.json b/example.ipld-ethcl-indexer-config.json
new file mode 100644
index 0000000..82513b4
--- /dev/null
+++ b/example.ipld-ethcl-indexer-config.json
@@ -0,0 +1,33 @@
+{
+ "db": {
+ "address": "localhost",
+ "password": "password",
+ "port": 8076,
+ "username": "vdbm",
+ "name": "vulcanize_testing",
+ "driver": "PGX"
+ },
+ "bc": {
+ "address": "10.203.8.51",
+ "port": 5052,
+ "type": "lighthouse",
+ "bootRetryInterval": 30,
+ "bootMaxRetry": 5,
+ "maxHistoricProcessWorker": 2,
+ "connectionProtocol": "http"
+ },
+ "t": {
+ "skipSync": false
+ },
+ "log": {
+ "level": "debug",
+ "output": true,
+ "file": "./ipld-ethcl-indexer.log",
+ "format": "json"
+ },
+ "kg": {
+ "increment": 10000,
+ "processKnownGaps": true,
+ "maxKnownGapsWorker": 2
+ }
+}
diff --git a/internal/boot/boot.go b/internal/boot/boot.go
index 3802407..e726cf8 100644
--- a/internal/boot/boot.go
+++ b/internal/boot/boot.go
@@ -41,11 +41,12 @@ var (
// 2. Connect to the database.
//
// 3. Make sure the node is synced, unless disregardSync is true.
-func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
+func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
+ bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
log.Info("Booting the Application")
log.Debug("Creating the Beacon Client")
- BC = beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort)
+ BC = beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement)
log.Debug("Checking Beacon Client")
err := BC.CheckBeaconClient()
@@ -81,32 +82,65 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName
// Add retry logic to ensure that we are give the Beacon Client and the DB time to start.
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
- bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, startUpMode string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
+ bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int, startUpMode string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
var err error
- for i := 0; i < bcMaxRetry; i++ {
- BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol, disregardSync)
- if err != nil {
- log.WithFields(log.Fields{
- "retryNumber": i,
- "err": err,
- }).Warn("Unable to boot application. Going to try again")
- time.Sleep(time.Duration(bcRetryInterval) * time.Second)
- continue
+
+ if bcMaxRetry < 0 {
+ i := 0
+ for {
+ BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName,
+ bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "retryNumber": i,
+ "err": err,
+ }).Warn("Unable to boot application. Going to try again")
+ time.Sleep(time.Duration(bcRetryInterval) * time.Second)
+ i = i + 1
+ continue
+ }
+ break
+ }
+ } else {
+ for i := 0; i < bcMaxRetry; i++ {
+ BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName,
+ bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "retryNumber": i,
+ "err": err,
+ }).Warn("Unable to boot application. Going to try again")
+ time.Sleep(time.Duration(bcRetryInterval) * time.Second)
+ continue
+ }
+ break
}
- break
}
switch strings.ToLower(startUpMode) {
case "head":
- log.Debug("No further actions needed to boot the application at this phase.")
+ BC.PerformHeadTracking = true
case "historic":
log.Debug("Performing additional boot steps for historical processing")
+ BC.PerformHistoricalProcessing = true
+ // This field is not currently used.
+ // The idea is, that if we are doing historially processing and we get a slot
+ // greater than this slot, then we would rerun this function.
+ // this would ensure that we have the slots necessary for processing
+ // within the beacon server.
+
+ // We can implement this feature if we notice any errors.
headSlot, err := BC.GetLatestSlotInBeaconServer(bcType)
if err != nil {
return BC, DB, err
}
BC.UpdateLatestSlotInBeaconServer(int64(headSlot))
// Add another switch case for bcType if its ever needed.
+ default:
+ log.WithFields(log.Fields{
+ "startUpMode": startUpMode,
+ }).Error("The startUpMode provided is not handled.")
}
+
return BC, DB, err
}
diff --git a/internal/boot/boot_test.go b/internal/boot/boot_test.go
index 44e1b2c..aa77e56 100644
--- a/internal/boot/boot_test.go
+++ b/internal/boot/boot_test.go
@@ -37,44 +37,45 @@ var _ = Describe("Boot", func() {
bcType string = "lighthouse"
bcBootRetryInterval int = 1
bcBootMaxRetry int = 5
+ bcKgTableIncrement int = 10
)
Describe("Booting the application", Label("integration"), func() {
Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() {
It("Should connect successfully", func() {
- _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", true)
+ _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true)
defer db.Close()
Expect(err).ToNot(HaveOccurred())
})
})
Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() {
It("Should connect successfully", func() {
- _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "historic", true)
+ _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true)
defer db.Close()
Expect(err).ToNot(HaveOccurred())
})
})
Context("When the DB and BC are both up and running, and we check for a synced head", func() {
It("Should not connect successfully", func() {
- _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", false)
+ _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false)
defer db.Close()
Expect(err).To(HaveOccurred())
})
})
Context("When the DB is running but not the BC", func() {
It("Should not connect successfully", func() {
- _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true)
+ _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true)
Expect(err).To(HaveOccurred())
})
})
Context("When the BC is running but not the DB", func() {
It("Should not connect successfully", func() {
- _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true)
+ _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true)
Expect(err).To(HaveOccurred())
})
})
Context("When neither the BC or DB are running", func() {
It("Should not connect successfully", func() {
- _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true)
+ _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true)
Expect(err).To(HaveOccurred())
})
})
diff --git a/internal/shutdown/shutdown_test.go b/internal/shutdown/shutdown_test.go
index 705b9df..0bbf38d 100644
--- a/internal/shutdown/shutdown_test.go
+++ b/internal/shutdown/shutdown_test.go
@@ -49,6 +49,7 @@ var _ = Describe("Shutdown", func() {
bcType string = "lighthouse"
bcBootRetryInterval int = 1
bcBootMaxRetry int = 5
+ bcKgTableIncrement int = 10
maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second
DB sql.Database
BC *beaconclient.BeaconClient
@@ -58,7 +59,8 @@ var _ = Describe("Shutdown", func() {
)
BeforeEach(func() {
ctx = context.Background()
- BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", true)
+ BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress,
+ bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true)
notifierCh = make(chan os.Signal, 1)
Expect(err).To(BeNil())
})
diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go
index 357eee3..d277201 100644
--- a/pkg/beaconclient/beaconclient.go
+++ b/pkg/beaconclient/beaconclient.go
@@ -43,12 +43,11 @@ var (
// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying.
type BeaconClient struct {
- Context context.Context // A context generic context with multiple uses.
- ServerEndpoint string // What is the endpoint of the beacon server.
- PerformHistoricalProcessing bool // Should we perform historical processing?
- Db sql.Database // Database object used for reads and writes.
- Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics.
- KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
+ Context context.Context // A context generic context with multiple uses.
+ ServerEndpoint string // What is the endpoint of the beacon server.
+ Db sql.Database // Database object used for reads and writes.
+ Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics.
+ KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
// Used for Head Tracking
@@ -65,7 +64,8 @@ type BeaconClient struct {
// The latest available slot within the Beacon Server. We can't query any slot greater than this.
// This value is lazily updated. Therefore at times it will be outdated.
- LatestSlotInBeaconServer int64
+ LatestSlotInBeaconServer int64
+ PerformHistoricalProcessing bool // Should we perform historical processing?
}
// A struct to keep track of relevant the head event topic.
@@ -84,20 +84,21 @@ type SseError struct {
}
// A Function to create the BeaconClient.
-func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int) *BeaconClient {
+func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int) *BeaconClient {
endpoint := fmt.Sprintf("%s://%s:%d", connectionProtocol, bcAddress, bcPort)
log.Info("Creating the BeaconClient")
return &BeaconClient{
- Context: ctx,
- ServerEndpoint: endpoint,
- HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint),
- ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
+ Context: ctx,
+ ServerEndpoint: endpoint,
+ KnownGapTableIncrement: bcKgTableIncrement,
+ HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint),
+ ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
Metrics: &BeaconClientMetrics{
- HeadTrackingInserts: 0,
- HeadTrackingReorgs: 0,
- HeadTrackingKnownGaps: 0,
- HeadError: 0,
- HeadReorgError: 0,
+ SlotInserts: 0,
+ ReorgInserts: 0,
+ KnownGapsInserts: 0,
+ HeadError: 0,
+ HeadReorgError: 0,
},
//FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
}
diff --git a/pkg/beaconclient/capturehead.go b/pkg/beaconclient/capturehead.go
index df70fd0..328fad9 100644
--- a/pkg/beaconclient/capturehead.go
+++ b/pkg/beaconclient/capturehead.go
@@ -25,12 +25,9 @@ import (
)
// This function will perform all the heavy lifting for tracking the head of the chain.
-func (bc *BeaconClient) CaptureHead(knownGapsTableIncrement int) {
- bc.KnownGapTableIncrement = knownGapsTableIncrement
+func (bc *BeaconClient) CaptureHead() {
log.Info("We are tracking the head of the chain.")
- //bc.tempHelper()
go bc.handleHead()
- //go bc.handleFinalizedCheckpoint()
go bc.handleReorg()
bc.captureEventTopic()
}
diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go
index 34f3bbf..459b2af 100644
--- a/pkg/beaconclient/capturehead_test.go
+++ b/pkg/beaconclient/capturehead_test.go
@@ -44,6 +44,172 @@ import (
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql/postgres"
)
+var (
+ address string = "localhost"
+ port int = 8080
+ protocol string = "http"
+ dbHost string = "localhost"
+ dbPort int = 8076
+ dbName string = "vulcanize_testing"
+ dbUser string = "vdbm"
+ dbPassword string = "password"
+ dbDriver string = "pgx"
+ dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42"
+ knownGapsTableIncrement int = 100000
+ maxRetry int = 120
+
+ TestEvents = map[string]Message{
+ "100-dummy": {
+ HeadMessage: beaconclient.Head{
+ Slot: "100",
+ Block: "04955400371347e26f61d7a4bbda5b23fa0b25d5fc465160f2a92d52a63b919b",
+ State: "36d5c9a129979b4502bd9a06e57a742810ecbc3fa55a0361c0723c92c1782bfa",
+ CurrentDutyDependentRoot: "",
+ PreviousDutyDependentRoot: "",
+ EpochTransition: false,
+ ExecutionOptimistic: false,
+ },
+ TestNotes: "A block that is supposed to replicate slot 100, but contains some dummy test information.",
+ MimicConfig: &MimicConfig{
+ ForkVersion: "phase0",
+ },
+ SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
+ BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
+ },
+ "100-dummy-2": {
+ HeadMessage: beaconclient.Head{
+ Slot: "100",
+ Block: "04955400371347e26f61d7a4bbda5b23fa0b25d5fc465160f2a9aaaaaaaaaaaa",
+ State: "36d5c9a129979b4502bd9a06e57a742810ecbc3fa55a0361c072bbbbbbbbbbbb",
+ CurrentDutyDependentRoot: "",
+ PreviousDutyDependentRoot: "",
+ EpochTransition: false,
+ ExecutionOptimistic: false,
+ },
+ TestNotes: "A block that is supposed to replicate slot 100, but contains some dummy test information.",
+ MimicConfig: &MimicConfig{
+ ForkVersion: "phase0",
+ },
+ SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
+ BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
+ },
+ "102-wrong-ssz-1": {
+ HeadMessage: beaconclient.Head{
+ Slot: "102",
+ Block: "0x46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42",
+ State: "0x9b20b114c613c1aa462e02d590b3da902b0a1377e938ed0f94dd3491d763ef67",
+ CurrentDutyDependentRoot: "",
+ PreviousDutyDependentRoot: "",
+ EpochTransition: false,
+ ExecutionOptimistic: false,
+ },
+ TestNotes: "A bad block that returns the wrong ssz objects, used for testing incorrect SSZ decoding.",
+ BeaconState: filepath.Join("ssz-data", "102", "signed-beacon-block.ssz"),
+ SignedBeaconBlock: filepath.Join("ssz-data", "102", "beacon-state.ssz"),
+ },
+ "100": {
+ HeadMessage: beaconclient.Head{
+ Slot: "100",
+ Block: "0x582187e97f7520bb69eea014c3834c964c45259372a0eaaea3f032013797996b",
+ State: "0xf286a0379c0386a3c7be28d05d829f8eb7b280cc9ede15449af20ebcd06a7a56",
+ CurrentDutyDependentRoot: "",
+ PreviousDutyDependentRoot: "",
+ EpochTransition: false,
+ ExecutionOptimistic: false,
+ },
+ TestNotes: "An easy to process Phase 0 block",
+ SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
+ BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
+ },
+ "101": {
+ HeadMessage: beaconclient.Head{
+ Slot: "101",
+ Block: "0xabe1a972e512182d04f0d4a5c9c25f9ee57c2e9d0ff3f4c4c82fd42d13d31083",
+ State: "0xcb04aa2edbf13c7bb7e7bd9b621ced6832e0075e89147352eac3019a824ce847",
+ CurrentDutyDependentRoot: "",
+ PreviousDutyDependentRoot: "",
+ EpochTransition: false,
+ ExecutionOptimistic: false,
+ },
+ TestNotes: "An easy to process Phase 0 block",
+ SignedBeaconBlock: filepath.Join("ssz-data", "101", "signed-beacon-block.ssz"),
+ BeaconState: filepath.Join("ssz-data", "101", "beacon-state.ssz"),
+ },
+ "2375703-dummy": {
+ HeadMessage: beaconclient.Head{
+ Slot: "2375703",
+ Block: "c9fb337b62e2a0dae4f27ab49913132570f7f2cab3f23ad99f4d07508a8e648e",
+ State: "0299a145bcda2c8f5e7d2e068ee101861edbee2ec1db2d5e1d850b0d265aef5f",
+ CurrentDutyDependentRoot: "",
+ PreviousDutyDependentRoot: "",
+ EpochTransition: false,
+ ExecutionOptimistic: false,
+ },
+ TestNotes: "This is a dummy message that is used for reorgs",
+ MimicConfig: &MimicConfig{
+ ForkVersion: "altair",
+ },
+ SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
+ BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
+ },
+ "2375703-dummy-2": {
+ HeadMessage: beaconclient.Head{
+ Slot: "2375703",
+ Block: "c9fb337b62e2a0dae4f27ab49913132570f7f2cab3f23ad99f4d07508aaaaaaa",
+ State: "0299a145bcda2c8f5e7d2e068ee101861edbee2ec1db2d5e1d850b0d2bbbbbbb",
+ CurrentDutyDependentRoot: "",
+ PreviousDutyDependentRoot: "",
+ EpochTransition: false,
+ ExecutionOptimistic: false,
+ },
+ TestNotes: "This is a dummy message that is used for reorgs",
+ MimicConfig: &MimicConfig{
+ ForkVersion: "altair",
+ },
+ SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
+ BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
+ },
+ "2375703": {
+ HeadMessage: beaconclient.Head{
+ Slot: "2375703",
+ Block: "0x4392372c5f6e39499e31bf924388b5815639103149f0f54f8a453773b1802301",
+ State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e",
+ CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false},
+ TestNotes: "An easy to process Altair Block",
+ SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
+ BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
+ },
+ "3797056": {
+ HeadMessage: beaconclient.Head{
+ Slot: "3797056",
+ Block: "",
+ State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e",
+ CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false},
+ TestNotes: "An easy to process Altair Block",
+ SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
+ BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
+ },
+ }
+ TestConfig = Config{
+ protocol: protocol,
+ address: address,
+ port: port,
+ dummyParentRoot: dummyParentRoot,
+ dbHost: dbHost,
+ dbPort: dbPort,
+ dbName: dbName,
+ dbUser: dbUser,
+ dbPassword: dbPassword,
+ dbDriver: dbDriver,
+ knownGapsTableIncrement: knownGapsTableIncrement,
+ }
+
+ BeaconNodeTester = TestBeaconNode{
+ TestEvents: TestEvents,
+ TestConfig: TestConfig,
+ }
+)
+
type Message struct {
HeadMessage beaconclient.Head // The head messsage that will be streamed to the BeaconClient
TestNotes string // A small explanation of the purpose this structure plays in the testing landscape.
@@ -61,177 +227,6 @@ type MimicConfig struct {
var _ = Describe("Capturehead", func() {
- var (
- TestConfig Config
- BeaconNodeTester TestBeaconNode
- address string = "localhost"
- port int = 8080
- protocol string = "http"
- TestEvents map[string]Message
- dbHost string = "localhost"
- dbPort int = 8076
- dbName string = "vulcanize_testing"
- dbUser string = "vdbm"
- dbPassword string = "password"
- dbDriver string = "pgx"
- dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42"
- knownGapsTableIncrement int = 100000
- maxRetry int = 60
- )
-
- BeforeEach(func() {
- TestEvents = map[string]Message{
- "100-dummy": {
- HeadMessage: beaconclient.Head{
- Slot: "100",
- Block: "04955400371347e26f61d7a4bbda5b23fa0b25d5fc465160f2a92d52a63b919b",
- State: "36d5c9a129979b4502bd9a06e57a742810ecbc3fa55a0361c0723c92c1782bfa",
- CurrentDutyDependentRoot: "",
- PreviousDutyDependentRoot: "",
- EpochTransition: false,
- ExecutionOptimistic: false,
- },
- TestNotes: "A block that is supposed to replicate slot 100, but contains some dummy test information.",
- MimicConfig: &MimicConfig{
- ForkVersion: "phase0",
- },
- SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
- BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
- },
- "100-dummy-2": {
- HeadMessage: beaconclient.Head{
- Slot: "100",
- Block: "04955400371347e26f61d7a4bbda5b23fa0b25d5fc465160f2a9aaaaaaaaaaaa",
- State: "36d5c9a129979b4502bd9a06e57a742810ecbc3fa55a0361c072bbbbbbbbbbbb",
- CurrentDutyDependentRoot: "",
- PreviousDutyDependentRoot: "",
- EpochTransition: false,
- ExecutionOptimistic: false,
- },
- TestNotes: "A block that is supposed to replicate slot 100, but contains some dummy test information.",
- MimicConfig: &MimicConfig{
- ForkVersion: "phase0",
- },
- SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
- BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
- },
- "102-wrong-ssz-1": {
- HeadMessage: beaconclient.Head{
- Slot: "102",
- Block: "0x46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42",
- State: "0x9b20b114c613c1aa462e02d590b3da902b0a1377e938ed0f94dd3491d763ef67",
- CurrentDutyDependentRoot: "",
- PreviousDutyDependentRoot: "",
- EpochTransition: false,
- ExecutionOptimistic: false,
- },
- TestNotes: "A bad block that returns the wrong ssz objects, used for testing incorrect SSZ decoding.",
- BeaconState: filepath.Join("ssz-data", "102", "signed-beacon-block.ssz"),
- SignedBeaconBlock: filepath.Join("ssz-data", "102", "beacon-state.ssz"),
- },
- "100": {
- HeadMessage: beaconclient.Head{
- Slot: "100",
- Block: "0x582187e97f7520bb69eea014c3834c964c45259372a0eaaea3f032013797996b",
- State: "0xf286a0379c0386a3c7be28d05d829f8eb7b280cc9ede15449af20ebcd06a7a56",
- CurrentDutyDependentRoot: "",
- PreviousDutyDependentRoot: "",
- EpochTransition: false,
- ExecutionOptimistic: false,
- },
- TestNotes: "An easy to process Phase 0 block",
- SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
- BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
- },
- "101": {
- HeadMessage: beaconclient.Head{
- Slot: "101",
- Block: "0xabe1a972e512182d04f0d4a5c9c25f9ee57c2e9d0ff3f4c4c82fd42d13d31083",
- State: "0xcb04aa2edbf13c7bb7e7bd9b621ced6832e0075e89147352eac3019a824ce847",
- CurrentDutyDependentRoot: "",
- PreviousDutyDependentRoot: "",
- EpochTransition: false,
- ExecutionOptimistic: false,
- },
- TestNotes: "An easy to process Phase 0 block",
- SignedBeaconBlock: filepath.Join("ssz-data", "101", "signed-beacon-block.ssz"),
- BeaconState: filepath.Join("ssz-data", "101", "beacon-state.ssz"),
- },
- "2375703-dummy": {
- HeadMessage: beaconclient.Head{
- Slot: "2375703",
- Block: "c9fb337b62e2a0dae4f27ab49913132570f7f2cab3f23ad99f4d07508a8e648e",
- State: "0299a145bcda2c8f5e7d2e068ee101861edbee2ec1db2d5e1d850b0d265aef5f",
- CurrentDutyDependentRoot: "",
- PreviousDutyDependentRoot: "",
- EpochTransition: false,
- ExecutionOptimistic: false,
- },
- TestNotes: "This is a dummy message that is used for reorgs",
- MimicConfig: &MimicConfig{
- ForkVersion: "altair",
- },
- SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
- BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
- },
- "2375703-dummy-2": {
- HeadMessage: beaconclient.Head{
- Slot: "2375703",
- Block: "c9fb337b62e2a0dae4f27ab49913132570f7f2cab3f23ad99f4d07508aaaaaaa",
- State: "0299a145bcda2c8f5e7d2e068ee101861edbee2ec1db2d5e1d850b0d2bbbbbbb",
- CurrentDutyDependentRoot: "",
- PreviousDutyDependentRoot: "",
- EpochTransition: false,
- ExecutionOptimistic: false,
- },
- TestNotes: "This is a dummy message that is used for reorgs",
- MimicConfig: &MimicConfig{
- ForkVersion: "altair",
- },
- SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
- BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
- },
- "2375703": {
- HeadMessage: beaconclient.Head{
- Slot: "2375703",
- Block: "0x4392372c5f6e39499e31bf924388b5815639103149f0f54f8a453773b1802301",
- State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e",
- CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false},
- TestNotes: "An easy to process Altair Block",
- SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
- BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
- },
- "3797056": {
- HeadMessage: beaconclient.Head{
- Slot: "3797056",
- Block: "",
- State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e",
- CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false},
- TestNotes: "An easy to process Altair Block",
- SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
- BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
- },
- }
- TestConfig = Config{
- protocol: protocol,
- address: address,
- port: port,
- dummyParentRoot: dummyParentRoot,
- dbHost: dbHost,
- dbPort: dbPort,
- dbName: dbName,
- dbUser: dbUser,
- dbPassword: dbPassword,
- dbDriver: dbDriver,
- knownGapsTableIncrement: knownGapsTableIncrement,
- }
-
- BeaconNodeTester = TestBeaconNode{
- TestEvents: TestEvents,
- TestConfig: TestConfig,
- }
- })
-
Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() {
Context("Correctly formatted Phase0 Block", func() {
It("Should turn it into a struct successfully.", func() {
@@ -315,7 +310,7 @@ var _ = Describe("Capturehead", func() {
// })
//})
- Context("When the proper SSZ objects are not served", Label("now"), func() {
+ Context("When the proper SSZ objects are not served", func() {
It("Should return an error, and add the slot to the knownGaps table.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "101")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
@@ -403,7 +398,7 @@ var _ = Describe("Capturehead", func() {
BeaconNodeTester.testMultipleReorgs(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry)
})
})
- Context("Altair: Multiple reorgs have occurred on this slot", Label("new"), func() {
+ Context("Altair: Multiple reorgs have occurred on this slot", func() {
It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
@@ -435,7 +430,7 @@ type Config struct {
// Must run before each test. We can't use the beforeEach because of the way
// Gingko treats race conditions.
func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient {
- bc := *beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port)
+ bc := *beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement)
db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver)
Expect(err).ToNot(HaveOccurred())
@@ -491,7 +486,7 @@ func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxR
data, err := json.Marshal(head)
Expect(err).ToNot(HaveOccurred())
- startInserts := atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts)
+ startInserts := atomic.LoadUint64(&bc.Metrics.SlotInserts)
bc.HeadTracking.MessagesCh <- &sse.Event{
ID: []byte{},
Data: data,
@@ -499,13 +494,14 @@ func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxR
Retry: []byte{},
}
curRetry := 0
- for atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts) != startInserts+expectedSuccessfulInserts {
+ for atomic.LoadUint64(&bc.Metrics.SlotInserts) != startInserts+expectedSuccessfulInserts {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
log.WithFields(log.Fields{
- "startInsert": startInserts,
- "currentValue": atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts),
+ "startInsert": startInserts,
+ "expectedSuccessfulInserts": expectedSuccessfulInserts,
+ "currentValue": atomic.LoadUint64(&bc.Metrics.SlotInserts),
}).Error("HeadTracking Insert wasn't incremented properly.")
Fail("Too many retries have occurred.")
}
@@ -517,9 +513,10 @@ func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot strin
sqlStatement := `SELECT epoch, slot, block_root, state_root, status FROM ethcl.slots WHERE slot=$1 AND block_root=$2;`
var epoch, slot int
var blockRoot, stateRoot, status string
- row := db.QueryRow(context.Background(), sqlStatement, querySlot, queryBlockRoot)
- err := row.Scan(&epoch, &slot, &blockRoot, &stateRoot, &status)
+ log.Debug("Starting to query the ethcl.slots table, ", querySlot, " ", queryBlockRoot)
+ err := db.QueryRow(context.Background(), sqlStatement, querySlot, queryBlockRoot).Scan(&epoch, &slot, &blockRoot, &stateRoot, &status)
Expect(err).ToNot(HaveOccurred())
+ log.Debug("Querying the ethcl.slots table complete")
return epoch, slot, blockRoot, stateRoot, status
}
@@ -567,7 +564,7 @@ func queryKnownGaps(db sql.Database, queryStartGap string, QueryEndGap string) (
// A function that will remove all entries from the ethcl tables for you.
func clearEthclDbTables(db sql.Database) {
- deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.known_gaps;"}
+ deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.known_gaps;", "DELETE FROM ethcl.historic_process;"}
for _, queries := range deleteQueries {
_, err := db.Exec(context.Background(), queries)
Expect(err).ToNot(HaveOccurred())
@@ -670,6 +667,33 @@ func (tbc TestBeaconNode) SetupBeaconNodeMock(TestEvents map[string]Message, pro
return httpmock.NewBytesResponse(200, dat), nil
},
)
+ // Not needed but could be useful to have.
+ blockRootUrl := `=~^` + protocol + "://" + address + ":" + strconv.Itoa(port) + "/eth/v1/beacon/blocks/" + `([^/]+)` + "/root"
+ httpmock.RegisterResponder("GET", blockRootUrl,
+ func(req *http.Request) (*http.Response, error) {
+ // Get ID from request
+ slot := httpmock.MustGetSubmatch(req, 1)
+ dat, err := tbc.provideBlockRoot(slot)
+ if err != nil {
+ Expect(err).NotTo(HaveOccurred())
+ return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find block root for %s", slot)), err
+ }
+ return httpmock.NewBytesResponse(200, dat), nil
+ },
+ )
+}
+
+// Provide the Block root
+func (tbc TestBeaconNode) provideBlockRoot(slot string) ([]byte, error) {
+
+ for _, val := range tbc.TestEvents {
+ if val.HeadMessage.Slot == slot && val.MimicConfig == nil {
+ block, err := hex.DecodeString(val.HeadMessage.Block[2:])
+ Expect(err).ToNot(HaveOccurred())
+ return block, nil
+ }
+ }
+ return nil, fmt.Errorf("Unable to find the Blockroot in test object.")
}
// A function to mimic querying the state from the beacon node. We simply get the SSZ file are return it.
@@ -679,12 +703,12 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string
for _, val := range tbc.TestEvents {
if sszIdentifier == "state" {
- if val.HeadMessage.Slot == slotIdentifier || val.HeadMessage.State == slotIdentifier {
+ if (val.HeadMessage.Slot == slotIdentifier && val.MimicConfig == nil) || val.HeadMessage.State == slotIdentifier {
slotFile = val.BeaconState
Message = val
}
} else if sszIdentifier == "block" {
- if val.HeadMessage.Slot == slotIdentifier || val.HeadMessage.Block == slotIdentifier {
+ if (val.HeadMessage.Slot == slotIdentifier && val.MimicConfig == nil) || val.HeadMessage.Block == slotIdentifier {
slotFile = val.SignedBeaconBlock
Message = val
}
@@ -770,16 +794,16 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string
// Helper function to test three reorg messages. There are going to be many functions like this,
// Because we need to test the same logic for multiple phases.
func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) {
- go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
+ go bc.CaptureHead()
time.Sleep(1 * time.Second)
- log.Info("Sending Phase0 Messages to BeaconClient")
+ log.Info("Sending Messages to BeaconClient")
sendHeadMessage(bc, firstHead, maxRetry, 1)
sendHeadMessage(bc, secondHead, maxRetry, 1)
sendHeadMessage(bc, thirdHead, maxRetry, 1)
curRetry := 0
- for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 2 {
+ for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 2 {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
@@ -810,7 +834,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
}
curRetry = 0
- for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 3 {
+ for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 3 {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
@@ -818,7 +842,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
}
}
- if bc.Metrics.HeadTrackingKnownGaps != 0 {
+ if bc.Metrics.KnownGapsInserts != 0 {
Fail("We found gaps when processing a single block")
}
@@ -832,25 +856,25 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
// A test to validate a single block was processed correctly
func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) {
- go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
+ go bc.CaptureHead()
time.Sleep(1 * time.Second)
sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert)
curRetry := 0
- for atomic.LoadUint64(&bc.Metrics.HeadTrackingKnownGaps) != expectedKnownGaps {
+ for atomic.LoadUint64(&bc.Metrics.KnownGapsInserts) != expectedKnownGaps {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
- Fail(fmt.Sprintf("Wrong gap metrics, got: %d, wanted %d", bc.Metrics.HeadTrackingKnownGaps, expectedKnownGaps))
+ Fail(fmt.Sprintf("Wrong gap metrics, got: %d, wanted %d", bc.Metrics.KnownGapsInserts, expectedKnownGaps))
}
}
curRetry = 0
- for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != expectedReorgs {
+ for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != expectedReorgs {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
- Fail(fmt.Sprintf("Wrong reorg metrics, got: %d, wanted %d", bc.Metrics.HeadTrackingKnownGaps, expectedKnownGaps))
+ Fail(fmt.Sprintf("Wrong reorg metrics, got: %d, wanted %d", bc.Metrics.KnownGapsInserts, expectedKnownGaps))
}
}
@@ -862,14 +886,14 @@ func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head b
// A test that ensures that if two HeadMessages occur for a single slot they are marked
// as proposed and forked correctly.
func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) {
- go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
+ go bc.CaptureHead()
time.Sleep(1 * time.Second)
sendHeadMessage(bc, firstHead, maxRetry, 1)
sendHeadMessage(bc, secondHead, maxRetry, 1)
curRetry := 0
- for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 1 {
+ for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 1 {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
@@ -877,7 +901,7 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH
}
}
- if bc.Metrics.HeadTrackingKnownGaps != 0 {
+ if bc.Metrics.KnownGapsInserts != 0 {
Fail("We found gaps when processing a single block")
}
@@ -889,7 +913,8 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH
// A test that ensures that if two HeadMessages occur for a single slot they are marked
// as proposed and forked correctly.
func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) {
- go bc.CaptureHead(tableIncrement)
+ bc.KnownGapTableIncrement = tableIncrement
+ go bc.CaptureHead()
time.Sleep(1 * time.Second)
for _, headMsg := range msg {
@@ -897,7 +922,7 @@ func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, t
}
curRetry := 0
- for atomic.LoadUint64(&bc.Metrics.HeadTrackingKnownGaps) != expectedEntries {
+ for atomic.LoadUint64(&bc.Metrics.KnownGapsInserts) != expectedEntries {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
@@ -909,7 +934,7 @@ func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, t
knownGapCount := countKnownGapsTable(bc.Db)
Expect(knownGapCount).To(Equal(int(expectedEntries)))
- if atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 0 {
+ if atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 0 {
Fail("We found reorgs when we didn't expect it")
}
}
diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go
new file mode 100644
index 0000000..3c4d29b
--- /dev/null
+++ b/pkg/beaconclient/capturehistoric.go
@@ -0,0 +1,136 @@
+// VulcanizeDB
+// Copyright © 2022 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+// This file will call all the functions to start and stop capturing the head of the beacon chain.
+
+package beaconclient
+
+import (
+ "fmt"
+
+ log "github.com/sirupsen/logrus"
+ "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
+ "golang.org/x/sync/errgroup"
+)
+
+// This function will perform all the heavy lifting for tracking the head of the chain.
+func (bc *BeaconClient) CaptureHistoric(maxWorkers int) []error {
+ log.Info("We are starting the historical processing service.")
+ hp := historicProcessing{db: bc.Db, metrics: bc.Metrics}
+ errs := handleBatchProcess(maxWorkers, hp, hp.db, bc.ServerEndpoint, bc.Metrics)
+ log.Debug("Exiting Historical")
+ return errs
+}
+
+// An interface to enforce any batch processing. Currently there are two use cases for this.
+//
+// 1. Historic Processing
+//
+// 2. Known Gaps Processing
+type BatchProcessing interface {
+ getSlotRange(chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write.
+ handleProcessingErrors(<-chan batchHistoricError)
+ removeTableEntry(<-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database.
+}
+
+// A struct to pass around indicating a table entry for slots to process.
+type slotsToProcess struct {
+ startSlot int // The start slot
+ endSlot int // The end slot
+}
+
+type batchHistoricError struct {
+ err error // The error that occurred when attempting to a slot
+ errProcess string // The process that caused the error.
+ slot int // The slot which the error is for.
+}
+
+// Wrapper function for the BatchProcessing interface.
+// This function will take the structure that needs batch processing.
+// It follows a generic format.
+// Get new entries from any given table.
+// 1. Add it to the slotsCh.
+//
+// 2. Run the maximum specified workers to handle individual slots. We need a maximum because we don't want
+// To store too many SSZ objects in memory.
+//
+// 3. Process the slots and send the err to the ErrCh. Each structure can define how it wants its own errors handled.
+//
+// 4. Remove the slot entry from the DB.
+//
+// 5. Handle any errors.
+func handleBatchProcess(maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics) []error {
+ slotsCh := make(chan slotsToProcess)
+ workCh := make(chan int)
+ processedCh := make(chan slotsToProcess)
+ errCh := make(chan batchHistoricError)
+ finishCh := make(chan []error, 1)
+
+ // Start workers
+ for w := 1; w <= maxWorkers; w++ {
+ log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers")
+ go processSlotRangeWorker(workCh, errCh, db, serverEndpoint, metrics)
+ }
+
+ // Process all ranges and send each individual slot to the worker.
+ go func() {
+ for slots := range slotsCh {
+ if slots.startSlot > slots.endSlot {
+ log.Error("We received a batch process request where the startSlot is greater than the end slot.")
+ errCh <- batchHistoricError{
+ err: fmt.Errorf("We received a startSlot where the start was greater than the end."),
+ errProcess: "RangeOrder",
+ slot: slots.startSlot,
+ }
+ errCh <- batchHistoricError{
+ err: fmt.Errorf("We received a endSlot where the start was greater than the end."),
+ errProcess: "RangeOrder",
+ slot: slots.endSlot,
+ }
+ } else {
+ for i := slots.startSlot; i <= slots.endSlot; i++ {
+ workCh <- i
+ }
+ processedCh <- slots
+ }
+ }
+ }()
+
+ // Remove entries, end the application if a row cannot be removed..
+ go func() {
+ errG := new(errgroup.Group)
+ errG.Go(func() error {
+ return bp.removeTableEntry(processedCh)
+ })
+ if err := errG.Wait(); err != nil {
+ finishCh <- []error{err}
+ }
+ }()
+ // Process errors from slot processing.
+ go bp.handleProcessingErrors(errCh)
+
+ // Get slots from the DB.
+ go func() {
+ errs := bp.getSlotRange(slotsCh) // Periodically adds new entries....
+ if errs != nil {
+ finishCh <- errs
+ }
+ finishCh <- nil
+ }()
+
+ errs := <-finishCh
+ log.Debug("Finishing the batchProcess")
+ return errs
+}
diff --git a/pkg/beaconclient/capturehistoric_test.go b/pkg/beaconclient/capturehistoric_test.go
new file mode 100644
index 0000000..0a288c3
--- /dev/null
+++ b/pkg/beaconclient/capturehistoric_test.go
@@ -0,0 +1,70 @@
+package beaconclient_test
+
+import (
+ "context"
+ "fmt"
+ "sync/atomic"
+ "time"
+
+ "github.com/jarcoal/httpmock"
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ log "github.com/sirupsen/logrus"
+ "github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
+)
+
+var _ = Describe("Capturehistoric", func() {
+
+ Describe("Run the application in historic mode", Label("unit", "behavioral"), func() {
+ Context("Phase0: When we need to process a single block in the ethcl.historic_process table.", Label("now"), func() {
+ It("Successfully Process the Block", func() {
+ bc := setUpTest(BeaconNodeTester.TestConfig, "99")
+ BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
+ defer httpmock.DeactivateAndReset()
+ BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 101, 10)
+ BeaconNodeTester.runBatchProcess(bc, 2, 100, 101, 0, 0)
+
+ time.Sleep(2 * time.Second)
+ validateSlot(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, "proposed")
+ validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE")
+ validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW")
+
+ //validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE")
+ //validateBeaconState(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW")
+
+ })
+ })
+ })
+})
+
+// This function will write an even to the ethcl.historic_process table
+func (tbc TestBeaconNode) writeEventToHistoricProcess(bc *beaconclient.BeaconClient, startSlot, endSlot, priority int) {
+ log.Debug("We are writing the necessary events to batch process")
+ insertHistoricProcessingStmt := `INSERT INTO ethcl.historic_process (start_slot, end_slot, priority)
+ VALUES ($1, $2, $3);`
+ res, err := bc.Db.Exec(context.Background(), insertHistoricProcessingStmt, startSlot, endSlot, priority)
+ Expect(err)
+ rows, err := res.RowsAffected()
+ if rows != 1 {
+ Fail("We didnt write...")
+ }
+ Expect(err)
+}
+
+// Start the batch processing function, and check for the correct inserted slots.
+func (tbc TestBeaconNode) runBatchProcess(bc *beaconclient.BeaconClient, maxWorkers int, startSlot uint64, endSlot uint64, expectedReorgs uint64, expectedKnownGaps uint64) {
+ go bc.CaptureHistoric(maxWorkers)
+ diff := endSlot - startSlot + 1
+
+ curRetry := 0
+ for atomic.LoadUint64(&bc.Metrics.SlotInserts) != diff {
+ time.Sleep(1 * time.Second)
+ curRetry = curRetry + 1
+ if curRetry == maxRetry {
+ Fail(fmt.Sprintf("Too many retries have occurred. The number of inserts expects %d, the number that actually occurred, %d", atomic.LoadUint64(&bc.Metrics.SlotInserts), diff))
+ }
+ }
+
+ Expect(atomic.LoadUint64(&bc.Metrics.KnownGapsInserts)).To(Equal(expectedKnownGaps))
+ Expect(atomic.LoadUint64(&bc.Metrics.ReorgInserts)).To(Equal(expectedKnownGaps))
+}
diff --git a/pkg/beaconclient/databasewrite.go b/pkg/beaconclient/databasewrite.go
index 84d160e..fd0ac6c 100644
--- a/pkg/beaconclient/databasewrite.go
+++ b/pkg/beaconclient/databasewrite.go
@@ -23,6 +23,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
+ "golang.org/x/sync/errgroup"
)
var (
@@ -53,10 +54,19 @@ VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
CheckProposedStmt string = `SELECT slot, block_root
FROM ethcl.slots
WHERE slot=$1 AND block_root=$2;`
+ // Used to get a single slot from the table if it exists
+ QueryBySlotStmt string = `SELECT slot
+ FROM ethcl.slots
+ WHERE slot=$1`
// Statement to insert known_gaps. We don't pass in timestamp, we let the server take care of that one.
UpsertKnownGapsStmt string = `
INSERT INTO ethcl.known_gaps (start_slot, end_slot, checked_out, reprocessing_error, entry_error, entry_process)
VALUES ($1, $2, $3, $4, $5, $6) on CONFLICT (start_slot, end_slot) DO NOTHING`
+ UpsertKnownGapsErrorStmt string = `
+ UPDATE ethcl.known_gaps
+ SET reprocessing_error=$3
+ WHERE start_slot=$1 AND end_slot=$2;`
+ // Get the highest slot if one exists
QueryHighestSlotStmt string = "SELECT COALESCE(MAX(slot), 0) FROM ethcl.slots"
)
@@ -148,19 +158,24 @@ func (dw *DatabaseWriter) writeFullSlot() error {
}).Debug("Starting to write to the DB.")
err := dw.writeSlots()
if err != nil {
+ loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl.slots table...")
return err
}
+ log.Debug("We finished writing to the ethcl.slots table.")
if dw.DbSlots.Status != "skipped" {
- err = dw.writeSignedBeaconBlocks()
- if err != nil {
- return err
- }
- err = dw.writeBeaconState()
- if err != nil {
+ errG, _ := errgroup.WithContext(context.Background())
+ errG.Go(func() error {
+ return dw.writeSignedBeaconBlocks()
+ })
+ errG.Go(func() error {
+ return dw.writeBeaconState()
+ })
+ if err := errG.Wait(); err != nil {
+ loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl block or state table...")
return err
}
}
- dw.Metrics.IncrementHeadTrackingInserts(1)
+ dw.Metrics.IncrementSlotInserts(1)
return nil
}
@@ -291,7 +306,7 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *
}
}
- metrics.IncrementHeadTrackingReorgs(1)
+ metrics.IncrementReorgsInsert(1)
}
// Update the slots table by marking the old slot's as forked.
@@ -383,7 +398,7 @@ func upsertKnownGaps(db sql.Database, knModel DbKnownGaps, metric *BeaconClientM
"startSlot": knModel.StartSlot,
"endSlot": knModel.EndSlot,
}).Warn("A new gap has been added to the ethcl.known_gaps table.")
- metric.IncrementHeadTrackingKnownGaps(1)
+ metric.IncrementKnownGapsInserts(1)
}
// A function to write the gap between the highest slot in the DB and the first processed slot.
@@ -404,8 +419,50 @@ func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot int, metric
}
}
+// A function to update a knownGap range with a reprocessing error.
+func updateKnownGapErrors(db sql.Database, startSlot int, endSlot int, reprocessingErr error, metric *BeaconClientMetrics) error {
+ res, err := db.Exec(context.Background(), UpsertKnownGapsErrorStmt, startSlot, endSlot, reprocessingErr.Error())
+ if err != nil {
+ loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Error("Unable to update reprocessing_error")
+ metric.IncrementKnownGapsProcessingError(1)
+ return err
+ }
+ row, err := res.RowsAffected()
+ if err != nil {
+ loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Error("Unable to count rows affected when trying to update reprocessing_error.")
+ metric.IncrementKnownGapsProcessingError(1)
+ return err
+ }
+ if row != 1 {
+ loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).WithFields(log.Fields{
+ "rowCount": row,
+ }).Error("The rows affected by the upsert for reprocessing_error is not 1.")
+ metric.IncrementKnownGapsProcessingError(1)
+ return err
+ }
+ metric.IncrementKnownGapsProcessed(1)
+ return nil
+}
+
// A quick helper function to calculate the epoch.
func calculateEpoch(slot int, slotPerEpoch int) string {
epoch := slot / slotPerEpoch
return strconv.Itoa(epoch)
}
+
+// A helper function to check to see if the slot is processed.
+func isSlotProcessed(db sql.Database, checkProcessStmt string, slot string) (bool, error) {
+ processRow, err := db.Exec(context.Background(), checkProcessStmt, slot)
+ if err != nil {
+ return false, err
+ }
+ row, err := processRow.RowsAffected()
+ if err != nil {
+ return false, err
+ }
+
+ if row > 0 {
+ return true, nil
+ }
+ return false, nil
+}
diff --git a/pkg/beaconclient/healthcheck_test.go b/pkg/beaconclient/healthcheck_test.go
index dd60bef..06ba2dd 100644
--- a/pkg/beaconclient/healthcheck_test.go
+++ b/pkg/beaconclient/healthcheck_test.go
@@ -25,8 +25,8 @@ import (
var _ = Describe("Healthcheck", func() {
var (
- BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052)
- errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010)
+ BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10)
+ errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10)
)
Describe("Connecting to the lighthouse client", Label("integration"), func() {
Context("When the client is running", func() {
diff --git a/pkg/beaconclient/incomingsse.go b/pkg/beaconclient/incomingsse.go
index 42fee02..98e44b6 100644
--- a/pkg/beaconclient/incomingsse.go
+++ b/pkg/beaconclient/incomingsse.go
@@ -94,5 +94,5 @@ func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan
func (bc *BeaconClient) captureEventTopic() {
log.Info("We are capturing all SSE events")
go handleIncomingSseEvent(bc.HeadTracking, bc.Metrics.IncrementHeadError)
- go handleIncomingSseEvent(bc.ReOrgTracking, bc.Metrics.IncrementHeadReorgError)
+ go handleIncomingSseEvent(bc.ReOrgTracking, bc.Metrics.IncrementReorgError)
}
diff --git a/pkg/beaconclient/metrics.go b/pkg/beaconclient/metrics.go
index ecf8da5..3304ea5 100644
--- a/pkg/beaconclient/metrics.go
+++ b/pkg/beaconclient/metrics.go
@@ -21,29 +21,43 @@ import (
// A structure utilized for keeping track of various metrics. Currently, mostly used in testing.
type BeaconClientMetrics struct {
- HeadTrackingInserts uint64 // Number of head events we successfully wrote to the DB.
- HeadTrackingReorgs uint64 // Number of reorg events we successfully wrote to the DB.
- HeadTrackingKnownGaps uint64 // Number of known_gaps we successfully wrote to the DB.
- HeadError uint64 // Number of errors that occurred when decoding the head message.
- HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message.
+ SlotInserts uint64 // Number of head events we successfully wrote to the DB.
+ ReorgInserts uint64 // Number of reorg events we successfully wrote to the DB.
+ KnownGapsInserts uint64 // Number of known_gaps we successfully wrote to the DB.
+ knownGapsProcessed uint64 // Number of knownGaps processed.
+ KnownGapsProcessingError uint64 // Number of errors that occurred while processing a knownGap
+ HeadError uint64 // Number of errors that occurred when decoding the head message.
+ HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message.
}
// Wrapper function to increment inserts. If we want to use mutexes later we can easily update all
// occurrences here.
-func (m *BeaconClientMetrics) IncrementHeadTrackingInserts(inc uint64) {
- atomic.AddUint64(&m.HeadTrackingInserts, inc)
+func (m *BeaconClientMetrics) IncrementSlotInserts(inc uint64) {
+ atomic.AddUint64(&m.SlotInserts, inc)
}
// Wrapper function to increment reorgs. If we want to use mutexes later we can easily update all
// occurrences here.
-func (m *BeaconClientMetrics) IncrementHeadTrackingReorgs(inc uint64) {
- atomic.AddUint64(&m.HeadTrackingReorgs, inc)
+func (m *BeaconClientMetrics) IncrementReorgsInsert(inc uint64) {
+ atomic.AddUint64(&m.ReorgInserts, inc)
}
// Wrapper function to increment known gaps. If we want to use mutexes later we can easily update all
// occurrences here.
-func (m *BeaconClientMetrics) IncrementHeadTrackingKnownGaps(inc uint64) {
- atomic.AddUint64(&m.HeadTrackingKnownGaps, inc)
+func (m *BeaconClientMetrics) IncrementKnownGapsInserts(inc uint64) {
+ atomic.AddUint64(&m.KnownGapsInserts, inc)
+}
+
+// Wrapper function to increment known gaps processed. If we want to use mutexes later we can easily update all
+// occurrences here.
+func (m *BeaconClientMetrics) IncrementKnownGapsProcessed(inc uint64) {
+ atomic.AddUint64(&m.knownGapsProcessed, inc)
+}
+
+// Wrapper function to increment known gaps processing error. If we want to use mutexes later we can easily update all
+// occurrences here.
+func (m *BeaconClientMetrics) IncrementKnownGapsProcessingError(inc uint64) {
+ atomic.AddUint64(&m.KnownGapsProcessingError, inc)
}
// Wrapper function to increment head errors. If we want to use mutexes later we can easily update all
@@ -54,6 +68,6 @@ func (m *BeaconClientMetrics) IncrementHeadError(inc uint64) {
// Wrapper function to increment reorg errors. If we want to use mutexes later we can easily update all
// occurrences here.
-func (m *BeaconClientMetrics) IncrementHeadReorgError(inc uint64) {
+func (m *BeaconClientMetrics) IncrementReorgError(inc uint64) {
atomic.AddUint64(&m.HeadReorgError, inc)
}
diff --git a/pkg/beaconclient/processevents.go b/pkg/beaconclient/processevents.go
index 15e4e16..7610191 100644
--- a/pkg/beaconclient/processevents.go
+++ b/pkg/beaconclient/processevents.go
@@ -23,9 +23,6 @@ import (
"strconv"
log "github.com/sirupsen/logrus"
- "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
- "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
- "golang.org/x/sync/errgroup"
)
// This function will perform the necessary steps to handle a reorg.
@@ -63,19 +60,7 @@ func (bc *BeaconClient) handleHead() {
log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.")
- go func(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) {
- errG := new(errgroup.Group)
- errG.Go(func() error {
- err = processHeadSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, metrics, knownGapsTableIncrement)
- if err != nil {
- return err
- }
- return nil
- })
- if err := errG.Wait(); err != nil {
- loghelper.LogSlotError(strconv.Itoa(slot), err).Error("Unable to process a slot")
- }
- }(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement)
+ go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement)
log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.")
@@ -83,5 +68,4 @@ func (bc *BeaconClient) handleHead() {
bc.PreviousSlot = slot
bc.PreviousBlockRoot = head.Block
}
-
}
diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go
new file mode 100644
index 0000000..2ba3bd4
--- /dev/null
+++ b/pkg/beaconclient/processhistoric.go
@@ -0,0 +1,223 @@
+// VulcanizeDB
+// Copyright © 2022 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+// This file contains all the code to process historic slots.
+
+package beaconclient
+
+import (
+ "context"
+ "fmt"
+ "strconv"
+ "time"
+
+ "github.com/jackc/pgx/v4"
+ log "github.com/sirupsen/logrus"
+ "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
+ "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
+)
+
+var (
+ // Get a single highest priority and non-checked out row row from ethcl.historical_process
+ getHpEntryStmt string = `SELECT start_slot, end_slot FROM ethcl.historic_process
+ WHERE checked_out=false
+ ORDER BY priority ASC
+ LIMIT 1;`
+ // Used to periodically check to see if there is a new entry in the ethcl.historic_process table.
+ checkHpEntryStmt string = `SELECT * FROM ethcl.historic_process WHERE checked_out=false;`
+ // Used to checkout a row from the ethcl.historic_process table
+ lockHpEntryStmt string = `UPDATE ethcl.historic_process
+ SET checked_out=true
+ WHERE start_slot=$1 AND end_slot=$2;`
+ // Used to delete an entry from the knownGaps table
+ deleteHpEntryStmt string = `DELETE FROM ethcl.historic_process
+ WHERE start_slot=$1 AND end_slot=$2;`
+)
+
+type historicProcessing struct {
+ db sql.Database
+ metrics *BeaconClientMetrics
+}
+
+// Get a single row of historical slots from the table.
+func (hp historicProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error {
+ return getBatchProcessRow(hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh)
+}
+
+// Remove the table entry.
+func (hp historicProcessing) removeTableEntry(processCh <-chan slotsToProcess) error {
+ return removeRowPostProcess(hp.db, processCh, QueryBySlotStmt, deleteHpEntryStmt)
+}
+
+// Remove the table entry.
+func (hp historicProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) {
+ for {
+ errMs := <-errMessages
+ loghelper.LogSlotError(strconv.Itoa(errMs.slot), errMs.err)
+ writeKnownGaps(hp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, hp.metrics)
+ }
+}
+
+// Process the slot range.
+func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics) {
+ for slot := range workCh {
+ log.Debug("Handling slot: ", slot)
+ err, errProcess := handleHistoricSlot(db, serverAddress, slot, metrics)
+ errMs := batchHistoricError{
+ err: err,
+ errProcess: errProcess,
+ slot: slot,
+ }
+ if err != nil {
+ errCh <- errMs
+ }
+ }
+}
+
+// A wrapper function that insert the start_slot and end_slot from a single row into a channel.
+// It also locks the row by updating the checked_out column.
+// The statement for getting the start_slot and end_slot must be provided.
+// The statement for "locking" the row must also be provided.
+func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess) []error {
+ errCount := make([]error, 0)
+
+ // 5 is an arbitrary number. It allows us to retry a few times before
+ // ending the application.
+ prevErrCount := 0
+ for len(errCount) < 5 {
+ if len(errCount) != prevErrCount {
+ log.WithFields(log.Fields{
+ "errCount": errCount,
+ }).Error("New error entry added")
+ }
+ processRow, err := db.Exec(context.Background(), checkNewRowsStmt)
+ if err != nil {
+ errCount = append(errCount, err)
+ }
+ row, err := processRow.RowsAffected()
+ if err != nil {
+ errCount = append(errCount, err)
+ }
+ if row < 1 {
+ time.Sleep(1000 * time.Millisecond)
+ log.Debug("We are checking rows, be patient")
+ continue
+ }
+ log.Debug("We found a new row")
+ ctx := context.Background()
+
+ // Setup TX
+ tx, err := db.Begin(ctx)
+ if err != nil {
+ loghelper.LogError(err).Error("We are unable to Begin a SQL transaction")
+ errCount = append(errCount, err)
+ continue
+ }
+ defer func() {
+ err := tx.Rollback(ctx)
+ if err != nil {
+ loghelper.LogError(err).Error("We were unable to Rollback a transaction")
+ errCount = append(errCount, err)
+ }
+ }()
+
+ // Query the DB for slots.
+ sp := slotsToProcess{}
+ err = tx.QueryRow(ctx, getStartEndSlotStmt).Scan(&sp.startSlot, &sp.endSlot)
+ if err != nil {
+ if err == pgx.ErrNoRows {
+ time.Sleep(100 * time.Millisecond)
+ continue
+ }
+ loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), getStartEndSlotStmt, err).Error("Unable to get a row")
+ errCount = append(errCount, err)
+ continue
+ }
+
+ // Checkout the Row
+ res, err := tx.Exec(ctx, checkOutRowStmt, sp.startSlot, sp.endSlot)
+ if err != nil {
+ loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).Error("Unable to checkout the row")
+ errCount = append(errCount, err)
+ continue
+ }
+ rows, err := res.RowsAffected()
+ if err != nil {
+ loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, fmt.Errorf("Unable to determine the rows affected when trying to checkout a row."))
+ errCount = append(errCount, err)
+ continue
+ }
+ if rows > 1 {
+ loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).WithFields(log.Fields{
+ "rowsReturn": rows,
+ }).Error("We locked too many rows.....")
+ errCount = append(errCount, err)
+ continue
+ }
+ if rows == 0 {
+ loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).WithFields(log.Fields{
+ "rowsReturn": rows,
+ }).Error("We did not lock a single row.")
+ errCount = append(errCount, err)
+ continue
+ }
+ err = tx.Commit(ctx)
+ if err != nil {
+ loghelper.LogSlotRangeError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), err).Error("Unable commit transactions.")
+ errCount = append(errCount, err)
+ continue
+ }
+ slotCh <- sp
+ }
+ log.WithFields(log.Fields{
+ "ErrCount": errCount,
+ }).Error("The ErrCounter")
+ return errCount
+}
+
+// After a row has been processed it should be removed from its appropriate table.
+func removeRowPostProcess(db sql.Database, processCh <-chan slotsToProcess, checkProcessedStmt, removeStmt string) error {
+ errCh := make(chan error)
+ for {
+ slots := <-processCh
+ // Make sure the start and end slot exist in the slots table.
+ go func() {
+ finishedProcess := false
+ for !finishedProcess {
+ isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot))
+ if err != nil {
+ errCh <- err
+ }
+ isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.endSlot))
+ if err != nil {
+ errCh <- err
+ }
+ if isStartProcess && isEndProcess {
+ finishedProcess = true
+ }
+ }
+
+ _, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot))
+ if err != nil {
+ errCh <- err
+ }
+
+ }()
+ if len(errCh) != 0 {
+ return <-errCh
+ }
+ }
+}
diff --git a/pkg/beaconclient/processknowngaps.go b/pkg/beaconclient/processknowngaps.go
new file mode 100644
index 0000000..6c07c02
--- /dev/null
+++ b/pkg/beaconclient/processknowngaps.go
@@ -0,0 +1,101 @@
+// VulcanizeDB
+// Copyright © 2022 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+// This file contains all the code to process historic slots.
+
+package beaconclient
+
+import (
+ "context"
+ "strconv"
+
+ log "github.com/sirupsen/logrus"
+ "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
+ "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
+)
+
+var (
+ // Get a single non-checked out row row from ethcl.known_gaps.
+ getKgEntryStmt string = `SELECT start_slot, end_slot FROM ethcl.known_gaps
+ WHERE checked_out=false
+ LIMIT 1;`
+ // Used to periodically check to see if there is a new entry in the ethcl.known_gaps table.
+ checkKgEntryStmt string = `SELECT * FROM ethcl.known_gaps WHERE checked_out=false;`
+ // Used to checkout a row from the ethcl.known_gaps table
+ lockKgEntryStmt string = `UPDATE ethcl.known_gaps
+ SET checked_out=true
+ WHERE start_slot=$1 AND end_slot=$2;`
+ // Used to delete an entry from the knownGaps table
+ deleteKgEntryStmt string = `DELETE FROM ethcl.known_gaps
+ WHERE start_slot=$1 AND end_slot=$2;`
+ // Used to check to see if a single slot exists in the known_gaps table.
+ checkKgSingleSlotStmt string = `SELECT start_slot, end_slot FROM ethcl.known_gaps
+ WHERE start_slot=$1 AND end_slot=$2;`
+)
+
+type knownGapsProcessing struct {
+ db sql.Database
+ metrics *BeaconClientMetrics
+}
+
+// This function will perform all the heavy lifting for tracking the head of the chain.
+func (bc *BeaconClient) ProcessKnownGaps(maxWorkers int) []error {
+ log.Info("We are starting the known gaps processing service.")
+ hp := knownGapsProcessing{db: bc.Db, metrics: bc.Metrics}
+ errs := handleBatchProcess(maxWorkers, hp, hp.db, bc.ServerEndpoint, bc.Metrics)
+ log.Debug("Exiting known gaps processing service")
+ return errs
+}
+
+// Get a single row of historical slots from the table.
+func (kgp knownGapsProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error {
+ return getBatchProcessRow(kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh)
+}
+
+// Remove the table entry.
+func (kgp knownGapsProcessing) removeTableEntry(processCh <-chan slotsToProcess) error {
+ return removeRowPostProcess(kgp.db, processCh, QueryBySlotStmt, deleteKgEntryStmt)
+}
+
+// Remove the table entry.
+func (kgp knownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) {
+ for {
+ errMs := <-errMessages
+
+ // Check to see if this if this entry already exists.
+ res, err := kgp.db.Exec(context.Background(), checkKgSingleSlotStmt, errMs.slot, errMs.slot)
+ if err != nil {
+ loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).Error("Unable to see if this slot is in the ethcl.known_gaps table")
+ }
+
+ rows, err := res.RowsAffected()
+ if err != nil {
+ loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).WithFields(log.Fields{
+ "queryStatement": checkKgSingleSlotStmt,
+ }).Error("Unable to get the number of rows affected by this statement.")
+ }
+
+ if rows > 0 {
+ loghelper.LogSlotError(strconv.Itoa(errMs.slot), errMs.err).Error("We received an error when processing a knownGap")
+ err = updateKnownGapErrors(kgp.db, errMs.slot, errMs.slot, errMs.err, kgp.metrics)
+ if err != nil {
+ loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).Error("Error processing known gap")
+ }
+ } else {
+ writeKnownGaps(kgp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, kgp.metrics)
+ }
+ }
+}
diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go
index 4e60109..146d53c 100644
--- a/pkg/beaconclient/processslot.go
+++ b/pkg/beaconclient/processslot.go
@@ -45,7 +45,6 @@ var (
return fmt.Sprintf("Unable to properly unmarshal the Slot field in the %s.", obj)
}
ParentRootUnmarshalError = "Unable to properly unmarshal the ParentRoot field in the SignedBeaconBlock."
- MissingIdentifiedError = "Can't query state without a set slot or block_root"
MissingEth1Data = "Can't get the Eth1 block_hash"
VersionedUnmarshalerError = "Unable to create a versioned unmarshaler"
)
@@ -78,7 +77,9 @@ type ProcessSlot struct {
}
// This function will do all the work to process the slot and write it to the DB.
-func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) error {
+// It will return the error and error process. The error process is used for providing reach detail to the
+// known_gaps table.
+func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) (error, string) {
ps := &ProcessSlot{
Slot: slot,
BlockRoot: blockRoot,
@@ -110,60 +111,55 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
})
if err := g.Wait(); err != nil {
- writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot", ps.Metrics)
- return err
- }
-
- if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" {
- writeStartUpGaps(db, knownGapsTableIncrement, ps.Slot, ps.Metrics)
+ return err, "processSlot"
}
// Get this object ready to write
- blockRootEndpoint := serverAddress + BcBlockRootEndpoint(strconv.Itoa(ps.Slot))
- dw, err := ps.createWriteObjects(blockRootEndpoint)
+ dw, err := ps.createWriteObjects()
if err != nil {
- writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "blockRoot", ps.Metrics)
- return err
+ return err, "blockRoot"
}
// Write the object to the DB.
err = dw.writeFullSlot()
if err != nil {
- writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot", ps.Metrics)
- return err
+ return err, "processSlot"
}
// Handle any reorgs or skipped slots.
headOrHistoric = strings.ToLower(headOrHistoric)
if headOrHistoric != "head" && headOrHistoric != "historic" {
- return fmt.Errorf("headOrHistoric must be either historic or head!")
+ return fmt.Errorf("headOrHistoric must be either historic or head!"), ""
}
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" {
ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement)
}
- return nil
+ return nil, ""
}
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
-func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) error {
- return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement)
+func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) {
+ // Get the knownGaps at startUp.
+ if previousSlot == 0 && previousBlockRoot == "" {
+ writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics)
+ }
+ err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement)
+ if err != nil {
+ writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics)
+ }
}
// Handle a historic slot. A wrapper function for calling `handleFullSlot`.
-// Commented because of the linter...... LOL
-//func handleHistoricSlot(db sql.Database, serverAddress string, slot int) error {
-// return handleFullSlot(db, serverAddress, slot, "", "", 0, "", "historic")
-//}
+func handleHistoricSlot(db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics) (error, string) {
+ return processFullSlot(db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1)
+}
// Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values.
func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *dt.VersionedUnmarshaler) error {
var blockIdentifier string // Used to query the block
if ps.BlockRoot != "" {
blockIdentifier = ps.BlockRoot
- } else if ps.Slot != 0 {
- blockIdentifier = strconv.Itoa(ps.Slot)
} else {
- log.Error(MissingIdentifiedError)
- return fmt.Errorf(MissingIdentifiedError)
+ blockIdentifier = strconv.Itoa(ps.Slot)
}
blockEndpoint := serverAddress + BcBlockQueryEndpoint + blockIdentifier
var err error
@@ -208,11 +204,8 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.Ver
var stateIdentifier string // Used to query the state
if ps.StateRoot != "" {
stateIdentifier = ps.StateRoot
- } else if ps.Slot != 0 {
- stateIdentifier = strconv.Itoa(ps.Slot)
} else {
- log.Error(MissingIdentifiedError)
- return fmt.Errorf(MissingIdentifiedError)
+ stateIdentifier = strconv.Itoa(ps.Slot)
}
stateEndpoint := serverEndpoint + BcStateQueryEndpoint + stateIdentifier
ps.SszBeaconState, _, _ = querySsz(stateEndpoint, strconv.Itoa(ps.Slot))
@@ -243,10 +236,15 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().ParentRoot())
if previousSlot == int(ps.FullBeaconState.Slot()) {
log.WithFields(log.Fields{
- "slot": ps.FullBeaconState.Slot,
+ "slot": ps.FullBeaconState.Slot(),
"fork": true,
}).Warn("A fork occurred! The previous slot and current slot match.")
writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
+ } else if previousSlot > int(ps.FullBeaconState.Slot()) {
+ log.WithFields(log.Fields{
+ "previousSlot": previousSlot,
+ "curSlot": int(ps.FullBeaconState.Slot()),
+ }).Warn("We noticed the previous slot is greater than the current slot.")
} else if previousSlot+1 != int(ps.FullBeaconState.Slot()) {
log.WithFields(log.Fields{
"previousSlot": previousSlot,
@@ -265,7 +263,7 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
}
// Transforms all the raw data into DB models that can be written to the DB.
-func (ps *ProcessSlot) createWriteObjects(blockRootEndpoint string) (*DatabaseWriter, error) {
+func (ps *ProcessSlot) createWriteObjects() (*DatabaseWriter, error) {
var (
stateRoot string
blockRoot string
@@ -289,10 +287,13 @@ func (ps *ProcessSlot) createWriteObjects(blockRootEndpoint string) (*DatabaseWr
blockRoot = ps.BlockRoot
} else {
var err error
- blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot))
+ rawBlockRoot, err := ps.FullSignedBeaconBlock.Block().HashTreeRoot()
+ //blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot))
if err != nil {
return nil, err
}
+ blockRoot = "0x" + hex.EncodeToString(rawBlockRoot[:])
+ log.WithFields(log.Fields{"blockRoot": blockRoot}).Debug("Block Root from ssz:")
}
eth1BlockHash = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().Body().Eth1Data().BlockHash)
}
diff --git a/pkg/loghelper/logerror.go b/pkg/loghelper/logerror.go
index 94a5069..f6dce63 100644
--- a/pkg/loghelper/logerror.go
+++ b/pkg/loghelper/logerror.go
@@ -27,9 +27,26 @@ func LogError(err error) *log.Entry {
})
}
+// A simple herlper function to log slot and error.
func LogSlotError(slot string, err error) *log.Entry {
return log.WithFields(log.Fields{
"err": err,
"slot": slot,
})
}
+
+func LogSlotRangeError(startSlot string, endSlot string, err error) *log.Entry {
+ return log.WithFields(log.Fields{
+ "err": err,
+ "startSlot": startSlot,
+ "endSlot": endSlot,
+ })
+}
+func LogSlotRangeStatementError(startSlot string, endSlot string, statement string, err error) *log.Entry {
+ return log.WithFields(log.Fields{
+ "err": err,
+ "startSlot": startSlot,
+ "endSlot": endSlot,
+ "SqlStatement": statement,
+ })
+}