Add KnownGaps table and corresponding entries #33
@ -9,7 +9,7 @@ COPY go.sum .
|
||||
RUN go mod tidy; go mod download
|
||||
COPY . .
|
||||
|
||||
RUN GCO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-extldflags "-static"' -o ipld-ethcl-indexer .
|
||||
RUN GCO_ENABLED=0 GOOS=linux go build -race -a -installsuffix cgo -ldflags '-extldflags "-static"' -o ipld-ethcl-indexer .
|
||||
RUN chmod +x ipld-ethcl-indexer
|
||||
|
||||
FROM frolvlad/alpine-bash:latest
|
||||
|
@ -22,6 +22,21 @@ The `database` package allows us to interact with a postgres DB. We utilize the
|
||||
|
||||
This package will contain code to interact with the beacon client.
|
||||
|
||||
### Known Gaps
|
||||
|
||||
Known Gaps tracking is handled within this package. The columns are as follows:
|
||||
|
||||
- StartSlot - The start slot for known_gaps, inclusive.
|
||||
- EndSlot - The end slot for known_gaps, inclusive.
|
||||
- CheckedOut - Indicates if any process is currently processing this entry.
|
||||
- ErrorMessage - Captures any error message that might have occurred when previously processing this entry.
|
||||
- EntryTime - The time this range was added to the DB. This can help us catch ranges that have not been processed for a long time due to some error.
|
||||
- EntryProcess - The entry process that added this process. Potential options are StartUp, Error, HeadGap.
|
||||
- This can help us understand how a specific entry was added. It can be useful for debugging the application.
|
||||
- StartUp - Gaps found when we started the application.
|
||||
- Error - Indicates that the entry was added due to an error with processing.
|
||||
- HeadGap - Indicates that gaps where found when keeping up with Head.
|
||||
|
||||
## `pkg/version`
|
||||
|
||||
A generic package which can be utilized to easily version our applications.
|
||||
|
@ -22,7 +22,8 @@ var (
|
||||
bcAddress string
|
||||
bcPort int
|
||||
bcConnectionProtocol string
|
||||
maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second
|
||||
maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second
|
||||
notifierCh chan os.Signal = make(chan os.Signal, 1)
|
||||
)
|
||||
|
||||
// captureCmd represents the capture command
|
||||
|
22
cmd/head.go
22
cmd/head.go
@ -6,15 +6,19 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
|
||||
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
|
||||
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
|
||||
)
|
||||
|
||||
var (
|
||||
kgTableIncrement int
|
||||
)
|
||||
|
||||
// headCmd represents the head command
|
||||
var headCmd = &cobra.Command{
|
||||
Use: "head",
|
||||
@ -37,10 +41,9 @@ func startHeadTracking() {
|
||||
}
|
||||
|
||||
// Capture head blocks
|
||||
go BC.CaptureHead()
|
||||
go BC.CaptureHead(kgTableIncrement)
|
||||
|
||||
// Shutdown when the time is right.
|
||||
notifierCh := make(chan os.Signal, 1)
|
||||
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC)
|
||||
if err != nil {
|
||||
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
|
||||
@ -53,13 +56,8 @@ func startHeadTracking() {
|
||||
func init() {
|
||||
captureCmd.AddCommand(headCmd)
|
||||
|
||||
// Here you will define your flags and configuration settings.
|
||||
|
||||
// Cobra supports Persistent Flags which will work for this command
|
||||
// and all subcommands, e.g.:
|
||||
// headCmd.PersistentFlags().String("foo", "", "A help for foo")
|
||||
|
||||
// Cobra supports local flags which will only run when this command
|
||||
// is called directly, e.g.:
|
||||
// headCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
|
||||
// Known Gaps specific
|
||||
captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.")
|
||||
err := viper.BindPFlag("kg.increment", captureCmd.PersistentFlags().Lookup("kg.increment"))
|
||||
exitErr(err)
|
||||
}
|
||||
|
@ -34,6 +34,7 @@ type BeaconClient struct {
|
||||
PerformHistoricalProcessing bool // Should we perform historical processing?
|
||||
Db sql.Database // Database object used for reads and writes.
|
||||
Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics.
|
||||
KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
|
||||
|
||||
// Used for Head Tracking
|
||||
PerformHeadTracking bool // Should we track head?
|
||||
|
@ -10,7 +10,8 @@ import (
|
||||
)
|
||||
|
||||
// This function will perform all the heavy lifting for tracking the head of the chain.
|
||||
func (bc *BeaconClient) CaptureHead() {
|
||||
func (bc *BeaconClient) CaptureHead(knownGapsTableIncrement int) {
|
||||
bc.KnownGapTableIncrement = knownGapsTableIncrement
|
||||
log.Info("We are tracking the head of the chain.")
|
||||
//bc.tempHelper()
|
||||
go bc.handleHead()
|
||||
|
@ -43,19 +43,20 @@ type MimicConfig struct {
|
||||
var _ = Describe("Capturehead", func() {
|
||||
|
||||
var (
|
||||
TestConfig Config
|
||||
BeaconNodeTester TestBeaconNode
|
||||
address string = "localhost"
|
||||
port int = 8080
|
||||
protocol string = "http"
|
||||
TestEvents map[string]Message
|
||||
dbHost string = "localhost"
|
||||
dbPort int = 8077
|
||||
dbName string = "vulcanize_testing"
|
||||
dbUser string = "vdbm"
|
||||
dbPassword string = "password"
|
||||
dbDriver string = "pgx"
|
||||
dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42"
|
||||
TestConfig Config
|
||||
BeaconNodeTester TestBeaconNode
|
||||
address string = "localhost"
|
||||
port int = 8080
|
||||
protocol string = "http"
|
||||
TestEvents map[string]Message
|
||||
dbHost string = "localhost"
|
||||
dbPort int = 8077
|
||||
dbName string = "vulcanize_testing"
|
||||
dbUser string = "vdbm"
|
||||
dbPassword string = "password"
|
||||
dbDriver string = "pgx"
|
||||
dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42"
|
||||
knownGapsTableIncrement int = 10000
|
||||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
@ -146,16 +147,17 @@ var _ = Describe("Capturehead", func() {
|
||||
},
|
||||
}
|
||||
TestConfig = Config{
|
||||
protocol: protocol,
|
||||
address: address,
|
||||
port: port,
|
||||
dummyParentRoot: dummyParentRoot,
|
||||
dbHost: dbHost,
|
||||
dbPort: dbPort,
|
||||
dbName: dbName,
|
||||
dbUser: dbUser,
|
||||
dbPassword: dbPassword,
|
||||
dbDriver: dbDriver,
|
||||
protocol: protocol,
|
||||
address: address,
|
||||
port: port,
|
||||
dummyParentRoot: dummyParentRoot,
|
||||
dbHost: dbHost,
|
||||
dbPort: dbPort,
|
||||
dbName: dbName,
|
||||
dbUser: dbUser,
|
||||
dbPassword: dbPassword,
|
||||
dbDriver: dbDriver,
|
||||
knownGapsTableIncrement: knownGapsTableIncrement,
|
||||
}
|
||||
|
||||
BeaconNodeTester = TestBeaconNode{
|
||||
@ -234,16 +236,17 @@ var _ = Describe("Capturehead", func() {
|
||||
})
|
||||
|
||||
type Config struct {
|
||||
protocol string
|
||||
address string
|
||||
port int
|
||||
dummyParentRoot string
|
||||
dbHost string
|
||||
dbPort int
|
||||
dbName string
|
||||
dbUser string
|
||||
dbPassword string
|
||||
dbDriver string
|
||||
protocol string
|
||||
address string
|
||||
port int
|
||||
dummyParentRoot string
|
||||
dbHost string
|
||||
dbPort int
|
||||
dbName string
|
||||
dbUser string
|
||||
dbPassword string
|
||||
dbDriver string
|
||||
knownGapsTableIncrement int
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////
|
||||
@ -307,7 +310,7 @@ func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot strin
|
||||
|
||||
// A function that will remove all entries from the ethcl tables for you.
|
||||
func clearEthclDbTables(db sql.Database) {
|
||||
deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.batch_processing;"}
|
||||
deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.known_gaps;"}
|
||||
for _, queries := range deleteQueries {
|
||||
_, err := db.Exec(context.Background(), queries)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
@ -436,7 +439,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(firstHead beaconclient.Head, second
|
||||
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
|
||||
go bc.CaptureHead()
|
||||
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
log.Info("Sending Phase0 Messages to BeaconClient")
|
||||
@ -488,7 +491,7 @@ func (tbc TestBeaconNode) testProcessBlock(head beaconclient.Head, epoch int) {
|
||||
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
|
||||
go bc.CaptureHead()
|
||||
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
|
||||
time.Sleep(1 * time.Second)
|
||||
sendHeadMessage(bc, head)
|
||||
validateSlot(bc, &head, epoch, "proposed")
|
||||
@ -501,7 +504,7 @@ func (tbc TestBeaconNode) testMultipleHead(firstHead beaconclient.Head, secondHe
|
||||
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
|
||||
go bc.CaptureHead()
|
||||
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
sendHeadMessage(bc, firstHead)
|
||||
|
@ -19,8 +19,8 @@ INSERT INTO ethcl.slots (epoch, slot, block_root, state_root, status)
|
||||
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (slot, block_root) DO NOTHING`
|
||||
// Statement to upsert to the ethcl.signed_beacon_blocks table.
|
||||
UpsertSignedBeaconBlockStmt string = `
|
||||
INSERT INTO ethcl.signed_beacon_block (slot, block_root, parent_block_root, mh_key)
|
||||
VALUES ($1, $2, $3, $4) ON CONFLICT (slot, block_root) DO NOTHING`
|
||||
INSERT INTO ethcl.signed_beacon_block (slot, block_root, parent_block_root, eth1_block_hash, mh_key)
|
||||
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (slot, block_root) DO NOTHING`
|
||||
// Statement to upsert to the ethcl.beacon_state table.
|
||||
UpsertBeaconState string = `
|
||||
INSERT INTO ethcl.beacon_state (slot, state_root, mh_key)
|
||||
@ -40,6 +40,11 @@ VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
|
||||
CheckProposedStmt string = `SELECT slot, block_root
|
||||
FROM ethcl.slots
|
||||
WHERE slot=$1 AND block_root=$2;`
|
||||
// Statement to insert known_gaps. We don't pass in timestamp, we let the server take care of that one.
|
||||
UpsertKnownGapsStmt string = `
|
||||
INSERT INTO ethcl.known_gaps (start_slot, end_slot, checked_out, reprocessing_error, entry_error, entry_process)
|
||||
VALUES ($1, $2, $3, $4, $5, $6) on CONFLICT (start_slot, end_slot) DO NOTHING`
|
||||
QueryHighestSlotStmt string = "SELECT COALESCE(MAX(slot), 0) FROM ethcl.slots"
|
||||
)
|
||||
|
||||
// Put all functionality to prepare the write object
|
||||
@ -55,13 +60,13 @@ type DatabaseWriter struct {
|
||||
rawSignedBeaconBlock []byte
|
||||
}
|
||||
|
||||
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, status string, metrics *BeaconClientMetrics) *DatabaseWriter {
|
||||
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, eth1BlockHash string, status string, metrics *BeaconClientMetrics) *DatabaseWriter {
|
||||
dw := &DatabaseWriter{
|
||||
Db: db,
|
||||
Metrics: metrics,
|
||||
}
|
||||
dw.prepareSlotsModel(slot, stateRoot, blockRoot, status)
|
||||
dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot)
|
||||
dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot, eth1BlockHash)
|
||||
dw.prepareBeaconStateModel(slot, stateRoot)
|
||||
return dw
|
||||
}
|
||||
@ -82,12 +87,13 @@ func (dw *DatabaseWriter) prepareSlotsModel(slot int, stateRoot string, blockRoo
|
||||
}
|
||||
|
||||
// Create the model for the ethcl.signed_beacon_block table.
|
||||
func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot int, blockRoot string, parentBlockRoot string) {
|
||||
func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot int, blockRoot string, parentBlockRoot string, eth1BlockHash string) {
|
||||
dw.DbSignedBeaconBlock = &DbSignedBeaconBlock{
|
||||
Slot: strconv.Itoa(slot),
|
||||
BlockRoot: blockRoot,
|
||||
ParentBlock: parentBlockRoot,
|
||||
MhKey: calculateMhKey(),
|
||||
Slot: strconv.Itoa(slot),
|
||||
BlockRoot: blockRoot,
|
||||
ParentBlock: parentBlockRoot,
|
||||
Eth1BlockHash: eth1BlockHash,
|
||||
MhKey: calculateMhKey(),
|
||||
}
|
||||
log.Debug("dw.DbSignedBeaconBlock: ", dw.DbSignedBeaconBlock)
|
||||
}
|
||||
@ -104,79 +110,115 @@ func (dw *DatabaseWriter) prepareBeaconStateModel(slot int, stateRoot string) {
|
||||
}
|
||||
|
||||
// Write all the data for a given slot.
|
||||
func (dw *DatabaseWriter) writeFullSlot() {
|
||||
func (dw *DatabaseWriter) writeFullSlot() error {
|
||||
// Add errors for each function call
|
||||
// If an error occurs, write to knownGaps table.
|
||||
dw.writeSlots()
|
||||
dw.writeSignedBeaconBlocks()
|
||||
dw.writeBeaconState()
|
||||
err := dw.writeSlots()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = dw.writeSignedBeaconBlocks()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = dw.writeBeaconState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dw.Metrics.IncrementHeadTrackingInserts(1)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write the information for the generic slots table. For now this is only one function.
|
||||
// But in the future if we need to incorporate any FK's or perform any actions to write to the
|
||||
// slots table we can do it all here.
|
||||
func (dw *DatabaseWriter) writeSlots() {
|
||||
dw.upsertSlots()
|
||||
|
||||
func (dw *DatabaseWriter) writeSlots() error {
|
||||
return dw.upsertSlots()
|
||||
}
|
||||
|
||||
// Upsert to the ethcl.slots table.
|
||||
func (dw *DatabaseWriter) upsertSlots() {
|
||||
func (dw *DatabaseWriter) upsertSlots() error {
|
||||
_, err := dw.Db.Exec(context.Background(), UpsertSlotsStmt, dw.DbSlots.Epoch, dw.DbSlots.Slot, dw.DbSlots.BlockRoot, dw.DbSlots.StateRoot, dw.DbSlots.Status)
|
||||
if err != nil {
|
||||
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.slots table")
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write the information for the signed_beacon_block.
|
||||
func (dw *DatabaseWriter) writeSignedBeaconBlocks() {
|
||||
dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock)
|
||||
dw.upsertSignedBeaconBlock()
|
||||
func (dw *DatabaseWriter) writeSignedBeaconBlocks() error {
|
||||
err := dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = dw.upsertSignedBeaconBlock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Upsert to public.blocks.
|
||||
func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) {
|
||||
func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error {
|
||||
_, err := dw.Db.Exec(context.Background(), UpsertBlocksStmt, key, data)
|
||||
if err != nil {
|
||||
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table")
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Upsert to the ethcl.signed_beacon_block table.
|
||||
func (dw *DatabaseWriter) upsertSignedBeaconBlock() {
|
||||
_, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.MhKey)
|
||||
func (dw *DatabaseWriter) upsertSignedBeaconBlock() error {
|
||||
_, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.Eth1BlockHash, dw.DbSignedBeaconBlock.MhKey)
|
||||
if err != nil {
|
||||
loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table")
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write the information for the beacon_state.
|
||||
func (dw *DatabaseWriter) writeBeaconState() {
|
||||
dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState)
|
||||
dw.upsertBeaconState()
|
||||
func (dw *DatabaseWriter) writeBeaconState() error {
|
||||
err := dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = dw.upsertBeaconState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Upsert to the ethcl.beacon_state table.
|
||||
func (dw *DatabaseWriter) upsertBeaconState() {
|
||||
func (dw *DatabaseWriter) upsertBeaconState() error {
|
||||
_, err := dw.Db.Exec(context.Background(), UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey)
|
||||
if err != nil {
|
||||
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table")
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot.
|
||||
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
|
||||
func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
|
||||
slotNum, strErr := strconv.Atoi(slot)
|
||||
if strErr != nil {
|
||||
loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...")
|
||||
}
|
||||
|
||||
forkCount, err := updateForked(db, slot, latestBlockRoot)
|
||||
if err != nil {
|
||||
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.")
|
||||
// Add to knownGaps Table
|
||||
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
|
||||
}
|
||||
proposedCount, err := updateProposed(db, slot, latestBlockRoot)
|
||||
if err != nil {
|
||||
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.")
|
||||
// Add to knownGaps Table
|
||||
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
|
||||
}
|
||||
|
||||
if forkCount > 0 {
|
||||
@ -187,7 +229,6 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *
|
||||
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
||||
"forkCount": forkCount,
|
||||
}).Warn("There were no forked rows to update.")
|
||||
|
||||
}
|
||||
|
||||
if proposedCount == 1 {
|
||||
@ -198,16 +239,19 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *
|
||||
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
||||
"proposedCount": proposedCount,
|
||||
}).Error("Too many rows were marked as proposed!")
|
||||
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
|
||||
} else if proposedCount == 0 {
|
||||
var count int
|
||||
err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count)
|
||||
if err != nil {
|
||||
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.")
|
||||
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
|
||||
}
|
||||
if count != 1 {
|
||||
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
||||
"proposedCount": count,
|
||||
}).Warn("The proposed block was not marked as proposed...")
|
||||
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
|
||||
} else {
|
||||
loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
|
||||
}
|
||||
@ -246,6 +290,82 @@ func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64
|
||||
return count, err
|
||||
}
|
||||
|
||||
// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaos into
|
||||
// smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would
|
||||
// have 10 entries as follows: 1-10, 11-20, etc...
|
||||
func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string) {
|
||||
if endSlot-startSlot <= tableIncrement {
|
||||
kgModel := DbKnownGaps{
|
||||
StartSlot: strconv.Itoa(startSlot),
|
||||
EndSlot: strconv.Itoa(endSlot),
|
||||
CheckedOut: false,
|
||||
ReprocessingError: "",
|
||||
EntryError: entryError.Error(),
|
||||
EntryProcess: entryProcess,
|
||||
}
|
||||
upsertKnownGaps(db, kgModel)
|
||||
}
|
||||
totalSlots := endSlot - startSlot
|
||||
var chunks int
|
||||
chunks = totalSlots / tableIncrement
|
||||
if totalSlots%tableIncrement != 0 {
|
||||
chunks = chunks + 1
|
||||
}
|
||||
|
||||
for i := 0; i < chunks; i++ {
|
||||
var tempStart, tempEnd int
|
||||
tempStart = startSlot + (i * tableIncrement)
|
||||
if i+1 == chunks {
|
||||
tempEnd = endSlot
|
||||
} else {
|
||||
tempEnd = startSlot + ((i + 1) * tableIncrement)
|
||||
}
|
||||
kgModel := DbKnownGaps{
|
||||
StartSlot: strconv.Itoa(tempStart),
|
||||
EndSlot: strconv.Itoa(tempEnd),
|
||||
CheckedOut: false,
|
||||
ReprocessingError: "",
|
||||
EntryError: entryError.Error(),
|
||||
EntryProcess: entryProcess,
|
||||
}
|
||||
upsertKnownGaps(db, kgModel)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// A function to upsert a single entry to the ethcl.known_gaps table.
|
||||
func upsertKnownGaps(db sql.Database, knModel DbKnownGaps) {
|
||||
_, err := db.Exec(context.Background(), UpsertKnownGapsStmt, knModel.StartSlot, knModel.EndSlot,
|
||||
knModel.CheckedOut, knModel.ReprocessingError, knModel.EntryError, knModel.EntryProcess)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"err": err,
|
||||
"startSlot": knModel.StartSlot,
|
||||
"endSlot": knModel.EndSlot,
|
||||
}).Fatal("We are unable to write to the ethcl.known_gaps table!!! We will stop the application because of that.")
|
||||
}
|
||||
log.WithFields(log.Fields{
|
||||
"startSlot": knModel.StartSlot,
|
||||
"endSlot": knModel.EndSlot,
|
||||
}).Warn("A new gap has been added to the ethcl.known_gaps table.")
|
||||
}
|
||||
|
||||
// A function to write the gap between the highest slot in the DB and the first processed slot.
|
||||
func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot int) {
|
||||
var maxSlot int
|
||||
err := db.QueryRow(context.Background(), QueryHighestSlotStmt).Scan(&maxSlot)
|
||||
if err != nil {
|
||||
loghelper.LogError(err).Fatal("Unable to get the max block from the DB. We must close the application or we might have undetected gaps.")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
loghelper.LogError(err).WithFields(log.Fields{
|
||||
"maxSlot": maxSlot,
|
||||
}).Fatal("Unable to get convert max block from DB to int. We must close the application or we might have undetected gaps.")
|
||||
}
|
||||
writeKnownGaps(db, tableIncrement, maxSlot, firstSlot, fmt.Errorf(""), "startup")
|
||||
}
|
||||
|
||||
// Dummy function for calculating the mhKey.
|
||||
func calculateMhKey() string {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
|
@ -47,10 +47,11 @@ type DbSlots struct {
|
||||
|
||||
// A struct to capture whats being written to ethcl.signed_beacon_block table.
|
||||
type DbSignedBeaconBlock struct {
|
||||
Slot string // The slot.
|
||||
BlockRoot string // The block root
|
||||
ParentBlock string // The parent block root.
|
||||
MhKey string // The ipld multihash key.
|
||||
Slot string // The slot.
|
||||
BlockRoot string // The block root
|
||||
ParentBlock string // The parent block root.
|
||||
Eth1BlockHash string // The eth1 block_hash
|
||||
MhKey string // The ipld multihash key.
|
||||
|
||||
}
|
||||
|
||||
@ -60,3 +61,14 @@ type DbBeaconState struct {
|
||||
StateRoot string // The state root
|
||||
MhKey string // The ipld multihash key.
|
||||
}
|
||||
|
||||
// A structure to capture whats being written to the ethcl.known_gaps table.
|
||||
type DbKnownGaps struct {
|
||||
StartSlot string // The start slot for known_gaps, inclusive.
|
||||
EndSlot string // The end slot for known_gaps, inclusive.
|
||||
CheckedOut bool // Indicates if any process is currently processing this entry.
|
||||
ReprocessingError string // The error that occurred when attempting to reprocess these entries.
|
||||
EntryError string // The error that caused this entry to be added to the table. Could be null.
|
||||
EntryTime string // The time this range was added to the DB. This can help us catch ranges that have not been processed for a long time due to some error.
|
||||
EntryProcess string // The entry process that added this process. Potential options are StartUp, Error, ManualEntry, HeadGap.
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ func (bc *BeaconClient) handleReorg() {
|
||||
// This function will handle the latest head event.
|
||||
func (bc *BeaconClient) handleHead() {
|
||||
log.Info("Starting to process head.")
|
||||
errorSlots := 0
|
||||
for {
|
||||
head := <-bc.HeadTracking.ProcessCh
|
||||
// Process all the work here.
|
||||
@ -32,10 +33,20 @@ func (bc *BeaconClient) handleHead() {
|
||||
bc.HeadTracking.ErrorCh <- &SseError{
|
||||
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
|
||||
}
|
||||
errorSlots = errorSlots + 1
|
||||
continue
|
||||
}
|
||||
err = processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics)
|
||||
if errorSlots != 0 && bc.PreviousSlot != 0 {
|
||||
log.WithFields(log.Fields{
|
||||
"lastProcessedSlot": bc.PreviousSlot,
|
||||
"errorMessages": errorSlots,
|
||||
}).Warn("We added slots to the knownGaps table because we got bad head messages.")
|
||||
writeKnownGaps(bc.Db, bc.KnownGapTableIncrement, bc.PreviousSlot, bcSlotsPerEpoch+errorSlots, fmt.Errorf("Bad Head Messages"), "headProcessing")
|
||||
}
|
||||
|
||||
err = processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement)
|
||||
if err != nil {
|
||||
loghelper.LogSlotError(head.Slot, err)
|
||||
loghelper.LogSlotError(head.Slot, err).Error("Unable to process a slot")
|
||||
}
|
||||
log.WithFields(log.Fields{"head": head}).Debug("Received a new head event.")
|
||||
|
||||
|
@ -25,6 +25,7 @@ var (
|
||||
}
|
||||
ParentRootUnmarshalError = "Unable to properly unmarshal the ParentRoot field in the SignedBeaconBlock."
|
||||
MissingIdentifiedError = "Can't query state without a set slot or block_root"
|
||||
MissingEth1Data = "Can't get the Eth1 block_hash"
|
||||
)
|
||||
|
||||
type ProcessSlot struct {
|
||||
@ -55,11 +56,7 @@ type ProcessSlot struct {
|
||||
}
|
||||
|
||||
// This function will do all the work to process the slot and write it to the DB.
|
||||
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics) error {
|
||||
headOrHistoric = strings.ToLower(headOrHistoric)
|
||||
if headOrHistoric != "head" && headOrHistoric != "historic" {
|
||||
return fmt.Errorf("headOrBatch must be either historic or head!")
|
||||
}
|
||||
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) error {
|
||||
ps := &ProcessSlot{
|
||||
Slot: slot,
|
||||
BlockRoot: blockRoot,
|
||||
@ -72,33 +69,43 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
|
||||
// Get the SignedBeaconBlock.
|
||||
err := ps.getSignedBeaconBlock(serverAddress)
|
||||
if err != nil {
|
||||
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
|
||||
return err
|
||||
}
|
||||
|
||||
// Get the BeaconState.
|
||||
err = ps.getBeaconState(serverAddress)
|
||||
if err != nil {
|
||||
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
|
||||
return err
|
||||
}
|
||||
if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" {
|
||||
writeStartUpGaps(db, knownGapsTableIncrement, ps.Slot)
|
||||
}
|
||||
|
||||
// Get this object ready to write
|
||||
dw := ps.createWriteObjects()
|
||||
|
||||
// Write the object to the DB.
|
||||
dw.writeFullSlot()
|
||||
err = dw.writeFullSlot()
|
||||
if err != nil {
|
||||
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
|
||||
}
|
||||
|
||||
// Handle any reorgs or skipped slots.
|
||||
if ps.HeadOrHistoric == "head" {
|
||||
if previousSlot != 0 && previousBlockRoot != "" {
|
||||
ps.checkPreviousSlot(previousSlot, previousBlockRoot)
|
||||
}
|
||||
headOrHistoric = strings.ToLower(headOrHistoric)
|
||||
if headOrHistoric != "head" && headOrHistoric != "historic" {
|
||||
return fmt.Errorf("headOrHistoric must be either historic or head!")
|
||||
}
|
||||
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" {
|
||||
ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
|
||||
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics) error {
|
||||
return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics)
|
||||
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) error {
|
||||
return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement)
|
||||
}
|
||||
|
||||
// Handle a historic slot. A wrapper function for calling `handleFullSlot`.
|
||||
@ -141,7 +148,11 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error {
|
||||
} else if ps.FullSignedBeaconBlock.Block.ParentRoot == nil {
|
||||
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(ParentRootUnmarshalError)
|
||||
return fmt.Errorf(ParentRootUnmarshalError)
|
||||
} else if hex.EncodeToString(ps.FullBeaconState.Eth1Data.BlockHash) == "" {
|
||||
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(MissingEth1Data)
|
||||
return fmt.Errorf(MissingEth1Data)
|
||||
}
|
||||
log.Warn("We received a processing error: ", err)
|
||||
}
|
||||
ps.ParentBlockRoot = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot)
|
||||
return nil
|
||||
@ -174,7 +185,7 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string) error {
|
||||
}
|
||||
|
||||
// Check to make sure that the previous block we processed is the parent of the current block.
|
||||
func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot string) {
|
||||
func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) {
|
||||
parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot)
|
||||
if previousSlot == int(ps.FullBeaconState.Slot) {
|
||||
log.WithFields(log.Fields{
|
||||
@ -182,21 +193,21 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
|
||||
"fork": true,
|
||||
}).Warn("A fork occurred! The previous slot and current slot match.")
|
||||
writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
|
||||
} else if previousSlot-1 != int(ps.FullBeaconState.Slot) {
|
||||
} else if previousSlot+1 != int(ps.FullBeaconState.Slot) {
|
||||
log.WithFields(log.Fields{
|
||||
"previousSlot": previousSlot,
|
||||
"currentSlot": ps.FullBeaconState.Slot,
|
||||
}).Error("We skipped a few slots.")
|
||||
writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot)-1, fmt.Errorf("Gaps during head processing"), "headGaps")
|
||||
// Check to see if the slot was skipped.
|
||||
// Call our batch processing function.
|
||||
// Call our known_gaps function.
|
||||
} else if previousBlockRoot != parentRoot {
|
||||
log.WithFields(log.Fields{
|
||||
"previousBlockRoot": previousBlockRoot,
|
||||
"currentBlockParent": parentRoot,
|
||||
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
|
||||
writeReorgs(ps.Db, strconv.Itoa(previousSlot), parentRoot, ps.Metrics)
|
||||
// Call our batch processing function.
|
||||
// Continue with this slot.
|
||||
writeKnownGaps(ps.Db, 1, ps.Slot-1, ps.Slot-1, fmt.Errorf("Incorrect Parent"), "processSlot")
|
||||
} else {
|
||||
log.Debug("Previous Slot and Current Slot are one distance from each other.")
|
||||
}
|
||||
@ -210,7 +221,7 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
|
||||
// Check if head or historic.
|
||||
|
||||
// 1. BeaconBlock is 404.
|
||||
// 2. check heck /lighthouse/database/info to make sure the oldest_block_slot == 0 and anchor == null. This indicates that I don't have any gaps in the DB.
|
||||
// 2. check check /lighthouse/database/info to make sure the oldest_block_slot == 0 and anchor == null. This indicates that I don't have any gaps in the DB.
|
||||
// 3. I query BeaconState for slot X, and get a BeaconState.
|
||||
// 4. Although for good measure you should also check that the head is at a slot >= X using something like /eth/v1/node/syncing/ or /eth/v1/beacon/headers/head
|
||||
func (ps *ProcessSlot) checkMissedSlot() {
|
||||
@ -254,7 +265,9 @@ func (ps *ProcessSlot) createWriteObjects() *DatabaseWriter {
|
||||
status = "proposed"
|
||||
}
|
||||
|
||||
dw := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, status, ps.Metrics)
|
||||
eth1BlockHash := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.Body.Eth1Data.BlockHash)
|
||||
|
||||
dw := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.Metrics)
|
||||
dw.rawSignedBeaconBlock = ps.SszSignedBeaconBlock
|
||||
dw.rawBeaconState = ps.SszBeaconState
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user