diff --git a/pkg/beaconclient/capturehistoric_test.go b/pkg/beaconclient/capturehistoric_test.go index 0f324b4..d1e4c42 100644 --- a/pkg/beaconclient/capturehistoric_test.go +++ b/pkg/beaconclient/capturehistoric_test.go @@ -94,7 +94,7 @@ var _ = Describe("Capturehistoric", func() { BeaconNodeTester.runKnownGapsProcess(bc, 2, 2, 0, 2, 0) }) }) - Context("When theres a reprocessing error", func() { + Context("When theres a reprocessing error", Label("reprocessingError"), func() { It("Should update the reprocessing error.", func() { bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) @@ -106,6 +106,68 @@ var _ = Describe("Capturehistoric", func() { }) }) }) + Describe("Running the application in Historic, Head, and KnownGaps mode", Label("unit", "historical", "full"), func() { + Context("When it recieves a head, historic and known Gaps message (in order)", func() { + It("Should process them all successfully.", func() { + bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") + BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) + defer httpmock.DeactivateAndReset() + // Head + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) + + // Historical + BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10) + BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0) + + // Known Gaps + BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101) + BeaconNodeTester.runKnownGapsProcess(bc, 2, 3, 0, 0, 0) + + time.Sleep(2 * time.Second) + validatePopularBatchBlocks(bc) + }) + }) + Context("When it recieves a historic, head and known Gaps message (in order)", func() { + It("Should process them all successfully.", func() { + bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") + BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) + defer httpmock.DeactivateAndReset() + // Historical + BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10) + BeaconNodeTester.runHistoricalProcess(bc, 2, 1, 0, 0, 0) + + // Head + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) + + // Known Gaps + BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101) + BeaconNodeTester.runKnownGapsProcess(bc, 2, 3, 0, 0, 0) + + time.Sleep(2 * time.Second) + validatePopularBatchBlocks(bc) + }) + }) + Context("When it recieves a known Gaps, historic and head message (in order)", func() { + It("Should process them all successfully.", func() { + bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") + BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) + defer httpmock.DeactivateAndReset() + // Known Gaps + BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101) + BeaconNodeTester.runKnownGapsProcess(bc, 2, 1, 0, 0, 0) + + // Historical + BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10) + BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0) + + // Head + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) + + time.Sleep(2 * time.Second) + validatePopularBatchBlocks(bc) + }) + }) + }) }) // This function will write an even to the ethcl.known_gaps table @@ -223,7 +285,6 @@ func validateAllRowsCheckedOut(db sql.Database, checkStmt string) { res, err := db.Exec(context.Background(), checkStmt) Expect(err).ToNot(HaveOccurred()) rows, err := res.RowsAffected() - log.Info("rows: ", rows) Expect(err).ToNot(HaveOccurred()) Expect(rows).To(Equal(int64(0))) } diff --git a/pkg/beaconclient/databasewrite.go b/pkg/beaconclient/databasewrite.go index b30c3cc..8f14dd6 100644 --- a/pkg/beaconclient/databasewrite.go +++ b/pkg/beaconclient/databasewrite.go @@ -73,7 +73,7 @@ INSERT INTO ethcl.known_gaps (start_slot, end_slot, checked_out, reprocessing_er VALUES ($1, $2, $3, $4, $5, $6) on CONFLICT (start_slot, end_slot) DO NOTHING` UpsertKnownGapsErrorStmt string = ` UPDATE ethcl.known_gaps - SET reprocessing_error=$3 + SET reprocessing_error=$3, priority=priority+1 WHERE start_slot=$1 AND end_slot=$2;` // Get the highest slot if one exists QueryHighestSlotStmt string = "SELECT COALESCE(MAX(slot), 0) FROM ethcl.slots" diff --git a/pkg/beaconclient/processknowngaps.go b/pkg/beaconclient/processknowngaps.go index 5ea21e9..79f2d53 100644 --- a/pkg/beaconclient/processknowngaps.go +++ b/pkg/beaconclient/processknowngaps.go @@ -31,6 +31,7 @@ var ( // Get a single non-checked out row row from ethcl.known_gaps. getKgEntryStmt string = `SELECT start_slot, end_slot FROM ethcl.known_gaps WHERE checked_out=false + ORDER BY priority ASC LIMIT 1;` // Used to periodically check to see if there is a new entry in the ethcl.known_gaps table. checkKgEntryStmt string = `SELECT * FROM ethcl.known_gaps WHERE checked_out=false;`