Handle Skipped Slots (#34)

* Ensure that the node is synced at boot time

* Update test + add logic for checking skipped slots

* Update boot check

* Add skip_sync to config.

* Update a test so it fails
This commit is contained in:
Abdul Rabbani 2022-05-13 08:48:31 -04:00 committed by Abdul Rabbani
parent 8cab81f12e
commit b6e7e503f2
15 changed files with 226 additions and 80 deletions

View File

@ -25,7 +25,7 @@ on:
- "**" - "**"
env: env:
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || '21d076268730e3f25fcec6371c1aca1bf48040d8'}} stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'feature/client-build'}}
ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || 'minimal-beacon-chain-schema' }} ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || 'minimal-beacon-chain-schema' }}
ssz-data-ref: ${{ github.event.inputs.ssz-data-ref || 'main' }} ssz-data-ref: ${{ github.event.inputs.ssz-data-ref || 'main' }}
GOPATH: /tmp/go GOPATH: /tmp/go
@ -57,6 +57,7 @@ jobs:
echo vulcanize_ipld_eth_db=$GITHUB_WORKSPACE/ipld-eth-db/ > ./config.sh echo vulcanize_ipld_eth_db=$GITHUB_WORKSPACE/ipld-eth-db/ > ./config.sh
echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh
echo ethcl_capture_mode=boot >> ./config.sh echo ethcl_capture_mode=boot >> ./config.sh
echo ethcl_skip_sync=true >> ./config.sh
cat ./config.sh cat ./config.sh
- name: Run docker compose - name: Run docker compose

View File

@ -37,7 +37,7 @@ To run the application, do as follows:
2. Run the start up command. 2. Run the start up command.
``` ```
go run main.go capture head --db.address localhost \ go run -race main.go capture head --db.address localhost \
--db.password password \ --db.password password \
--db.port 8077 \ --db.port 8077 \
--db.username vdbm \ --db.username vdbm \
@ -46,8 +46,10 @@ go run main.go capture head --db.address localhost \
--bc.address localhost \ --bc.address localhost \
--bc.port 5052 \ --bc.port 5052 \
--bc.connectionProtocol http \ --bc.connectionProtocol http \
--t.skipSync=true \
--log.level info \ --log.level info \
--log.output=true --log.output=true \
--kg.increment 100
``` ```
## Running Tests ## Running Tests

View File

@ -44,7 +44,7 @@ func bootApp() {
log.Info("Starting the application in boot mode.") log.Info("Starting the application in boot mode.")
ctx := context.Background() ctx := context.Background()
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol) BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, testDisregardSync)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to Start application") loghelper.LogError(err).Error("Unable to Start application")
} }

View File

@ -22,8 +22,10 @@ var (
bcAddress string bcAddress string
bcPort int bcPort int
bcConnectionProtocol string bcConnectionProtocol string
bcType string
maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second
notifierCh chan os.Signal = make(chan os.Signal, 1) notifierCh chan os.Signal = make(chan os.Signal, 1)
testDisregardSync bool
) )
// captureCmd represents the capture command // captureCmd represents the capture command
@ -62,14 +64,18 @@ func init() {
exitErr(err) exitErr(err)
//// Beacon Client Specific //// Beacon Client Specific
captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required if username is set)") captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required)")
captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required if username is set)") captureCmd.PersistentFlags().StringVarP(&bcType, "bc.type", "", "lighthouse", "The beacon client we are using, options are prysm and lighthouse.")
captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required )")
captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.") captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.")
err = captureCmd.MarkPersistentFlagRequired("bc.address") err = captureCmd.MarkPersistentFlagRequired("bc.address")
exitErr(err) exitErr(err)
err = captureCmd.MarkPersistentFlagRequired("bc.port") err = captureCmd.MarkPersistentFlagRequired("bc.port")
exitErr(err) exitErr(err)
//// Testing Specific
captureCmd.PersistentFlags().BoolVar(&testDisregardSync, "t.skipSync", false, "Should we disregard the head sync?")
// Bind Flags with Viper // Bind Flags with Viper
//// DB Flags //// DB Flags
err = viper.BindPFlag("db.username", captureCmd.PersistentFlags().Lookup("db.username")) err = viper.BindPFlag("db.username", captureCmd.PersistentFlags().Lookup("db.username"))
@ -82,12 +88,18 @@ func init() {
exitErr(err) exitErr(err)
err = viper.BindPFlag("db.name", captureCmd.PersistentFlags().Lookup("db.name")) err = viper.BindPFlag("db.name", captureCmd.PersistentFlags().Lookup("db.name"))
exitErr(err) exitErr(err)
err = viper.BindPFlag("db.driver", captureCmd.PersistentFlags().Lookup("db.driver")) err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync"))
exitErr(err)
// Testing Specific
err = viper.BindPFlag("t.driver", captureCmd.PersistentFlags().Lookup("db.driver"))
exitErr(err) exitErr(err)
// LH specific // LH specific
err = viper.BindPFlag("bc.address", captureCmd.PersistentFlags().Lookup("bc.address")) err = viper.BindPFlag("bc.address", captureCmd.PersistentFlags().Lookup("bc.address"))
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.type", captureCmd.PersistentFlags().Lookup("bc.type"))
exitErr(err)
err = viper.BindPFlag("bc.port", captureCmd.PersistentFlags().Lookup("bc.port")) err = viper.BindPFlag("bc.port", captureCmd.PersistentFlags().Lookup("bc.port"))
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.connectionProtocol", captureCmd.PersistentFlags().Lookup("bc.connectionProtocol")) err = viper.BindPFlag("bc.connectionProtocol", captureCmd.PersistentFlags().Lookup("bc.connectionProtocol"))

View File

@ -6,6 +6,7 @@ package cmd
import ( import (
"context" "context"
"os"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -35,11 +36,16 @@ func startHeadTracking() {
log.Info("Starting the application in head tracking mode.") log.Info("Starting the application in head tracking mode.")
ctx := context.Background() ctx := context.Background()
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol) BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, testDisregardSync)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to Start application") loghelper.LogError(err).Error("Unable to Start application")
if DB != nil {
DB.Close()
}
os.Exit(1)
} }
log.Info("The Beacon Client has booted successfully!")
// Capture head blocks // Capture head blocks
go BC.CaptureHead(kgTableIncrement) go BC.CaptureHead(kgTableIncrement)

View File

@ -11,7 +11,8 @@ echo /root/ipld-ethcl-indexer capture ${CAPTURE_MODE} --db.address $DB_ADDRESS \
--db.driver $DB_DRIVER \ --db.driver $DB_DRIVER \
--bc.address $BC_ADDRESS \ --bc.address $BC_ADDRESS \
--bc.port $BC_PORT \ --bc.port $BC_PORT \
--log.level $LOG_LEVEL --log.level $LOG_LEVEL\
--t.skipSync=$SKIP_SYNC
/root/ipld-ethcl-indexer capture ${CAPTURE_MODE} --db.address $DB_ADDRESS \ /root/ipld-ethcl-indexer capture ${CAPTURE_MODE} --db.address $DB_ADDRESS \
--db.password $DB_PASSWORD \ --db.password $DB_PASSWORD \
@ -21,7 +22,8 @@ echo /root/ipld-ethcl-indexer capture ${CAPTURE_MODE} --db.address $DB_ADDRESS \
--db.driver $DB_DRIVER \ --db.driver $DB_DRIVER \
--bc.address $BC_ADDRESS \ --bc.address $BC_ADDRESS \
--bc.port $BC_PORT \ --bc.port $BC_PORT \
--log.level $LOG_LEVEL --log.level $LOG_LEVEL \
--t.skipSync=$SKIP_SYNC
rv=$? rv=$?

View File

@ -2,6 +2,7 @@ package boot
import ( import (
"context" "context"
"fmt"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -25,7 +26,8 @@ var (
// //
// 2. Connect to the database. // 2. Connect to the database.
// //
func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string) (*beaconclient.BeaconClient, sql.Database, error) { // 3. Make sure the node is synced, unless disregardSync is true.
func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
log.Info("Booting the Application") log.Info("Booting the Application")
log.Debug("Creating the Beacon Client") log.Debug("Creating the Beacon Client")
@ -38,23 +40,40 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName
} }
log.Debug("Setting up DB connection") log.Debug("Setting up DB connection")
DB, err := postgres.SetupPostgresDb(dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName) DB, err = postgres.SetupPostgresDb(dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
BC.Db = DB BC.Db = DB
var status bool
if !disregardSync {
status, err = BC.CheckHeadSync()
if err != nil {
log.Error("Unable to get the nodes sync status")
return BC, DB, err
}
if status {
log.Error("The node is still syncing..")
err = fmt.Errorf("The node is still syncing.")
return BC, DB, err
}
} else {
log.Warn("We are not checking to see if the node has synced to head.")
}
return BC, DB, nil return BC, DB, nil
} }
// Add retry logic to ensure that we are give the Beacon Client and the DB time to start. // Add retry logic to ensure that we are give the Beacon Client and the DB time to start.
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string) (*beaconclient.BeaconClient, sql.Database, error) { func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
var err error var err error
for i := 0; i < maxRetry; i++ { for i := 0; i < maxRetry; i++ {
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol) BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol, disregardSync)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"retryNumber": i, "retryNumber": i,
"err": err,
}).Warn("Unable to boot application. Going to try again") }).Warn("Unable to boot application. Going to try again")
time.Sleep(time.Duration(retryInterval) * time.Second) time.Sleep(time.Duration(retryInterval) * time.Second)
continue continue

View File

@ -21,29 +21,36 @@ var _ = Describe("Boot", func() {
bcConnectionProtocol string = "http" bcConnectionProtocol string = "http"
) )
Describe("Booting the application", Label("integration"), func() { Describe("Booting the application", Label("integration"), func() {
Context("When the DB and BC are both up and running", func() { Context("When the DB and BC are both up and running, and we skip checking for a synced head", func() {
It("Should connect successfully", func() { It("Should connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true)
defer db.Close() defer db.Close()
Expect(err).To(BeNil()) Expect(err).ToNot(HaveOccurred())
})
})
Context("When the DB and BC are both up and running, and we check for a synced head", func() {
It("Should not connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, false)
defer db.Close()
Expect(err).To(HaveOccurred())
}) })
}) })
Context("When the DB is running but not the BC", func() { Context("When the DB is running but not the BC", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol) _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true)
Expect(err).ToNot(BeNil()) Expect(err).To(HaveOccurred())
}) })
}) })
Context("When the BC is running but not the DB", func() { Context("When the BC is running but not the DB", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol) _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true)
Expect(err).ToNot(BeNil()) Expect(err).To(HaveOccurred())
}) })
}) })
Context("When neither the BC or DB are running", func() { Context("When neither the BC or DB are running", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol) _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true)
Expect(err).ToNot(BeNil()) Expect(err).To(HaveOccurred())
}) })
}) })
}) })

View File

@ -40,7 +40,7 @@ var _ = Describe("Shutdown", func() {
) )
BeforeEach(func() { BeforeEach(func() {
ctx = context.Background() ctx = context.Background()
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol) BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true)
notifierCh = make(chan os.Signal, 1) notifierCh = make(chan os.Signal, 1)
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })

View File

@ -16,7 +16,11 @@ var (
bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain
BcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks BcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks
BcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States BcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States
bcSlotsPerEpoch = 32 // Number of slots in a single Epoch BcSyncStatusEndpoint = "/eth/v1/node/syncing"
BcBlockRootEndpoint = func(slot string) string {
return "/eth/v1/beacon/blocks/" + slot + "/root"
}
bcSlotsPerEpoch = 32 // Number of slots in a single Epoch
//bcSlotPerHistoricalVector = 8192 // The number of slots in a historic vector. //bcSlotPerHistoricalVector = 8192 // The number of slots in a historic vector.
//bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain //bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
) )

View File

@ -0,0 +1,57 @@
package beaconclient
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// The sync response
type Sync struct {
Data SyncData `json:"data"`
}
// The sync data
type SyncData struct {
IsSync bool `json:"is_syncing"`
HeadSlot string `json:"head_slot"`
SyncDistance string `json:"sync_distance"`
}
// This function will check to see if we are synced up with the head of chain.
//{"data":{"is_syncing":true,"head_slot":"62528","sync_distance":"3734299"}}
func (bc BeaconClient) CheckHeadSync() (bool, error) {
bcSync := bc.ServerEndpoint + BcSyncStatusEndpoint
resp, err := http.Get(bcSync)
if err != nil {
loghelper.LogEndpoint(bcSync).Error("Unable to check the sync status")
return true, err
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
loghelper.LogEndpoint(bcSync).WithFields(log.Fields{"returnCode": resp.StatusCode}).Error("Error when getting the sync status")
return true, fmt.Errorf("Querying the sync status returned a non 2xx status code, code provided: %d", resp.StatusCode)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return true, err
}
var syncStatus Sync
if err := json.Unmarshal(body, &syncStatus); err != nil {
loghelper.LogEndpoint(bcSync).WithFields(log.Fields{
"rawMessage": string(body),
"err": err,
}).Error("Unable to unmarshal sync status")
return true, err
}
return syncStatus.Data.IsSync, nil
}

View File

@ -22,8 +22,8 @@ func (bc BeaconClient) CheckBeaconClient() error {
} }
if resp.StatusCode < 200 || resp.StatusCode > 299 { if resp.StatusCode < 200 || resp.StatusCode > 299 {
log.Error("We recieved a non 2xx status code when checking the health of the beacon node.") loghelper.LogEndpoint(bcEndpoint).Error("We recieved a non 2xx status code when checking the health of the beacon node.")
log.Error("Health Endpoint Status Code: ", resp.StatusCode) loghelper.LogEndpoint(bcEndpoint).Error("Health Endpoint Status Code: ", resp.StatusCode)
return fmt.Errorf("beacon Node Provided a non 2xx status code, code provided: %d", resp.StatusCode) return fmt.Errorf("beacon Node Provided a non 2xx status code, code provided: %d", resp.StatusCode)
} }

View File

@ -42,7 +42,7 @@ type DbSlots struct {
Slot string // The slot. Slot string // The slot.
BlockRoot string // The block root BlockRoot string // The block root
StateRoot string // The state root StateRoot string // The state root
Status string // The status, it can be proposed | forked | missed. Status string // The status, it can be proposed | forked | skipped.
} }
// A struct to capture whats being written to ethcl.signed_beacon_block table. // A struct to capture whats being written to ethcl.signed_beacon_block table.

View File

@ -37,7 +37,7 @@ type ProcessSlot struct {
StateRoot string // The hex encoded string of the StateRoot. StateRoot string // The hex encoded string of the StateRoot.
ParentBlockRoot string // The hex encoded string of the parent block. ParentBlockRoot string // The hex encoded string of the parent block.
Status string // The status of the block Status string // The status of the block
HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and missed slots. HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and skipped slots.
Db sql.Database // The DB object used to write to the DB. Db sql.Database // The DB object used to write to the DB.
Metrics *BeaconClientMetrics // An object to keep track of the beaconclient metrics Metrics *BeaconClientMetrics // An object to keep track of the beaconclient metrics
// BeaconBlock // BeaconBlock
@ -66,26 +66,31 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
Metrics: metrics, Metrics: metrics,
} }
// Get the SignedBeaconBlock. // Get the BeaconState.
err := ps.getSignedBeaconBlock(serverAddress) err := ps.getBeaconState(serverAddress)
if err != nil { if err != nil {
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot") writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
return err return err
} }
// Get the BeaconState. // Get the SignedBeaconBlock.
err = ps.getBeaconState(serverAddress) err = ps.getSignedBeaconBlock(serverAddress)
if err != nil { if err != nil {
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot") writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
return err return err
} }
if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" { if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" {
writeStartUpGaps(db, knownGapsTableIncrement, ps.Slot) writeStartUpGaps(db, knownGapsTableIncrement, ps.Slot)
} }
// Get this object ready to write // Get this object ready to write
dw := ps.createWriteObjects() blockRootEndpoint := serverAddress + BcBlockRootEndpoint(strconv.Itoa(ps.Slot))
dw, err := ps.createWriteObjects(blockRootEndpoint)
if err != nil {
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "blockRoot")
return err
}
// Write the object to the DB. // Write the object to the DB.
err = dw.writeFullSlot() err = dw.writeFullSlot()
if err != nil { if err != nil {
@ -97,7 +102,7 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
if headOrHistoric != "head" && headOrHistoric != "historic" { if headOrHistoric != "head" && headOrHistoric != "historic" {
return fmt.Errorf("headOrHistoric must be either historic or head!") return fmt.Errorf("headOrHistoric must be either historic or head!")
} }
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" { if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" {
ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement) ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement)
} }
return nil return nil
@ -135,7 +140,11 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error {
} }
if rc != 200 { if rc != 200 {
ps.checkMissedSlot() ps.FullSignedBeaconBlock = &st.SignedBeaconBlock{}
ps.SszSignedBeaconBlock = []byte{}
ps.ParentBlockRoot = ""
ps.Status = "skipped"
return nil
} }
ps.FullSignedBeaconBlock = &st.SignedBeaconBlock{} ps.FullSignedBeaconBlock = &st.SignedBeaconBlock{}
@ -199,8 +208,6 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
"currentSlot": ps.FullBeaconState.Slot, "currentSlot": ps.FullBeaconState.Slot,
}).Error("We skipped a few slots.") }).Error("We skipped a few slots.")
writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot)-1, fmt.Errorf("Gaps during head processing"), "headGaps") writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot)-1, fmt.Errorf("Gaps during head processing"), "headGaps")
// Check to see if the slot was skipped.
// Call our known_gaps function.
} else if previousBlockRoot != parentRoot { } else if previousBlockRoot != parentRoot {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"previousBlockRoot": previousBlockRoot, "previousBlockRoot": previousBlockRoot,
@ -213,50 +220,37 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
} }
} }
// Add logic for checking a missed Slot
// IF the state is present but block is not, then it was skipped???
// If the state and block are both absent, then the block might be missing??
// IF state is absent but block is not, there might be an issue with the LH client.
// Check the previous and following slot?
// Check if head or historic.
// 1. BeaconBlock is 404.
// 2. check check /lighthouse/database/info to make sure the oldest_block_slot == 0 and anchor == null. This indicates that I don't have any gaps in the DB.
// 3. I query BeaconState for slot X, and get a BeaconState.
// 4. Although for good measure you should also check that the head is at a slot >= X using something like /eth/v1/node/syncing/ or /eth/v1/beacon/headers/head
func (ps *ProcessSlot) checkMissedSlot() {
}
// Transforms all the raw data into DB models that can be written to the DB. // Transforms all the raw data into DB models that can be written to the DB.
func (ps *ProcessSlot) createWriteObjects() *DatabaseWriter { func (ps *ProcessSlot) createWriteObjects(blockRootEndpoint string) (*DatabaseWriter, error) {
var ( var (
stateRoot string stateRoot string
blockRoot string blockRoot string
status string status string
eth1BlockHash string
) )
if ps.StateRoot != "" { if ps.Status == "skipped" {
stateRoot = ps.StateRoot stateRoot = ""
blockRoot = ""
eth1BlockHash = ""
} else { } else {
stateRoot = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.StateRoot) if ps.StateRoot != "" {
log.Debug("StateRoot: ", stateRoot) stateRoot = ps.StateRoot
} } else {
stateRoot = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.StateRoot)
log.Debug("StateRoot: ", stateRoot)
}
// MUST RESOLVE! if ps.BlockRoot != "" {
if ps.BlockRoot != "" { blockRoot = ps.BlockRoot
blockRoot = ps.BlockRoot } else {
} else { var err error
log.Info("We need to add logic") blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot))
// We need to get the state of Slot + 1, then we can run the below. if err != nil {
// WE can query it for each run, or we can leave it blank, and update it. return nil, err
// I just want to avoid getting the same state twice, especially since the state can get heavy. }
// blockRoot = "0x" + hex.EncodeToString(ps.FullBeaconState.GetBlockRoots()[ps.Slot%bcSlotPerHistoricalVector][:]) }
// log.Debug("Block Root: ", blockRoot) eth1BlockHash = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.Body.Eth1Data.BlockHash)
// log.Debug("ps.Slott: ", ps.Slot)
// Maybe we can use the helper down the road.
//blockRootRaw, _ := helper.BlockRootAtSlot(ps.FullBeaconState, ps.FullSignedBeaconBlock.Block.Slot)
//blockRoot = string(blockRootRaw)
} }
if ps.Status != "" { if ps.Status != "" {
@ -265,11 +259,9 @@ func (ps *ProcessSlot) createWriteObjects() *DatabaseWriter {
status = "proposed" status = "proposed"
} }
eth1BlockHash := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.Body.Eth1Data.BlockHash)
dw := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.Metrics) dw := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.Metrics)
dw.rawSignedBeaconBlock = ps.SszSignedBeaconBlock dw.rawSignedBeaconBlock = ps.SszSignedBeaconBlock
dw.rawBeaconState = ps.SszBeaconState dw.rawBeaconState = ps.SszBeaconState
return dw return dw, nil
} }

View File

@ -3,6 +3,7 @@
package beaconclient package beaconclient
import ( import (
"encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -35,3 +36,46 @@ func querySsz(endpoint string, slot string) ([]byte, int, error) {
} }
return body, rc, nil return body, rc, nil
} }
// Object to unmarshal the BlockRootResponse
type BlockRootResponse struct {
Data BlockRootMessage `json:"data"`
}
// Object to unmarshal the BlockRoot Message
type BlockRootMessage struct {
Root string `json:"root"`
}
// A function to query the blockroot for a given slot.
func queryBlockRoot(endpoint string, slot string) (string, error) {
log.WithFields(log.Fields{"endpoint": endpoint}).Debug("Querying endpoint")
client := &http.Client{}
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to create a request!")
return "", fmt.Errorf("Unable to create a request!: %s", err.Error())
}
req.Header.Set("Accept", "application/json")
response, err := client.Do(req)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to query Beacon Node!")
return "", fmt.Errorf("Unable to query Beacon Node: %s", err.Error())
}
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!")
return "", fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error())
}
resp := BlockRootResponse{}
if err := json.Unmarshal(body, &resp); err != nil {
loghelper.LogEndpoint(endpoint).WithFields(log.Fields{
"rawMessage": string(body),
"err": err,
}).Error("Unable to unmarshal the block root")
return "", err
}
return resp.Data.Root, nil
}