Add final tests

This commit is contained in:
Abdul Rabbani 2022-05-17 15:10:47 -04:00
parent 4be3b9b1b4
commit 3e1de8ddd8
6 changed files with 205 additions and 68 deletions

View File

@ -30,6 +30,8 @@ type BeaconClientMetrics struct {
HeadTrackingInserts uint64 // Number of head events we successfully wrote to the DB.
HeadTrackingReorgs uint64 // Number of reorg events we successfully wrote to the DB.
HeadTrackingKnownGaps uint64 // Number of known_gaps we successfully wrote to the DB.
HeadError uint64 // Number of errors that occurred when decoding the head message.
HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message.
}
// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying.

View File

@ -100,6 +100,20 @@ var _ = Describe("Capturehead", func() {
SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
},
"102-wrong-ssz-1": {
HeadMessage: beaconclient.Head{
Slot: "102",
Block: "0x46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42",
State: "0x9b20b114c613c1aa462e02d590b3da902b0a1377e938ed0f94dd3491d763ef67",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "A bad block that returns the wrong ssz objects, used for testing incorrect SSZ decoding.",
BeaconState: filepath.Join("ssz-data", "102", "signed-beacon-block.ssz"),
SignedBeaconBlock: filepath.Join("ssz-data", "102", "beacon-state.ssz"),
},
"100": {
HeadMessage: beaconclient.Head{
Slot: "100",
@ -172,6 +186,16 @@ var _ = Describe("Capturehead", func() {
SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
},
"3797056": {
HeadMessage: beaconclient.Head{
Slot: "3797056",
Block: "",
State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e",
CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false},
TestNotes: "An easy to process Altair Block",
SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
},
}
TestConfig = Config{
protocol: protocol,
@ -193,16 +217,13 @@ var _ = Describe("Capturehead", func() {
}
})
// We might also want to add an integration test that will actually process a single event, then end.
// This will help us know that our models match that actual data being served from the beacon node.
Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() {
Context("Correctly formatted Phase0 Block", func() {
It("Should turn it into a struct successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 0, 0)
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0)
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE")
validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW")
@ -213,36 +234,36 @@ var _ = Describe("Capturehead", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 0, 0)
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, "0x83154c692b9cce50bdf56af5a933da0a020ed7ff809a6a8236301094c7f25276", "0xd74b1c60423651624de6bb301ac25808951c167ba6ecdd9b2e79b4315aee8202", "/blocks/QHVAEQRQPA2DGOJSGM3TEYZVMY3GKMZZGQ4TSZJTGFRGMOJSGQZTQODCGU4DCNJWGM4TCMBTGE2DSZRQMY2TIZRYME2DKMZXG4ZWEMJYGAZDGMBR")
validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, "/blocks/QHVAEQRQPBRDMMRRGVRDKNRQGI3TGYLGGYZWKYZXMUYDCMJVG4ZGENRQMVRTCY3BGBRDAMRTGJTDQZTGGQ2GMY3EGRSWINJVMM3TKMRWMU4TMNDF")
})
})
Context("Correctly formatted Altair Test Blocks", Label("now"), func() {
Context("Correctly formatted Altair Test Blocks", func() {
It("Should turn it into a struct successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry, 0, 0)
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry, 1, 0, 0)
bc = setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage, 74240, maxRetry, 0, 0)
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage, 74240, maxRetry, 1, 0, 0)
})
})
Context("Correctly formatted Phase0 Test Blocks", Label("now"), func() {
Context("Correctly formatted Phase0 Test Blocks", func() {
It("Should turn it into a struct successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 0, 0)
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0)
bc = setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 0, 0)
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 1, 0, 0)
})
})
@ -251,8 +272,8 @@ var _ = Describe("Capturehead", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 0, 0)
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 0, 0)
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0)
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 0, 0)
})
})
Context("Two consecutive blocks with a bad parent", func() {
@ -260,8 +281,8 @@ var _ = Describe("Capturehead", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 0, 0)
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 1)
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0)
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 1, 1)
})
})
Context("Phase 0: We have a correctly formated SSZ SignedBeaconBlock and BeaconState", func() {
@ -274,29 +295,72 @@ var _ = Describe("Capturehead", func() {
testSszRoot(BeaconNodeTester.TestEvents["2375703"])
})
})
//Context("A single incorrectly formatted head message", func() {
// It("Should create an error, maybe also add the projected slot to the knownGaps table......")
// If it can unmarshal the head add it to knownGaps
//})
//Context("An incorrectly formatted message sandwiched between correctly formatted messages", func() {
// It("Should create an error, maybe also add the projected slot to the knownGaps table......")
//})
// Context("When there is a skipped slot", func() {
// It("Should indicate that the slot was skipped")
// })
// Context("When the slot is not properly served", func() {
// It("Should return an error, and add the slot to the knownGaps table.")
//Context("When there is a skipped slot", func() {
// It("Should indicate that the slot was skipped", func() {
// })
//})
// Context("With gaps in between head slots", func() {
// It("Should add the slots in between to the knownGaps table")
// })
// Context("With the previousBlockHash not matching the parentBlockHash", func() {
// It("Should recognize the reorg and add the previous slot to knownGaps table.")
// })
// Context("Out of order", func() {
// It("Not sure what it should do....")
// })
Context("When the proper SSZ objects are not served", Label("now"), func() {
It("Should return an error, and add the slot to the knownGaps table.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "101")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["102-wrong-ssz-1"].HeadMessage, 3, maxRetry, 0, 1, 0)
knownGapCount := countKnownGapsTable(bc.Db)
Expect(knownGapCount).To(Equal(1))
start, end := queryKnownGaps(bc.Db, "102", "102")
Expect(start).To(Equal(102))
Expect(end).To(Equal(102))
})
})
})
Describe("Known Gaps Scenario", Label("unit", "behavioral"), func() {
Context("There is a gap at start up within one incrementing range.", func() {
It("Should add only a single entry to the knownGaps table.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "10")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testKnownGapsMessages(bc, 100, 1, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage)
start, end := queryKnownGaps(bc.Db, "11", "99")
Expect(start).To(Equal(11))
Expect(end).To(Equal(99))
})
})
Context("There is a gap at start up spanning multiple incrementing range.", func() {
It("Should add multiple entries to the knownGaps table.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "5")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testKnownGapsMessages(bc, 10, 10, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage)
start, end := queryKnownGaps(bc.Db, "6", "16")
Expect(start).To(Equal(6))
Expect(end).To(Equal(16))
start, end = queryKnownGaps(bc.Db, "96", "99")
Expect(start).To(Equal(96))
Expect(end).To(Equal(99))
})
})
Context("Gaps between two head messages", func() {
It("Should add the slots in-between", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.testKnownGapsMessages(bc, 1000000, 3, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].HeadMessage)
start, end := queryKnownGaps(bc.Db, "101", "1000101")
Expect(start).To(Equal(101))
Expect(end).To(Equal(1000101))
start, end = queryKnownGaps(bc.Db, "2000101", "2375702")
Expect(start).To(Equal(2000101))
Expect(end).To(Equal(2375702))
})
})
})
Describe("ReOrg Scenario", Label("unit", "behavioral"), func() {
@ -305,7 +369,6 @@ var _ = Describe("Capturehead", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
//BeaconNodeTester.testMultipleHead(bc, TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, 10)
BeaconNodeTester.testMultipleHead(bc, TestEvents["2375703"].HeadMessage, TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry)
})
})
@ -333,11 +396,6 @@ var _ = Describe("Capturehead", func() {
BeaconNodeTester.testMultipleReorgs(bc, TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry)
})
})
//Context("Reorg slot in not already in the DB", func() {
// It("Should simply have the correct slot in the DB.")
// Add to knowngaps
//})
})
})
@ -413,7 +471,7 @@ func validateBeaconState(bc *beaconclient.BeaconClient, headMessage beaconclient
}
// Wrapper function to send a head message to the beaconclient
func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxRetry int) {
func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxRetry int, expectedSuccessfulInserts uint64) {
data, err := json.Marshal(head)
Expect(err).ToNot(HaveOccurred())
@ -426,14 +484,14 @@ func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxR
Retry: []byte{},
}
curRetry := 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts) != startInserts+1 {
for atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts) != startInserts+expectedSuccessfulInserts {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
log.WithFields(log.Fields{
"startInsert": startInserts,
"currentValue": atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts),
}).Error("HeadTracking Insert wasnt incremented properly.")
}).Error("HeadTracking Insert wasn't incremented properly.")
Fail("Too many retries have occurred.")
}
}
@ -472,6 +530,26 @@ func queryDbBeaconState(db sql.Database, querySlot string, queryStateRoot string
return slot, stateRoot, mh_key
}
// Count the entries in the knownGaps table.
func countKnownGapsTable(db sql.Database) int {
var count int
sqlStatement := "SELECT COUNT(*) FROM ethcl.known_gaps"
err := db.QueryRow(context.Background(), sqlStatement).Scan(&count)
Expect(err).ToNot(HaveOccurred())
return count
}
// Return the start and end slot
func queryKnownGaps(db sql.Database, queryStartGap string, QueryEndGap string) (int, int) {
sqlStatement := `SELECT start_slot, end_slot FROM ethcl.known_gaps WHERE start_slot=$1 AND end_slot=$2;`
var startGap, endGap int
row := db.QueryRow(context.Background(), sqlStatement, queryStartGap, QueryEndGap)
err := row.Scan(&startGap, &endGap)
Expect(err).ToNot(HaveOccurred())
return startGap, endGap
}
// A function that will remove all entries from the ethcl tables for you.
func clearEthclDbTables(db sql.Database) {
deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.known_gaps;"}
@ -681,9 +759,9 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
time.Sleep(1 * time.Second)
log.Info("Sending Phase0 Messages to BeaconClient")
sendHeadMessage(bc, firstHead, maxRetry)
sendHeadMessage(bc, secondHead, maxRetry)
sendHeadMessage(bc, thirdHead, maxRetry)
sendHeadMessage(bc, firstHead, maxRetry, 1)
sendHeadMessage(bc, secondHead, maxRetry, 2)
sendHeadMessage(bc, thirdHead, maxRetry, 2)
curRetry := 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 2 {
@ -721,7 +799,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
Fail("Too many retries have occured.")
Fail("Too many retries have occurred.")
}
}
@ -738,17 +816,32 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
}
// A test to validate a single block was processed correctly
func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedKnownGaps uint64, expectedReorgs uint64) {
func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) {
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
time.Sleep(1 * time.Second)
sendHeadMessage(bc, head, maxRetry)
if bc.Metrics.HeadTrackingKnownGaps != expectedKnownGaps {
Fail(fmt.Sprintf("Wrong gap metrics, got: %d, wanted %d", bc.Metrics.HeadTrackingKnownGaps, expectedKnownGaps))
sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert)
curRetry := 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingKnownGaps) != expectedKnownGaps {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
Fail(fmt.Sprintf("Wrong gap metrics, got: %d, wanted %d", bc.Metrics.HeadTrackingKnownGaps, expectedKnownGaps))
}
}
if bc.Metrics.HeadTrackingReorgs != expectedReorgs {
Fail(fmt.Sprintf("Wrong reorg metrics, got: %d, wanted %d", bc.Metrics.HeadTrackingKnownGaps, expectedKnownGaps))
curRetry = 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != expectedReorgs {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
Fail(fmt.Sprintf("Wrong reorg metrics, got: %d, wanted %d", bc.Metrics.HeadTrackingKnownGaps, expectedKnownGaps))
}
}
if expectedSuccessInsert > 0 {
validateSlot(bc, head, epoch, "proposed")
}
validateSlot(bc, head, epoch, "proposed")
}
// A test that ensures that if two HeadMessages occur for a single slot they are marked
@ -757,8 +850,8 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
time.Sleep(1 * time.Second)
sendHeadMessage(bc, firstHead, maxRetry)
sendHeadMessage(bc, secondHead, maxRetry)
sendHeadMessage(bc, firstHead, maxRetry, 1)
sendHeadMessage(bc, secondHead, maxRetry, 1)
curRetry := 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 1 {
@ -778,6 +871,34 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH
validateSlot(bc, secondHead, epoch, "proposed")
}
// A test that ensures that if two HeadMessages occur for a single slot they are marked
// as proposed and forked correctly.
func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) {
go bc.CaptureHead(tableIncrement)
time.Sleep(1 * time.Second)
for _, headMsg := range msg {
sendHeadMessage(bc, headMsg, maxRetry, 1)
}
curRetry := 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingKnownGaps) != expectedEntries {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
Fail("Too many retries have occurred.")
}
}
log.Info("Checking to make sure we have the expected number of entries in the knownGaps table.")
knownGapCount := countKnownGapsTable(bc.Db)
Expect(knownGapCount).To(Equal(int(expectedEntries)))
if atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 0 {
Fail("We found reorgs when we didn't expect it")
}
}
// This function will make sure we are properly able to get the SszRoot of the SignedBeaconBlock and the BeaconState.
func testSszRoot(msg Message) {
state, vm, err := readBeaconState(msg.BeaconState)

View File

@ -322,7 +322,7 @@ func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot
EntryError: entryError.Error(),
EntryProcess: entryProcess,
}
upsertKnownGaps(db, kgModel)
upsertKnownGaps(db, kgModel, metric)
} else {
totalSlots := endSlot - startSlot
var chunks int
@ -347,15 +347,14 @@ func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot
EntryError: entryError.Error(),
EntryProcess: entryProcess,
}
upsertKnownGaps(db, kgModel)
upsertKnownGaps(db, kgModel, metric)
}
}
metric.IncrementHeadTrackingKnownGaps(1)
}
// A function to upsert a single entry to the ethcl.known_gaps table.
func upsertKnownGaps(db sql.Database, knModel DbKnownGaps) {
func upsertKnownGaps(db sql.Database, knModel DbKnownGaps, metric *BeaconClientMetrics) {
_, err := db.Exec(context.Background(), UpsertKnownGapsStmt, knModel.StartSlot, knModel.EndSlot,
knModel.CheckedOut, knModel.ReprocessingError, knModel.EntryError, knModel.EntryProcess)
if err != nil {
@ -369,6 +368,7 @@ func upsertKnownGaps(db sql.Database, knModel DbKnownGaps) {
"startSlot": knModel.StartSlot,
"endSlot": knModel.EndSlot,
}).Warn("A new gap has been added to the ethcl.known_gaps table.")
metric.IncrementHeadTrackingKnownGaps(1)
}
// A function to write the gap between the highest slot in the DB and the first processed slot.
@ -385,7 +385,7 @@ func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot int, metric
}).Fatal("Unable to get convert max block from DB to int. We must close the application or we might have undetected gaps.")
}
if maxSlot != firstSlot-1 {
writeKnownGaps(db, tableIncrement, maxSlot, firstSlot-1, fmt.Errorf(""), "startup", metric)
writeKnownGaps(db, tableIncrement, maxSlot+1, firstSlot-1, fmt.Errorf(""), "startup", metric)
}
}

View File

@ -18,7 +18,7 @@ var (
// This function will capture all the SSE events for a given SseEvents object.
// When new messages come in, it will ensure that they are decoded into JSON.
// If any errors occur, it log the error information.
func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) {
func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMetricInc func(uint64)) {
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
@ -55,6 +55,7 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) {
"msg": headErr.msg,
},
).Error("Unable to handle event.")
errMetricInc(1)
}
}
}
@ -77,6 +78,6 @@ func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan
// Capture all of the event topics.
func (bc *BeaconClient) captureEventTopic() {
log.Info("We are capturing all SSE events")
go handleIncomingSseEvent(bc.HeadTracking)
go handleIncomingSseEvent(bc.ReOrgTracking)
go handleIncomingSseEvent(bc.HeadTracking, bc.Metrics.IncrementHeadError)
go handleIncomingSseEvent(bc.ReOrgTracking, bc.Metrics.IncrementHeadReorgError)
}

View File

@ -16,8 +16,20 @@ func (m *BeaconClientMetrics) IncrementHeadTrackingReorgs(inc uint64) {
atomic.AddUint64(&m.HeadTrackingReorgs, inc)
}
// Wrapper function to increment reorgs. If we want to use mutexes later we can easily update all
// Wrapper function to increment known gaps. If we want to use mutexes later we can easily update all
// occurrences here.
func (m *BeaconClientMetrics) IncrementHeadTrackingKnownGaps(inc uint64) {
atomic.AddUint64(&m.HeadTrackingKnownGaps, inc)
}
// Wrapper function to increment head errors. If we want to use mutexes later we can easily update all
// occurrences here.
func (m *BeaconClientMetrics) IncrementHeadError(inc uint64) {
atomic.AddUint64(&m.HeadError, inc)
}
// Wrapper function to increment reorg errors. If we want to use mutexes later we can easily update all
// occurrences here.
func (m *BeaconClientMetrics) IncrementHeadReorgError(inc uint64) {
atomic.AddUint64(&m.HeadReorgError, inc)
}

View File

@ -96,6 +96,7 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
if err := g.Wait(); err != nil {
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot", ps.Metrics)
return err
}
if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" {
@ -234,7 +235,7 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
} else if previousSlot+1 != int(ps.FullBeaconState.Slot()) {
log.WithFields(log.Fields{
"previousSlot": previousSlot,
"currentSlot": ps.FullBeaconState.Slot,
"currentSlot": ps.FullBeaconState.Slot(),
}).Error("We skipped a few slots.")
writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot())-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics)
} else if previousBlockRoot != parentRoot {