Add KnownGaps table and corresponding entries #33
@ -9,7 +9,7 @@ COPY go.sum .
|
|||||||
RUN go mod tidy; go mod download
|
RUN go mod tidy; go mod download
|
||||||
COPY . .
|
COPY . .
|
||||||
|
|
||||||
RUN GCO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-extldflags "-static"' -o ipld-ethcl-indexer .
|
RUN GCO_ENABLED=0 GOOS=linux go build -race -a -installsuffix cgo -ldflags '-extldflags "-static"' -o ipld-ethcl-indexer .
|
||||||
RUN chmod +x ipld-ethcl-indexer
|
RUN chmod +x ipld-ethcl-indexer
|
||||||
|
|
||||||
FROM frolvlad/alpine-bash:latest
|
FROM frolvlad/alpine-bash:latest
|
||||||
|
@ -22,6 +22,21 @@ The `database` package allows us to interact with a postgres DB. We utilize the
|
|||||||
|
|
||||||
This package will contain code to interact with the beacon client.
|
This package will contain code to interact with the beacon client.
|
||||||
|
|
||||||
|
### Known Gaps
|
||||||
|
|
||||||
|
Known Gaps tracking is handled within this package. The columns are as follows:
|
||||||
|
|
||||||
|
- StartSlot - The start slot for known_gaps, inclusive.
|
||||||
|
- EndSlot - The end slot for known_gaps, inclusive.
|
||||||
|
- CheckedOut - Indicates if any process is currently processing this entry.
|
||||||
|
- ErrorMessage - Captures any error message that might have occurred when previously processing this entry.
|
||||||
|
- EntryTime - The time this range was added to the DB. This can help us catch ranges that have not been processed for a long time due to some error.
|
||||||
|
- EntryProcess - The entry process that added this process. Potential options are StartUp, Error, HeadGap.
|
||||||
|
- This can help us understand how a specific entry was added. It can be useful for debugging the application.
|
||||||
|
- StartUp - Gaps found when we started the application.
|
||||||
|
- Error - Indicates that the entry was added due to an error with processing.
|
||||||
|
- HeadGap - Indicates that gaps where found when keeping up with Head.
|
||||||
|
|
||||||
## `pkg/version`
|
## `pkg/version`
|
||||||
|
|
||||||
A generic package which can be utilized to easily version our applications.
|
A generic package which can be utilized to easily version our applications.
|
||||||
|
@ -22,7 +22,8 @@ var (
|
|||||||
bcAddress string
|
bcAddress string
|
||||||
bcPort int
|
bcPort int
|
||||||
bcConnectionProtocol string
|
bcConnectionProtocol string
|
||||||
maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second
|
maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second
|
||||||
|
notifierCh chan os.Signal = make(chan os.Signal, 1)
|
||||||
)
|
)
|
||||||
|
|
||||||
// captureCmd represents the capture command
|
// captureCmd represents the capture command
|
||||||
|
22
cmd/head.go
22
cmd/head.go
@ -6,15 +6,19 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"os"
|
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
"github.com/spf13/viper"
|
||||||
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
|
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
|
||||||
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
|
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
|
||||||
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
kgTableIncrement int
|
||||||
|
)
|
||||||
|
|
||||||
// headCmd represents the head command
|
// headCmd represents the head command
|
||||||
var headCmd = &cobra.Command{
|
var headCmd = &cobra.Command{
|
||||||
Use: "head",
|
Use: "head",
|
||||||
@ -37,10 +41,9 @@ func startHeadTracking() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Capture head blocks
|
// Capture head blocks
|
||||||
go BC.CaptureHead()
|
go BC.CaptureHead(kgTableIncrement)
|
||||||
|
|
||||||
// Shutdown when the time is right.
|
// Shutdown when the time is right.
|
||||||
notifierCh := make(chan os.Signal, 1)
|
|
||||||
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC)
|
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
|
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
|
||||||
@ -53,13 +56,8 @@ func startHeadTracking() {
|
|||||||
func init() {
|
func init() {
|
||||||
captureCmd.AddCommand(headCmd)
|
captureCmd.AddCommand(headCmd)
|
||||||
|
|
||||||
// Here you will define your flags and configuration settings.
|
// Known Gaps specific
|
||||||
|
captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.")
|
||||||
// Cobra supports Persistent Flags which will work for this command
|
err := viper.BindPFlag("kg.increment", captureCmd.PersistentFlags().Lookup("kg.increment"))
|
||||||
// and all subcommands, e.g.:
|
exitErr(err)
|
||||||
// headCmd.PersistentFlags().String("foo", "", "A help for foo")
|
|
||||||
|
|
||||||
// Cobra supports local flags which will only run when this command
|
|
||||||
// is called directly, e.g.:
|
|
||||||
// headCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
|
|
||||||
}
|
}
|
||||||
|
@ -34,6 +34,7 @@ type BeaconClient struct {
|
|||||||
PerformHistoricalProcessing bool // Should we perform historical processing?
|
PerformHistoricalProcessing bool // Should we perform historical processing?
|
||||||
Db sql.Database // Database object used for reads and writes.
|
Db sql.Database // Database object used for reads and writes.
|
||||||
Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics.
|
Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics.
|
||||||
|
KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
|
||||||
|
|
||||||
// Used for Head Tracking
|
// Used for Head Tracking
|
||||||
PerformHeadTracking bool // Should we track head?
|
PerformHeadTracking bool // Should we track head?
|
||||||
|
@ -10,7 +10,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// This function will perform all the heavy lifting for tracking the head of the chain.
|
// This function will perform all the heavy lifting for tracking the head of the chain.
|
||||||
func (bc *BeaconClient) CaptureHead() {
|
func (bc *BeaconClient) CaptureHead(knownGapsTableIncrement int) {
|
||||||
|
bc.KnownGapTableIncrement = knownGapsTableIncrement
|
||||||
log.Info("We are tracking the head of the chain.")
|
log.Info("We are tracking the head of the chain.")
|
||||||
//bc.tempHelper()
|
//bc.tempHelper()
|
||||||
go bc.handleHead()
|
go bc.handleHead()
|
||||||
|
@ -43,19 +43,20 @@ type MimicConfig struct {
|
|||||||
var _ = Describe("Capturehead", func() {
|
var _ = Describe("Capturehead", func() {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
TestConfig Config
|
TestConfig Config
|
||||||
BeaconNodeTester TestBeaconNode
|
BeaconNodeTester TestBeaconNode
|
||||||
address string = "localhost"
|
address string = "localhost"
|
||||||
port int = 8080
|
port int = 8080
|
||||||
protocol string = "http"
|
protocol string = "http"
|
||||||
TestEvents map[string]Message
|
TestEvents map[string]Message
|
||||||
dbHost string = "localhost"
|
dbHost string = "localhost"
|
||||||
dbPort int = 8077
|
dbPort int = 8077
|
||||||
dbName string = "vulcanize_testing"
|
dbName string = "vulcanize_testing"
|
||||||
dbUser string = "vdbm"
|
dbUser string = "vdbm"
|
||||||
dbPassword string = "password"
|
dbPassword string = "password"
|
||||||
dbDriver string = "pgx"
|
dbDriver string = "pgx"
|
||||||
dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42"
|
dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42"
|
||||||
|
knownGapsTableIncrement int = 10000
|
||||||
)
|
)
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
@ -146,16 +147,17 @@ var _ = Describe("Capturehead", func() {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
TestConfig = Config{
|
TestConfig = Config{
|
||||||
protocol: protocol,
|
protocol: protocol,
|
||||||
address: address,
|
address: address,
|
||||||
port: port,
|
port: port,
|
||||||
dummyParentRoot: dummyParentRoot,
|
dummyParentRoot: dummyParentRoot,
|
||||||
dbHost: dbHost,
|
dbHost: dbHost,
|
||||||
dbPort: dbPort,
|
dbPort: dbPort,
|
||||||
dbName: dbName,
|
dbName: dbName,
|
||||||
dbUser: dbUser,
|
dbUser: dbUser,
|
||||||
dbPassword: dbPassword,
|
dbPassword: dbPassword,
|
||||||
dbDriver: dbDriver,
|
dbDriver: dbDriver,
|
||||||
|
knownGapsTableIncrement: knownGapsTableIncrement,
|
||||||
}
|
}
|
||||||
|
|
||||||
BeaconNodeTester = TestBeaconNode{
|
BeaconNodeTester = TestBeaconNode{
|
||||||
@ -234,16 +236,17 @@ var _ = Describe("Capturehead", func() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
protocol string
|
protocol string
|
||||||
address string
|
address string
|
||||||
port int
|
port int
|
||||||
dummyParentRoot string
|
dummyParentRoot string
|
||||||
dbHost string
|
dbHost string
|
||||||
dbPort int
|
dbPort int
|
||||||
dbName string
|
dbName string
|
||||||
dbUser string
|
dbUser string
|
||||||
dbPassword string
|
dbPassword string
|
||||||
dbDriver string
|
dbDriver string
|
||||||
|
knownGapsTableIncrement int
|
||||||
}
|
}
|
||||||
|
|
||||||
//////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////
|
||||||
@ -307,7 +310,7 @@ func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot strin
|
|||||||
|
|
||||||
// A function that will remove all entries from the ethcl tables for you.
|
// A function that will remove all entries from the ethcl tables for you.
|
||||||
func clearEthclDbTables(db sql.Database) {
|
func clearEthclDbTables(db sql.Database) {
|
||||||
deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.batch_processing;"}
|
deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.known_gaps;"}
|
||||||
for _, queries := range deleteQueries {
|
for _, queries := range deleteQueries {
|
||||||
_, err := db.Exec(context.Background(), queries)
|
_, err := db.Exec(context.Background(), queries)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
@ -436,7 +439,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(firstHead beaconclient.Head, second
|
|||||||
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
|
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
|
||||||
defer httpmock.DeactivateAndReset()
|
defer httpmock.DeactivateAndReset()
|
||||||
|
|
||||||
go bc.CaptureHead()
|
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
log.Info("Sending Phase0 Messages to BeaconClient")
|
log.Info("Sending Phase0 Messages to BeaconClient")
|
||||||
@ -488,7 +491,7 @@ func (tbc TestBeaconNode) testProcessBlock(head beaconclient.Head, epoch int) {
|
|||||||
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
|
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
|
||||||
defer httpmock.DeactivateAndReset()
|
defer httpmock.DeactivateAndReset()
|
||||||
|
|
||||||
go bc.CaptureHead()
|
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
sendHeadMessage(bc, head)
|
sendHeadMessage(bc, head)
|
||||||
validateSlot(bc, &head, epoch, "proposed")
|
validateSlot(bc, &head, epoch, "proposed")
|
||||||
@ -501,7 +504,7 @@ func (tbc TestBeaconNode) testMultipleHead(firstHead beaconclient.Head, secondHe
|
|||||||
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
|
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
|
||||||
defer httpmock.DeactivateAndReset()
|
defer httpmock.DeactivateAndReset()
|
||||||
|
|
||||||
go bc.CaptureHead()
|
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
sendHeadMessage(bc, firstHead)
|
sendHeadMessage(bc, firstHead)
|
||||||
|
@ -19,8 +19,8 @@ INSERT INTO ethcl.slots (epoch, slot, block_root, state_root, status)
|
|||||||
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (slot, block_root) DO NOTHING`
|
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (slot, block_root) DO NOTHING`
|
||||||
// Statement to upsert to the ethcl.signed_beacon_blocks table.
|
// Statement to upsert to the ethcl.signed_beacon_blocks table.
|
||||||
UpsertSignedBeaconBlockStmt string = `
|
UpsertSignedBeaconBlockStmt string = `
|
||||||
INSERT INTO ethcl.signed_beacon_block (slot, block_root, parent_block_root, mh_key)
|
INSERT INTO ethcl.signed_beacon_block (slot, block_root, parent_block_root, eth1_block_hash, mh_key)
|
||||||
VALUES ($1, $2, $3, $4) ON CONFLICT (slot, block_root) DO NOTHING`
|
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (slot, block_root) DO NOTHING`
|
||||||
// Statement to upsert to the ethcl.beacon_state table.
|
// Statement to upsert to the ethcl.beacon_state table.
|
||||||
UpsertBeaconState string = `
|
UpsertBeaconState string = `
|
||||||
INSERT INTO ethcl.beacon_state (slot, state_root, mh_key)
|
INSERT INTO ethcl.beacon_state (slot, state_root, mh_key)
|
||||||
@ -40,6 +40,11 @@ VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
|
|||||||
CheckProposedStmt string = `SELECT slot, block_root
|
CheckProposedStmt string = `SELECT slot, block_root
|
||||||
FROM ethcl.slots
|
FROM ethcl.slots
|
||||||
WHERE slot=$1 AND block_root=$2;`
|
WHERE slot=$1 AND block_root=$2;`
|
||||||
|
// Statement to insert known_gaps. We don't pass in timestamp, we let the server take care of that one.
|
||||||
|
UpsertKnownGapsStmt string = `
|
||||||
|
INSERT INTO ethcl.known_gaps (start_slot, end_slot, checked_out, reprocessing_error, entry_error, entry_process)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6) on CONFLICT (start_slot, end_slot) DO NOTHING`
|
||||||
|
QueryHighestSlotStmt string = "SELECT COALESCE(MAX(slot), 0) FROM ethcl.slots"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Put all functionality to prepare the write object
|
// Put all functionality to prepare the write object
|
||||||
@ -55,13 +60,13 @@ type DatabaseWriter struct {
|
|||||||
rawSignedBeaconBlock []byte
|
rawSignedBeaconBlock []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, status string, metrics *BeaconClientMetrics) *DatabaseWriter {
|
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, eth1BlockHash string, status string, metrics *BeaconClientMetrics) *DatabaseWriter {
|
||||||
dw := &DatabaseWriter{
|
dw := &DatabaseWriter{
|
||||||
Db: db,
|
Db: db,
|
||||||
Metrics: metrics,
|
Metrics: metrics,
|
||||||
}
|
}
|
||||||
dw.prepareSlotsModel(slot, stateRoot, blockRoot, status)
|
dw.prepareSlotsModel(slot, stateRoot, blockRoot, status)
|
||||||
dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot)
|
dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot, eth1BlockHash)
|
||||||
dw.prepareBeaconStateModel(slot, stateRoot)
|
dw.prepareBeaconStateModel(slot, stateRoot)
|
||||||
return dw
|
return dw
|
||||||
}
|
}
|
||||||
@ -82,12 +87,13 @@ func (dw *DatabaseWriter) prepareSlotsModel(slot int, stateRoot string, blockRoo
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create the model for the ethcl.signed_beacon_block table.
|
// Create the model for the ethcl.signed_beacon_block table.
|
||||||
func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot int, blockRoot string, parentBlockRoot string) {
|
func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot int, blockRoot string, parentBlockRoot string, eth1BlockHash string) {
|
||||||
dw.DbSignedBeaconBlock = &DbSignedBeaconBlock{
|
dw.DbSignedBeaconBlock = &DbSignedBeaconBlock{
|
||||||
Slot: strconv.Itoa(slot),
|
Slot: strconv.Itoa(slot),
|
||||||
BlockRoot: blockRoot,
|
BlockRoot: blockRoot,
|
||||||
ParentBlock: parentBlockRoot,
|
ParentBlock: parentBlockRoot,
|
||||||
MhKey: calculateMhKey(),
|
Eth1BlockHash: eth1BlockHash,
|
||||||
|
MhKey: calculateMhKey(),
|
||||||
}
|
}
|
||||||
log.Debug("dw.DbSignedBeaconBlock: ", dw.DbSignedBeaconBlock)
|
log.Debug("dw.DbSignedBeaconBlock: ", dw.DbSignedBeaconBlock)
|
||||||
}
|
}
|
||||||
@ -104,79 +110,115 @@ func (dw *DatabaseWriter) prepareBeaconStateModel(slot int, stateRoot string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Write all the data for a given slot.
|
// Write all the data for a given slot.
|
||||||
func (dw *DatabaseWriter) writeFullSlot() {
|
func (dw *DatabaseWriter) writeFullSlot() error {
|
||||||
// Add errors for each function call
|
// Add errors for each function call
|
||||||
// If an error occurs, write to knownGaps table.
|
// If an error occurs, write to knownGaps table.
|
||||||
dw.writeSlots()
|
err := dw.writeSlots()
|
||||||
dw.writeSignedBeaconBlocks()
|
if err != nil {
|
||||||
dw.writeBeaconState()
|
return err
|
||||||
|
}
|
||||||
|
err = dw.writeSignedBeaconBlocks()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = dw.writeBeaconState()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
dw.Metrics.IncrementHeadTrackingInserts(1)
|
dw.Metrics.IncrementHeadTrackingInserts(1)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write the information for the generic slots table. For now this is only one function.
|
// Write the information for the generic slots table. For now this is only one function.
|
||||||
// But in the future if we need to incorporate any FK's or perform any actions to write to the
|
// But in the future if we need to incorporate any FK's or perform any actions to write to the
|
||||||
// slots table we can do it all here.
|
// slots table we can do it all here.
|
||||||
func (dw *DatabaseWriter) writeSlots() {
|
func (dw *DatabaseWriter) writeSlots() error {
|
||||||
dw.upsertSlots()
|
return dw.upsertSlots()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Upsert to the ethcl.slots table.
|
// Upsert to the ethcl.slots table.
|
||||||
func (dw *DatabaseWriter) upsertSlots() {
|
func (dw *DatabaseWriter) upsertSlots() error {
|
||||||
_, err := dw.Db.Exec(context.Background(), UpsertSlotsStmt, dw.DbSlots.Epoch, dw.DbSlots.Slot, dw.DbSlots.BlockRoot, dw.DbSlots.StateRoot, dw.DbSlots.Status)
|
_, err := dw.Db.Exec(context.Background(), UpsertSlotsStmt, dw.DbSlots.Epoch, dw.DbSlots.Slot, dw.DbSlots.BlockRoot, dw.DbSlots.StateRoot, dw.DbSlots.Status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.slots table")
|
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.slots table")
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write the information for the signed_beacon_block.
|
// Write the information for the signed_beacon_block.
|
||||||
func (dw *DatabaseWriter) writeSignedBeaconBlocks() {
|
func (dw *DatabaseWriter) writeSignedBeaconBlocks() error {
|
||||||
dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock)
|
err := dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock)
|
||||||
dw.upsertSignedBeaconBlock()
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = dw.upsertSignedBeaconBlock()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Upsert to public.blocks.
|
// Upsert to public.blocks.
|
||||||
func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) {
|
func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error {
|
||||||
_, err := dw.Db.Exec(context.Background(), UpsertBlocksStmt, key, data)
|
_, err := dw.Db.Exec(context.Background(), UpsertBlocksStmt, key, data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table")
|
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table")
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Upsert to the ethcl.signed_beacon_block table.
|
// Upsert to the ethcl.signed_beacon_block table.
|
||||||
func (dw *DatabaseWriter) upsertSignedBeaconBlock() {
|
func (dw *DatabaseWriter) upsertSignedBeaconBlock() error {
|
||||||
_, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.MhKey)
|
_, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.Eth1BlockHash, dw.DbSignedBeaconBlock.MhKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table")
|
loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table")
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write the information for the beacon_state.
|
// Write the information for the beacon_state.
|
||||||
func (dw *DatabaseWriter) writeBeaconState() {
|
func (dw *DatabaseWriter) writeBeaconState() error {
|
||||||
dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState)
|
err := dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState)
|
||||||
dw.upsertBeaconState()
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = dw.upsertBeaconState()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Upsert to the ethcl.beacon_state table.
|
// Upsert to the ethcl.beacon_state table.
|
||||||
func (dw *DatabaseWriter) upsertBeaconState() {
|
func (dw *DatabaseWriter) upsertBeaconState() error {
|
||||||
_, err := dw.Db.Exec(context.Background(), UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey)
|
_, err := dw.Db.Exec(context.Background(), UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table")
|
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table")
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot.
|
// Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot.
|
||||||
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
|
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
|
||||||
func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
|
func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
|
||||||
|
slotNum, strErr := strconv.Atoi(slot)
|
||||||
|
if strErr != nil {
|
||||||
|
loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...")
|
||||||
|
}
|
||||||
|
|
||||||
forkCount, err := updateForked(db, slot, latestBlockRoot)
|
forkCount, err := updateForked(db, slot, latestBlockRoot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.")
|
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.")
|
||||||
// Add to knownGaps Table
|
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
|
||||||
}
|
}
|
||||||
proposedCount, err := updateProposed(db, slot, latestBlockRoot)
|
proposedCount, err := updateProposed(db, slot, latestBlockRoot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.")
|
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.")
|
||||||
// Add to knownGaps Table
|
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
|
||||||
}
|
}
|
||||||
|
|
||||||
if forkCount > 0 {
|
if forkCount > 0 {
|
||||||
@ -187,7 +229,6 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *
|
|||||||
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
||||||
"forkCount": forkCount,
|
"forkCount": forkCount,
|
||||||
}).Warn("There were no forked rows to update.")
|
}).Warn("There were no forked rows to update.")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if proposedCount == 1 {
|
if proposedCount == 1 {
|
||||||
@ -198,16 +239,19 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *
|
|||||||
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
||||||
"proposedCount": proposedCount,
|
"proposedCount": proposedCount,
|
||||||
}).Error("Too many rows were marked as proposed!")
|
}).Error("Too many rows were marked as proposed!")
|
||||||
|
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
|
||||||
} else if proposedCount == 0 {
|
} else if proposedCount == 0 {
|
||||||
var count int
|
var count int
|
||||||
err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count)
|
err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.")
|
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.")
|
||||||
|
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
|
||||||
}
|
}
|
||||||
if count != 1 {
|
if count != 1 {
|
||||||
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
||||||
"proposedCount": count,
|
"proposedCount": count,
|
||||||
}).Warn("The proposed block was not marked as proposed...")
|
}).Warn("The proposed block was not marked as proposed...")
|
||||||
|
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
|
||||||
} else {
|
} else {
|
||||||
loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
|
loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
|
||||||
}
|
}
|
||||||
@ -246,6 +290,82 @@ func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64
|
|||||||
return count, err
|
return count, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaos into
|
||||||
|
// smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would
|
||||||
|
// have 10 entries as follows: 1-10, 11-20, etc...
|
||||||
|
func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string) {
|
||||||
|
if endSlot-startSlot <= tableIncrement {
|
||||||
|
kgModel := DbKnownGaps{
|
||||||
|
StartSlot: strconv.Itoa(startSlot),
|
||||||
|
EndSlot: strconv.Itoa(endSlot),
|
||||||
|
CheckedOut: false,
|
||||||
|
ReprocessingError: "",
|
||||||
|
EntryError: entryError.Error(),
|
||||||
|
EntryProcess: entryProcess,
|
||||||
|
}
|
||||||
|
upsertKnownGaps(db, kgModel)
|
||||||
|
}
|
||||||
|
totalSlots := endSlot - startSlot
|
||||||
|
var chunks int
|
||||||
|
chunks = totalSlots / tableIncrement
|
||||||
|
if totalSlots%tableIncrement != 0 {
|
||||||
|
chunks = chunks + 1
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < chunks; i++ {
|
||||||
|
var tempStart, tempEnd int
|
||||||
|
tempStart = startSlot + (i * tableIncrement)
|
||||||
|
if i+1 == chunks {
|
||||||
|
tempEnd = endSlot
|
||||||
|
} else {
|
||||||
|
tempEnd = startSlot + ((i + 1) * tableIncrement)
|
||||||
|
}
|
||||||
|
kgModel := DbKnownGaps{
|
||||||
|
StartSlot: strconv.Itoa(tempStart),
|
||||||
|
EndSlot: strconv.Itoa(tempEnd),
|
||||||
|
CheckedOut: false,
|
||||||
|
ReprocessingError: "",
|
||||||
|
EntryError: entryError.Error(),
|
||||||
|
EntryProcess: entryProcess,
|
||||||
|
}
|
||||||
|
upsertKnownGaps(db, kgModel)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// A function to upsert a single entry to the ethcl.known_gaps table.
|
||||||
|
func upsertKnownGaps(db sql.Database, knModel DbKnownGaps) {
|
||||||
|
_, err := db.Exec(context.Background(), UpsertKnownGapsStmt, knModel.StartSlot, knModel.EndSlot,
|
||||||
|
knModel.CheckedOut, knModel.ReprocessingError, knModel.EntryError, knModel.EntryProcess)
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"err": err,
|
||||||
|
"startSlot": knModel.StartSlot,
|
||||||
|
"endSlot": knModel.EndSlot,
|
||||||
|
}).Fatal("We are unable to write to the ethcl.known_gaps table!!! We will stop the application because of that.")
|
||||||
|
}
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"startSlot": knModel.StartSlot,
|
||||||
|
"endSlot": knModel.EndSlot,
|
||||||
|
}).Warn("A new gap has been added to the ethcl.known_gaps table.")
|
||||||
|
}
|
||||||
|
|
||||||
|
// A function to write the gap between the highest slot in the DB and the first processed slot.
|
||||||
|
func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot int) {
|
||||||
|
var maxSlot int
|
||||||
|
err := db.QueryRow(context.Background(), QueryHighestSlotStmt).Scan(&maxSlot)
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogError(err).Fatal("Unable to get the max block from the DB. We must close the application or we might have undetected gaps.")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogError(err).WithFields(log.Fields{
|
||||||
|
"maxSlot": maxSlot,
|
||||||
|
}).Fatal("Unable to get convert max block from DB to int. We must close the application or we might have undetected gaps.")
|
||||||
|
}
|
||||||
|
writeKnownGaps(db, tableIncrement, maxSlot, firstSlot, fmt.Errorf(""), "startup")
|
||||||
|
}
|
||||||
|
|
||||||
// Dummy function for calculating the mhKey.
|
// Dummy function for calculating the mhKey.
|
||||||
func calculateMhKey() string {
|
func calculateMhKey() string {
|
||||||
rand.Seed(time.Now().UnixNano())
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
@ -47,10 +47,11 @@ type DbSlots struct {
|
|||||||
|
|
||||||
// A struct to capture whats being written to ethcl.signed_beacon_block table.
|
// A struct to capture whats being written to ethcl.signed_beacon_block table.
|
||||||
type DbSignedBeaconBlock struct {
|
type DbSignedBeaconBlock struct {
|
||||||
Slot string // The slot.
|
Slot string // The slot.
|
||||||
BlockRoot string // The block root
|
BlockRoot string // The block root
|
||||||
ParentBlock string // The parent block root.
|
ParentBlock string // The parent block root.
|
||||||
MhKey string // The ipld multihash key.
|
Eth1BlockHash string // The eth1 block_hash
|
||||||
|
MhKey string // The ipld multihash key.
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -60,3 +61,14 @@ type DbBeaconState struct {
|
|||||||
StateRoot string // The state root
|
StateRoot string // The state root
|
||||||
MhKey string // The ipld multihash key.
|
MhKey string // The ipld multihash key.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A structure to capture whats being written to the ethcl.known_gaps table.
|
||||||
|
type DbKnownGaps struct {
|
||||||
|
StartSlot string // The start slot for known_gaps, inclusive.
|
||||||
|
EndSlot string // The end slot for known_gaps, inclusive.
|
||||||
|
CheckedOut bool // Indicates if any process is currently processing this entry.
|
||||||
|
ReprocessingError string // The error that occurred when attempting to reprocess these entries.
|
||||||
|
EntryError string // The error that caused this entry to be added to the table. Could be null.
|
||||||
|
EntryTime string // The time this range was added to the DB. This can help us catch ranges that have not been processed for a long time due to some error.
|
||||||
|
EntryProcess string // The entry process that added this process. Potential options are StartUp, Error, ManualEntry, HeadGap.
|
||||||
|
}
|
||||||
|
@ -24,6 +24,7 @@ func (bc *BeaconClient) handleReorg() {
|
|||||||
// This function will handle the latest head event.
|
// This function will handle the latest head event.
|
||||||
func (bc *BeaconClient) handleHead() {
|
func (bc *BeaconClient) handleHead() {
|
||||||
log.Info("Starting to process head.")
|
log.Info("Starting to process head.")
|
||||||
|
errorSlots := 0
|
||||||
for {
|
for {
|
||||||
head := <-bc.HeadTracking.ProcessCh
|
head := <-bc.HeadTracking.ProcessCh
|
||||||
// Process all the work here.
|
// Process all the work here.
|
||||||
@ -32,10 +33,20 @@ func (bc *BeaconClient) handleHead() {
|
|||||||
bc.HeadTracking.ErrorCh <- &SseError{
|
bc.HeadTracking.ErrorCh <- &SseError{
|
||||||
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
|
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
|
||||||
}
|
}
|
||||||
|
errorSlots = errorSlots + 1
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
err = processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics)
|
if errorSlots != 0 && bc.PreviousSlot != 0 {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"lastProcessedSlot": bc.PreviousSlot,
|
||||||
|
"errorMessages": errorSlots,
|
||||||
|
}).Warn("We added slots to the knownGaps table because we got bad head messages.")
|
||||||
|
writeKnownGaps(bc.Db, bc.KnownGapTableIncrement, bc.PreviousSlot, bcSlotsPerEpoch+errorSlots, fmt.Errorf("Bad Head Messages"), "headProcessing")
|
||||||
|
}
|
||||||
|
|
||||||
|
err = processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogSlotError(head.Slot, err)
|
loghelper.LogSlotError(head.Slot, err).Error("Unable to process a slot")
|
||||||
}
|
}
|
||||||
log.WithFields(log.Fields{"head": head}).Debug("Received a new head event.")
|
log.WithFields(log.Fields{"head": head}).Debug("Received a new head event.")
|
||||||
|
|
||||||
|
@ -25,6 +25,7 @@ var (
|
|||||||
}
|
}
|
||||||
ParentRootUnmarshalError = "Unable to properly unmarshal the ParentRoot field in the SignedBeaconBlock."
|
ParentRootUnmarshalError = "Unable to properly unmarshal the ParentRoot field in the SignedBeaconBlock."
|
||||||
MissingIdentifiedError = "Can't query state without a set slot or block_root"
|
MissingIdentifiedError = "Can't query state without a set slot or block_root"
|
||||||
|
MissingEth1Data = "Can't get the Eth1 block_hash"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ProcessSlot struct {
|
type ProcessSlot struct {
|
||||||
@ -55,11 +56,7 @@ type ProcessSlot struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// This function will do all the work to process the slot and write it to the DB.
|
// This function will do all the work to process the slot and write it to the DB.
|
||||||
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics) error {
|
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) error {
|
||||||
headOrHistoric = strings.ToLower(headOrHistoric)
|
|
||||||
if headOrHistoric != "head" && headOrHistoric != "historic" {
|
|
||||||
return fmt.Errorf("headOrBatch must be either historic or head!")
|
|
||||||
}
|
|
||||||
ps := &ProcessSlot{
|
ps := &ProcessSlot{
|
||||||
Slot: slot,
|
Slot: slot,
|
||||||
BlockRoot: blockRoot,
|
BlockRoot: blockRoot,
|
||||||
@ -72,33 +69,43 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
|
|||||||
// Get the SignedBeaconBlock.
|
// Get the SignedBeaconBlock.
|
||||||
err := ps.getSignedBeaconBlock(serverAddress)
|
err := ps.getSignedBeaconBlock(serverAddress)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the BeaconState.
|
// Get the BeaconState.
|
||||||
err = ps.getBeaconState(serverAddress)
|
err = ps.getBeaconState(serverAddress)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" {
|
||||||
|
writeStartUpGaps(db, knownGapsTableIncrement, ps.Slot)
|
||||||
|
}
|
||||||
|
|
||||||
// Get this object ready to write
|
// Get this object ready to write
|
||||||
dw := ps.createWriteObjects()
|
dw := ps.createWriteObjects()
|
||||||
|
|
||||||
// Write the object to the DB.
|
// Write the object to the DB.
|
||||||
dw.writeFullSlot()
|
err = dw.writeFullSlot()
|
||||||
|
if err != nil {
|
||||||
|
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
|
||||||
|
}
|
||||||
|
|
||||||
// Handle any reorgs or skipped slots.
|
// Handle any reorgs or skipped slots.
|
||||||
if ps.HeadOrHistoric == "head" {
|
headOrHistoric = strings.ToLower(headOrHistoric)
|
||||||
if previousSlot != 0 && previousBlockRoot != "" {
|
if headOrHistoric != "head" && headOrHistoric != "historic" {
|
||||||
ps.checkPreviousSlot(previousSlot, previousBlockRoot)
|
return fmt.Errorf("headOrHistoric must be either historic or head!")
|
||||||
}
|
}
|
||||||
|
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" {
|
||||||
|
ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
|
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
|
||||||
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics) error {
|
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) error {
|
||||||
return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics)
|
return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle a historic slot. A wrapper function for calling `handleFullSlot`.
|
// Handle a historic slot. A wrapper function for calling `handleFullSlot`.
|
||||||
@ -141,7 +148,11 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error {
|
|||||||
} else if ps.FullSignedBeaconBlock.Block.ParentRoot == nil {
|
} else if ps.FullSignedBeaconBlock.Block.ParentRoot == nil {
|
||||||
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(ParentRootUnmarshalError)
|
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(ParentRootUnmarshalError)
|
||||||
return fmt.Errorf(ParentRootUnmarshalError)
|
return fmt.Errorf(ParentRootUnmarshalError)
|
||||||
|
} else if hex.EncodeToString(ps.FullBeaconState.Eth1Data.BlockHash) == "" {
|
||||||
|
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(MissingEth1Data)
|
||||||
|
return fmt.Errorf(MissingEth1Data)
|
||||||
}
|
}
|
||||||
|
log.Warn("We received a processing error: ", err)
|
||||||
}
|
}
|
||||||
ps.ParentBlockRoot = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot)
|
ps.ParentBlockRoot = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot)
|
||||||
return nil
|
return nil
|
||||||
@ -174,7 +185,7 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check to make sure that the previous block we processed is the parent of the current block.
|
// Check to make sure that the previous block we processed is the parent of the current block.
|
||||||
func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot string) {
|
func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) {
|
||||||
parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot)
|
parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot)
|
||||||
if previousSlot == int(ps.FullBeaconState.Slot) {
|
if previousSlot == int(ps.FullBeaconState.Slot) {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
@ -182,21 +193,21 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
|
|||||||
"fork": true,
|
"fork": true,
|
||||||
}).Warn("A fork occurred! The previous slot and current slot match.")
|
}).Warn("A fork occurred! The previous slot and current slot match.")
|
||||||
writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
|
writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
|
||||||
} else if previousSlot-1 != int(ps.FullBeaconState.Slot) {
|
} else if previousSlot+1 != int(ps.FullBeaconState.Slot) {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"previousSlot": previousSlot,
|
"previousSlot": previousSlot,
|
||||||
"currentSlot": ps.FullBeaconState.Slot,
|
"currentSlot": ps.FullBeaconState.Slot,
|
||||||
}).Error("We skipped a few slots.")
|
}).Error("We skipped a few slots.")
|
||||||
|
writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot)-1, fmt.Errorf("Gaps during head processing"), "headGaps")
|
||||||
// Check to see if the slot was skipped.
|
// Check to see if the slot was skipped.
|
||||||
// Call our batch processing function.
|
// Call our known_gaps function.
|
||||||
} else if previousBlockRoot != parentRoot {
|
} else if previousBlockRoot != parentRoot {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"previousBlockRoot": previousBlockRoot,
|
"previousBlockRoot": previousBlockRoot,
|
||||||
"currentBlockParent": parentRoot,
|
"currentBlockParent": parentRoot,
|
||||||
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
|
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
|
||||||
writeReorgs(ps.Db, strconv.Itoa(previousSlot), parentRoot, ps.Metrics)
|
writeReorgs(ps.Db, strconv.Itoa(previousSlot), parentRoot, ps.Metrics)
|
||||||
// Call our batch processing function.
|
writeKnownGaps(ps.Db, 1, ps.Slot-1, ps.Slot-1, fmt.Errorf("Incorrect Parent"), "processSlot")
|
||||||
// Continue with this slot.
|
|
||||||
} else {
|
} else {
|
||||||
log.Debug("Previous Slot and Current Slot are one distance from each other.")
|
log.Debug("Previous Slot and Current Slot are one distance from each other.")
|
||||||
}
|
}
|
||||||
@ -210,7 +221,7 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
|
|||||||
// Check if head or historic.
|
// Check if head or historic.
|
||||||
|
|
||||||
// 1. BeaconBlock is 404.
|
// 1. BeaconBlock is 404.
|
||||||
// 2. check heck /lighthouse/database/info to make sure the oldest_block_slot == 0 and anchor == null. This indicates that I don't have any gaps in the DB.
|
// 2. check check /lighthouse/database/info to make sure the oldest_block_slot == 0 and anchor == null. This indicates that I don't have any gaps in the DB.
|
||||||
// 3. I query BeaconState for slot X, and get a BeaconState.
|
// 3. I query BeaconState for slot X, and get a BeaconState.
|
||||||
// 4. Although for good measure you should also check that the head is at a slot >= X using something like /eth/v1/node/syncing/ or /eth/v1/beacon/headers/head
|
// 4. Although for good measure you should also check that the head is at a slot >= X using something like /eth/v1/node/syncing/ or /eth/v1/beacon/headers/head
|
||||||
func (ps *ProcessSlot) checkMissedSlot() {
|
func (ps *ProcessSlot) checkMissedSlot() {
|
||||||
@ -254,7 +265,9 @@ func (ps *ProcessSlot) createWriteObjects() *DatabaseWriter {
|
|||||||
status = "proposed"
|
status = "proposed"
|
||||||
}
|
}
|
||||||
|
|
||||||
dw := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, status, ps.Metrics)
|
eth1BlockHash := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.Body.Eth1Data.BlockHash)
|
||||||
|
|
||||||
|
dw := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.Metrics)
|
||||||
dw.rawSignedBeaconBlock = ps.SszSignedBeaconBlock
|
dw.rawSignedBeaconBlock = ps.SszSignedBeaconBlock
|
||||||
dw.rawBeaconState = ps.SszBeaconState
|
dw.rawBeaconState = ps.SszBeaconState
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user