Handle Skipped Slots #34
3
.github/workflows/on-pr.yml
vendored
3
.github/workflows/on-pr.yml
vendored
@ -25,7 +25,7 @@ on:
|
||||
- "**"
|
||||
|
||||
env:
|
||||
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || '21d076268730e3f25fcec6371c1aca1bf48040d8'}}
|
||||
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'feature/client-build'}}
|
||||
ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || 'minimal-beacon-chain-schema' }}
|
||||
ssz-data-ref: ${{ github.event.inputs.ssz-data-ref || 'main' }}
|
||||
GOPATH: /tmp/go
|
||||
@ -57,6 +57,7 @@ jobs:
|
||||
echo vulcanize_ipld_eth_db=$GITHUB_WORKSPACE/ipld-eth-db/ > ./config.sh
|
||||
echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh
|
||||
echo ethcl_capture_mode=boot >> ./config.sh
|
||||
echo ethcl_skip_sync=true >> ./config.sh
|
||||
cat ./config.sh
|
||||
|
||||
- name: Run docker compose
|
||||
|
@ -37,7 +37,7 @@ To run the application, do as follows:
|
||||
2. Run the start up command.
|
||||
|
||||
```
|
||||
go run main.go capture head --db.address localhost \
|
||||
go run -race main.go capture head --db.address localhost \
|
||||
--db.password password \
|
||||
--db.port 8077 \
|
||||
--db.username vdbm \
|
||||
@ -46,8 +46,10 @@ go run main.go capture head --db.address localhost \
|
||||
--bc.address localhost \
|
||||
--bc.port 5052 \
|
||||
--bc.connectionProtocol http \
|
||||
--t.skipSync=true \
|
||||
--log.level info \
|
||||
--log.output=true
|
||||
--log.output=true \
|
||||
--kg.increment 100
|
||||
```
|
||||
|
||||
## Running Tests
|
||||
|
@ -44,7 +44,7 @@ func bootApp() {
|
||||
log.Info("Starting the application in boot mode.")
|
||||
ctx := context.Background()
|
||||
|
||||
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
|
||||
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, testDisregardSync)
|
||||
if err != nil {
|
||||
loghelper.LogError(err).Error("Unable to Start application")
|
||||
}
|
||||
|
@ -22,8 +22,10 @@ var (
|
||||
bcAddress string
|
||||
bcPort int
|
||||
bcConnectionProtocol string
|
||||
bcType string
|
||||
maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second
|
||||
notifierCh chan os.Signal = make(chan os.Signal, 1)
|
||||
testDisregardSync bool
|
||||
)
|
||||
|
||||
// captureCmd represents the capture command
|
||||
@ -62,14 +64,18 @@ func init() {
|
||||
exitErr(err)
|
||||
|
||||
//// Beacon Client Specific
|
||||
captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required if username is set)")
|
||||
captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required if username is set)")
|
||||
captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required)")
|
||||
captureCmd.PersistentFlags().StringVarP(&bcType, "bc.type", "", "lighthouse", "The beacon client we are using, options are prysm and lighthouse.")
|
||||
captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required )")
|
||||
captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.")
|
||||
err = captureCmd.MarkPersistentFlagRequired("bc.address")
|
||||
exitErr(err)
|
||||
err = captureCmd.MarkPersistentFlagRequired("bc.port")
|
||||
exitErr(err)
|
||||
|
||||
//// Testing Specific
|
||||
captureCmd.PersistentFlags().BoolVar(&testDisregardSync, "t.skipSync", false, "Should we disregard the head sync?")
|
||||
|
||||
// Bind Flags with Viper
|
||||
//// DB Flags
|
||||
err = viper.BindPFlag("db.username", captureCmd.PersistentFlags().Lookup("db.username"))
|
||||
@ -82,12 +88,18 @@ func init() {
|
||||
exitErr(err)
|
||||
err = viper.BindPFlag("db.name", captureCmd.PersistentFlags().Lookup("db.name"))
|
||||
exitErr(err)
|
||||
err = viper.BindPFlag("db.driver", captureCmd.PersistentFlags().Lookup("db.driver"))
|
||||
err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync"))
|
||||
exitErr(err)
|
||||
|
||||
// Testing Specific
|
||||
err = viper.BindPFlag("t.driver", captureCmd.PersistentFlags().Lookup("db.driver"))
|
||||
exitErr(err)
|
||||
|
||||
// LH specific
|
||||
err = viper.BindPFlag("bc.address", captureCmd.PersistentFlags().Lookup("bc.address"))
|
||||
exitErr(err)
|
||||
err = viper.BindPFlag("bc.type", captureCmd.PersistentFlags().Lookup("bc.type"))
|
||||
exitErr(err)
|
||||
err = viper.BindPFlag("bc.port", captureCmd.PersistentFlags().Lookup("bc.port"))
|
||||
exitErr(err)
|
||||
err = viper.BindPFlag("bc.connectionProtocol", captureCmd.PersistentFlags().Lookup("bc.connectionProtocol"))
|
||||
|
@ -6,6 +6,7 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
@ -35,11 +36,16 @@ func startHeadTracking() {
|
||||
log.Info("Starting the application in head tracking mode.")
|
||||
ctx := context.Background()
|
||||
|
||||
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
|
||||
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, testDisregardSync)
|
||||
if err != nil {
|
||||
loghelper.LogError(err).Error("Unable to Start application")
|
||||
if DB != nil {
|
||||
DB.Close()
|
||||
}
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
log.Info("The Beacon Client has booted successfully!")
|
||||
// Capture head blocks
|
||||
go BC.CaptureHead(kgTableIncrement)
|
||||
|
||||
|
@ -11,7 +11,8 @@ echo /root/ipld-ethcl-indexer capture ${CAPTURE_MODE} --db.address $DB_ADDRESS \
|
||||
--db.driver $DB_DRIVER \
|
||||
--bc.address $BC_ADDRESS \
|
||||
--bc.port $BC_PORT \
|
||||
--log.level $LOG_LEVEL
|
||||
--log.level $LOG_LEVEL\
|
||||
--t.skipSync=$SKIP_SYNC
|
||||
|
||||
/root/ipld-ethcl-indexer capture ${CAPTURE_MODE} --db.address $DB_ADDRESS \
|
||||
--db.password $DB_PASSWORD \
|
||||
@ -21,7 +22,8 @@ echo /root/ipld-ethcl-indexer capture ${CAPTURE_MODE} --db.address $DB_ADDRESS \
|
||||
--db.driver $DB_DRIVER \
|
||||
--bc.address $BC_ADDRESS \
|
||||
--bc.port $BC_PORT \
|
||||
--log.level $LOG_LEVEL
|
||||
--log.level $LOG_LEVEL \
|
||||
--t.skipSync=$SKIP_SYNC
|
||||
|
||||
rv=$?
|
||||
|
||||
|
@ -2,6 +2,7 @@ package boot
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
@ -25,7 +26,8 @@ var (
|
||||
//
|
||||
// 2. Connect to the database.
|
||||
//
|
||||
func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string) (*beaconclient.BeaconClient, sql.Database, error) {
|
||||
// 3. Make sure the node is synced, unless disregardSync is true.
|
||||
func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
|
||||
log.Info("Booting the Application")
|
||||
|
||||
log.Debug("Creating the Beacon Client")
|
||||
@ -38,23 +40,40 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName
|
||||
}
|
||||
|
||||
log.Debug("Setting up DB connection")
|
||||
DB, err := postgres.SetupPostgresDb(dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName)
|
||||
DB, err = postgres.SetupPostgresDb(dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
BC.Db = DB
|
||||
|
||||
var status bool
|
||||
if !disregardSync {
|
||||
status, err = BC.CheckHeadSync()
|
||||
if err != nil {
|
||||
log.Error("Unable to get the nodes sync status")
|
||||
return BC, DB, err
|
||||
}
|
||||
if status {
|
||||
log.Error("The node is still syncing..")
|
||||
err = fmt.Errorf("The node is still syncing.")
|
||||
return BC, DB, err
|
||||
}
|
||||
} else {
|
||||
log.Warn("We are not checking to see if the node has synced to head.")
|
||||
}
|
||||
return BC, DB, nil
|
||||
}
|
||||
|
||||
// Add retry logic to ensure that we are give the Beacon Client and the DB time to start.
|
||||
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string) (*beaconclient.BeaconClient, sql.Database, error) {
|
||||
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
|
||||
var err error
|
||||
for i := 0; i < maxRetry; i++ {
|
||||
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol)
|
||||
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol, disregardSync)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"retryNumber": i,
|
||||
"err": err,
|
||||
}).Warn("Unable to boot application. Going to try again")
|
||||
time.Sleep(time.Duration(retryInterval) * time.Second)
|
||||
continue
|
||||
|
@ -21,29 +21,36 @@ var _ = Describe("Boot", func() {
|
||||
bcConnectionProtocol string = "http"
|
||||
)
|
||||
Describe("Booting the application", Label("integration"), func() {
|
||||
Context("When the DB and BC are both up and running", func() {
|
||||
Context("When the DB and BC are both up and running, and we skip checking for a synced head", func() {
|
||||
It("Should connect successfully", func() {
|
||||
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
|
||||
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true)
|
||||
defer db.Close()
|
||||
Expect(err).To(BeNil())
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
})
|
||||
})
|
||||
Context("When the DB and BC are both up and running, and we check for a synced head", func() {
|
||||
It("Should not connect successfully", func() {
|
||||
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, false)
|
||||
defer db.Close()
|
||||
Expect(err).To(HaveOccurred())
|
||||
})
|
||||
})
|
||||
Context("When the DB is running but not the BC", func() {
|
||||
It("Should not connect successfully", func() {
|
||||
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol)
|
||||
Expect(err).ToNot(BeNil())
|
||||
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true)
|
||||
Expect(err).To(HaveOccurred())
|
||||
})
|
||||
})
|
||||
Context("When the BC is running but not the DB", func() {
|
||||
It("Should not connect successfully", func() {
|
||||
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
|
||||
Expect(err).ToNot(BeNil())
|
||||
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true)
|
||||
Expect(err).To(HaveOccurred())
|
||||
})
|
||||
})
|
||||
Context("When neither the BC or DB are running", func() {
|
||||
It("Should not connect successfully", func() {
|
||||
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol)
|
||||
Expect(err).ToNot(BeNil())
|
||||
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true)
|
||||
Expect(err).To(HaveOccurred())
|
||||
})
|
||||
})
|
||||
})
|
||||
|
@ -40,7 +40,7 @@ var _ = Describe("Shutdown", func() {
|
||||
)
|
||||
BeforeEach(func() {
|
||||
ctx = context.Background()
|
||||
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
|
||||
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true)
|
||||
notifierCh = make(chan os.Signal, 1)
|
||||
Expect(err).To(BeNil())
|
||||
})
|
||||
|
@ -16,6 +16,10 @@ var (
|
||||
bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain
|
||||
BcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks
|
||||
BcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States
|
||||
BcSyncStatusEndpoint = "/eth/v1/node/syncing"
|
||||
BcBlockRootEndpoint = func(slot string) string {
|
||||
return "/eth/v1/beacon/blocks/" + slot + "/root"
|
||||
}
|
||||
bcSlotsPerEpoch = 32 // Number of slots in a single Epoch
|
||||
//bcSlotPerHistoricalVector = 8192 // The number of slots in a historic vector.
|
||||
//bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
|
||||
|
57
pkg/beaconclient/checksyncstatus.go
Normal file
57
pkg/beaconclient/checksyncstatus.go
Normal file
@ -0,0 +1,57 @@
|
||||
package beaconclient
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
|
||||
)
|
||||
|
||||
// The sync response
|
||||
type Sync struct {
|
||||
Data SyncData `json:"data"`
|
||||
}
|
||||
|
||||
// The sync data
|
||||
type SyncData struct {
|
||||
IsSync bool `json:"is_syncing"`
|
||||
HeadSlot string `json:"head_slot"`
|
||||
SyncDistance string `json:"sync_distance"`
|
||||
}
|
||||
|
||||
// This function will check to see if we are synced up with the head of chain.
|
||||
//{"data":{"is_syncing":true,"head_slot":"62528","sync_distance":"3734299"}}
|
||||
func (bc BeaconClient) CheckHeadSync() (bool, error) {
|
||||
bcSync := bc.ServerEndpoint + BcSyncStatusEndpoint
|
||||
resp, err := http.Get(bcSync)
|
||||
|
||||
if err != nil {
|
||||
loghelper.LogEndpoint(bcSync).Error("Unable to check the sync status")
|
||||
return true, err
|
||||
}
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode > 299 {
|
||||
loghelper.LogEndpoint(bcSync).WithFields(log.Fields{"returnCode": resp.StatusCode}).Error("Error when getting the sync status")
|
||||
return true, fmt.Errorf("Querying the sync status returned a non 2xx status code, code provided: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
|
||||
var syncStatus Sync
|
||||
if err := json.Unmarshal(body, &syncStatus); err != nil {
|
||||
loghelper.LogEndpoint(bcSync).WithFields(log.Fields{
|
||||
"rawMessage": string(body),
|
||||
"err": err,
|
||||
}).Error("Unable to unmarshal sync status")
|
||||
return true, err
|
||||
}
|
||||
|
||||
return syncStatus.Data.IsSync, nil
|
||||
}
|
@ -22,8 +22,8 @@ func (bc BeaconClient) CheckBeaconClient() error {
|
||||
}
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode > 299 {
|
||||
log.Error("We recieved a non 2xx status code when checking the health of the beacon node.")
|
||||
log.Error("Health Endpoint Status Code: ", resp.StatusCode)
|
||||
loghelper.LogEndpoint(bcEndpoint).Error("We recieved a non 2xx status code when checking the health of the beacon node.")
|
||||
loghelper.LogEndpoint(bcEndpoint).Error("Health Endpoint Status Code: ", resp.StatusCode)
|
||||
return fmt.Errorf("beacon Node Provided a non 2xx status code, code provided: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
|
@ -42,7 +42,7 @@ type DbSlots struct {
|
||||
Slot string // The slot.
|
||||
BlockRoot string // The block root
|
||||
StateRoot string // The state root
|
||||
Status string // The status, it can be proposed | forked | missed.
|
||||
Status string // The status, it can be proposed | forked | skipped.
|
||||
}
|
||||
|
||||
// A struct to capture whats being written to ethcl.signed_beacon_block table.
|
||||
|
@ -37,7 +37,7 @@ type ProcessSlot struct {
|
||||
StateRoot string // The hex encoded string of the StateRoot.
|
||||
ParentBlockRoot string // The hex encoded string of the parent block.
|
||||
Status string // The status of the block
|
||||
HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and missed slots.
|
||||
HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and skipped slots.
|
||||
Db sql.Database // The DB object used to write to the DB.
|
||||
Metrics *BeaconClientMetrics // An object to keep track of the beaconclient metrics
|
||||
// BeaconBlock
|
||||
@ -66,26 +66,31 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
|
||||
Metrics: metrics,
|
||||
}
|
||||
|
||||
// Get the SignedBeaconBlock.
|
||||
err := ps.getSignedBeaconBlock(serverAddress)
|
||||
// Get the BeaconState.
|
||||
err := ps.getBeaconState(serverAddress)
|
||||
if err != nil {
|
||||
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
|
||||
return err
|
||||
}
|
||||
|
||||
// Get the BeaconState.
|
||||
err = ps.getBeaconState(serverAddress)
|
||||
// Get the SignedBeaconBlock.
|
||||
err = ps.getSignedBeaconBlock(serverAddress)
|
||||
if err != nil {
|
||||
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
|
||||
return err
|
||||
}
|
||||
|
||||
if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" {
|
||||
writeStartUpGaps(db, knownGapsTableIncrement, ps.Slot)
|
||||
}
|
||||
|
||||
// Get this object ready to write
|
||||
dw := ps.createWriteObjects()
|
||||
|
||||
blockRootEndpoint := serverAddress + BcBlockRootEndpoint(strconv.Itoa(ps.Slot))
|
||||
dw, err := ps.createWriteObjects(blockRootEndpoint)
|
||||
if err != nil {
|
||||
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "blockRoot")
|
||||
return err
|
||||
}
|
||||
// Write the object to the DB.
|
||||
err = dw.writeFullSlot()
|
||||
if err != nil {
|
||||
@ -97,7 +102,7 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
|
||||
if headOrHistoric != "head" && headOrHistoric != "historic" {
|
||||
return fmt.Errorf("headOrHistoric must be either historic or head!")
|
||||
}
|
||||
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" {
|
||||
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" {
|
||||
ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement)
|
||||
}
|
||||
return nil
|
||||
@ -135,7 +140,11 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error {
|
||||
}
|
||||
|
||||
if rc != 200 {
|
||||
ps.checkMissedSlot()
|
||||
ps.FullSignedBeaconBlock = &st.SignedBeaconBlock{}
|
||||
ps.SszSignedBeaconBlock = []byte{}
|
||||
ps.ParentBlockRoot = ""
|
||||
ps.Status = "skipped"
|
||||
return nil
|
||||
}
|
||||
|
||||
ps.FullSignedBeaconBlock = &st.SignedBeaconBlock{}
|
||||
@ -199,8 +208,6 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
|
||||
"currentSlot": ps.FullBeaconState.Slot,
|
||||
}).Error("We skipped a few slots.")
|
||||
writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot)-1, fmt.Errorf("Gaps during head processing"), "headGaps")
|
||||
// Check to see if the slot was skipped.
|
||||
// Call our known_gaps function.
|
||||
} else if previousBlockRoot != parentRoot {
|
||||
log.WithFields(log.Fields{
|
||||
"previousBlockRoot": previousBlockRoot,
|
||||
@ -213,29 +220,20 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
|
||||
}
|
||||
}
|
||||
|
||||
// Add logic for checking a missed Slot
|
||||
// IF the state is present but block is not, then it was skipped???
|
||||
// If the state and block are both absent, then the block might be missing??
|
||||
// IF state is absent but block is not, there might be an issue with the LH client.
|
||||
// Check the previous and following slot?
|
||||
// Check if head or historic.
|
||||
|
||||
// 1. BeaconBlock is 404.
|
||||
// 2. check check /lighthouse/database/info to make sure the oldest_block_slot == 0 and anchor == null. This indicates that I don't have any gaps in the DB.
|
||||
// 3. I query BeaconState for slot X, and get a BeaconState.
|
||||
// 4. Although for good measure you should also check that the head is at a slot >= X using something like /eth/v1/node/syncing/ or /eth/v1/beacon/headers/head
|
||||
func (ps *ProcessSlot) checkMissedSlot() {
|
||||
|
||||
}
|
||||
|
||||
// Transforms all the raw data into DB models that can be written to the DB.
|
||||
func (ps *ProcessSlot) createWriteObjects() *DatabaseWriter {
|
||||
func (ps *ProcessSlot) createWriteObjects(blockRootEndpoint string) (*DatabaseWriter, error) {
|
||||
var (
|
||||
stateRoot string
|
||||
blockRoot string
|
||||
status string
|
||||
eth1BlockHash string
|
||||
)
|
||||
|
||||
if ps.Status == "skipped" {
|
||||
stateRoot = ""
|
||||
blockRoot = ""
|
||||
eth1BlockHash = ""
|
||||
} else {
|
||||
if ps.StateRoot != "" {
|
||||
stateRoot = ps.StateRoot
|
||||
} else {
|
||||
@ -243,20 +241,16 @@ func (ps *ProcessSlot) createWriteObjects() *DatabaseWriter {
|
||||
log.Debug("StateRoot: ", stateRoot)
|
||||
}
|
||||
|
||||
// MUST RESOLVE!
|
||||
if ps.BlockRoot != "" {
|
||||
blockRoot = ps.BlockRoot
|
||||
} else {
|
||||
log.Info("We need to add logic")
|
||||
// We need to get the state of Slot + 1, then we can run the below.
|
||||
// WE can query it for each run, or we can leave it blank, and update it.
|
||||
// I just want to avoid getting the same state twice, especially since the state can get heavy.
|
||||
// blockRoot = "0x" + hex.EncodeToString(ps.FullBeaconState.GetBlockRoots()[ps.Slot%bcSlotPerHistoricalVector][:])
|
||||
// log.Debug("Block Root: ", blockRoot)
|
||||
// log.Debug("ps.Slott: ", ps.Slot)
|
||||
// Maybe we can use the helper down the road.
|
||||
//blockRootRaw, _ := helper.BlockRootAtSlot(ps.FullBeaconState, ps.FullSignedBeaconBlock.Block.Slot)
|
||||
//blockRoot = string(blockRootRaw)
|
||||
var err error
|
||||
blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
eth1BlockHash = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.Body.Eth1Data.BlockHash)
|
||||
}
|
||||
|
||||
if ps.Status != "" {
|
||||
@ -265,11 +259,9 @@ func (ps *ProcessSlot) createWriteObjects() *DatabaseWriter {
|
||||
status = "proposed"
|
||||
}
|
||||
|
||||
eth1BlockHash := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.Body.Eth1Data.BlockHash)
|
||||
|
||||
dw := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.Metrics)
|
||||
dw.rawSignedBeaconBlock = ps.SszSignedBeaconBlock
|
||||
dw.rawBeaconState = ps.SszBeaconState
|
||||
|
||||
return dw
|
||||
return dw, nil
|
||||
}
|
||||
|
@ -3,6 +3,7 @@
|
||||
package beaconclient
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
@ -35,3 +36,46 @@ func querySsz(endpoint string, slot string) ([]byte, int, error) {
|
||||
}
|
||||
return body, rc, nil
|
||||
}
|
||||
|
||||
// Object to unmarshal the BlockRootResponse
|
||||
type BlockRootResponse struct {
|
||||
Data BlockRootMessage `json:"data"`
|
||||
}
|
||||
|
||||
// Object to unmarshal the BlockRoot Message
|
||||
type BlockRootMessage struct {
|
||||
Root string `json:"root"`
|
||||
}
|
||||
|
||||
// A function to query the blockroot for a given slot.
|
||||
func queryBlockRoot(endpoint string, slot string) (string, error) {
|
||||
log.WithFields(log.Fields{"endpoint": endpoint}).Debug("Querying endpoint")
|
||||
client := &http.Client{}
|
||||
req, err := http.NewRequest("GET", endpoint, nil)
|
||||
if err != nil {
|
||||
loghelper.LogSlotError(slot, err).Error("Unable to create a request!")
|
||||
return "", fmt.Errorf("Unable to create a request!: %s", err.Error())
|
||||
}
|
||||
req.Header.Set("Accept", "application/json")
|
||||
response, err := client.Do(req)
|
||||
if err != nil {
|
||||
loghelper.LogSlotError(slot, err).Error("Unable to query Beacon Node!")
|
||||
return "", fmt.Errorf("Unable to query Beacon Node: %s", err.Error())
|
||||
}
|
||||
defer response.Body.Close()
|
||||
body, err := ioutil.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!")
|
||||
return "", fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error())
|
||||
}
|
||||
|
||||
resp := BlockRootResponse{}
|
||||
if err := json.Unmarshal(body, &resp); err != nil {
|
||||
loghelper.LogEndpoint(endpoint).WithFields(log.Fields{
|
||||
"rawMessage": string(body),
|
||||
"err": err,
|
||||
}).Error("Unable to unmarshal the block root")
|
||||
return "", err
|
||||
}
|
||||
return resp.Data.Root, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user