From 3e1de8ddd80f1f0eea5c62ce7dde05926d638c58 Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Tue, 17 May 2022 15:10:47 -0400 Subject: [PATCH] Add final tests --- pkg/beaconclient/beaconclient.go | 2 + pkg/beaconclient/capturehead_test.go | 237 ++++++++++++++++++++------- pkg/beaconclient/databasewrite.go | 10 +- pkg/beaconclient/incomingsse.go | 7 +- pkg/beaconclient/metrics.go | 14 +- pkg/beaconclient/processslot.go | 3 +- 6 files changed, 205 insertions(+), 68 deletions(-) diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index ae0e843..cb8df7d 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -30,6 +30,8 @@ type BeaconClientMetrics struct { HeadTrackingInserts uint64 // Number of head events we successfully wrote to the DB. HeadTrackingReorgs uint64 // Number of reorg events we successfully wrote to the DB. HeadTrackingKnownGaps uint64 // Number of known_gaps we successfully wrote to the DB. + HeadError uint64 // Number of errors that occurred when decoding the head message. + HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message. } // A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying. diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go index ef3b259..2781b96 100644 --- a/pkg/beaconclient/capturehead_test.go +++ b/pkg/beaconclient/capturehead_test.go @@ -100,6 +100,20 @@ var _ = Describe("Capturehead", func() { SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"), BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"), }, + "102-wrong-ssz-1": { + HeadMessage: beaconclient.Head{ + Slot: "102", + Block: "0x46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42", + State: "0x9b20b114c613c1aa462e02d590b3da902b0a1377e938ed0f94dd3491d763ef67", + CurrentDutyDependentRoot: "", + PreviousDutyDependentRoot: "", + EpochTransition: false, + ExecutionOptimistic: false, + }, + TestNotes: "A bad block that returns the wrong ssz objects, used for testing incorrect SSZ decoding.", + BeaconState: filepath.Join("ssz-data", "102", "signed-beacon-block.ssz"), + SignedBeaconBlock: filepath.Join("ssz-data", "102", "beacon-state.ssz"), + }, "100": { HeadMessage: beaconclient.Head{ Slot: "100", @@ -172,6 +186,16 @@ var _ = Describe("Capturehead", func() { SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"), BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"), }, + "3797056": { + HeadMessage: beaconclient.Head{ + Slot: "3797056", + Block: "", + State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e", + CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false}, + TestNotes: "An easy to process Altair Block", + SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"), + BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"), + }, } TestConfig = Config{ protocol: protocol, @@ -193,16 +217,13 @@ var _ = Describe("Capturehead", func() { } }) - // We might also want to add an integration test that will actually process a single event, then end. - // This will help us know that our models match that actual data being served from the beacon node. - Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() { Context("Correctly formatted Phase0 Block", func() { It("Should turn it into a struct successfully.", func() { bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 0, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE") validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW") @@ -213,36 +234,36 @@ var _ = Describe("Capturehead", func() { bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 0, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, "0x83154c692b9cce50bdf56af5a933da0a020ed7ff809a6a8236301094c7f25276", "0xd74b1c60423651624de6bb301ac25808951c167ba6ecdd9b2e79b4315aee8202", "/blocks/QHVAEQRQPA2DGOJSGM3TEYZVMY3GKMZZGQ4TSZJTGFRGMOJSGQZTQODCGU4DCNJWGM4TCMBTGE2DSZRQMY2TIZRYME2DKMZXG4ZWEMJYGAZDGMBR") validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, "/blocks/QHVAEQRQPBRDMMRRGVRDKNRQGI3TGYLGGYZWKYZXMUYDCMJVG4ZGENRQMVRTCY3BGBRDAMRTGJTDQZTGGQ2GMY3EGRSWINJVMM3TKMRWMU4TMNDF") }) }) - Context("Correctly formatted Altair Test Blocks", Label("now"), func() { + Context("Correctly formatted Altair Test Blocks", func() { It("Should turn it into a struct successfully.", func() { bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry, 0, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry, 1, 0, 0) bc = setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage, 74240, maxRetry, 0, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage, 74240, maxRetry, 1, 0, 0) }) }) - Context("Correctly formatted Phase0 Test Blocks", Label("now"), func() { + Context("Correctly formatted Phase0 Test Blocks", func() { It("Should turn it into a struct successfully.", func() { bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 0, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0) bc = setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 0, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 1, 0, 0) }) }) @@ -251,8 +272,8 @@ var _ = Describe("Capturehead", func() { bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 0, 0) - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 0, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 0, 0) }) }) Context("Two consecutive blocks with a bad parent", func() { @@ -260,8 +281,8 @@ var _ = Describe("Capturehead", func() { bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 0, 0) - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 1) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 1, 1) }) }) Context("Phase 0: We have a correctly formated SSZ SignedBeaconBlock and BeaconState", func() { @@ -274,29 +295,72 @@ var _ = Describe("Capturehead", func() { testSszRoot(BeaconNodeTester.TestEvents["2375703"]) }) }) - //Context("A single incorrectly formatted head message", func() { - // It("Should create an error, maybe also add the projected slot to the knownGaps table......") - // If it can unmarshal the head add it to knownGaps - //}) - //Context("An incorrectly formatted message sandwiched between correctly formatted messages", func() { - // It("Should create an error, maybe also add the projected slot to the knownGaps table......") - //}) - // Context("When there is a skipped slot", func() { - // It("Should indicate that the slot was skipped") - // }) - // Context("When the slot is not properly served", func() { - // It("Should return an error, and add the slot to the knownGaps table.") + //Context("When there is a skipped slot", func() { + // It("Should indicate that the slot was skipped", func() { + // }) //}) - // Context("With gaps in between head slots", func() { - // It("Should add the slots in between to the knownGaps table") - // }) - // Context("With the previousBlockHash not matching the parentBlockHash", func() { - // It("Should recognize the reorg and add the previous slot to knownGaps table.") - // }) - // Context("Out of order", func() { - // It("Not sure what it should do....") - // }) + Context("When the proper SSZ objects are not served", Label("now"), func() { + It("Should return an error, and add the slot to the knownGaps table.", func() { + bc := setUpTest(BeaconNodeTester.TestConfig, "101") + BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) + defer httpmock.DeactivateAndReset() + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["102-wrong-ssz-1"].HeadMessage, 3, maxRetry, 0, 1, 0) + + knownGapCount := countKnownGapsTable(bc.Db) + Expect(knownGapCount).To(Equal(1)) + + start, end := queryKnownGaps(bc.Db, "102", "102") + Expect(start).To(Equal(102)) + Expect(end).To(Equal(102)) + }) + }) + }) + + Describe("Known Gaps Scenario", Label("unit", "behavioral"), func() { + Context("There is a gap at start up within one incrementing range.", func() { + It("Should add only a single entry to the knownGaps table.", func() { + bc := setUpTest(BeaconNodeTester.TestConfig, "10") + BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) + defer httpmock.DeactivateAndReset() + BeaconNodeTester.testKnownGapsMessages(bc, 100, 1, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage) + start, end := queryKnownGaps(bc.Db, "11", "99") + Expect(start).To(Equal(11)) + Expect(end).To(Equal(99)) + }) + }) + Context("There is a gap at start up spanning multiple incrementing range.", func() { + It("Should add multiple entries to the knownGaps table.", func() { + bc := setUpTest(BeaconNodeTester.TestConfig, "5") + BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) + defer httpmock.DeactivateAndReset() + BeaconNodeTester.testKnownGapsMessages(bc, 10, 10, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage) + + start, end := queryKnownGaps(bc.Db, "6", "16") + Expect(start).To(Equal(6)) + Expect(end).To(Equal(16)) + + start, end = queryKnownGaps(bc.Db, "96", "99") + Expect(start).To(Equal(96)) + Expect(end).To(Equal(99)) + }) + }) + Context("Gaps between two head messages", func() { + It("Should add the slots in-between", func() { + bc := setUpTest(BeaconNodeTester.TestConfig, "99") + BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) + defer httpmock.DeactivateAndReset() + BeaconNodeTester.testKnownGapsMessages(bc, 1000000, 3, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].HeadMessage) + + start, end := queryKnownGaps(bc.Db, "101", "1000101") + Expect(start).To(Equal(101)) + Expect(end).To(Equal(1000101)) + + start, end = queryKnownGaps(bc.Db, "2000101", "2375702") + Expect(start).To(Equal(2000101)) + Expect(end).To(Equal(2375702)) + }) + }) }) Describe("ReOrg Scenario", Label("unit", "behavioral"), func() { @@ -305,7 +369,6 @@ var _ = Describe("Capturehead", func() { bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - //BeaconNodeTester.testMultipleHead(bc, TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, 10) BeaconNodeTester.testMultipleHead(bc, TestEvents["2375703"].HeadMessage, TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry) }) }) @@ -333,11 +396,6 @@ var _ = Describe("Capturehead", func() { BeaconNodeTester.testMultipleReorgs(bc, TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry) }) }) - //Context("Reorg slot in not already in the DB", func() { - // It("Should simply have the correct slot in the DB.") - // Add to knowngaps - //}) - }) }) @@ -413,7 +471,7 @@ func validateBeaconState(bc *beaconclient.BeaconClient, headMessage beaconclient } // Wrapper function to send a head message to the beaconclient -func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxRetry int) { +func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxRetry int, expectedSuccessfulInserts uint64) { data, err := json.Marshal(head) Expect(err).ToNot(HaveOccurred()) @@ -426,14 +484,14 @@ func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxR Retry: []byte{}, } curRetry := 0 - for atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts) != startInserts+1 { + for atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts) != startInserts+expectedSuccessfulInserts { time.Sleep(1 * time.Second) curRetry = curRetry + 1 if curRetry == maxRetry { log.WithFields(log.Fields{ "startInsert": startInserts, "currentValue": atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts), - }).Error("HeadTracking Insert wasnt incremented properly.") + }).Error("HeadTracking Insert wasn't incremented properly.") Fail("Too many retries have occurred.") } } @@ -472,6 +530,26 @@ func queryDbBeaconState(db sql.Database, querySlot string, queryStateRoot string return slot, stateRoot, mh_key } +// Count the entries in the knownGaps table. +func countKnownGapsTable(db sql.Database) int { + var count int + sqlStatement := "SELECT COUNT(*) FROM ethcl.known_gaps" + err := db.QueryRow(context.Background(), sqlStatement).Scan(&count) + Expect(err).ToNot(HaveOccurred()) + return count +} + +// Return the start and end slot +func queryKnownGaps(db sql.Database, queryStartGap string, QueryEndGap string) (int, int) { + sqlStatement := `SELECT start_slot, end_slot FROM ethcl.known_gaps WHERE start_slot=$1 AND end_slot=$2;` + var startGap, endGap int + row := db.QueryRow(context.Background(), sqlStatement, queryStartGap, QueryEndGap) + err := row.Scan(&startGap, &endGap) + Expect(err).ToNot(HaveOccurred()) + return startGap, endGap + +} + // A function that will remove all entries from the ethcl tables for you. func clearEthclDbTables(db sql.Database) { deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.known_gaps;"} @@ -681,9 +759,9 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs time.Sleep(1 * time.Second) log.Info("Sending Phase0 Messages to BeaconClient") - sendHeadMessage(bc, firstHead, maxRetry) - sendHeadMessage(bc, secondHead, maxRetry) - sendHeadMessage(bc, thirdHead, maxRetry) + sendHeadMessage(bc, firstHead, maxRetry, 1) + sendHeadMessage(bc, secondHead, maxRetry, 2) + sendHeadMessage(bc, thirdHead, maxRetry, 2) curRetry := 0 for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 2 { @@ -721,7 +799,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs time.Sleep(1 * time.Second) curRetry = curRetry + 1 if curRetry == maxRetry { - Fail("Too many retries have occured.") + Fail("Too many retries have occurred.") } } @@ -738,17 +816,32 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs } // A test to validate a single block was processed correctly -func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedKnownGaps uint64, expectedReorgs uint64) { +func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) { go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement) time.Sleep(1 * time.Second) - sendHeadMessage(bc, head, maxRetry) - if bc.Metrics.HeadTrackingKnownGaps != expectedKnownGaps { - Fail(fmt.Sprintf("Wrong gap metrics, got: %d, wanted %d", bc.Metrics.HeadTrackingKnownGaps, expectedKnownGaps)) + sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert) + + curRetry := 0 + for atomic.LoadUint64(&bc.Metrics.HeadTrackingKnownGaps) != expectedKnownGaps { + time.Sleep(1 * time.Second) + curRetry = curRetry + 1 + if curRetry == maxRetry { + Fail(fmt.Sprintf("Wrong gap metrics, got: %d, wanted %d", bc.Metrics.HeadTrackingKnownGaps, expectedKnownGaps)) + } } - if bc.Metrics.HeadTrackingReorgs != expectedReorgs { - Fail(fmt.Sprintf("Wrong reorg metrics, got: %d, wanted %d", bc.Metrics.HeadTrackingKnownGaps, expectedKnownGaps)) + + curRetry = 0 + for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != expectedReorgs { + time.Sleep(1 * time.Second) + curRetry = curRetry + 1 + if curRetry == maxRetry { + Fail(fmt.Sprintf("Wrong reorg metrics, got: %d, wanted %d", bc.Metrics.HeadTrackingKnownGaps, expectedKnownGaps)) + } + } + + if expectedSuccessInsert > 0 { + validateSlot(bc, head, epoch, "proposed") } - validateSlot(bc, head, epoch, "proposed") } // A test that ensures that if two HeadMessages occur for a single slot they are marked @@ -757,8 +850,8 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement) time.Sleep(1 * time.Second) - sendHeadMessage(bc, firstHead, maxRetry) - sendHeadMessage(bc, secondHead, maxRetry) + sendHeadMessage(bc, firstHead, maxRetry, 1) + sendHeadMessage(bc, secondHead, maxRetry, 1) curRetry := 0 for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 1 { @@ -778,6 +871,34 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH validateSlot(bc, secondHead, epoch, "proposed") } +// A test that ensures that if two HeadMessages occur for a single slot they are marked +// as proposed and forked correctly. +func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) { + go bc.CaptureHead(tableIncrement) + time.Sleep(1 * time.Second) + + for _, headMsg := range msg { + sendHeadMessage(bc, headMsg, maxRetry, 1) + } + + curRetry := 0 + for atomic.LoadUint64(&bc.Metrics.HeadTrackingKnownGaps) != expectedEntries { + time.Sleep(1 * time.Second) + curRetry = curRetry + 1 + if curRetry == maxRetry { + Fail("Too many retries have occurred.") + } + } + + log.Info("Checking to make sure we have the expected number of entries in the knownGaps table.") + knownGapCount := countKnownGapsTable(bc.Db) + Expect(knownGapCount).To(Equal(int(expectedEntries))) + + if atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 0 { + Fail("We found reorgs when we didn't expect it") + } +} + // This function will make sure we are properly able to get the SszRoot of the SignedBeaconBlock and the BeaconState. func testSszRoot(msg Message) { state, vm, err := readBeaconState(msg.BeaconState) diff --git a/pkg/beaconclient/databasewrite.go b/pkg/beaconclient/databasewrite.go index 9bad1e8..43803fb 100644 --- a/pkg/beaconclient/databasewrite.go +++ b/pkg/beaconclient/databasewrite.go @@ -322,7 +322,7 @@ func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot EntryError: entryError.Error(), EntryProcess: entryProcess, } - upsertKnownGaps(db, kgModel) + upsertKnownGaps(db, kgModel, metric) } else { totalSlots := endSlot - startSlot var chunks int @@ -347,15 +347,14 @@ func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot EntryError: entryError.Error(), EntryProcess: entryProcess, } - upsertKnownGaps(db, kgModel) + upsertKnownGaps(db, kgModel, metric) } } - metric.IncrementHeadTrackingKnownGaps(1) } // A function to upsert a single entry to the ethcl.known_gaps table. -func upsertKnownGaps(db sql.Database, knModel DbKnownGaps) { +func upsertKnownGaps(db sql.Database, knModel DbKnownGaps, metric *BeaconClientMetrics) { _, err := db.Exec(context.Background(), UpsertKnownGapsStmt, knModel.StartSlot, knModel.EndSlot, knModel.CheckedOut, knModel.ReprocessingError, knModel.EntryError, knModel.EntryProcess) if err != nil { @@ -369,6 +368,7 @@ func upsertKnownGaps(db sql.Database, knModel DbKnownGaps) { "startSlot": knModel.StartSlot, "endSlot": knModel.EndSlot, }).Warn("A new gap has been added to the ethcl.known_gaps table.") + metric.IncrementHeadTrackingKnownGaps(1) } // A function to write the gap between the highest slot in the DB and the first processed slot. @@ -385,7 +385,7 @@ func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot int, metric }).Fatal("Unable to get convert max block from DB to int. We must close the application or we might have undetected gaps.") } if maxSlot != firstSlot-1 { - writeKnownGaps(db, tableIncrement, maxSlot, firstSlot-1, fmt.Errorf(""), "startup", metric) + writeKnownGaps(db, tableIncrement, maxSlot+1, firstSlot-1, fmt.Errorf(""), "startup", metric) } } diff --git a/pkg/beaconclient/incomingsse.go b/pkg/beaconclient/incomingsse.go index bd5abc5..4cb8ef4 100644 --- a/pkg/beaconclient/incomingsse.go +++ b/pkg/beaconclient/incomingsse.go @@ -18,7 +18,7 @@ var ( // This function will capture all the SSE events for a given SseEvents object. // When new messages come in, it will ensure that they are decoded into JSON. // If any errors occur, it log the error information. -func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) { +func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMetricInc func(uint64)) { go func() { errG := new(errgroup.Group) errG.Go(func() error { @@ -55,6 +55,7 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) { "msg": headErr.msg, }, ).Error("Unable to handle event.") + errMetricInc(1) } } } @@ -77,6 +78,6 @@ func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan // Capture all of the event topics. func (bc *BeaconClient) captureEventTopic() { log.Info("We are capturing all SSE events") - go handleIncomingSseEvent(bc.HeadTracking) - go handleIncomingSseEvent(bc.ReOrgTracking) + go handleIncomingSseEvent(bc.HeadTracking, bc.Metrics.IncrementHeadError) + go handleIncomingSseEvent(bc.ReOrgTracking, bc.Metrics.IncrementHeadReorgError) } diff --git a/pkg/beaconclient/metrics.go b/pkg/beaconclient/metrics.go index 36c78b7..2778198 100644 --- a/pkg/beaconclient/metrics.go +++ b/pkg/beaconclient/metrics.go @@ -16,8 +16,20 @@ func (m *BeaconClientMetrics) IncrementHeadTrackingReorgs(inc uint64) { atomic.AddUint64(&m.HeadTrackingReorgs, inc) } -// Wrapper function to increment reorgs. If we want to use mutexes later we can easily update all +// Wrapper function to increment known gaps. If we want to use mutexes later we can easily update all // occurrences here. func (m *BeaconClientMetrics) IncrementHeadTrackingKnownGaps(inc uint64) { atomic.AddUint64(&m.HeadTrackingKnownGaps, inc) } + +// Wrapper function to increment head errors. If we want to use mutexes later we can easily update all +// occurrences here. +func (m *BeaconClientMetrics) IncrementHeadError(inc uint64) { + atomic.AddUint64(&m.HeadError, inc) +} + +// Wrapper function to increment reorg errors. If we want to use mutexes later we can easily update all +// occurrences here. +func (m *BeaconClientMetrics) IncrementHeadReorgError(inc uint64) { + atomic.AddUint64(&m.HeadReorgError, inc) +} diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index d445bd3..7c4082d 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -96,6 +96,7 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot if err := g.Wait(); err != nil { writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot", ps.Metrics) + return err } if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" { @@ -234,7 +235,7 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str } else if previousSlot+1 != int(ps.FullBeaconState.Slot()) { log.WithFields(log.Fields{ "previousSlot": previousSlot, - "currentSlot": ps.FullBeaconState.Slot, + "currentSlot": ps.FullBeaconState.Slot(), }).Error("We skipped a few slots.") writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot())-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics) } else if previousBlockRoot != parentRoot {