Feature/44 read write historic slots #46

Merged
abdulrabbani00 merged 10 commits from feature/44-read-write-historic-slots into develop 2022-05-24 20:18:55 +00:00
11 changed files with 232 additions and 96 deletions
Showing only changes of commit 4457888cc1 - Show all commits

View File

@ -17,6 +17,7 @@
package cmd
import (
"fmt"
"os"
"time"
@ -39,8 +40,10 @@ var (
bcType string
bcIsProcessKnownGaps bool
bcMaxHistoricProcessWorker int
bcMaxKnownGapsWorker int
maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second
kgMaxWorker int
kgTableIncrement int
kgProcessGaps bool
maxWaitSecondsShutdown time.Duration = time.Duration(20) * time.Second
notifierCh chan os.Signal = make(chan os.Signal, 1)
testDisregardSync bool
)
@ -67,18 +70,18 @@ func init() {
captureCmd.PersistentFlags().StringVarP(&dbName, "db.name", "n", "", "Database name connect to DB(required)")
captureCmd.PersistentFlags().StringVarP(&dbDriver, "db.driver", "", "", "Database Driver to connect to DB(required)")
captureCmd.PersistentFlags().IntVarP(&dbPort, "db.port", "", 0, "Port to connect to DB(required)")
err := captureCmd.MarkPersistentFlagRequired("db.username")
exitErr(err)
err = captureCmd.MarkPersistentFlagRequired("db.password")
exitErr(err)
err = captureCmd.MarkPersistentFlagRequired("db.address")
exitErr(err)
err = captureCmd.MarkPersistentFlagRequired("db.port")
exitErr(err)
err = captureCmd.MarkPersistentFlagRequired("db.name")
exitErr(err)
err = captureCmd.MarkPersistentFlagRequired("db.driver")
exitErr(err)
//err := captureCmd.MarkPersistentFlagRequired("db.username")
// exitErr(err)
// err = captureCmd.MarkPersistentFlagRequired("db.password")
// exitErr(err)
// err = captureCmd.MarkPersistentFlagRequired("db.address")
// exitErr(err)
// err = captureCmd.MarkPersistentFlagRequired("db.port")
// exitErr(err)
// err = captureCmd.MarkPersistentFlagRequired("db.name")
// exitErr(err)
// err = captureCmd.MarkPersistentFlagRequired("db.driver")
// exitErr(err)
//// Beacon Client Specific
captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required)")
@ -88,19 +91,22 @@ func init() {
captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application")
captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application")
captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.")
captureCmd.PersistentFlags().IntVarP(&bcMaxKnownGapsWorker, "bc.maxKnownGapsWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.")
captureCmd.PersistentFlags().BoolVar(&bcIsProcessKnownGaps, "bc.knownGapsProcess", false, "Should we process entries from the knownGaps table as they occur?")
err = captureCmd.MarkPersistentFlagRequired("bc.address")
exitErr(err)
err = captureCmd.MarkPersistentFlagRequired("bc.port")
exitErr(err)
// err = captureCmd.MarkPersistentFlagRequired("bc.address")
// exitErr(err)
// err = captureCmd.MarkPersistentFlagRequired("bc.port")
// exitErr(err)
//// Known Gaps specific
captureCmd.PersistentFlags().BoolVarP(&kgProcessGaps, "kg.processKnownGaps", "", true, "Should we process the slots within the ethcl.known_gaps table.")
captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.")
captureCmd.PersistentFlags().IntVarP(&kgMaxWorker, "kg.maxKnownGapsWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.known_gaps table. Be careful of system memory.")
//// Testing Specific
captureCmd.PersistentFlags().BoolVar(&testDisregardSync, "t.skipSync", false, "Should we disregard the head sync?")
// Bind Flags with Viper
//// DB Flags
err = viper.BindPFlag("db.username", captureCmd.PersistentFlags().Lookup("db.username"))
err := viper.BindPFlag("db.username", captureCmd.PersistentFlags().Lookup("db.username"))
exitErr(err)
err = viper.BindPFlag("db.password", captureCmd.PersistentFlags().Lookup("db.password"))
exitErr(err)
@ -111,13 +117,11 @@ func init() {
err = viper.BindPFlag("db.name", captureCmd.PersistentFlags().Lookup("db.name"))
exitErr(err)
// Testing Specific
//// Testing Specific
err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync"))
exitErr(err)
err = viper.BindPFlag("t.driver", captureCmd.PersistentFlags().Lookup("db.driver"))
exitErr(err)
// LH specific
//// LH specific
err = viper.BindPFlag("bc.address", captureCmd.PersistentFlags().Lookup("bc.address"))
exitErr(err)
err = viper.BindPFlag("bc.type", captureCmd.PersistentFlags().Lookup("bc.type"))
@ -130,22 +134,25 @@ func init() {
exitErr(err)
err = viper.BindPFlag("bc.bootMaxRetry", captureCmd.PersistentFlags().Lookup("bc.bootMaxRetry"))
exitErr(err)
err = viper.BindPFlag("bc.bootMaxRetry", captureCmd.PersistentFlags().Lookup("bc.bootMaxRetry"))
exitErr(err)
err = viper.BindPFlag("bc.knownGapsProcess", captureCmd.PersistentFlags().Lookup("bc.knownGapsProcess"))
exitErr(err)
err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker"))
exitErr(err)
err = viper.BindPFlag("bc.maxKnownGapsWorker", captureCmd.PersistentFlags().Lookup("bc.maxKnownGapsWorker"))
exitErr(err)
// Here you will define your flags and configuration settings.
//// Known Gap Specific
err = viper.BindPFlag("kg.processKnownGaps", captureCmd.PersistentFlags().Lookup("kg.processKnownGaps"))
exitErr(err)
err = viper.BindPFlag("kg.increment", captureCmd.PersistentFlags().Lookup("kg.increment"))
exitErr(err)
err = viper.BindPFlag("kg.processKnownGaps", captureCmd.PersistentFlags().Lookup("kg.maxKnownGapsWorker"))
exitErr(err)
}
// Helper function to catch any errors.
// We need to capture these errors for the linter.
func exitErr(err error) {
if err != nil {
fmt.Println("Error: ", err)
os.Exit(1)
}
}

View File

@ -29,10 +29,6 @@ import (
"golang.org/x/sync/errgroup"
)
var (
kgTableIncrement int
)
// headCmd represents the head command
var headCmd = &cobra.Command{
Use: "head",
@ -49,8 +45,9 @@ func startHeadTracking() {
log.Info("Starting the application in head tracking mode.")
ctx := context.Background()
Bc, Db, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver,
bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, kgTableIncrement, "head", testDisregardSync)
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"))
if err != nil {
StopApplicationPreBoot(err, Db)
}
@ -58,10 +55,11 @@ func startHeadTracking() {
log.Info("The Beacon Client has booted successfully!")
// Capture head blocks
go Bc.CaptureHead()
if bcIsProcessKnownGaps {
if viper.GetBool("kg.processKnownGaps") {
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
errs := Bc.ProcessKnownGaps(bcMaxKnownGapsWorker)
errs := Bc.ProcessKnownGaps(viper.GetInt("kg.maxKnownGapsWorker"))
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
@ -71,6 +69,7 @@ func startHeadTracking() {
if err := errG.Wait(); err != nil {
loghelper.LogError(err).Error("Error with knownGaps processing")
}
}()
}
// Shutdown when the time is right.
@ -85,9 +84,4 @@ func startHeadTracking() {
func init() {
captureCmd.AddCommand(headCmd)
// Known Gaps specific
captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.")
err := viper.BindPFlag("kg.increment", captureCmd.PersistentFlags().Lookup("kg.increment"))
exitErr(err)
}

View File

@ -23,6 +23,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
@ -46,8 +47,9 @@ func startHistoricProcessing() {
log.Info("Starting the application in head tracking mode.")
ctx := context.Background()
Bc, Db, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver,
bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, kgTableIncrement, "historic", testDisregardSync)
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"))
if err != nil {
StopApplicationPreBoot(err, Db)
}
@ -55,7 +57,7 @@ func startHistoricProcessing() {
errG, _ := errgroup.WithContext(context.Background())
errG.Go(func() error {
errs := Bc.CaptureHistoric(bcMaxHistoricProcessWorker)
errs := Bc.CaptureHistoric(viper.GetInt("bc.maxHistoricProcessWorker"))
if len(errs) != 0 {
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events")
@ -65,21 +67,22 @@ func startHistoricProcessing() {
return nil
})
if bcIsProcessKnownGaps {
if viper.GetBool("kg.processKnownGaps") {
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
errs := Bc.ProcessKnownGaps(bcMaxKnownGapsWorker)
errs := Bc.ProcessKnownGaps(viper.GetInt("kg.maxKnownGapsWorker"))
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
}
return nil
})
}
if err := errG.Wait(); err != nil {
loghelper.LogError(err).Error("Error with knownGaps processing")
}
log.Debug("WE ARE AT CHECKPOINT")
}()
}
// Shutdown when the time is right.
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc)

View File

@ -47,6 +47,7 @@ It can either do this will keeping track of head, or backfilling historic data.`
func Execute() {
err := rootCmd.Execute()
if err != nil {
fmt.Println("Err when executing rootCmd", err)
os.Exit(1)
}
}

View File

@ -0,0 +1,33 @@
{
"db": {
"address": "localhost",
"password": "password",
"port": 8076,
"username": "vdbm",
"name": "vulcanize_testing",
"driver": "PGX"
},
"bc": {
"address": "10.203.8.51",
"port": 5052,
"type": "lighthouse",
"bootRetryInterval": 30,
"bootMaxRetry": 5,
"maxHistoricProcessWorker": 2,
"connectionProtocol": "http"
},
"t": {
"skipSync": false
},
"log": {
"level": "debug",
"output": true,
"file": "./ipld-ethcl-indexer.log",
"format": "json"
},
"kg": {
"increment": 10000,
"processKnownGaps": true,
"maxKnownGapsWorker": 2
}
}

View File

@ -84,6 +84,24 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int, startUpMode string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
var err error
if bcMaxRetry < 0 {
i := 0
for {
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName,
bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync)
if err != nil {
log.WithFields(log.Fields{
"retryNumber": i,
"err": err,
}).Warn("Unable to boot application. Going to try again")
time.Sleep(time.Duration(bcRetryInterval) * time.Second)
i = i + 1
continue
}
break
}
} else {
for i := 0; i < bcMaxRetry; i++ {
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName,
bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync)
@ -97,6 +115,7 @@ func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int
}
break
}
}
switch strings.ToLower(startUpMode) {
case "head":
@ -104,6 +123,13 @@ func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int
case "historic":
log.Debug("Performing additional boot steps for historical processing")
BC.PerformHistoricalProcessing = true
// This field is not currently used.
// The idea is, that if we are doing historially processing and we get a slot
// greater than this slot, then we would rerun this function.
// this would ensure that we have the slots necessary for processing
// within the beacon server.
// We can implement this feature if we notice any errors.
headSlot, err := BC.GetLatestSlotInBeaconServer(bcType)
if err != nil {
return BC, DB, err

View File

@ -62,6 +62,11 @@ VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
UpsertKnownGapsStmt string = `
INSERT INTO ethcl.known_gaps (start_slot, end_slot, checked_out, reprocessing_error, entry_error, entry_process)
VALUES ($1, $2, $3, $4, $5, $6) on CONFLICT (start_slot, end_slot) DO NOTHING`
UpsertKnownGapsErrorStmt string = `
UPDATE ethcl.known_gaps
SET reprocessing_error=$3
WHERE start_slot=$1 AND end_slot=$2;`
// Get the highest slot if one exists
QueryHighestSlotStmt string = "SELECT COALESCE(MAX(slot), 0) FROM ethcl.slots"
)
@ -414,6 +419,31 @@ func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot int, metric
}
}
// A function to update a knownGap range with a reprocessing error.
func updateKnownGapErrors(db sql.Database, startSlot int, endSlot int, reprocessingErr error, metric *BeaconClientMetrics) error {
res, err := db.Exec(context.Background(), UpsertKnownGapsErrorStmt, startSlot, endSlot, reprocessingErr.Error())
if err != nil {
loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Error("Unable to update reprocessing_error")
metric.IncrementKnownGapsProcessingError(1)
return err
}
row, err := res.RowsAffected()
if err != nil {
loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Error("Unable to count rows affected when trying to update reprocessing_error.")
metric.IncrementKnownGapsProcessingError(1)
return err
}
if row != 1 {
loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).WithFields(log.Fields{
"rowCount": row,
}).Error("The rows affected by the upsert for reprocessing_error is not 1.")
metric.IncrementKnownGapsProcessingError(1)
return err
}
metric.IncrementKnownGapsProcessed(1)
return nil
}
// A quick helper function to calculate the epoch.
func calculateEpoch(slot int, slotPerEpoch int) string {
epoch := slot / slotPerEpoch

View File

@ -24,6 +24,8 @@ type BeaconClientMetrics struct {
SlotInserts uint64 // Number of head events we successfully wrote to the DB.
ReorgInserts uint64 // Number of reorg events we successfully wrote to the DB.
KnownGapsInserts uint64 // Number of known_gaps we successfully wrote to the DB.
knownGapsProcessed uint64 // Number of knownGaps processed.
KnownGapsProcessingError uint64 // Number of errors that occurred while processing a knownGap
HeadError uint64 // Number of errors that occurred when decoding the head message.
HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message.
}
@ -46,6 +48,18 @@ func (m *BeaconClientMetrics) IncrementKnownGapsInserts(inc uint64) {
atomic.AddUint64(&m.KnownGapsInserts, inc)
}
// Wrapper function to increment known gaps processed. If we want to use mutexes later we can easily update all
// occurrences here.
func (m *BeaconClientMetrics) IncrementKnownGapsProcessed(inc uint64) {
atomic.AddUint64(&m.knownGapsProcessed, inc)
}
// Wrapper function to increment known gaps processing error. If we want to use mutexes later we can easily update all
// occurrences here.
func (m *BeaconClientMetrics) IncrementKnownGapsProcessingError(inc uint64) {
atomic.AddUint64(&m.KnownGapsProcessingError, inc)
}
// Wrapper function to increment head errors. If we want to use mutexes later we can easily update all
// occurrences here.
func (m *BeaconClientMetrics) IncrementHeadError(inc uint64) {

View File

@ -68,5 +68,4 @@ func (bc *BeaconClient) handleHead() {
bc.PreviousSlot = slot
bc.PreviousBlockRoot = head.Block
}
}

View File

@ -19,6 +19,7 @@
package beaconclient
import (
"context"
"strconv"
log "github.com/sirupsen/logrus"
@ -40,6 +41,9 @@ var (
// Used to delete an entry from the knownGaps table
deleteKgEntryStmt string = `DELETE FROM ethcl.known_gaps
WHERE start_slot=$1 AND end_slot=$2;`
// Used to check to see if a single slot exists in the known_gaps table.
checkKgSingleSlotStmt string = `SELECT start_slot, end_slot FROM ethcl.known_gaps
WHERE start_slot=$1 AND end_slot=$2;`
)
type knownGapsProcessing struct {
@ -70,7 +74,28 @@ func (kgp knownGapsProcessing) removeTableEntry(processCh <-chan slotsToProcess)
func (kgp knownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) {
for {
errMs := <-errMessages
loghelper.LogSlotError(strconv.Itoa(errMs.slot), errMs.err)
// Check to see if this if this entry already exists.
res, err := kgp.db.Exec(context.Background(), checkKgSingleSlotStmt, errMs.slot, errMs.slot)
if err != nil {
loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).Error("Unable to see if this slot is in the ethcl.known_gaps table")
}
rows, err := res.RowsAffected()
if err != nil {
loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).WithFields(log.Fields{
"queryStatement": checkKgSingleSlotStmt,
}).Error("Unable to get the number of rows affected by this statement.")
}
if rows > 0 {
loghelper.LogSlotError(strconv.Itoa(errMs.slot), errMs.err).Error("We received an error when processing a knownGap")
err = updateKnownGapErrors(kgp.db, errMs.slot, errMs.slot, errMs.err, kgp.metrics)
if err != nil {
loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).Error("Error processing known gap")
}
} else {
writeKnownGaps(kgp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, kgp.metrics)
}
}
}

View File

@ -114,13 +114,8 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
return err, "processSlot"
}
if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" {
writeStartUpGaps(db, knownGapsTableIncrement, ps.Slot, ps.Metrics)
}
// Get this object ready to write
blockRootEndpoint := serverAddress + BcBlockRootEndpoint(strconv.Itoa(ps.Slot))
dw, err := ps.createWriteObjects(blockRootEndpoint)
dw, err := ps.createWriteObjects()
if err != nil {
return err, "blockRoot"
}
@ -143,6 +138,10 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) {
// Get the knownGaps at startUp.
if previousSlot == 0 && previousBlockRoot == "" {
writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics)
}
err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement)
if err != nil {
writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics)
@ -241,6 +240,11 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
"fork": true,
}).Warn("A fork occurred! The previous slot and current slot match.")
writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
} else if previousSlot > int(ps.FullBeaconState.Slot()) {
log.WithFields(log.Fields{
"previousSlot": previousSlot,
"curSlot": int(ps.FullBeaconState.Slot()),
}).Warn("We noticed the previous slot is greater than the current slot.")
} else if previousSlot+1 != int(ps.FullBeaconState.Slot()) {
log.WithFields(log.Fields{
"previousSlot": previousSlot,
@ -259,7 +263,7 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
}
// Transforms all the raw data into DB models that can be written to the DB.
func (ps *ProcessSlot) createWriteObjects(blockRootEndpoint string) (*DatabaseWriter, error) {
func (ps *ProcessSlot) createWriteObjects() (*DatabaseWriter, error) {
var (
stateRoot string
blockRoot string