From 36dd5d6331f893f9566b77218c9cb845c6b2f592 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Wed, 28 Sep 2022 13:52:13 -0500 Subject: [PATCH] use Slot rather than string or uint64 --- cmd/full.go | 5 +- cmd/head.go | 3 +- cmd/historic.go | 5 +- pkg/beaconclient/beaconclient.go | 4 +- pkg/beaconclient/capturehead_test.go | 31 ++++---- pkg/beaconclient/capturehistoric.go | 20 ++--- pkg/beaconclient/consensus.go | 29 ++++++- pkg/beaconclient/databasewrite.go | 113 ++++++++++++--------------- pkg/beaconclient/models.go | 8 +- pkg/beaconclient/processevents.go | 11 ++- pkg/beaconclient/processhistoric.go | 26 +++--- pkg/beaconclient/processknowngaps.go | 12 +-- pkg/beaconclient/processslot.go | 43 +++++----- pkg/beaconclient/queryserver.go | 8 +- pkg/loghelper/logerror.go | 6 +- pkg/loghelper/logreorg.go | 4 +- 16 files changed, 176 insertions(+), 152 deletions(-) diff --git a/cmd/full.go b/cmd/full.go index 31c7658..b90a10d 100644 --- a/cmd/full.go +++ b/cmd/full.go @@ -19,6 +19,7 @@ package cmd import ( "context" "fmt" + "github.com/vulcanize/ipld-eth-beacon-indexer/pkg/beaconclient" "strconv" log "github.com/sirupsen/logrus" @@ -81,7 +82,7 @@ func startFullProcessing() { errG, _ := errgroup.WithContext(context.Background()) errG.Go(func() error { - errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"), viper.GetUint64("bc.minimumSlot")) + errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"), beaconclient.Slot(viper.GetUint64("bc.minimumSlot"))) if len(errs) != 0 { if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events") @@ -95,7 +96,7 @@ func startFullProcessing() { go func() { errG := new(errgroup.Group) errG.Go(func() error { - errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"), viper.GetUint64("kg.minimumSlot")) + errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"), beaconclient.Slot(viper.GetUint64("kg.minimumSlot"))) if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") diff --git a/cmd/head.go b/cmd/head.go index 789440b..8debd6d 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -19,6 +19,7 @@ package cmd import ( "context" "fmt" + "github.com/vulcanize/ipld-eth-beacon-indexer/pkg/beaconclient" "net/http" "strconv" @@ -69,7 +70,7 @@ func startHeadTracking() { go func() { errG := new(errgroup.Group) errG.Go(func() error { - errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"), viper.GetUint64("kg.minimumSlot")) + errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"), beaconclient.Slot(viper.GetUint64("kg.minimumSlot"))) if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") diff --git a/cmd/historic.go b/cmd/historic.go index 482e200..eab8ca0 100644 --- a/cmd/historic.go +++ b/cmd/historic.go @@ -19,6 +19,7 @@ package cmd import ( "context" "fmt" + "github.com/vulcanize/ipld-eth-beacon-indexer/pkg/beaconclient" "os" "strconv" @@ -65,7 +66,7 @@ func startHistoricProcessing() { errG, _ := errgroup.WithContext(context.Background()) errG.Go(func() error { - errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"), viper.GetUint64("bc.minimumSlot")) + errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"), beaconclient.Slot(viper.GetUint64("bc.minimumSlot"))) if len(errs) != 0 { if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events") @@ -80,7 +81,7 @@ func startHistoricProcessing() { go func() { errG := new(errgroup.Group) errG.Go(func() error { - errs := Bc.ProcessKnownGaps(kgContext, viper.GetInt("kg.maxKnownGapsWorker"), viper.GetUint64("kg.minimumSlot")) + errs := Bc.ProcessKnownGaps(kgContext, viper.GetInt("kg.maxKnownGapsWorker"), beaconclient.Slot(viper.GetUint64("kg.minimumSlot"))) if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index af10c25..9a855d1 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -58,8 +58,8 @@ type BeaconClient struct { // Used for Head Tracking PerformHeadTracking bool // Should we track head? - StartingSlot uint64 // If we're performing head tracking. What is the first slot we processed. - PreviousSlot uint64 // Whats the previous slot we processed + StartingSlot Slot // If we're performing head tracking. What is the first slot we processed. + PreviousSlot Slot // Whats the previous slot we processed PreviousBlockRoot string // Whats the previous block root, used to check the next blocks parent. HeadTracking *SseEvents[Head] // Track the head block ReOrgTracking *SseEvents[ChainReorg] // Track all Reorgs diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go index 4ca4623..ba62f14 100644 --- a/pkg/beaconclient/capturehead_test.go +++ b/pkg/beaconclient/capturehead_test.go @@ -565,10 +565,10 @@ func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient { } // A helper function to validate the expected output from the eth_beacon.slots table. -func validateSlot(bc *beaconclient.BeaconClient, headMessage beaconclient.Head, correctEpoch uint64, correctStatus string) { +func validateSlot(bc *beaconclient.BeaconClient, headMessage beaconclient.Head, correctEpoch beaconclient.Epoch, correctStatus string) { epoch, dbSlot, blockRoot, stateRoot, status := queryDbSlotAndBlock(bc.Db, headMessage.Slot, headMessage.Block) log.Info("validateSlot: ", headMessage) - baseSlot, err := strconv.ParseUint(headMessage.Slot, 10, 64) + baseSlot, err := beaconclient.ParseSlot(headMessage.Slot) Expect(err).ToNot(HaveOccurred()) Expect(dbSlot).To(Equal(baseSlot)) Expect(epoch).To(Equal(correctEpoch)) @@ -583,9 +583,9 @@ func validateSignedBeaconBlock(bc *beaconclient.BeaconClient, headMessage beacon correctExecutionPayloadHeader *beaconclient.DbExecutionPayloadHeader) { dbSignedBlock := queryDbSignedBeaconBlock(bc.Db, headMessage.Slot, headMessage.Block) log.Info("validateSignedBeaconBlock: ", headMessage) - baseSlot, err := strconv.ParseUint(headMessage.Slot, 10, 64) + baseSlot, err := beaconclient.ParseSlot(headMessage.Slot) Expect(err).ToNot(HaveOccurred()) - Expect(dbSignedBlock.Slot).To(Equal(baseSlot)) + Expect(dbSignedBlock.Slot).To(Equal(baseSlot.Number())) Expect(dbSignedBlock.BlockRoot).To(Equal(headMessage.Block)) Expect(dbSignedBlock.ParentBlock).To(Equal(correctParentRoot)) Expect(dbSignedBlock.Eth1DataBlockHash).To(Equal(correctEth1DataBlockHash)) @@ -597,7 +597,7 @@ func validateSignedBeaconBlock(bc *beaconclient.BeaconClient, headMessage beacon func validateBeaconState(bc *beaconclient.BeaconClient, headMessage beaconclient.Head, correctMhKey string) { dbSlot, stateRoot, mhKey := queryDbBeaconState(bc.Db, headMessage.Slot, headMessage.State) log.Info("validateBeaconState: ", headMessage) - baseSlot, err := strconv.ParseUint(headMessage.Slot, 10, 64) + baseSlot, err := beaconclient.ParseSlot(headMessage.Slot) Expect(err).ToNot(HaveOccurred()) Expect(dbSlot).To(Equal(baseSlot)) Expect(stateRoot).To(Equal(headMessage.State)) @@ -633,9 +633,10 @@ func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxR } // A helper function to query the eth_beacon.slots table based on the slot and block_root -func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot string) (uint64, uint64, string, string, string) { +func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot string) (beaconclient.Epoch, beaconclient.Slot, string, string, string) { sqlStatement := `SELECT epoch, slot, block_root, state_root, status FROM eth_beacon.slots WHERE slot=$1 AND block_root=$2;` - var epoch, slot uint64 + var epoch beaconclient.Epoch + var slot beaconclient.Slot var blockRoot, stateRoot, status string log.Debug("Starting to query the eth_beacon.slots table, ", querySlot, " ", queryBlockRoot) err := db.QueryRow(context.Background(), sqlStatement, querySlot, queryBlockRoot).Scan(&epoch, &slot, &blockRoot, &stateRoot, &status) @@ -651,7 +652,7 @@ func queryDbSignedBeaconBlock(db sql.Database, querySlot string, queryBlockRoot payload_parent_hash, payload_state_root, payload_receipts_root, payload_transactions_root FROM eth_beacon.signed_block WHERE slot=$1 AND block_root=$2;` - var slot uint64 + var slot beaconclient.Slot var payloadBlockNumber, payloadTimestamp *uint64 var blockRoot, parentBlockRoot, eth1DataBlockHash, mhKey string var payloadBlockHash, payloadParentHash, payloadStateRoot, payloadReceiptsRoot, payloadTransactionsRoot *string @@ -663,7 +664,7 @@ func queryDbSignedBeaconBlock(db sql.Database, querySlot string, queryBlockRoot Expect(err).ToNot(HaveOccurred()) signedBlock := beaconclient.DbSignedBeaconBlock{ - Slot: slot, + Slot: slot.Number(), BlockRoot: blockRoot, ParentBlock: parentBlockRoot, Eth1DataBlockHash: eth1DataBlockHash, @@ -687,9 +688,9 @@ func queryDbSignedBeaconBlock(db sql.Database, querySlot string, queryBlockRoot } // A helper function to query the eth_beacon.signed_block table based on the slot and block_root. -func queryDbBeaconState(db sql.Database, querySlot string, queryStateRoot string) (uint64, string, string) { +func queryDbBeaconState(db sql.Database, querySlot string, queryStateRoot string) (beaconclient.Slot, string, string) { sqlStatement := `SELECT slot, state_root, mh_key FROM eth_beacon.state WHERE slot=$1 AND state_root=$2;` - var slot uint64 + var slot beaconclient.Slot var stateRoot, mhKey string row := db.QueryRow(context.Background(), sqlStatement, querySlot, queryStateRoot) err := row.Scan(&slot, &stateRoot, &mhKey) @@ -926,7 +927,7 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string // Helper function to test three reorg messages. There are going to be many functions like this, // Because we need to test the same logic for multiple phases. -func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch uint64, maxRetry int) { +func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch beaconclient.Epoch, maxRetry int) { go bc.CaptureHead() time.Sleep(1 * time.Second) @@ -958,7 +959,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs NewHeadBlock: secondHead.Block, OldHeadState: thirdHead.State, NewHeadState: secondHead.State, - Epoch: strconv.FormatUint(epoch, 10), + Epoch: epoch.Format(), ExecutionOptimistic: false, }) Expect(err).ToNot(HaveOccurred()) @@ -988,7 +989,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs } // A test to validate a single block was processed correctly -func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch uint64, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) { +func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch beaconclient.Epoch, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) { go bc.CaptureHead() time.Sleep(1 * time.Second) sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert) @@ -1018,7 +1019,7 @@ func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head b // A test that ensures that if two HeadMessages occur for a single slot they are marked // as proposed and forked correctly. -func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch uint64, maxRetry int) { +func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch beaconclient.Epoch, maxRetry int) { go bc.CaptureHead() time.Sleep(1 * time.Second) diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index 24312bf..ac695ce 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -27,7 +27,7 @@ import ( ) // This function will perform all the heavy lifting for tracking the head of the chain. -func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int, minimumSlot uint64) []error { +func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int, minimumSlot Slot) []error { log.Info("We are starting the historical processing service.") bc.HistoricalProcess = HistoricProcessing{db: bc.Db, metrics: bc.Metrics, uniqueNodeIdentifier: bc.UniqueNodeIdentifier} errs := handleBatchProcess(ctx, maxWorkers, bc.HistoricalProcess, bc.SlotProcessingDetails(), bc.Metrics.IncrementHistoricSlotProcessed, minimumSlot) @@ -52,10 +52,10 @@ func (bc *BeaconClient) StopHistoric(cancel context.CancelFunc) error { // // 2. Known Gaps Processing type BatchProcessing interface { - getSlotRange(context.Context, chan<- slotsToProcess, uint64) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write. - handleProcessingErrors(context.Context, <-chan batchHistoricError) // Custom logic to handle errors. - removeTableEntry(context.Context, <-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database. - releaseDbLocks() error // Update the checked_out column to false for whatever table is being updated. + getSlotRange(context.Context, chan<- slotsToProcess, Slot) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write. + handleProcessingErrors(context.Context, <-chan batchHistoricError) // Custom logic to handle errors. + removeTableEntry(context.Context, <-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database. + releaseDbLocks() error // Update the checked_out column to false for whatever table is being updated. } /// ^^^ @@ -66,14 +66,14 @@ type BatchProcessing interface { // A struct to pass around indicating a table entry for slots to process. type slotsToProcess struct { - startSlot uint64 // The start slot - endSlot uint64 // The end slot + startSlot Slot // The start slot + endSlot Slot // The end slot } type batchHistoricError struct { err error // The error that occurred when attempting to a slot errProcess string // The process that caused the error. - slot uint64 // The slot which the error is for. + slot Slot // The slot which the error is for. } // Wrapper function for the BatchProcessing interface. @@ -90,9 +90,9 @@ type batchHistoricError struct { // 4. Remove the slot entry from the DB. // // 5. Handle any errors. -func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, spd SlotProcessingDetails, incrementTracker func(uint64), minimumSlot uint64) []error { +func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, spd SlotProcessingDetails, incrementTracker func(uint64), minimumSlot Slot) []error { slotsCh := make(chan slotsToProcess) - workCh := make(chan uint64) + workCh := make(chan Slot) processedCh := make(chan slotsToProcess) errCh := make(chan batchHistoricError) finalErrCh := make(chan []error, 1) diff --git a/pkg/beaconclient/consensus.go b/pkg/beaconclient/consensus.go index f4a25b2..76bd388 100644 --- a/pkg/beaconclient/consensus.go +++ b/pkg/beaconclient/consensus.go @@ -11,14 +11,41 @@ import ( "github.com/protolambda/ztyp/codec" "github.com/protolambda/ztyp/tree" log "github.com/sirupsen/logrus" + "strconv" ) type Eth1Data common.Eth1Data type Root common.Root type Signature common.BLSSignature -type Slot common.Slot +type Slot uint64 +type Epoch uint64 type ExecutionPayloadHeader common.ExecutionPayloadHeader +func ParseSlot(v string) (Slot, error) { + slotNum, err := strconv.ParseUint(v, 10, 64) + return Slot(slotNum), err +} + +func (s *Slot) Format() string { + return strconv.FormatUint(uint64(*s), 10) +} + +func (s *Slot) Number() uint64 { + return uint64(*s) +} + +func (s *Slot) Plus(v uint64) Slot { + return Slot(v + s.Number()) +} + +func (s *Slot) PlusInt(v int) Slot { + return s.Plus(uint64(v)) +} + +func (e *Epoch) Format() string { + return strconv.FormatUint(uint64(*e), 10) +} + type BeaconBlock struct { spec *common.Spec bellatrix *bellatrix.BeaconBlock diff --git a/pkg/beaconclient/databasewrite.go b/pkg/beaconclient/databasewrite.go index 7434c89..b2470be 100644 --- a/pkg/beaconclient/databasewrite.go +++ b/pkg/beaconclient/databasewrite.go @@ -18,8 +18,6 @@ package beaconclient import ( "context" "fmt" - "strconv" - "github.com/jackc/pgx/v4" log "github.com/sirupsen/logrus" "github.com/vulcanize/ipld-eth-beacon-indexer/pkg/database/sql" @@ -100,7 +98,7 @@ type DatabaseWriter struct { rawSignedBeaconBlock *[]byte } -func CreateDatabaseWrite(db sql.Database, slot uint64, stateRoot string, blockRoot string, parentBlockRoot string, +func CreateDatabaseWrite(db sql.Database, slot Slot, stateRoot string, blockRoot string, parentBlockRoot string, eth1DataBlockHash string, payloadHeader *ExecutionPayloadHeader, status string, rawSignedBeaconBlock *[]byte, rawBeaconState *[]byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) { ctx := context.Background() tx, err := db.Begin(ctx) @@ -130,10 +128,10 @@ func CreateDatabaseWrite(db sql.Database, slot uint64, stateRoot string, blockRo // Write functions to write each all together... // Should I do one atomic write? // Create the model for the eth_beacon.slots table -func (dw *DatabaseWriter) prepareSlotsModel(slot uint64, stateRoot string, blockRoot string, status string) { +func (dw *DatabaseWriter) prepareSlotsModel(slot Slot, stateRoot string, blockRoot string, status string) { dw.DbSlots = &DbSlots{ Epoch: calculateEpoch(slot, bcSlotsPerEpoch), - Slot: strconv.FormatUint((slot), 10), + Slot: slot.Number(), StateRoot: stateRoot, BlockRoot: blockRoot, Status: status, @@ -143,14 +141,14 @@ func (dw *DatabaseWriter) prepareSlotsModel(slot uint64, stateRoot string, block } // Create the model for the eth_beacon.signed_block table. -func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot uint64, blockRoot string, parentBlockRoot string, eth1DataBlockHash string, +func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot Slot, blockRoot string, parentBlockRoot string, eth1DataBlockHash string, payloadHeader *ExecutionPayloadHeader) error { mhKey, err := MultihashKeyFromSSZRoot([]byte(dw.DbSlots.BlockRoot)) if err != nil { return err } dw.DbSignedBeaconBlock = &DbSignedBeaconBlock{ - Slot: slot, + Slot: slot.Number(), BlockRoot: blockRoot, ParentBlock: parentBlockRoot, Eth1DataBlockHash: eth1DataBlockHash, @@ -175,13 +173,13 @@ func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot uint64, blockRoot s } // Create the model for the eth_beacon.state table. -func (dw *DatabaseWriter) prepareBeaconStateModel(slot uint64, stateRoot string) error { +func (dw *DatabaseWriter) prepareBeaconStateModel(slot Slot, stateRoot string) error { mhKey, err := MultihashKeyFromSSZRoot([]byte(dw.DbSlots.StateRoot)) if err != nil { return err } dw.DbBeaconState = &DbBeaconState{ - Slot: slot, + Slot: slot.Number(), StateRoot: stateRoot, MhKey: mhKey, } @@ -343,56 +341,51 @@ func (dw *DatabaseWriter) upsertBeaconState() error { // Update a given slot to be marked as forked within a transaction. Provide the slot and the latest latestBlockRoot. // We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked. -func transactReorgs(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) { - slotNum, strErr := strconv.ParseUint(slot, 10, 64) - if strErr != nil { - loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...") - } - +func transactReorgs(tx sql.Tx, ctx context.Context, slot Slot, latestBlockRoot string, metrics *BeaconClientMetrics) { forkCount, err := updateForked(tx, ctx, slot, latestBlockRoot) if err != nil { - loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.") - transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics) + loghelper.LogReorgError(slot.Number(), latestBlockRoot, err).Error("We ran into some trouble while updating all forks.") + transactKnownGaps(tx, ctx, 1, slot, slot, err, "reorg", metrics) } proposedCount, err := updateProposed(tx, ctx, slot, latestBlockRoot) if err != nil { - loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.") - transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics) + loghelper.LogReorgError(slot.Number(), latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.") + transactKnownGaps(tx, ctx, 1, slot, slot, err, "reorg", metrics) } if forkCount > 0 { - loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ + loghelper.LogReorg(slot.Number(), latestBlockRoot).WithFields(log.Fields{ "forkCount": forkCount, }).Info("Updated rows that were forked.") } else { - loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ + loghelper.LogReorg(slot.Number(), latestBlockRoot).WithFields(log.Fields{ "forkCount": forkCount, }).Warn("There were no forked rows to update.") } if proposedCount == 1 { - loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ + loghelper.LogReorg(slot.Number(), latestBlockRoot).WithFields(log.Fields{ "proposedCount": proposedCount, }).Info("Updated the row that should have been marked as proposed.") } else if proposedCount > 1 { - loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ + loghelper.LogReorg(slot.Number(), latestBlockRoot).WithFields(log.Fields{ "proposedCount": proposedCount, }).Error("Too many rows were marked as proposed!") - transactKnownGaps(tx, ctx, 1, slotNum, slotNum, fmt.Errorf("Too many rows were marked as unproposed."), "reorg", metrics) + transactKnownGaps(tx, ctx, 1, slot, slot, fmt.Errorf("Too many rows were marked as unproposed."), "reorg", metrics) } else if proposedCount == 0 { - transactKnownGaps(tx, ctx, 1, slotNum, slotNum, fmt.Errorf("Unable to find properly proposed row in DB"), "reorg", metrics) - loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.") + transactKnownGaps(tx, ctx, 1, slot, slot, fmt.Errorf("Unable to find properly proposed row in DB"), "reorg", metrics) + loghelper.LogReorg(slot.Number(), latestBlockRoot).Info("Updated the row that should have been marked as proposed.") } metrics.IncrementReorgsInsert(1) } // Wrapper function that will create a transaction and execute the function. -func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) { +func writeReorgs(db sql.Database, slot Slot, latestBlockRoot string, metrics *BeaconClientMetrics) { ctx := context.Background() tx, err := db.Begin(ctx) if err != nil { - loghelper.LogReorgError(slot, latestBlockRoot, err).Fatal("Unable to create a new transaction for reorgs") + loghelper.LogReorgError(slot.Number(), latestBlockRoot, err).Fatal("Unable to create a new transaction for reorgs") } defer func() { err := tx.Rollback(ctx) @@ -402,35 +395,35 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics * }() transactReorgs(tx, ctx, slot, latestBlockRoot, metrics) if err = tx.Commit(ctx); err != nil { - loghelper.LogReorgError(slot, latestBlockRoot, err).Fatal("Unable to execute the transaction for reorgs") + loghelper.LogReorgError(slot.Number(), latestBlockRoot, err).Fatal("Unable to execute the transaction for reorgs") } } // Update the slots table by marking the old slot's as forked. -func updateForked(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string) (int64, error) { +func updateForked(tx sql.Tx, ctx context.Context, slot Slot, latestBlockRoot string) (int64, error) { res, err := tx.Exec(ctx, UpdateForkedStmt, slot, latestBlockRoot) if err != nil { - loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the eth_beacon.slots table with the forked slots") + loghelper.LogReorgError(slot.Number(), latestBlockRoot, err).Error("We are unable to update the eth_beacon.slots table with the forked slots") return 0, err } count, err := res.RowsAffected() if err != nil { - loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were marked as forked.") + loghelper.LogReorgError(slot.Number(), latestBlockRoot, err).Error("Unable to figure out how many entries were marked as forked.") return 0, err } return count, err } // Mark a slot as proposed. -func updateProposed(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string) (int64, error) { +func updateProposed(tx sql.Tx, ctx context.Context, slot Slot, latestBlockRoot string) (int64, error) { res, err := tx.Exec(ctx, UpdateProposedStmt, slot, latestBlockRoot) if err != nil { - loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the eth_beacon.slots table with the proposed slot.") + loghelper.LogReorgError(slot.Number(), latestBlockRoot, err).Error("We are unable to update the eth_beacon.slots table with the proposed slot.") return 0, err } count, err := res.RowsAffected() if err != nil { - loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were marked as proposed") + loghelper.LogReorgError(slot.Number(), latestBlockRoot, err).Error("Unable to figure out how many entries were marked as proposed") return 0, err } @@ -440,17 +433,17 @@ func updateProposed(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot // A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaps into // smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would // have 10 entries as follows: 1-10, 11-20, etc... -func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, startSlot uint64, endSlot uint64, entryError error, entryProcess string, metric *BeaconClientMetrics) { +func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, startSlot Slot, endSlot Slot, entryError error, entryProcess string, metric *BeaconClientMetrics) { var entryErrorMsg string if entryError == nil { entryErrorMsg = "" } else { entryErrorMsg = entryError.Error() } - if endSlot-startSlot <= uint64(tableIncrement) { + if endSlot.Number()-startSlot.Number() <= uint64(tableIncrement) { kgModel := DbKnownGaps{ - StartSlot: strconv.FormatUint(startSlot, 10), - EndSlot: strconv.FormatUint(endSlot, 10), + StartSlot: startSlot.Number(), + EndSlot: endSlot.Number(), CheckedOut: false, ReprocessingError: "", EntryError: entryErrorMsg, @@ -458,7 +451,7 @@ func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, start } upsertKnownGaps(tx, ctx, kgModel, metric) } else { - totalSlots := endSlot - startSlot + totalSlots := endSlot.Number() - startSlot.Number() var chunks int chunks = int(totalSlots / uint64(tableIncrement)) if totalSlots%uint64(tableIncrement) != 0 { @@ -466,16 +459,16 @@ func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, start } for i := 0; i < chunks; i++ { - var tempStart, tempEnd uint64 - tempStart = startSlot + (uint64(i * tableIncrement)) + var tempStart, tempEnd Slot + tempStart = startSlot.PlusInt(i * tableIncrement) if i+1 == chunks { tempEnd = endSlot } else { - tempEnd = startSlot + uint64((i+1)*tableIncrement) + tempEnd = startSlot.PlusInt((i + 1) * tableIncrement) } kgModel := DbKnownGaps{ - StartSlot: strconv.FormatUint(tempStart, 10), - EndSlot: strconv.FormatUint(tempEnd, 10), + StartSlot: tempStart.Number(), + EndSlot: tempEnd.Number(), CheckedOut: false, ReprocessingError: "", EntryError: entryErrorMsg, @@ -488,11 +481,11 @@ func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, start // Wrapper function, instead of adding the knownGaps entries to a transaction, it will // create the transaction and write it. -func writeKnownGaps(db sql.Database, tableIncrement int, startSlot uint64, endSlot uint64, entryError error, entryProcess string, metric *BeaconClientMetrics) { +func writeKnownGaps(db sql.Database, tableIncrement int, startSlot Slot, endSlot Slot, entryError error, entryProcess string, metric *BeaconClientMetrics) { ctx := context.Background() tx, err := db.Begin(ctx) if err != nil { - loghelper.LogSlotRangeError(strconv.FormatUint(startSlot, 10), strconv.FormatUint(endSlot, 10), err).Fatal("Unable to create a new transaction for knownGaps") + loghelper.LogSlotRangeError(startSlot.Number(), endSlot.Number(), err).Fatal("Unable to create a new transaction for knownGaps") } defer func() { err := tx.Rollback(ctx) @@ -502,8 +495,7 @@ func writeKnownGaps(db sql.Database, tableIncrement int, startSlot uint64, endSl }() transactKnownGaps(tx, ctx, tableIncrement, startSlot, endSlot, entryError, entryProcess, metric) if err = tx.Commit(ctx); err != nil { - loghelper.LogSlotRangeError(strconv.FormatUint(startSlot, 10), strconv.FormatUint(endSlot, 10), err).Fatal("Unable to execute the transaction for knownGaps") - + loghelper.LogSlotRangeError(startSlot.Number(), endSlot.Number(), err).Fatal("Unable to execute the transaction for knownGaps") } } @@ -526,8 +518,8 @@ func upsertKnownGaps(tx sql.Tx, ctx context.Context, knModel DbKnownGaps, metric } // A function to write the gap between the highest slot in the DB and the first processed slot. -func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot uint64, metric *BeaconClientMetrics) { - var maxSlot uint64 +func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot Slot, metric *BeaconClientMetrics) { + var maxSlot Slot err := db.QueryRow(context.Background(), QueryHighestSlotStmt).Scan(&maxSlot) if err != nil { loghelper.LogError(err).Fatal("Unable to get the max block from the DB. We must close the application or we might have undetected gaps.") @@ -555,19 +547,19 @@ func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot uint64, met } // A function to update a knownGap range with a reprocessing error. -func updateKnownGapErrors(db sql.Database, startSlot uint64, endSlot uint64, reprocessingErr error, metric *BeaconClientMetrics) error { +func updateKnownGapErrors(db sql.Database, startSlot Slot, endSlot Slot, reprocessingErr error, metric *BeaconClientMetrics) error { res, err := db.Exec(context.Background(), UpsertKnownGapsErrorStmt, startSlot, endSlot, reprocessingErr.Error()) if err != nil { - loghelper.LogSlotRangeError(strconv.FormatUint(startSlot, 10), strconv.FormatUint(endSlot, 10), err).Error("Unable to update reprocessing_error") + loghelper.LogSlotRangeError(startSlot.Number(), endSlot.Number(), err).Error("Unable to update reprocessing_error") return err } row, err := res.RowsAffected() if err != nil { - loghelper.LogSlotRangeError(strconv.FormatUint(startSlot, 10), strconv.FormatUint(endSlot, 10), err).Error("Unable to count rows affected when trying to update reprocessing_error.") + loghelper.LogSlotRangeError(startSlot.Number(), endSlot.Number(), err).Error("Unable to count rows affected when trying to update reprocessing_error.") return err } if row != 1 { - loghelper.LogSlotRangeError(strconv.FormatUint(startSlot, 10), strconv.FormatUint(endSlot, 10), err).WithFields(log.Fields{ + loghelper.LogSlotRangeError(startSlot.Number(), endSlot.Number(), err).WithFields(log.Fields{ "rowCount": row, }).Error("The rows affected by the upsert for reprocessing_error is not 1.") metric.IncrementKnownGapsReprocessError(1) @@ -578,13 +570,12 @@ func updateKnownGapErrors(db sql.Database, startSlot uint64, endSlot uint64, rep } // A quick helper function to calculate the epoch. -func calculateEpoch(slot uint64, slotPerEpoch uint64) string { - epoch := slot / slotPerEpoch - return strconv.FormatUint(epoch, 10) +func calculateEpoch(slot Slot, slotPerEpoch uint64) uint64 { + return slot.Number() / slotPerEpoch } // A helper function to check to see if the slot is processed. -func isSlotProcessed(db sql.Database, checkProcessStmt string, slot string) (bool, error) { +func isSlotProcessed(db sql.Database, checkProcessStmt string, slot Slot) (bool, error) { processRow, err := db.Exec(context.Background(), checkProcessStmt, slot) if err != nil { return false, err @@ -601,7 +592,7 @@ func isSlotProcessed(db sql.Database, checkProcessStmt string, slot string) (boo // Check to see if this slot is in the DB. Check eth_beacon.slots, eth_beacon.signed_block // and eth_beacon.state. If the slot exists, return true -func IsSlotInDb(ctx context.Context, db sql.Database, slot string, blockRoot string, stateRoot string) (bool, error) { +func IsSlotInDb(ctx context.Context, db sql.Database, slot Slot, blockRoot string, stateRoot string) (bool, error) { var ( isInBeaconState bool isInSignedBeaconBlock bool @@ -644,7 +635,7 @@ func IsSlotInDb(ctx context.Context, db sql.Database, slot string, blockRoot str // Provide a statement, slot, and root, and this function will check to see // if the slot and root exist in the table. -func checkSlotAndRoot(db sql.Database, statement, slot, root string) (bool, error) { +func checkSlotAndRoot(db sql.Database, statement string, slot Slot, root string) (bool, error) { processRow, err := db.Exec(context.Background(), statement, slot, root) if err != nil { return false, err diff --git a/pkg/beaconclient/models.go b/pkg/beaconclient/models.go index 67b082c..26e2aba 100644 --- a/pkg/beaconclient/models.go +++ b/pkg/beaconclient/models.go @@ -53,8 +53,8 @@ type ChainReorg struct { // A struct to capture whats being written to the eth-beacon.slots table. type DbSlots struct { - Epoch string // The epoch. - Slot string // The slot. + Epoch uint64 // The epoch. + Slot uint64 // The slot. BlockRoot string // The block root StateRoot string // The state root Status string // The status, it can be proposed | forked | skipped. @@ -90,8 +90,8 @@ type DbBeaconState struct { // A structure to capture whats being written to the eth-beacon.known_gaps table. type DbKnownGaps struct { - StartSlot string // The start slot for known_gaps, inclusive. - EndSlot string // The end slot for known_gaps, inclusive. + StartSlot uint64 // The start slot for known_gaps, inclusive. + EndSlot uint64 // The end slot for known_gaps, inclusive. CheckedOut bool // Indicates if any process is currently processing this entry. ReprocessingError string // The error that occurred when attempting to reprocess these entries. EntryError string // The error that caused this entry to be added to the table. Could be null. diff --git a/pkg/beaconclient/processevents.go b/pkg/beaconclient/processevents.go index 3d1f518..b291204 100644 --- a/pkg/beaconclient/processevents.go +++ b/pkg/beaconclient/processevents.go @@ -20,9 +20,8 @@ package beaconclient import ( "fmt" - "strconv" - log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper" ) // This function will perform the necessary steps to handle a reorg. @@ -31,7 +30,11 @@ func (bc *BeaconClient) handleReorg() { for { reorg := <-bc.ReOrgTracking.ProcessCh log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.") - writeReorgs(bc.Db, reorg.Slot, reorg.NewHeadBlock, bc.Metrics) + slot, err := ParseSlot(reorg.Slot) + if nil != err { + loghelper.LogSlotError(slot.Number(), err) + } + writeReorgs(bc.Db, slot, reorg.NewHeadBlock, bc.Metrics) } } @@ -42,7 +45,7 @@ func (bc *BeaconClient) handleHead() { for { head := <-bc.HeadTracking.ProcessCh // Process all the work here. - slot, err := strconv.ParseUint(head.Slot, 10, 64) + slot, err := ParseSlot(head.Slot) if err != nil { bc.HeadTracking.ErrorCh <- &SseError{ err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot), diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index cf32d77..b1ef9d6 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -58,7 +58,7 @@ type HistoricProcessing struct { } // Get a single row of historical slots from the table. -func (hp HistoricProcessing) getSlotRange(ctx context.Context, slotCh chan<- slotsToProcess, minimumSlot uint64) []error { +func (hp HistoricProcessing) getSlotRange(ctx context.Context, slotCh chan<- slotsToProcess, minimumSlot Slot) []error { return getBatchProcessRow(ctx, hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh, strconv.Itoa(hp.uniqueNodeIdentifier), minimumSlot) } @@ -74,7 +74,7 @@ func (hp HistoricProcessing) handleProcessingErrors(ctx context.Context, errMess case <-ctx.Done(): return case errMs := <-errMessages: - loghelper.LogSlotError(strconv.FormatUint(errMs.slot, 10), errMs.err) + loghelper.LogSlotError(errMs.slot.Number(), errMs.err) writeKnownGaps(hp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, hp.metrics) } } @@ -97,7 +97,7 @@ func (hp HistoricProcessing) releaseDbLocks() error { } // Process the slot range. -func processSlotRangeWorker(ctx context.Context, workCh <-chan uint64, errCh chan<- batchHistoricError, spd SlotProcessingDetails, incrementTracker func(uint64)) { +func processSlotRangeWorker(ctx context.Context, workCh <-chan Slot, errCh chan<- batchHistoricError, spd SlotProcessingDetails, incrementTracker func(uint64)) { for { select { case <-ctx.Done(): @@ -123,7 +123,7 @@ func processSlotRangeWorker(ctx context.Context, workCh <-chan uint64, errCh cha // It also locks the row by updating the checked_out column. // The statement for getting the start_slot and end_slot must be provided. // The statement for "locking" the row must also be provided. -func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess, uniqueNodeIdentifier string, minimumSlot uint64) []error { +func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess, uniqueNodeIdentifier string, minimumSlot Slot) []error { errCount := make([]error, 0) // 5 is an arbitrary number. It allows us to retry a few times before @@ -178,7 +178,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm time.Sleep(1 * time.Second) break } - loghelper.LogSlotRangeStatementError(strconv.FormatUint(sp.startSlot, 10), strconv.FormatUint(sp.endSlot, 10), getStartEndSlotStmt, err).Error("Unable to get a row") + loghelper.LogSlotRangeStatementError(sp.startSlot.Number(), sp.endSlot.Number(), getStartEndSlotStmt, err).Error("Unable to get a row") errCount = append(errCount, err) break } @@ -186,25 +186,25 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm // Checkout the Row res, err := tx.Exec(dbCtx, checkOutRowStmt, sp.startSlot, sp.endSlot, uniqueNodeIdentifier) if err != nil { - loghelper.LogSlotRangeStatementError(strconv.FormatUint(sp.startSlot, 10), strconv.FormatUint(sp.endSlot, 10), checkOutRowStmt, err).Error("Unable to checkout the row") + loghelper.LogSlotRangeStatementError(sp.startSlot.Number(), sp.endSlot.Number(), checkOutRowStmt, err).Error("Unable to checkout the row") errCount = append(errCount, err) break } rows, err := res.RowsAffected() if err != nil { - loghelper.LogSlotRangeStatementError(strconv.FormatUint(sp.startSlot, 10), strconv.FormatUint(sp.endSlot, 10), checkOutRowStmt, fmt.Errorf("Unable to determine the rows affected when trying to checkout a row.")) + loghelper.LogSlotRangeStatementError(sp.startSlot.Number(), sp.endSlot.Number(), checkOutRowStmt, fmt.Errorf("Unable to determine the rows affected when trying to checkout a row.")) errCount = append(errCount, err) break } if rows > 1 { - loghelper.LogSlotRangeStatementError(strconv.FormatUint(sp.startSlot, 10), strconv.FormatUint(sp.endSlot, 10), checkOutRowStmt, err).WithFields(log.Fields{ + loghelper.LogSlotRangeStatementError(sp.startSlot.Number(), sp.endSlot.Number(), checkOutRowStmt, err).WithFields(log.Fields{ "rowsReturn": rows, }).Error("We locked too many rows.....") errCount = append(errCount, err) break } if rows == 0 { - loghelper.LogSlotRangeStatementError(strconv.FormatUint(sp.startSlot, 10), strconv.FormatUint(sp.endSlot, 10), checkOutRowStmt, err).WithFields(log.Fields{ + loghelper.LogSlotRangeStatementError(sp.startSlot.Number(), sp.endSlot.Number(), checkOutRowStmt, err).WithFields(log.Fields{ "rowsReturn": rows, }).Error("We did not lock a single row.") errCount = append(errCount, err) @@ -212,7 +212,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm } err = tx.Commit(dbCtx) if err != nil { - loghelper.LogSlotRangeError(strconv.FormatUint(sp.startSlot, 10), strconv.FormatUint(sp.endSlot, 10), err).Error("Unable commit transactions.") + loghelper.LogSlotRangeError(sp.startSlot.Number(), sp.endSlot.Number(), err).Error("Unable commit transactions.") errCount = append(errCount, err) break } @@ -241,11 +241,11 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan "endSlot": slots.endSlot, }).Debug("Starting to check to see if the following slots have been processed") for { - isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.FormatUint(slots.startSlot, 10)) + isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, slots.startSlot) if err != nil { errCh <- err } - isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.FormatUint(slots.endSlot, 10)) + isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, slots.endSlot) if err != nil { errCh <- err } @@ -255,7 +255,7 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan time.Sleep(3 * time.Second) } - _, err := db.Exec(context.Background(), removeStmt, strconv.FormatUint(slots.startSlot, 10), strconv.FormatUint(slots.endSlot, 10)) + _, err := db.Exec(context.Background(), removeStmt, slots.startSlot.Number(), slots.endSlot.Number()) if err != nil { errCh <- err } diff --git a/pkg/beaconclient/processknowngaps.go b/pkg/beaconclient/processknowngaps.go index 7bc9168..1e35742 100644 --- a/pkg/beaconclient/processknowngaps.go +++ b/pkg/beaconclient/processknowngaps.go @@ -58,7 +58,7 @@ type KnownGapsProcessing struct { } // This function will perform all the heavy lifting for tracking the head of the chain. -func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int, minimumSlot uint64) []error { +func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int, minimumSlot Slot) []error { log.Info("We are starting the known gaps processing service.") bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics} errs := handleBatchProcess(ctx, maxWorkers, bc.KnownGapsProcess, bc.SlotProcessingDetails(), bc.Metrics.IncrementKnownGapsProcessed, minimumSlot) @@ -78,7 +78,7 @@ func (bc *BeaconClient) StopKnownGapsProcessing(cancel context.CancelFunc) error } // Get a single row of historical slots from the table. -func (kgp KnownGapsProcessing) getSlotRange(ctx context.Context, slotCh chan<- slotsToProcess, minimumSlot uint64) []error { +func (kgp KnownGapsProcessing) getSlotRange(ctx context.Context, slotCh chan<- slotsToProcess, minimumSlot Slot) []error { return getBatchProcessRow(ctx, kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh, strconv.Itoa(kgp.uniqueNodeIdentifier), minimumSlot) } @@ -97,21 +97,21 @@ func (kgp KnownGapsProcessing) handleProcessingErrors(ctx context.Context, errMe // Check to see if this if this entry already exists. res, err := kgp.db.Exec(context.Background(), checkKgSingleSlotStmt, errMs.slot, errMs.slot) if err != nil { - loghelper.LogSlotError(strconv.FormatUint(errMs.slot, 10), err).Error("Unable to see if this slot is in the eth_beacon.known_gaps table") + loghelper.LogSlotError(errMs.slot.Number(), err).Error("Unable to see if this slot is in the eth_beacon.known_gaps table") } rows, err := res.RowsAffected() if err != nil { - loghelper.LogSlotError(strconv.FormatUint(errMs.slot, 10), err).WithFields(log.Fields{ + loghelper.LogSlotError(errMs.slot.Number(), err).WithFields(log.Fields{ "queryStatement": checkKgSingleSlotStmt, }).Error("Unable to get the number of rows affected by this statement.") } if rows > 0 { - loghelper.LogSlotError(strconv.FormatUint(errMs.slot, 10), errMs.err).Error("We received an error when processing a knownGap") + loghelper.LogSlotError(errMs.slot.Number(), errMs.err).Error("We received an error when processing a knownGap") err = updateKnownGapErrors(kgp.db, errMs.slot, errMs.slot, errMs.err, kgp.metrics) if err != nil { - loghelper.LogSlotError(strconv.FormatUint(errMs.slot, 10), err).Error("Error processing known gap") + loghelper.LogSlotError(errMs.slot.Number(), err).Error("Error processing known gap") } } else { writeKnownGaps(kgp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, kgp.metrics) diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index 26eedc4..5d5eafb 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -23,7 +23,6 @@ import ( "context" "encoding/hex" "fmt" - "strconv" "strings" "time" @@ -45,8 +44,8 @@ type SlotProcessingDetails struct { PerformBeaconStateProcessing bool // Should we process BeaconStates? PerformBeaconBlockProcessing bool // Should we process BeaconBlocks? - StartingSlot uint64 // If we're performing head tracking. What is the first slot we processed. - PreviousSlot uint64 // Whats the previous slot we processed + StartingSlot Slot // If we're performing head tracking. What is the first slot we processed. + PreviousSlot Slot // Whats the previous slot we processed PreviousBlockRoot string // Whats the previous block root, used to check the next blocks parent. } @@ -71,8 +70,8 @@ func (bc *BeaconClient) SlotProcessingDetails() SlotProcessingDetails { type ProcessSlot struct { // Generic - Slot uint64 // The slot number. - Epoch uint64 // The epoch number. + Slot Slot // The slot number. + Epoch Epoch // The epoch number. BlockRoot string // The hex encoded string of the BlockRoot. StateRoot string // The hex encoded string of the StateRoot. ParentBlockRoot string // The hex encoded string of the parent block. @@ -114,10 +113,10 @@ type PerformanceMetrics struct { // known_gaps table. func processFullSlot( ctx context.Context, - slot uint64, + slot Slot, blockRoot string, stateRoot string, - previousSlot uint64, + previousSlot Slot, previousBlockRoot string, knownGapsTableIncrement int, headOrHistoric string, @@ -201,7 +200,7 @@ func processFullSlot( checkDbTime := time.Now() var blockRequired bool if spd.PerformBeaconBlockProcessing { - blockExists, err := checkSlotAndRoot(ps.Db, CheckSignedBeaconBlockStmt, strconv.FormatUint(ps.Slot, 10), finalBlockRoot) + blockExists, err := checkSlotAndRoot(ps.Db, CheckSignedBeaconBlockStmt, ps.Slot, finalBlockRoot) if err != nil { return err, "checkDb" } @@ -210,7 +209,7 @@ func processFullSlot( var stateRequired bool if spd.PerformBeaconStateProcessing { - stateExists, err := checkSlotAndRoot(ps.Db, CheckBeaconStateStmt, strconv.FormatUint(ps.Slot, 10), finalStateRoot) + stateExists, err := checkSlotAndRoot(ps.Db, CheckBeaconStateStmt, ps.Slot, finalStateRoot) if err != nil { return err, "checkDb" } @@ -280,7 +279,7 @@ func processFullSlot( } // Handle a slot that is at head. A wrapper function for calling `handleFullSlot`. -func processHeadSlot(slot uint64, blockRoot string, stateRoot string, spd SlotProcessingDetails) { +func processHeadSlot(slot Slot, blockRoot string, stateRoot string, spd SlotProcessingDetails) { // Get the knownGaps at startUp if spd.PreviousSlot == 0 && spd.PreviousBlockRoot == "" { writeStartUpGaps(spd.Db, spd.KnownGapTableIncrement, slot, spd.Metrics) @@ -294,7 +293,7 @@ func processHeadSlot(slot uint64, blockRoot string, stateRoot string, spd SlotPr } // Handle a historic slot. A wrapper function for calling `handleFullSlot`. -func handleHistoricSlot(ctx context.Context, slot uint64, spd SlotProcessingDetails) (error, string) { +func handleHistoricSlot(ctx context.Context, slot Slot, spd SlotProcessingDetails) (error, string) { return processFullSlot(ctx, slot, "", "", 0, "", 1, "historic", &spd) } @@ -305,14 +304,14 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error { if ps.BlockRoot != "" { blockIdentifier = ps.BlockRoot } else { - blockIdentifier = strconv.FormatUint(ps.Slot, 10) + blockIdentifier = ps.Slot.Format() } blockEndpoint := serverAddress + BcBlockQueryEndpoint + blockIdentifier - sszSignedBeaconBlock, rc, err := querySsz(blockEndpoint, strconv.FormatUint(ps.Slot, 10)) + sszSignedBeaconBlock, rc, err := querySsz(blockEndpoint, ps.Slot) if err != nil || rc != 200 { - loghelper.LogSlotError(strconv.FormatUint(ps.Slot, 10), err).Error("Unable to properly query the slot.") + loghelper.LogSlotError(ps.Slot.Number(), err).Error("Unable to properly query the slot.") ps.FullSignedBeaconBlock = nil ps.SszSignedBeaconBlock = []byte{} ps.ParentBlockRoot = "" @@ -328,7 +327,7 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error { var signedBeaconBlock SignedBeaconBlock err = signedBeaconBlock.UnmarshalSSZ(sszSignedBeaconBlock) if err != nil { - loghelper.LogSlotError(strconv.FormatUint(ps.Slot, 10), err).Error("Unable to unmarshal SignedBeaconBlock for slot.") + loghelper.LogSlotError(ps.Slot.Number(), err).Error("Unable to unmarshal SignedBeaconBlock for slot.") ps.FullSignedBeaconBlock = nil ps.SszSignedBeaconBlock = []byte{} ps.ParentBlockRoot = "" @@ -349,20 +348,20 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string) error { if ps.StateRoot != "" { stateIdentifier = ps.StateRoot } else { - stateIdentifier = strconv.FormatUint(ps.Slot, 10) + stateIdentifier = ps.Slot.Format() } stateEndpoint := serverEndpoint + BcStateQueryEndpoint + stateIdentifier - sszBeaconState, _, err := querySsz(stateEndpoint, strconv.FormatUint(ps.Slot, 10)) + sszBeaconState, _, err := querySsz(stateEndpoint, ps.Slot) if err != nil { - loghelper.LogSlotError(strconv.FormatUint(ps.Slot, 10), err).Error("Unable to properly query the BeaconState.") + loghelper.LogSlotError(ps.Slot.Number(), err).Error("Unable to properly query the BeaconState.") return err } var beaconState BeaconState err = beaconState.UnmarshalSSZ(sszBeaconState) if err != nil { - loghelper.LogSlotError(strconv.FormatUint(ps.Slot, 10), err).Error("Unable to unmarshal the BeaconState.") + loghelper.LogSlotError(ps.Slot.Number(), err).Error("Unable to unmarshal the BeaconState.") return err } @@ -372,7 +371,7 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string) error { } // Check to make sure that the previous block we processed is the parent of the current block. -func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previousSlot uint64, previousBlockRoot string, knownGapsTableIncrement int) { +func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previousSlot Slot, previousBlockRoot string, knownGapsTableIncrement int) { if nil == ps.FullSignedBeaconBlock { log.Debug("Can't check block root, no current block.") return @@ -384,7 +383,7 @@ func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previou "slot": slot, "fork": true, }).Warn("A fork occurred! The previous slot and current slot match.") - transactReorgs(tx, ctx, strconv.FormatUint(ps.Slot, 10), ps.BlockRoot, ps.Metrics) + transactReorgs(tx, ctx, ps.Slot, ps.BlockRoot, ps.Metrics) } else if previousSlot > slot { log.WithFields(log.Fields{ "previousSlot": previousSlot, @@ -401,7 +400,7 @@ func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previou "previousBlockRoot": previousBlockRoot, "currentBlockParent": parentRoot, }).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.") - transactReorgs(tx, ctx, strconv.FormatUint(previousSlot, 10), parentRoot, ps.Metrics) + transactReorgs(tx, ctx, previousSlot, parentRoot, ps.Metrics) } else { log.Debug("Previous Slot and Current Slot are one distance from each other.") } diff --git a/pkg/beaconclient/queryserver.go b/pkg/beaconclient/queryserver.go index 9ce4ffe..4971e62 100644 --- a/pkg/beaconclient/queryserver.go +++ b/pkg/beaconclient/queryserver.go @@ -37,18 +37,18 @@ type BlockRootMessage struct { } // A helper function to query endpoints that utilize slots. -func querySsz(endpoint string, slot string) ([]byte, int, error) { +func querySsz(endpoint string, slot Slot) ([]byte, int, error) { log.WithFields(log.Fields{"endpoint": endpoint}).Debug("Querying endpoint") client := &http.Client{} req, err := http.NewRequest("GET", endpoint, nil) if err != nil { - loghelper.LogSlotError(slot, err).Error("Unable to create a request!") + loghelper.LogSlotError(slot.Number(), err).Error("Unable to create a request!") return nil, 0, fmt.Errorf("Unable to create a request!: %s", err.Error()) } req.Header.Set("Accept", "application/octet-stream") response, err := client.Do(req) if err != nil { - loghelper.LogSlotError(slot, err).Error("Unable to query Beacon Node!") + loghelper.LogSlotError(slot.Number(), err).Error("Unable to query Beacon Node!") return nil, 0, fmt.Errorf("Unable to query Beacon Node: %s", err.Error()) } defer response.Body.Close() @@ -61,7 +61,7 @@ func querySsz(endpoint string, slot string) ([]byte, int, error) { body, err := ioutil.ReadAll(response.Body) if err != nil { - loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!") + loghelper.LogSlotError(slot.Number(), err).Error("Unable to turn response into a []bytes array!") return nil, rc, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error()) } diff --git a/pkg/loghelper/logerror.go b/pkg/loghelper/logerror.go index f6dce63..067d50d 100644 --- a/pkg/loghelper/logerror.go +++ b/pkg/loghelper/logerror.go @@ -28,21 +28,21 @@ func LogError(err error) *log.Entry { } // A simple herlper function to log slot and error. -func LogSlotError(slot string, err error) *log.Entry { +func LogSlotError(slot uint64, err error) *log.Entry { return log.WithFields(log.Fields{ "err": err, "slot": slot, }) } -func LogSlotRangeError(startSlot string, endSlot string, err error) *log.Entry { +func LogSlotRangeError(startSlot uint64, endSlot uint64, err error) *log.Entry { return log.WithFields(log.Fields{ "err": err, "startSlot": startSlot, "endSlot": endSlot, }) } -func LogSlotRangeStatementError(startSlot string, endSlot string, statement string, err error) *log.Entry { +func LogSlotRangeStatementError(startSlot uint64, endSlot uint64, statement string, err error) *log.Entry { return log.WithFields(log.Fields{ "err": err, "startSlot": startSlot, diff --git a/pkg/loghelper/logreorg.go b/pkg/loghelper/logreorg.go index 64776cf..8784e21 100644 --- a/pkg/loghelper/logreorg.go +++ b/pkg/loghelper/logreorg.go @@ -20,7 +20,7 @@ import ( ) // A simple helper function that will help wrap the reorg error messages. -func LogReorgError(slot string, latestBlockRoot string, err error) *log.Entry { +func LogReorgError(slot uint64, latestBlockRoot string, err error) *log.Entry { return log.WithFields(log.Fields{ "err": err, "slot": slot, @@ -29,7 +29,7 @@ func LogReorgError(slot string, latestBlockRoot string, err error) *log.Entry { } // A simple helper function that will help wrap regular reorg messages. -func LogReorg(slot string, latestBlockRoot string) *log.Entry { +func LogReorg(slot uint64, latestBlockRoot string) *log.Entry { return log.WithFields(log.Fields{ "slot": slot, "latestBlockRoot": latestBlockRoot,