76: Add indexing of ExecutionPayloads (and other Merge-related updates). #73

Merged
telackey merged 30 commits from telackey/the_merge into main 2022-09-29 01:39:56 +00:00
16 changed files with 176 additions and 152 deletions
Showing only changes of commit 36dd5d6331 - Show all commits

View File

@ -19,6 +19,7 @@ package cmd
import (
"context"
"fmt"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/beaconclient"
"strconv"
log "github.com/sirupsen/logrus"
@ -81,7 +82,7 @@ func startFullProcessing() {
errG, _ := errgroup.WithContext(context.Background())
errG.Go(func() error {
errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"), viper.GetUint64("bc.minimumSlot"))
errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"), beaconclient.Slot(viper.GetUint64("bc.minimumSlot")))
if len(errs) != 0 {
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events")
@ -95,7 +96,7 @@ func startFullProcessing() {
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"), viper.GetUint64("kg.minimumSlot"))
errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"), beaconclient.Slot(viper.GetUint64("kg.minimumSlot")))
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")

View File

@ -19,6 +19,7 @@ package cmd
import (
"context"
"fmt"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/beaconclient"
"net/http"
"strconv"
@ -69,7 +70,7 @@ func startHeadTracking() {
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"), viper.GetUint64("kg.minimumSlot"))
errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"), beaconclient.Slot(viper.GetUint64("kg.minimumSlot")))
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")

View File

@ -19,6 +19,7 @@ package cmd
import (
"context"
"fmt"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/beaconclient"
"os"
"strconv"
@ -65,7 +66,7 @@ func startHistoricProcessing() {
errG, _ := errgroup.WithContext(context.Background())
errG.Go(func() error {
errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"), viper.GetUint64("bc.minimumSlot"))
errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"), beaconclient.Slot(viper.GetUint64("bc.minimumSlot")))
if len(errs) != 0 {
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events")
@ -80,7 +81,7 @@ func startHistoricProcessing() {
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
errs := Bc.ProcessKnownGaps(kgContext, viper.GetInt("kg.maxKnownGapsWorker"), viper.GetUint64("kg.minimumSlot"))
errs := Bc.ProcessKnownGaps(kgContext, viper.GetInt("kg.maxKnownGapsWorker"), beaconclient.Slot(viper.GetUint64("kg.minimumSlot")))
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")

View File

@ -58,8 +58,8 @@ type BeaconClient struct {
// Used for Head Tracking
PerformHeadTracking bool // Should we track head?
StartingSlot uint64 // If we're performing head tracking. What is the first slot we processed.
PreviousSlot uint64 // Whats the previous slot we processed
StartingSlot Slot // If we're performing head tracking. What is the first slot we processed.
PreviousSlot Slot // Whats the previous slot we processed
PreviousBlockRoot string // Whats the previous block root, used to check the next blocks parent.
HeadTracking *SseEvents[Head] // Track the head block
ReOrgTracking *SseEvents[ChainReorg] // Track all Reorgs

View File

@ -565,10 +565,10 @@ func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient {
}
// A helper function to validate the expected output from the eth_beacon.slots table.
func validateSlot(bc *beaconclient.BeaconClient, headMessage beaconclient.Head, correctEpoch uint64, correctStatus string) {
func validateSlot(bc *beaconclient.BeaconClient, headMessage beaconclient.Head, correctEpoch beaconclient.Epoch, correctStatus string) {
epoch, dbSlot, blockRoot, stateRoot, status := queryDbSlotAndBlock(bc.Db, headMessage.Slot, headMessage.Block)
log.Info("validateSlot: ", headMessage)
baseSlot, err := strconv.ParseUint(headMessage.Slot, 10, 64)
baseSlot, err := beaconclient.ParseSlot(headMessage.Slot)
Expect(err).ToNot(HaveOccurred())
Expect(dbSlot).To(Equal(baseSlot))
Expect(epoch).To(Equal(correctEpoch))
@ -583,9 +583,9 @@ func validateSignedBeaconBlock(bc *beaconclient.BeaconClient, headMessage beacon
correctExecutionPayloadHeader *beaconclient.DbExecutionPayloadHeader) {
dbSignedBlock := queryDbSignedBeaconBlock(bc.Db, headMessage.Slot, headMessage.Block)
log.Info("validateSignedBeaconBlock: ", headMessage)
baseSlot, err := strconv.ParseUint(headMessage.Slot, 10, 64)
baseSlot, err := beaconclient.ParseSlot(headMessage.Slot)
Expect(err).ToNot(HaveOccurred())
Expect(dbSignedBlock.Slot).To(Equal(baseSlot))
Expect(dbSignedBlock.Slot).To(Equal(baseSlot.Number()))
Expect(dbSignedBlock.BlockRoot).To(Equal(headMessage.Block))
Expect(dbSignedBlock.ParentBlock).To(Equal(correctParentRoot))
Expect(dbSignedBlock.Eth1DataBlockHash).To(Equal(correctEth1DataBlockHash))
@ -597,7 +597,7 @@ func validateSignedBeaconBlock(bc *beaconclient.BeaconClient, headMessage beacon
func validateBeaconState(bc *beaconclient.BeaconClient, headMessage beaconclient.Head, correctMhKey string) {
dbSlot, stateRoot, mhKey := queryDbBeaconState(bc.Db, headMessage.Slot, headMessage.State)
log.Info("validateBeaconState: ", headMessage)
baseSlot, err := strconv.ParseUint(headMessage.Slot, 10, 64)
baseSlot, err := beaconclient.ParseSlot(headMessage.Slot)
Expect(err).ToNot(HaveOccurred())
Expect(dbSlot).To(Equal(baseSlot))
Expect(stateRoot).To(Equal(headMessage.State))
@ -633,9 +633,10 @@ func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxR
}
// A helper function to query the eth_beacon.slots table based on the slot and block_root
func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot string) (uint64, uint64, string, string, string) {
func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot string) (beaconclient.Epoch, beaconclient.Slot, string, string, string) {
sqlStatement := `SELECT epoch, slot, block_root, state_root, status FROM eth_beacon.slots WHERE slot=$1 AND block_root=$2;`
var epoch, slot uint64
var epoch beaconclient.Epoch
var slot beaconclient.Slot
var blockRoot, stateRoot, status string
log.Debug("Starting to query the eth_beacon.slots table, ", querySlot, " ", queryBlockRoot)
err := db.QueryRow(context.Background(), sqlStatement, querySlot, queryBlockRoot).Scan(&epoch, &slot, &blockRoot, &stateRoot, &status)
@ -651,7 +652,7 @@ func queryDbSignedBeaconBlock(db sql.Database, querySlot string, queryBlockRoot
payload_parent_hash, payload_state_root, payload_receipts_root,
payload_transactions_root FROM eth_beacon.signed_block WHERE slot=$1 AND block_root=$2;`
var slot uint64
var slot beaconclient.Slot
var payloadBlockNumber, payloadTimestamp *uint64
var blockRoot, parentBlockRoot, eth1DataBlockHash, mhKey string
var payloadBlockHash, payloadParentHash, payloadStateRoot, payloadReceiptsRoot, payloadTransactionsRoot *string
@ -663,7 +664,7 @@ func queryDbSignedBeaconBlock(db sql.Database, querySlot string, queryBlockRoot
Expect(err).ToNot(HaveOccurred())
signedBlock := beaconclient.DbSignedBeaconBlock{
Slot: slot,
Slot: slot.Number(),
BlockRoot: blockRoot,
ParentBlock: parentBlockRoot,
Eth1DataBlockHash: eth1DataBlockHash,
@ -687,9 +688,9 @@ func queryDbSignedBeaconBlock(db sql.Database, querySlot string, queryBlockRoot
}
// A helper function to query the eth_beacon.signed_block table based on the slot and block_root.
func queryDbBeaconState(db sql.Database, querySlot string, queryStateRoot string) (uint64, string, string) {
func queryDbBeaconState(db sql.Database, querySlot string, queryStateRoot string) (beaconclient.Slot, string, string) {
sqlStatement := `SELECT slot, state_root, mh_key FROM eth_beacon.state WHERE slot=$1 AND state_root=$2;`
var slot uint64
var slot beaconclient.Slot
var stateRoot, mhKey string
row := db.QueryRow(context.Background(), sqlStatement, querySlot, queryStateRoot)
err := row.Scan(&slot, &stateRoot, &mhKey)
@ -926,7 +927,7 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string
// Helper function to test three reorg messages. There are going to be many functions like this,
// Because we need to test the same logic for multiple phases.
func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch uint64, maxRetry int) {
func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch beaconclient.Epoch, maxRetry int) {
go bc.CaptureHead()
time.Sleep(1 * time.Second)
@ -958,7 +959,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
NewHeadBlock: secondHead.Block,
OldHeadState: thirdHead.State,
NewHeadState: secondHead.State,
Epoch: strconv.FormatUint(epoch, 10),
Epoch: epoch.Format(),
ExecutionOptimistic: false,
})
Expect(err).ToNot(HaveOccurred())
@ -988,7 +989,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
}
// A test to validate a single block was processed correctly
func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch uint64, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) {
func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch beaconclient.Epoch, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) {
go bc.CaptureHead()
time.Sleep(1 * time.Second)
sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert)
@ -1018,7 +1019,7 @@ func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head b
// A test that ensures that if two HeadMessages occur for a single slot they are marked
// as proposed and forked correctly.
func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch uint64, maxRetry int) {
func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch beaconclient.Epoch, maxRetry int) {
go bc.CaptureHead()
time.Sleep(1 * time.Second)

View File

@ -27,7 +27,7 @@ import (
)
// This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int, minimumSlot uint64) []error {
func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int, minimumSlot Slot) []error {
log.Info("We are starting the historical processing service.")
bc.HistoricalProcess = HistoricProcessing{db: bc.Db, metrics: bc.Metrics, uniqueNodeIdentifier: bc.UniqueNodeIdentifier}
errs := handleBatchProcess(ctx, maxWorkers, bc.HistoricalProcess, bc.SlotProcessingDetails(), bc.Metrics.IncrementHistoricSlotProcessed, minimumSlot)
@ -52,10 +52,10 @@ func (bc *BeaconClient) StopHistoric(cancel context.CancelFunc) error {
//
// 2. Known Gaps Processing
type BatchProcessing interface {
getSlotRange(context.Context, chan<- slotsToProcess, uint64) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write.
handleProcessingErrors(context.Context, <-chan batchHistoricError) // Custom logic to handle errors.
removeTableEntry(context.Context, <-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database.
releaseDbLocks() error // Update the checked_out column to false for whatever table is being updated.
getSlotRange(context.Context, chan<- slotsToProcess, Slot) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write.
handleProcessingErrors(context.Context, <-chan batchHistoricError) // Custom logic to handle errors.
removeTableEntry(context.Context, <-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database.
releaseDbLocks() error // Update the checked_out column to false for whatever table is being updated.
}
/// ^^^
@ -66,14 +66,14 @@ type BatchProcessing interface {
// A struct to pass around indicating a table entry for slots to process.
type slotsToProcess struct {
startSlot uint64 // The start slot
endSlot uint64 // The end slot
startSlot Slot // The start slot
endSlot Slot // The end slot
}
type batchHistoricError struct {
err error // The error that occurred when attempting to a slot
errProcess string // The process that caused the error.
slot uint64 // The slot which the error is for.
slot Slot // The slot which the error is for.
}
// Wrapper function for the BatchProcessing interface.
@ -90,9 +90,9 @@ type batchHistoricError struct {
// 4. Remove the slot entry from the DB.
//
// 5. Handle any errors.
func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, spd SlotProcessingDetails, incrementTracker func(uint64), minimumSlot uint64) []error {
func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, spd SlotProcessingDetails, incrementTracker func(uint64), minimumSlot Slot) []error {
slotsCh := make(chan slotsToProcess)
workCh := make(chan uint64)
workCh := make(chan Slot)
processedCh := make(chan slotsToProcess)
errCh := make(chan batchHistoricError)
finalErrCh := make(chan []error, 1)

View File

@ -11,14 +11,41 @@ import (
"github.com/protolambda/ztyp/codec"
"github.com/protolambda/ztyp/tree"
log "github.com/sirupsen/logrus"
"strconv"
)
type Eth1Data common.Eth1Data
type Root common.Root
type Signature common.BLSSignature
type Slot common.Slot
type Slot uint64
type Epoch uint64
type ExecutionPayloadHeader common.ExecutionPayloadHeader
func ParseSlot(v string) (Slot, error) {
slotNum, err := strconv.ParseUint(v, 10, 64)
return Slot(slotNum), err
}
func (s *Slot) Format() string {
return strconv.FormatUint(uint64(*s), 10)
}
func (s *Slot) Number() uint64 {
return uint64(*s)
}
func (s *Slot) Plus(v uint64) Slot {
return Slot(v + s.Number())
}
func (s *Slot) PlusInt(v int) Slot {
return s.Plus(uint64(v))
}
func (e *Epoch) Format() string {
return strconv.FormatUint(uint64(*e), 10)
}
type BeaconBlock struct {
spec *common.Spec
bellatrix *bellatrix.BeaconBlock

View File

@ -18,8 +18,6 @@ package beaconclient
import (
"context"
"fmt"
"strconv"
"github.com/jackc/pgx/v4"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/database/sql"
@ -100,7 +98,7 @@ type DatabaseWriter struct {
rawSignedBeaconBlock *[]byte
}
func CreateDatabaseWrite(db sql.Database, slot uint64, stateRoot string, blockRoot string, parentBlockRoot string,
func CreateDatabaseWrite(db sql.Database, slot Slot, stateRoot string, blockRoot string, parentBlockRoot string,
eth1DataBlockHash string, payloadHeader *ExecutionPayloadHeader, status string, rawSignedBeaconBlock *[]byte, rawBeaconState *[]byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) {
ctx := context.Background()
tx, err := db.Begin(ctx)
@ -130,10 +128,10 @@ func CreateDatabaseWrite(db sql.Database, slot uint64, stateRoot string, blockRo
// Write functions to write each all together...
// Should I do one atomic write?
// Create the model for the eth_beacon.slots table
func (dw *DatabaseWriter) prepareSlotsModel(slot uint64, stateRoot string, blockRoot string, status string) {
func (dw *DatabaseWriter) prepareSlotsModel(slot Slot, stateRoot string, blockRoot string, status string) {
dw.DbSlots = &DbSlots{
Epoch: calculateEpoch(slot, bcSlotsPerEpoch),
Slot: strconv.FormatUint((slot), 10),
Slot: slot.Number(),
StateRoot: stateRoot,
BlockRoot: blockRoot,
Status: status,
@ -143,14 +141,14 @@ func (dw *DatabaseWriter) prepareSlotsModel(slot uint64, stateRoot string, block
}
// Create the model for the eth_beacon.signed_block table.
func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot uint64, blockRoot string, parentBlockRoot string, eth1DataBlockHash string,
func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot Slot, blockRoot string, parentBlockRoot string, eth1DataBlockHash string,
payloadHeader *ExecutionPayloadHeader) error {
mhKey, err := MultihashKeyFromSSZRoot([]byte(dw.DbSlots.BlockRoot))
if err != nil {
return err
}
dw.DbSignedBeaconBlock = &DbSignedBeaconBlock{
Slot: slot,
Slot: slot.Number(),
BlockRoot: blockRoot,
ParentBlock: parentBlockRoot,
Eth1DataBlockHash: eth1DataBlockHash,
@ -175,13 +173,13 @@ func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot uint64, blockRoot s
}
// Create the model for the eth_beacon.state table.
func (dw *DatabaseWriter) prepareBeaconStateModel(slot uint64, stateRoot string) error {
func (dw *DatabaseWriter) prepareBeaconStateModel(slot Slot, stateRoot string) error {
mhKey, err := MultihashKeyFromSSZRoot([]byte(dw.DbSlots.StateRoot))
if err != nil {
return err
}
dw.DbBeaconState = &DbBeaconState{
Slot: slot,
Slot: slot.Number(),
StateRoot: stateRoot,
MhKey: mhKey,
}
@ -343,56 +341,51 @@ func (dw *DatabaseWriter) upsertBeaconState() error {
// Update a given slot to be marked as forked within a transaction. Provide the slot and the latest latestBlockRoot.
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
func transactReorgs(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
slotNum, strErr := strconv.ParseUint(slot, 10, 64)
if strErr != nil {
loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...")
}
func transactReorgs(tx sql.Tx, ctx context.Context, slot Slot, latestBlockRoot string, metrics *BeaconClientMetrics) {
forkCount, err := updateForked(tx, ctx, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.")
transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics)
loghelper.LogReorgError(slot.Number(), latestBlockRoot, err).Error("We ran into some trouble while updating all forks.")
transactKnownGaps(tx, ctx, 1, slot, slot, err, "reorg", metrics)
}
proposedCount, err := updateProposed(tx, ctx, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.")
transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics)
loghelper.LogReorgError(slot.Number(), latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.")
transactKnownGaps(tx, ctx, 1, slot, slot, err, "reorg", metrics)
}
if forkCount > 0 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
loghelper.LogReorg(slot.Number(), latestBlockRoot).WithFields(log.Fields{
"forkCount": forkCount,
}).Info("Updated rows that were forked.")
} else {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
loghelper.LogReorg(slot.Number(), latestBlockRoot).WithFields(log.Fields{
"forkCount": forkCount,
}).Warn("There were no forked rows to update.")
}
if proposedCount == 1 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
loghelper.LogReorg(slot.Number(), latestBlockRoot).WithFields(log.Fields{
"proposedCount": proposedCount,
}).Info("Updated the row that should have been marked as proposed.")
} else if proposedCount > 1 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
loghelper.LogReorg(slot.Number(), latestBlockRoot).WithFields(log.Fields{
"proposedCount": proposedCount,
}).Error("Too many rows were marked as proposed!")
transactKnownGaps(tx, ctx, 1, slotNum, slotNum, fmt.Errorf("Too many rows were marked as unproposed."), "reorg", metrics)
transactKnownGaps(tx, ctx, 1, slot, slot, fmt.Errorf("Too many rows were marked as unproposed."), "reorg", metrics)
} else if proposedCount == 0 {
transactKnownGaps(tx, ctx, 1, slotNum, slotNum, fmt.Errorf("Unable to find properly proposed row in DB"), "reorg", metrics)
loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
transactKnownGaps(tx, ctx, 1, slot, slot, fmt.Errorf("Unable to find properly proposed row in DB"), "reorg", metrics)
loghelper.LogReorg(slot.Number(), latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
}
metrics.IncrementReorgsInsert(1)
}
// Wrapper function that will create a transaction and execute the function.
func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
func writeReorgs(db sql.Database, slot Slot, latestBlockRoot string, metrics *BeaconClientMetrics) {
ctx := context.Background()
tx, err := db.Begin(ctx)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Fatal("Unable to create a new transaction for reorgs")
loghelper.LogReorgError(slot.Number(), latestBlockRoot, err).Fatal("Unable to create a new transaction for reorgs")
}
defer func() {
err := tx.Rollback(ctx)
@ -402,35 +395,35 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *
}()
transactReorgs(tx, ctx, slot, latestBlockRoot, metrics)
if err = tx.Commit(ctx); err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Fatal("Unable to execute the transaction for reorgs")
loghelper.LogReorgError(slot.Number(), latestBlockRoot, err).Fatal("Unable to execute the transaction for reorgs")
}
}
// Update the slots table by marking the old slot's as forked.
func updateForked(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string) (int64, error) {
func updateForked(tx sql.Tx, ctx context.Context, slot Slot, latestBlockRoot string) (int64, error) {
res, err := tx.Exec(ctx, UpdateForkedStmt, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the eth_beacon.slots table with the forked slots")
loghelper.LogReorgError(slot.Number(), latestBlockRoot, err).Error("We are unable to update the eth_beacon.slots table with the forked slots")
return 0, err
}
count, err := res.RowsAffected()
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were marked as forked.")
loghelper.LogReorgError(slot.Number(), latestBlockRoot, err).Error("Unable to figure out how many entries were marked as forked.")
return 0, err
}
return count, err
}
// Mark a slot as proposed.
func updateProposed(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string) (int64, error) {
func updateProposed(tx sql.Tx, ctx context.Context, slot Slot, latestBlockRoot string) (int64, error) {
res, err := tx.Exec(ctx, UpdateProposedStmt, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the eth_beacon.slots table with the proposed slot.")
loghelper.LogReorgError(slot.Number(), latestBlockRoot, err).Error("We are unable to update the eth_beacon.slots table with the proposed slot.")
return 0, err
}
count, err := res.RowsAffected()
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were marked as proposed")
loghelper.LogReorgError(slot.Number(), latestBlockRoot, err).Error("Unable to figure out how many entries were marked as proposed")
return 0, err
}
@ -440,17 +433,17 @@ func updateProposed(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot
// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaps into
// smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would
// have 10 entries as follows: 1-10, 11-20, etc...
func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, startSlot uint64, endSlot uint64, entryError error, entryProcess string, metric *BeaconClientMetrics) {
func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, startSlot Slot, endSlot Slot, entryError error, entryProcess string, metric *BeaconClientMetrics) {
var entryErrorMsg string
if entryError == nil {
entryErrorMsg = ""
} else {
entryErrorMsg = entryError.Error()
}
if endSlot-startSlot <= uint64(tableIncrement) {
if endSlot.Number()-startSlot.Number() <= uint64(tableIncrement) {
kgModel := DbKnownGaps{
StartSlot: strconv.FormatUint(startSlot, 10),
EndSlot: strconv.FormatUint(endSlot, 10),
StartSlot: startSlot.Number(),
EndSlot: endSlot.Number(),
CheckedOut: false,
ReprocessingError: "",
EntryError: entryErrorMsg,
@ -458,7 +451,7 @@ func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, start
}
upsertKnownGaps(tx, ctx, kgModel, metric)
} else {
totalSlots := endSlot - startSlot
totalSlots := endSlot.Number() - startSlot.Number()
var chunks int
chunks = int(totalSlots / uint64(tableIncrement))
if totalSlots%uint64(tableIncrement) != 0 {
@ -466,16 +459,16 @@ func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, start
}
for i := 0; i < chunks; i++ {
var tempStart, tempEnd uint64
tempStart = startSlot + (uint64(i * tableIncrement))
var tempStart, tempEnd Slot
tempStart = startSlot.PlusInt(i * tableIncrement)
if i+1 == chunks {
tempEnd = endSlot
} else {
tempEnd = startSlot + uint64((i+1)*tableIncrement)
tempEnd = startSlot.PlusInt((i + 1) * tableIncrement)
}
kgModel := DbKnownGaps{
StartSlot: strconv.FormatUint(tempStart, 10),
EndSlot: strconv.FormatUint(tempEnd, 10),
StartSlot: tempStart.Number(),
EndSlot: tempEnd.Number(),
CheckedOut: false,
ReprocessingError: "",
EntryError: entryErrorMsg,
@ -488,11 +481,11 @@ func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, start
// Wrapper function, instead of adding the knownGaps entries to a transaction, it will
// create the transaction and write it.
func writeKnownGaps(db sql.Database, tableIncrement int, startSlot uint64, endSlot uint64, entryError error, entryProcess string, metric *BeaconClientMetrics) {
func writeKnownGaps(db sql.Database, tableIncrement int, startSlot Slot, endSlot Slot, entryError error, entryProcess string, metric *BeaconClientMetrics) {
ctx := context.Background()
tx, err := db.Begin(ctx)
if err != nil {
loghelper.LogSlotRangeError(strconv.FormatUint(startSlot, 10), strconv.FormatUint(endSlot, 10), err).Fatal("Unable to create a new transaction for knownGaps")
loghelper.LogSlotRangeError(startSlot.Number(), endSlot.Number(), err).Fatal("Unable to create a new transaction for knownGaps")
}
defer func() {
err := tx.Rollback(ctx)
@ -502,8 +495,7 @@ func writeKnownGaps(db sql.Database, tableIncrement int, startSlot uint64, endSl
}()
transactKnownGaps(tx, ctx, tableIncrement, startSlot, endSlot, entryError, entryProcess, metric)
if err = tx.Commit(ctx); err != nil {
loghelper.LogSlotRangeError(strconv.FormatUint(startSlot, 10), strconv.FormatUint(endSlot, 10), err).Fatal("Unable to execute the transaction for knownGaps")
loghelper.LogSlotRangeError(startSlot.Number(), endSlot.Number(), err).Fatal("Unable to execute the transaction for knownGaps")
}
}
@ -526,8 +518,8 @@ func upsertKnownGaps(tx sql.Tx, ctx context.Context, knModel DbKnownGaps, metric
}
// A function to write the gap between the highest slot in the DB and the first processed slot.
func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot uint64, metric *BeaconClientMetrics) {
var maxSlot uint64
func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot Slot, metric *BeaconClientMetrics) {
var maxSlot Slot
err := db.QueryRow(context.Background(), QueryHighestSlotStmt).Scan(&maxSlot)
if err != nil {
loghelper.LogError(err).Fatal("Unable to get the max block from the DB. We must close the application or we might have undetected gaps.")
@ -555,19 +547,19 @@ func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot uint64, met
}
// A function to update a knownGap range with a reprocessing error.
func updateKnownGapErrors(db sql.Database, startSlot uint64, endSlot uint64, reprocessingErr error, metric *BeaconClientMetrics) error {
func updateKnownGapErrors(db sql.Database, startSlot Slot, endSlot Slot, reprocessingErr error, metric *BeaconClientMetrics) error {
res, err := db.Exec(context.Background(), UpsertKnownGapsErrorStmt, startSlot, endSlot, reprocessingErr.Error())
if err != nil {
loghelper.LogSlotRangeError(strconv.FormatUint(startSlot, 10), strconv.FormatUint(endSlot, 10), err).Error("Unable to update reprocessing_error")
loghelper.LogSlotRangeError(startSlot.Number(), endSlot.Number(), err).Error("Unable to update reprocessing_error")
return err
}
row, err := res.RowsAffected()
if err != nil {
loghelper.LogSlotRangeError(strconv.FormatUint(startSlot, 10), strconv.FormatUint(endSlot, 10), err).Error("Unable to count rows affected when trying to update reprocessing_error.")
loghelper.LogSlotRangeError(startSlot.Number(), endSlot.Number(), err).Error("Unable to count rows affected when trying to update reprocessing_error.")
return err
}
if row != 1 {
loghelper.LogSlotRangeError(strconv.FormatUint(startSlot, 10), strconv.FormatUint(endSlot, 10), err).WithFields(log.Fields{
loghelper.LogSlotRangeError(startSlot.Number(), endSlot.Number(), err).WithFields(log.Fields{
"rowCount": row,
}).Error("The rows affected by the upsert for reprocessing_error is not 1.")
metric.IncrementKnownGapsReprocessError(1)
@ -578,13 +570,12 @@ func updateKnownGapErrors(db sql.Database, startSlot uint64, endSlot uint64, rep
}
// A quick helper function to calculate the epoch.
func calculateEpoch(slot uint64, slotPerEpoch uint64) string {
epoch := slot / slotPerEpoch
return strconv.FormatUint(epoch, 10)
func calculateEpoch(slot Slot, slotPerEpoch uint64) uint64 {
return slot.Number() / slotPerEpoch
}
// A helper function to check to see if the slot is processed.
func isSlotProcessed(db sql.Database, checkProcessStmt string, slot string) (bool, error) {
func isSlotProcessed(db sql.Database, checkProcessStmt string, slot Slot) (bool, error) {
processRow, err := db.Exec(context.Background(), checkProcessStmt, slot)
if err != nil {
return false, err
@ -601,7 +592,7 @@ func isSlotProcessed(db sql.Database, checkProcessStmt string, slot string) (boo
// Check to see if this slot is in the DB. Check eth_beacon.slots, eth_beacon.signed_block
// and eth_beacon.state. If the slot exists, return true
func IsSlotInDb(ctx context.Context, db sql.Database, slot string, blockRoot string, stateRoot string) (bool, error) {
func IsSlotInDb(ctx context.Context, db sql.Database, slot Slot, blockRoot string, stateRoot string) (bool, error) {
var (
isInBeaconState bool
isInSignedBeaconBlock bool
@ -644,7 +635,7 @@ func IsSlotInDb(ctx context.Context, db sql.Database, slot string, blockRoot str
// Provide a statement, slot, and root, and this function will check to see
// if the slot and root exist in the table.
func checkSlotAndRoot(db sql.Database, statement, slot, root string) (bool, error) {
func checkSlotAndRoot(db sql.Database, statement string, slot Slot, root string) (bool, error) {
processRow, err := db.Exec(context.Background(), statement, slot, root)
if err != nil {
return false, err

View File

@ -53,8 +53,8 @@ type ChainReorg struct {
// A struct to capture whats being written to the eth-beacon.slots table.
type DbSlots struct {
Epoch string // The epoch.
Slot string // The slot.
Epoch uint64 // The epoch.
Slot uint64 // The slot.
BlockRoot string // The block root
StateRoot string // The state root
Status string // The status, it can be proposed | forked | skipped.
@ -90,8 +90,8 @@ type DbBeaconState struct {
// A structure to capture whats being written to the eth-beacon.known_gaps table.
type DbKnownGaps struct {
StartSlot string // The start slot for known_gaps, inclusive.
EndSlot string // The end slot for known_gaps, inclusive.
StartSlot uint64 // The start slot for known_gaps, inclusive.
EndSlot uint64 // The end slot for known_gaps, inclusive.
CheckedOut bool // Indicates if any process is currently processing this entry.
ReprocessingError string // The error that occurred when attempting to reprocess these entries.
EntryError string // The error that caused this entry to be added to the table. Could be null.

View File

@ -20,9 +20,8 @@ package beaconclient
import (
"fmt"
"strconv"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper"
)
// This function will perform the necessary steps to handle a reorg.
@ -31,7 +30,11 @@ func (bc *BeaconClient) handleReorg() {
for {
reorg := <-bc.ReOrgTracking.ProcessCh
log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.")
writeReorgs(bc.Db, reorg.Slot, reorg.NewHeadBlock, bc.Metrics)
slot, err := ParseSlot(reorg.Slot)
if nil != err {
loghelper.LogSlotError(slot.Number(), err)
}
writeReorgs(bc.Db, slot, reorg.NewHeadBlock, bc.Metrics)
}
}
@ -42,7 +45,7 @@ func (bc *BeaconClient) handleHead() {
for {
head := <-bc.HeadTracking.ProcessCh
// Process all the work here.
slot, err := strconv.ParseUint(head.Slot, 10, 64)
slot, err := ParseSlot(head.Slot)
if err != nil {
bc.HeadTracking.ErrorCh <- &SseError{
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),

View File

@ -58,7 +58,7 @@ type HistoricProcessing struct {
}
// Get a single row of historical slots from the table.
func (hp HistoricProcessing) getSlotRange(ctx context.Context, slotCh chan<- slotsToProcess, minimumSlot uint64) []error {
func (hp HistoricProcessing) getSlotRange(ctx context.Context, slotCh chan<- slotsToProcess, minimumSlot Slot) []error {
return getBatchProcessRow(ctx, hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh, strconv.Itoa(hp.uniqueNodeIdentifier), minimumSlot)
}
@ -74,7 +74,7 @@ func (hp HistoricProcessing) handleProcessingErrors(ctx context.Context, errMess
case <-ctx.Done():
return
case errMs := <-errMessages:
loghelper.LogSlotError(strconv.FormatUint(errMs.slot, 10), errMs.err)
loghelper.LogSlotError(errMs.slot.Number(), errMs.err)
writeKnownGaps(hp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, hp.metrics)
}
}
@ -97,7 +97,7 @@ func (hp HistoricProcessing) releaseDbLocks() error {
}
// Process the slot range.
func processSlotRangeWorker(ctx context.Context, workCh <-chan uint64, errCh chan<- batchHistoricError, spd SlotProcessingDetails, incrementTracker func(uint64)) {
func processSlotRangeWorker(ctx context.Context, workCh <-chan Slot, errCh chan<- batchHistoricError, spd SlotProcessingDetails, incrementTracker func(uint64)) {
for {
select {
case <-ctx.Done():
@ -123,7 +123,7 @@ func processSlotRangeWorker(ctx context.Context, workCh <-chan uint64, errCh cha
// It also locks the row by updating the checked_out column.
// The statement for getting the start_slot and end_slot must be provided.
// The statement for "locking" the row must also be provided.
func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess, uniqueNodeIdentifier string, minimumSlot uint64) []error {
func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess, uniqueNodeIdentifier string, minimumSlot Slot) []error {
errCount := make([]error, 0)
// 5 is an arbitrary number. It allows us to retry a few times before
@ -178,7 +178,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
time.Sleep(1 * time.Second)
break
}
loghelper.LogSlotRangeStatementError(strconv.FormatUint(sp.startSlot, 10), strconv.FormatUint(sp.endSlot, 10), getStartEndSlotStmt, err).Error("Unable to get a row")
loghelper.LogSlotRangeStatementError(sp.startSlot.Number(), sp.endSlot.Number(), getStartEndSlotStmt, err).Error("Unable to get a row")
errCount = append(errCount, err)
break
}
@ -186,25 +186,25 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
// Checkout the Row
res, err := tx.Exec(dbCtx, checkOutRowStmt, sp.startSlot, sp.endSlot, uniqueNodeIdentifier)
if err != nil {
loghelper.LogSlotRangeStatementError(strconv.FormatUint(sp.startSlot, 10), strconv.FormatUint(sp.endSlot, 10), checkOutRowStmt, err).Error("Unable to checkout the row")
loghelper.LogSlotRangeStatementError(sp.startSlot.Number(), sp.endSlot.Number(), checkOutRowStmt, err).Error("Unable to checkout the row")
errCount = append(errCount, err)
break
}
rows, err := res.RowsAffected()
if err != nil {
loghelper.LogSlotRangeStatementError(strconv.FormatUint(sp.startSlot, 10), strconv.FormatUint(sp.endSlot, 10), checkOutRowStmt, fmt.Errorf("Unable to determine the rows affected when trying to checkout a row."))
loghelper.LogSlotRangeStatementError(sp.startSlot.Number(), sp.endSlot.Number(), checkOutRowStmt, fmt.Errorf("Unable to determine the rows affected when trying to checkout a row."))
errCount = append(errCount, err)
break
}
if rows > 1 {
loghelper.LogSlotRangeStatementError(strconv.FormatUint(sp.startSlot, 10), strconv.FormatUint(sp.endSlot, 10), checkOutRowStmt, err).WithFields(log.Fields{
loghelper.LogSlotRangeStatementError(sp.startSlot.Number(), sp.endSlot.Number(), checkOutRowStmt, err).WithFields(log.Fields{
"rowsReturn": rows,
}).Error("We locked too many rows.....")
errCount = append(errCount, err)
break
}
if rows == 0 {
loghelper.LogSlotRangeStatementError(strconv.FormatUint(sp.startSlot, 10), strconv.FormatUint(sp.endSlot, 10), checkOutRowStmt, err).WithFields(log.Fields{
loghelper.LogSlotRangeStatementError(sp.startSlot.Number(), sp.endSlot.Number(), checkOutRowStmt, err).WithFields(log.Fields{
"rowsReturn": rows,
}).Error("We did not lock a single row.")
errCount = append(errCount, err)
@ -212,7 +212,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
}
err = tx.Commit(dbCtx)
if err != nil {
loghelper.LogSlotRangeError(strconv.FormatUint(sp.startSlot, 10), strconv.FormatUint(sp.endSlot, 10), err).Error("Unable commit transactions.")
loghelper.LogSlotRangeError(sp.startSlot.Number(), sp.endSlot.Number(), err).Error("Unable commit transactions.")
errCount = append(errCount, err)
break
}
@ -241,11 +241,11 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan
"endSlot": slots.endSlot,
}).Debug("Starting to check to see if the following slots have been processed")
for {
isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.FormatUint(slots.startSlot, 10))
isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, slots.startSlot)
if err != nil {
errCh <- err
}
isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.FormatUint(slots.endSlot, 10))
isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, slots.endSlot)
if err != nil {
errCh <- err
}
@ -255,7 +255,7 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan
time.Sleep(3 * time.Second)
}
_, err := db.Exec(context.Background(), removeStmt, strconv.FormatUint(slots.startSlot, 10), strconv.FormatUint(slots.endSlot, 10))
_, err := db.Exec(context.Background(), removeStmt, slots.startSlot.Number(), slots.endSlot.Number())
if err != nil {
errCh <- err
}

View File

@ -58,7 +58,7 @@ type KnownGapsProcessing struct {
}
// This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int, minimumSlot uint64) []error {
func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int, minimumSlot Slot) []error {
log.Info("We are starting the known gaps processing service.")
bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics}
errs := handleBatchProcess(ctx, maxWorkers, bc.KnownGapsProcess, bc.SlotProcessingDetails(), bc.Metrics.IncrementKnownGapsProcessed, minimumSlot)
@ -78,7 +78,7 @@ func (bc *BeaconClient) StopKnownGapsProcessing(cancel context.CancelFunc) error
}
// Get a single row of historical slots from the table.
func (kgp KnownGapsProcessing) getSlotRange(ctx context.Context, slotCh chan<- slotsToProcess, minimumSlot uint64) []error {
func (kgp KnownGapsProcessing) getSlotRange(ctx context.Context, slotCh chan<- slotsToProcess, minimumSlot Slot) []error {
return getBatchProcessRow(ctx, kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh, strconv.Itoa(kgp.uniqueNodeIdentifier), minimumSlot)
}
@ -97,21 +97,21 @@ func (kgp KnownGapsProcessing) handleProcessingErrors(ctx context.Context, errMe
// Check to see if this if this entry already exists.
res, err := kgp.db.Exec(context.Background(), checkKgSingleSlotStmt, errMs.slot, errMs.slot)
if err != nil {
loghelper.LogSlotError(strconv.FormatUint(errMs.slot, 10), err).Error("Unable to see if this slot is in the eth_beacon.known_gaps table")
loghelper.LogSlotError(errMs.slot.Number(), err).Error("Unable to see if this slot is in the eth_beacon.known_gaps table")
}
rows, err := res.RowsAffected()
if err != nil {
loghelper.LogSlotError(strconv.FormatUint(errMs.slot, 10), err).WithFields(log.Fields{
loghelper.LogSlotError(errMs.slot.Number(), err).WithFields(log.Fields{
"queryStatement": checkKgSingleSlotStmt,
}).Error("Unable to get the number of rows affected by this statement.")
}
if rows > 0 {
loghelper.LogSlotError(strconv.FormatUint(errMs.slot, 10), errMs.err).Error("We received an error when processing a knownGap")
loghelper.LogSlotError(errMs.slot.Number(), errMs.err).Error("We received an error when processing a knownGap")
err = updateKnownGapErrors(kgp.db, errMs.slot, errMs.slot, errMs.err, kgp.metrics)
if err != nil {
loghelper.LogSlotError(strconv.FormatUint(errMs.slot, 10), err).Error("Error processing known gap")
loghelper.LogSlotError(errMs.slot.Number(), err).Error("Error processing known gap")
}
} else {
writeKnownGaps(kgp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, kgp.metrics)

View File

@ -23,7 +23,6 @@ import (
"context"
"encoding/hex"
"fmt"
"strconv"
"strings"
"time"
@ -45,8 +44,8 @@ type SlotProcessingDetails struct {
PerformBeaconStateProcessing bool // Should we process BeaconStates?
PerformBeaconBlockProcessing bool // Should we process BeaconBlocks?
StartingSlot uint64 // If we're performing head tracking. What is the first slot we processed.
PreviousSlot uint64 // Whats the previous slot we processed
StartingSlot Slot // If we're performing head tracking. What is the first slot we processed.
PreviousSlot Slot // Whats the previous slot we processed
PreviousBlockRoot string // Whats the previous block root, used to check the next blocks parent.
}
@ -71,8 +70,8 @@ func (bc *BeaconClient) SlotProcessingDetails() SlotProcessingDetails {
type ProcessSlot struct {
// Generic
Slot uint64 // The slot number.
Epoch uint64 // The epoch number.
Slot Slot // The slot number.
Epoch Epoch // The epoch number.
BlockRoot string // The hex encoded string of the BlockRoot.
StateRoot string // The hex encoded string of the StateRoot.
ParentBlockRoot string // The hex encoded string of the parent block.
@ -114,10 +113,10 @@ type PerformanceMetrics struct {
// known_gaps table.
func processFullSlot(
ctx context.Context,
slot uint64,
slot Slot,
blockRoot string,
stateRoot string,
previousSlot uint64,
previousSlot Slot,
previousBlockRoot string,
knownGapsTableIncrement int,
headOrHistoric string,
@ -201,7 +200,7 @@ func processFullSlot(
checkDbTime := time.Now()
var blockRequired bool
if spd.PerformBeaconBlockProcessing {
blockExists, err := checkSlotAndRoot(ps.Db, CheckSignedBeaconBlockStmt, strconv.FormatUint(ps.Slot, 10), finalBlockRoot)
blockExists, err := checkSlotAndRoot(ps.Db, CheckSignedBeaconBlockStmt, ps.Slot, finalBlockRoot)
if err != nil {
return err, "checkDb"
}
@ -210,7 +209,7 @@ func processFullSlot(
var stateRequired bool
if spd.PerformBeaconStateProcessing {
stateExists, err := checkSlotAndRoot(ps.Db, CheckBeaconStateStmt, strconv.FormatUint(ps.Slot, 10), finalStateRoot)
stateExists, err := checkSlotAndRoot(ps.Db, CheckBeaconStateStmt, ps.Slot, finalStateRoot)
if err != nil {
return err, "checkDb"
}
@ -280,7 +279,7 @@ func processFullSlot(
}
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
func processHeadSlot(slot uint64, blockRoot string, stateRoot string, spd SlotProcessingDetails) {
func processHeadSlot(slot Slot, blockRoot string, stateRoot string, spd SlotProcessingDetails) {
// Get the knownGaps at startUp
if spd.PreviousSlot == 0 && spd.PreviousBlockRoot == "" {
writeStartUpGaps(spd.Db, spd.KnownGapTableIncrement, slot, spd.Metrics)
@ -294,7 +293,7 @@ func processHeadSlot(slot uint64, blockRoot string, stateRoot string, spd SlotPr
}
// Handle a historic slot. A wrapper function for calling `handleFullSlot`.
func handleHistoricSlot(ctx context.Context, slot uint64, spd SlotProcessingDetails) (error, string) {
func handleHistoricSlot(ctx context.Context, slot Slot, spd SlotProcessingDetails) (error, string) {
return processFullSlot(ctx, slot, "", "", 0, "",
1, "historic", &spd)
}
@ -305,14 +304,14 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error {
if ps.BlockRoot != "" {
blockIdentifier = ps.BlockRoot
} else {
blockIdentifier = strconv.FormatUint(ps.Slot, 10)
blockIdentifier = ps.Slot.Format()
}
blockEndpoint := serverAddress + BcBlockQueryEndpoint + blockIdentifier
sszSignedBeaconBlock, rc, err := querySsz(blockEndpoint, strconv.FormatUint(ps.Slot, 10))
sszSignedBeaconBlock, rc, err := querySsz(blockEndpoint, ps.Slot)
if err != nil || rc != 200 {
loghelper.LogSlotError(strconv.FormatUint(ps.Slot, 10), err).Error("Unable to properly query the slot.")
loghelper.LogSlotError(ps.Slot.Number(), err).Error("Unable to properly query the slot.")
ps.FullSignedBeaconBlock = nil
ps.SszSignedBeaconBlock = []byte{}
ps.ParentBlockRoot = ""
@ -328,7 +327,7 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error {
var signedBeaconBlock SignedBeaconBlock
err = signedBeaconBlock.UnmarshalSSZ(sszSignedBeaconBlock)
if err != nil {
loghelper.LogSlotError(strconv.FormatUint(ps.Slot, 10), err).Error("Unable to unmarshal SignedBeaconBlock for slot.")
loghelper.LogSlotError(ps.Slot.Number(), err).Error("Unable to unmarshal SignedBeaconBlock for slot.")
ps.FullSignedBeaconBlock = nil
ps.SszSignedBeaconBlock = []byte{}
ps.ParentBlockRoot = ""
@ -349,20 +348,20 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string) error {
if ps.StateRoot != "" {
stateIdentifier = ps.StateRoot
} else {
stateIdentifier = strconv.FormatUint(ps.Slot, 10)
stateIdentifier = ps.Slot.Format()
}
stateEndpoint := serverEndpoint + BcStateQueryEndpoint + stateIdentifier
sszBeaconState, _, err := querySsz(stateEndpoint, strconv.FormatUint(ps.Slot, 10))
sszBeaconState, _, err := querySsz(stateEndpoint, ps.Slot)
if err != nil {
loghelper.LogSlotError(strconv.FormatUint(ps.Slot, 10), err).Error("Unable to properly query the BeaconState.")
loghelper.LogSlotError(ps.Slot.Number(), err).Error("Unable to properly query the BeaconState.")
return err
}
var beaconState BeaconState
err = beaconState.UnmarshalSSZ(sszBeaconState)
if err != nil {
loghelper.LogSlotError(strconv.FormatUint(ps.Slot, 10), err).Error("Unable to unmarshal the BeaconState.")
loghelper.LogSlotError(ps.Slot.Number(), err).Error("Unable to unmarshal the BeaconState.")
return err
}
@ -372,7 +371,7 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string) error {
}
// Check to make sure that the previous block we processed is the parent of the current block.
func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previousSlot uint64, previousBlockRoot string, knownGapsTableIncrement int) {
func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previousSlot Slot, previousBlockRoot string, knownGapsTableIncrement int) {
if nil == ps.FullSignedBeaconBlock {
log.Debug("Can't check block root, no current block.")
return
@ -384,7 +383,7 @@ func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previou
"slot": slot,
"fork": true,
}).Warn("A fork occurred! The previous slot and current slot match.")
transactReorgs(tx, ctx, strconv.FormatUint(ps.Slot, 10), ps.BlockRoot, ps.Metrics)
transactReorgs(tx, ctx, ps.Slot, ps.BlockRoot, ps.Metrics)
} else if previousSlot > slot {
log.WithFields(log.Fields{
"previousSlot": previousSlot,
@ -401,7 +400,7 @@ func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previou
"previousBlockRoot": previousBlockRoot,
"currentBlockParent": parentRoot,
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
transactReorgs(tx, ctx, strconv.FormatUint(previousSlot, 10), parentRoot, ps.Metrics)
transactReorgs(tx, ctx, previousSlot, parentRoot, ps.Metrics)
} else {
log.Debug("Previous Slot and Current Slot are one distance from each other.")
}

View File

@ -37,18 +37,18 @@ type BlockRootMessage struct {
}
// A helper function to query endpoints that utilize slots.
func querySsz(endpoint string, slot string) ([]byte, int, error) {
func querySsz(endpoint string, slot Slot) ([]byte, int, error) {
log.WithFields(log.Fields{"endpoint": endpoint}).Debug("Querying endpoint")
client := &http.Client{}
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to create a request!")
loghelper.LogSlotError(slot.Number(), err).Error("Unable to create a request!")
return nil, 0, fmt.Errorf("Unable to create a request!: %s", err.Error())
}
req.Header.Set("Accept", "application/octet-stream")
response, err := client.Do(req)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to query Beacon Node!")
loghelper.LogSlotError(slot.Number(), err).Error("Unable to query Beacon Node!")
return nil, 0, fmt.Errorf("Unable to query Beacon Node: %s", err.Error())
}
defer response.Body.Close()
@ -61,7 +61,7 @@ func querySsz(endpoint string, slot string) ([]byte, int, error) {
body, err := ioutil.ReadAll(response.Body)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!")
loghelper.LogSlotError(slot.Number(), err).Error("Unable to turn response into a []bytes array!")
return nil, rc, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error())
}

View File

@ -28,21 +28,21 @@ func LogError(err error) *log.Entry {
}
// A simple herlper function to log slot and error.
func LogSlotError(slot string, err error) *log.Entry {
func LogSlotError(slot uint64, err error) *log.Entry {
return log.WithFields(log.Fields{
"err": err,
"slot": slot,
})
}
func LogSlotRangeError(startSlot string, endSlot string, err error) *log.Entry {
func LogSlotRangeError(startSlot uint64, endSlot uint64, err error) *log.Entry {
return log.WithFields(log.Fields{
"err": err,
"startSlot": startSlot,
"endSlot": endSlot,
})
}
func LogSlotRangeStatementError(startSlot string, endSlot string, statement string, err error) *log.Entry {
func LogSlotRangeStatementError(startSlot uint64, endSlot uint64, statement string, err error) *log.Entry {
return log.WithFields(log.Fields{
"err": err,
"startSlot": startSlot,

View File

@ -20,7 +20,7 @@ import (
)
// A simple helper function that will help wrap the reorg error messages.
func LogReorgError(slot string, latestBlockRoot string, err error) *log.Entry {
func LogReorgError(slot uint64, latestBlockRoot string, err error) *log.Entry {
return log.WithFields(log.Fields{
"err": err,
"slot": slot,
@ -29,7 +29,7 @@ func LogReorgError(slot string, latestBlockRoot string, err error) *log.Entry {
}
// A simple helper function that will help wrap regular reorg messages.
func LogReorg(slot string, latestBlockRoot string) *log.Entry {
func LogReorg(slot uint64, latestBlockRoot string) *log.Entry {
return log.WithFields(log.Fields{
"slot": slot,
"latestBlockRoot": latestBlockRoot,