Handle Skipped Slots #34

Merged
abdulrabbani00 merged 5 commits from feature/31-handle-skipped-slot into develop 2022-05-13 12:48:31 +00:00
10 changed files with 128 additions and 24 deletions
Showing only changes of commit da72e37625 - Show all commits

View File

@ -37,7 +37,7 @@ To run the application, do as follows:
2. Run the start up command.
```
go run main.go capture head --db.address localhost \
go run -race main.go capture head --db.address localhost \
--db.password password \
--db.port 8077 \
--db.username vdbm \
@ -46,8 +46,10 @@ go run main.go capture head --db.address localhost \
--bc.address localhost \
--bc.port 5052 \
--bc.connectionProtocol http \
--t.skipSync=true \
--log.level info \
--log.output=true
--log.output=true \
--kg.increment 100
```
## Running Tests

View File

@ -44,7 +44,7 @@ func bootApp() {
log.Info("Starting the application in boot mode.")
ctx := context.Background()
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, testDisregardSync)
if err != nil {
loghelper.LogError(err).Error("Unable to Start application")
}

View File

@ -22,8 +22,10 @@ var (
bcAddress string
bcPort int
bcConnectionProtocol string
bcType string
maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second
notifierCh chan os.Signal = make(chan os.Signal, 1)
testDisregardSync bool
)
// captureCmd represents the capture command
@ -62,14 +64,18 @@ func init() {
exitErr(err)
//// Beacon Client Specific
captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required if username is set)")
captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required if username is set)")
captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required)")
captureCmd.PersistentFlags().StringVarP(&bcType, "bc.type", "", "lighthouse", "The beacon client we are using, options are prysm and lighthouse.")
captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required )")
captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.")
err = captureCmd.MarkPersistentFlagRequired("bc.address")
exitErr(err)
err = captureCmd.MarkPersistentFlagRequired("bc.port")
exitErr(err)
//// Testing Specific
captureCmd.PersistentFlags().BoolVar(&testDisregardSync, "t.skipSync", false, "Should we disregard the head sync?")
// Bind Flags with Viper
//// DB Flags
err = viper.BindPFlag("db.username", captureCmd.PersistentFlags().Lookup("db.username"))
@ -82,12 +88,18 @@ func init() {
exitErr(err)
err = viper.BindPFlag("db.name", captureCmd.PersistentFlags().Lookup("db.name"))
exitErr(err)
err = viper.BindPFlag("db.driver", captureCmd.PersistentFlags().Lookup("db.driver"))
err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync"))
exitErr(err)
// Testing Specific
err = viper.BindPFlag("t.driver", captureCmd.PersistentFlags().Lookup("db.driver"))
exitErr(err)
// LH specific
err = viper.BindPFlag("bc.address", captureCmd.PersistentFlags().Lookup("bc.address"))
exitErr(err)
err = viper.BindPFlag("bc.type", captureCmd.PersistentFlags().Lookup("bc.type"))
exitErr(err)
err = viper.BindPFlag("bc.port", captureCmd.PersistentFlags().Lookup("bc.port"))
exitErr(err)
err = viper.BindPFlag("bc.connectionProtocol", captureCmd.PersistentFlags().Lookup("bc.connectionProtocol"))

View File

@ -6,6 +6,7 @@ package cmd
import (
"context"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@ -35,11 +36,16 @@ func startHeadTracking() {
log.Info("Starting the application in head tracking mode.")
ctx := context.Background()
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, testDisregardSync)
if err != nil {
loghelper.LogError(err).Error("Unable to Start application")
if DB != nil {
DB.Close()
}
os.Exit(1)
}
log.Info("The Beacon Client has booted successfully!")
// Capture head blocks
go BC.CaptureHead(kgTableIncrement)

View File

@ -2,6 +2,7 @@ package boot
import (
"context"
"fmt"
"time"
log "github.com/sirupsen/logrus"
@ -25,7 +26,8 @@ var (
//
// 2. Connect to the database.
//
func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string) (*beaconclient.BeaconClient, sql.Database, error) {
// 3. Make sure the node is synced, unless disregardSync is true.
func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
log.Info("Booting the Application")
log.Debug("Creating the Beacon Client")
@ -38,23 +40,40 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName
}
log.Debug("Setting up DB connection")
DB, err := postgres.SetupPostgresDb(dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName)
DB, err = postgres.SetupPostgresDb(dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName)
if err != nil {
return nil, nil, err
}
BC.Db = DB
var status bool
if !disregardSync {
status, err = BC.CheckHeadSync()
if err != nil {
log.Error("Unable to get the nodes sync status")
return BC, DB, err
}
if status != false {
log.Error("The node is still syncing..")
err = fmt.Errorf("The node is still syncing.")
return BC, DB, err
}
} else {
log.Warn("We are not checking to see if the node has synced to head.")
}
return BC, DB, nil
}
// Add retry logic to ensure that we are give the Beacon Client and the DB time to start.
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string) (*beaconclient.BeaconClient, sql.Database, error) {
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
var err error
for i := 0; i < maxRetry; i++ {
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol)
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol, disregardSync)
if err != nil {
log.WithFields(log.Fields{
"retryNumber": i,
"err": err,
}).Warn("Unable to boot application. Going to try again")
time.Sleep(time.Duration(retryInterval) * time.Second)
continue

View File

@ -21,29 +21,36 @@ var _ = Describe("Boot", func() {
bcConnectionProtocol string = "http"
)
Describe("Booting the application", Label("integration"), func() {
Context("When the DB and BC are both up and running", func() {
Context("When the DB and BC are both up and running, and we skip checking for a synced head", func() {
It("Should connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true)
defer db.Close()
Expect(err).To(BeNil())
Expect(err).To(HaveOccurred())
})
})
Context("When the DB and BC are both up and running, and we check for a synced head", func() {
It("Should not connect successfully", func() {
_, db, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, false)
defer db.Close()
Expect(err).To(HaveOccurred())
})
})
Context("When the DB is running but not the BC", func() {
It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol)
Expect(err).ToNot(BeNil())
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true)
Expect(err).ToNot(HaveOccurred())
})
})
Context("When the BC is running but not the DB", func() {
It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
Expect(err).ToNot(BeNil())
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true)
Expect(err).ToNot(HaveOccurred())
})
})
Context("When neither the BC or DB are running", func() {
It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol)
Expect(err).ToNot(BeNil())
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true)
Expect(err).ToNot(HaveOccurred())
})
})
})

View File

@ -40,7 +40,7 @@ var _ = Describe("Shutdown", func() {
)
BeforeEach(func() {
ctx = context.Background()
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true)
notifierCh = make(chan os.Signal, 1)
Expect(err).To(BeNil())
})

View File

@ -16,7 +16,8 @@ var (
bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain
BcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks
BcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States
bcSlotsPerEpoch = 32 // Number of slots in a single Epoch
BcSyncStatusEndpoint = "/eth/v1/node/syncing"
bcSlotsPerEpoch = 32 // Number of slots in a single Epoch
//bcSlotPerHistoricalVector = 8192 // The number of slots in a historic vector.
//bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
)

View File

@ -0,0 +1,57 @@
package beaconclient
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// The sync response
type Sync struct {
Data SyncData `json:"data"`
}
// The sync data
type SyncData struct {
IsSync bool `json:"is_syncing"`
HeadSlot string `json:"head_slot"`
SyncDistance string `json:"sync_distance"`
}
// This function will check to see if we are synced up with the head of chain.
//{"data":{"is_syncing":true,"head_slot":"62528","sync_distance":"3734299"}}
func (bc BeaconClient) CheckHeadSync() (bool, error) {
bcSync := bc.ServerEndpoint + BcSyncStatusEndpoint
resp, err := http.Get(bcSync)
if err != nil {
loghelper.LogEndpoint(bcSync).Error("Unable to check the sync status")
return true, err
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
loghelper.LogEndpoint(bcSync).WithFields(log.Fields{"returnCode": resp.StatusCode}).Error("Error when getting the sync status")
return true, fmt.Errorf("Querying the sync status returned a non 2xx status code, code provided: %d", resp.StatusCode)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return true, err
}
var syncStatus Sync
if err := json.Unmarshal(body, &syncStatus); err != nil {
loghelper.LogEndpoint(bcSync).WithFields(log.Fields{
"rawMessage": string(body),
"err": err,
}).Error("Unable to unmarshal sync status")
return true, err
}
return syncStatus.Data.IsSync, nil
}

View File

@ -22,8 +22,8 @@ func (bc BeaconClient) CheckBeaconClient() error {
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
log.Error("We recieved a non 2xx status code when checking the health of the beacon node.")
log.Error("Health Endpoint Status Code: ", resp.StatusCode)
loghelper.LogEndpoint(bcEndpoint).Error("We recieved a non 2xx status code when checking the health of the beacon node.")
loghelper.LogEndpoint(bcEndpoint).Error("Health Endpoint Status Code: ", resp.StatusCode)
return fmt.Errorf("beacon Node Provided a non 2xx status code, code provided: %d", resp.StatusCode)
}