Test the application for v1 release. #37
@ -27,8 +27,9 @@ var (
|
|||||||
|
|
||||||
// A structure utilized for keeping track of various metrics. Currently, mostly used in testing.
|
// A structure utilized for keeping track of various metrics. Currently, mostly used in testing.
|
||||||
type BeaconClientMetrics struct {
|
type BeaconClientMetrics struct {
|
||||||
HeadTrackingInserts uint64 // Number of head events we wrote to the DB.
|
HeadTrackingInserts uint64 // Number of head events we successfully wrote to the DB.
|
||||||
HeadTrackingReorgs uint64 // The number of reorg events written to the DB.
|
HeadTrackingReorgs uint64 // Number of reorg events we successfully wrote to the DB.
|
||||||
|
HeadTrackingKnownGaps uint64 // Number of known_gaps we successfully wrote to the DB.
|
||||||
}
|
}
|
||||||
|
|
||||||
// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying.
|
// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying.
|
||||||
|
@ -57,7 +57,7 @@ var _ = Describe("Capturehead", func() {
|
|||||||
dbDriver string = "pgx"
|
dbDriver string = "pgx"
|
||||||
dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42"
|
dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42"
|
||||||
knownGapsTableIncrement int = 100000
|
knownGapsTableIncrement int = 100000
|
||||||
maxRetry int = 30
|
maxRetry int = 60
|
||||||
)
|
)
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
@ -106,6 +106,20 @@ var _ = Describe("Capturehead", func() {
|
|||||||
SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
|
SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
|
||||||
BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
|
BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
|
||||||
},
|
},
|
||||||
|
"101": {
|
||||||
|
HeadMessage: beaconclient.Head{
|
||||||
|
Slot: "101",
|
||||||
|
Block: "0xabe1a972e512182d04f0d4a5c9c25f9ee57c2e9d0ff3f4c4c82fd42d13d31083",
|
||||||
|
State: "0xcb04aa2edbf13c7bb7e7bd9b621ced6832e0075e89147352eac3019a824ce847",
|
||||||
|
CurrentDutyDependentRoot: "",
|
||||||
|
PreviousDutyDependentRoot: "",
|
||||||
|
EpochTransition: false,
|
||||||
|
ExecutionOptimistic: false,
|
||||||
|
},
|
||||||
|
TestNotes: "An easy to process Phase 0 block",
|
||||||
|
SignedBeaconBlock: filepath.Join("ssz-data", "101", "signed-beacon-block.ssz"),
|
||||||
|
BeaconState: filepath.Join("ssz-data", "101", "beacon-state.ssz"),
|
||||||
|
},
|
||||||
"2375703-dummy": {
|
"2375703-dummy": {
|
||||||
HeadMessage: beaconclient.Head{
|
HeadMessage: beaconclient.Head{
|
||||||
Slot: "2375703",
|
Slot: "2375703",
|
||||||
@ -170,16 +184,27 @@ var _ = Describe("Capturehead", func() {
|
|||||||
// We might also want to add an integration test that will actually process a single event, then end.
|
// We might also want to add an integration test that will actually process a single event, then end.
|
||||||
// This will help us know that our models match that actual data being served from the beacon node.
|
// This will help us know that our models match that actual data being served from the beacon node.
|
||||||
|
|
||||||
Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() {
|
Describe("Receiving New Head SSE messages", Label("unit", "behavioral", "dry"), func() {
|
||||||
Context("Correctly formatted Phase0 Block", func() {
|
Context("Correctly formatted Phase0 Block", func() {
|
||||||
It("Should turn it into a struct successfully.", func() {
|
It("Should turn it into a struct successfully.", func() {
|
||||||
BeaconNodeTester.testProcessBlock(BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry)
|
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||||
|
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||||
|
defer httpmock.DeactivateAndReset()
|
||||||
|
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry)
|
||||||
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
Context("Correctly formatted Altair Block", func() {
|
Context("Correctly formatted Altair Block", func() {
|
||||||
It("Should turn it into a struct successfully.", func() {
|
It("Should turn it into a struct successfully.", func() {
|
||||||
BeaconNodeTester.testProcessBlock(BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry)
|
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
||||||
|
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||||
|
defer httpmock.DeactivateAndReset()
|
||||||
|
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
Context("Two consecutive Block", func() {
|
||||||
|
It("Should handle both blocks correctly, without any reorgs or known_gaps", func() {
|
||||||
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
//Context("A single incorrectly formatted head message", func() {
|
//Context("A single incorrectly formatted head message", func() {
|
||||||
@ -210,22 +235,35 @@ var _ = Describe("Capturehead", func() {
|
|||||||
Describe("ReOrg Scenario", Label("unit", "behavioral"), func() {
|
Describe("ReOrg Scenario", Label("unit", "behavioral"), func() {
|
||||||
Context("Altair: Multiple head messages for the same slot.", func() {
|
Context("Altair: Multiple head messages for the same slot.", func() {
|
||||||
It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
|
It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
|
||||||
BeaconNodeTester.testMultipleHead(TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry)
|
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
||||||
|
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||||
|
defer httpmock.DeactivateAndReset()
|
||||||
|
//BeaconNodeTester.testMultipleHead(bc, TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, 10)
|
||||||
|
BeaconNodeTester.testMultipleHead(bc, TestEvents["2375703"].HeadMessage, TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
Context("Phase0: Multiple head messages for the same slot.", func() {
|
Context("Phase0: Multiple head messages for the same slot.", func() {
|
||||||
It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
|
It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
|
||||||
BeaconNodeTester.testMultipleHead(TestEvents["100-dummy"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry)
|
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||||
|
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||||
|
defer httpmock.DeactivateAndReset()
|
||||||
|
BeaconNodeTester.testMultipleHead(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
Context("Phase 0: Multiple reorgs have occurred on this slot", Label("new"), func() {
|
Context("Phase 0: Multiple reorgs have occurred on this slot", func() {
|
||||||
It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
|
It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
|
||||||
BeaconNodeTester.testMultipleReorgs(TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry)
|
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||||
|
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||||
|
defer httpmock.DeactivateAndReset()
|
||||||
|
BeaconNodeTester.testMultipleReorgs(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
Context("Altair: Multiple reorgs have occurred on this slot", Label("new"), func() {
|
Context("Altair: Multiple reorgs have occurred on this slot", Label("new"), func() {
|
||||||
It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
|
It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
|
||||||
BeaconNodeTester.testMultipleReorgs(TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry)
|
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
||||||
|
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||||
|
defer httpmock.DeactivateAndReset()
|
||||||
|
BeaconNodeTester.testMultipleReorgs(bc, TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
//Context("Reorg slot in not already in the DB", func() {
|
//Context("Reorg slot in not already in the DB", func() {
|
||||||
@ -256,13 +294,16 @@ type Config struct {
|
|||||||
|
|
||||||
// Must run before each test. We can't use the beforeEach because of the way
|
// Must run before each test. We can't use the beforeEach because of the way
|
||||||
// Gingko treats race conditions.
|
// Gingko treats race conditions.
|
||||||
func setUpTest(config Config) *beaconclient.BeaconClient {
|
func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient {
|
||||||
bc := *beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port)
|
bc := *beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port)
|
||||||
db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver)
|
db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
// Drop all records from the DB.
|
// Drop all records from the DB.
|
||||||
clearEthclDbTables(db)
|
clearEthclDbTables(db)
|
||||||
|
|
||||||
|
// Add an slot to the ethcl.slots table so it we can control how known_gaps are handled.
|
||||||
|
writeSlot(db, maxSlot)
|
||||||
bc.Db = db
|
bc.Db = db
|
||||||
|
|
||||||
return &bc
|
return &bc
|
||||||
@ -298,7 +339,11 @@ func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxR
|
|||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
curRetry = curRetry + 1
|
curRetry = curRetry + 1
|
||||||
if curRetry == maxRetry {
|
if curRetry == maxRetry {
|
||||||
Fail(" Too many retries have occured.")
|
log.WithFields(log.Fields{
|
||||||
|
"startInsert": startInserts,
|
||||||
|
"currentValue": atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts),
|
||||||
|
}).Error("HeadTracking Insert wasnt incremented properly.")
|
||||||
|
Fail("Too many retries have occurred.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -321,7 +366,12 @@ func clearEthclDbTables(db sql.Database) {
|
|||||||
_, err := db.Exec(context.Background(), queries)
|
_, err := db.Exec(context.Background(), queries)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write an entry to the ethcl.slots table with just a slot number
|
||||||
|
func writeSlot(db sql.Database, slot string) {
|
||||||
|
_, err := db.Exec(context.Background(), beaconclient.UpsertSlotsStmt, "0", slot, "", "", "")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
}
|
}
|
||||||
|
|
||||||
// An object that is used to aggregate test functions. Test functions are needed because we need to
|
// An object that is used to aggregate test functions. Test functions are needed because we need to
|
||||||
@ -440,11 +490,7 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string
|
|||||||
|
|
||||||
// Helper function to test three reorg messages. There are going to be many functions like this,
|
// Helper function to test three reorg messages. There are going to be many functions like this,
|
||||||
// Because we need to test the same logic for multiple phases.
|
// Because we need to test the same logic for multiple phases.
|
||||||
func (tbc TestBeaconNode) testMultipleReorgs(firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) {
|
func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) {
|
||||||
bc := setUpTest(tbc.TestConfig)
|
|
||||||
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
|
|
||||||
defer httpmock.DeactivateAndReset()
|
|
||||||
|
|
||||||
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
|
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
@ -458,11 +504,11 @@ func (tbc TestBeaconNode) testMultipleReorgs(firstHead beaconclient.Head, second
|
|||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
curRetry = curRetry + 1
|
curRetry = curRetry + 1
|
||||||
if curRetry == maxRetry {
|
if curRetry == maxRetry {
|
||||||
Fail(" Too many retries have occured.")
|
Fail(" Too many retries have occurred.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Checking Phase0 to make sure the fork was marked properly.")
|
log.Info("Checking to make sure the fork was marked properly.")
|
||||||
validateSlot(bc, &firstHead, epoch, "forked")
|
validateSlot(bc, &firstHead, epoch, "forked")
|
||||||
validateSlot(bc, &secondHead, epoch, "forked")
|
validateSlot(bc, &secondHead, epoch, "forked")
|
||||||
validateSlot(bc, &thirdHead, epoch, "proposed")
|
validateSlot(bc, &thirdHead, epoch, "proposed")
|
||||||
@ -493,6 +539,10 @@ func (tbc TestBeaconNode) testMultipleReorgs(firstHead beaconclient.Head, second
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if bc.Metrics.HeadTrackingKnownGaps != 0 {
|
||||||
|
Fail("We found gaps when processing a single block")
|
||||||
|
}
|
||||||
|
|
||||||
log.Info("Make sure the forks were properly updated!")
|
log.Info("Make sure the forks were properly updated!")
|
||||||
|
|
||||||
validateSlot(bc, &firstHead, epoch, "forked")
|
validateSlot(bc, &firstHead, epoch, "forked")
|
||||||
@ -502,39 +552,44 @@ func (tbc TestBeaconNode) testMultipleReorgs(firstHead beaconclient.Head, second
|
|||||||
}
|
}
|
||||||
|
|
||||||
// A test to validate a single block was processed correctly
|
// A test to validate a single block was processed correctly
|
||||||
func (tbc TestBeaconNode) testProcessBlock(head beaconclient.Head, epoch int, maxRetry int) {
|
func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int) {
|
||||||
bc := setUpTest(tbc.TestConfig)
|
|
||||||
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
|
|
||||||
defer httpmock.DeactivateAndReset()
|
|
||||||
|
|
||||||
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
|
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
sendHeadMessage(bc, head, maxRetry)
|
sendHeadMessage(bc, head, maxRetry)
|
||||||
|
if bc.Metrics.HeadTrackingKnownGaps != 0 {
|
||||||
|
Fail("We found gaps when processing a single block")
|
||||||
|
}
|
||||||
|
if bc.Metrics.HeadTrackingReorgs != 0 {
|
||||||
|
Fail("We found reorgs when processing a single block")
|
||||||
|
}
|
||||||
validateSlot(bc, &head, epoch, "proposed")
|
validateSlot(bc, &head, epoch, "proposed")
|
||||||
}
|
}
|
||||||
|
|
||||||
// A test that ensures that if two HeadMessages occur for a single slot they are marked
|
// A test that ensures that if two HeadMessages occur for a single slot they are marked
|
||||||
// as proposed and forked correctly.
|
// as proposed and forked correctly.
|
||||||
func (tbc TestBeaconNode) testMultipleHead(firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) {
|
func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) {
|
||||||
bc := setUpTest(tbc.TestConfig)
|
|
||||||
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
|
|
||||||
defer httpmock.DeactivateAndReset()
|
|
||||||
|
|
||||||
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
|
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
sendHeadMessage(bc, firstHead, maxRetry)
|
sendHeadMessage(bc, firstHead, maxRetry)
|
||||||
|
log.Info("First Head has been processed.")
|
||||||
sendHeadMessage(bc, secondHead, maxRetry)
|
sendHeadMessage(bc, secondHead, maxRetry)
|
||||||
|
log.Info("Second Head has been processed.")
|
||||||
|
|
||||||
curRetry := 0
|
curRetry := 0
|
||||||
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 1 {
|
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 1 {
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
curRetry = curRetry + 1
|
curRetry = curRetry + 1
|
||||||
if curRetry == maxRetry {
|
if curRetry == maxRetry {
|
||||||
Fail(" Too many retries have occured.")
|
Fail(" Too many retries have occurred.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if bc.Metrics.HeadTrackingKnownGaps != 0 {
|
||||||
|
Fail("We found gaps when processing a single block")
|
||||||
|
}
|
||||||
|
|
||||||
log.Info("Checking Altair to make sure the fork was marked properly.")
|
log.Info("Checking Altair to make sure the fork was marked properly.")
|
||||||
validateSlot(bc, &firstHead, epoch, "forked")
|
validateSlot(bc, &firstHead, epoch, "forked")
|
||||||
validateSlot(bc, &secondHead, epoch, "proposed")
|
validateSlot(bc, &secondHead, epoch, "proposed")
|
||||||
|
@ -127,8 +127,10 @@ func (dw *DatabaseWriter) prepareBeaconStateModel(slot int, stateRoot string) er
|
|||||||
|
|
||||||
// Write all the data for a given slot.
|
// Write all the data for a given slot.
|
||||||
func (dw *DatabaseWriter) writeFullSlot() error {
|
func (dw *DatabaseWriter) writeFullSlot() error {
|
||||||
// Add errors for each function call
|
|
||||||
// If an error occurs, write to knownGaps table.
|
// If an error occurs, write to knownGaps table.
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"slot": dw.DbSlots.Slot,
|
||||||
|
}).Debug("Starting to write to the DB.")
|
||||||
err := dw.writeSlots()
|
err := dw.writeSlots()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -231,12 +233,12 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *
|
|||||||
forkCount, err := updateForked(db, slot, latestBlockRoot)
|
forkCount, err := updateForked(db, slot, latestBlockRoot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.")
|
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.")
|
||||||
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
|
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics)
|
||||||
}
|
}
|
||||||
proposedCount, err := updateProposed(db, slot, latestBlockRoot)
|
proposedCount, err := updateProposed(db, slot, latestBlockRoot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.")
|
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.")
|
||||||
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
|
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics)
|
||||||
}
|
}
|
||||||
|
|
||||||
if forkCount > 0 {
|
if forkCount > 0 {
|
||||||
@ -257,19 +259,19 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *
|
|||||||
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
||||||
"proposedCount": proposedCount,
|
"proposedCount": proposedCount,
|
||||||
}).Error("Too many rows were marked as proposed!")
|
}).Error("Too many rows were marked as proposed!")
|
||||||
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
|
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics)
|
||||||
} else if proposedCount == 0 {
|
} else if proposedCount == 0 {
|
||||||
var count int
|
var count int
|
||||||
err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count)
|
err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.")
|
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.")
|
||||||
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
|
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics)
|
||||||
}
|
}
|
||||||
if count != 1 {
|
if count != 1 {
|
||||||
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
||||||
"proposedCount": count,
|
"proposedCount": count,
|
||||||
}).Warn("The proposed block was not marked as proposed...")
|
}).Warn("The proposed block was not marked as proposed...")
|
||||||
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
|
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics)
|
||||||
} else {
|
} else {
|
||||||
loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
|
loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
|
||||||
}
|
}
|
||||||
@ -311,7 +313,7 @@ func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64
|
|||||||
// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaos into
|
// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaos into
|
||||||
// smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would
|
// smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would
|
||||||
// have 10 entries as follows: 1-10, 11-20, etc...
|
// have 10 entries as follows: 1-10, 11-20, etc...
|
||||||
func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string) {
|
func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string, metric *BeaconClientMetrics) {
|
||||||
if endSlot-startSlot <= tableIncrement {
|
if endSlot-startSlot <= tableIncrement {
|
||||||
kgModel := DbKnownGaps{
|
kgModel := DbKnownGaps{
|
||||||
StartSlot: strconv.Itoa(startSlot),
|
StartSlot: strconv.Itoa(startSlot),
|
||||||
@ -349,6 +351,7 @@ func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot
|
|||||||
upsertKnownGaps(db, kgModel)
|
upsertKnownGaps(db, kgModel)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
metric.IncrementHeadTrackingKnownGaps(1)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -370,7 +373,7 @@ func upsertKnownGaps(db sql.Database, knModel DbKnownGaps) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// A function to write the gap between the highest slot in the DB and the first processed slot.
|
// A function to write the gap between the highest slot in the DB and the first processed slot.
|
||||||
func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot int) {
|
func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot int, metric *BeaconClientMetrics) {
|
||||||
var maxSlot int
|
var maxSlot int
|
||||||
err := db.QueryRow(context.Background(), QueryHighestSlotStmt).Scan(&maxSlot)
|
err := db.QueryRow(context.Background(), QueryHighestSlotStmt).Scan(&maxSlot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -382,7 +385,9 @@ func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot int) {
|
|||||||
"maxSlot": maxSlot,
|
"maxSlot": maxSlot,
|
||||||
}).Fatal("Unable to get convert max block from DB to int. We must close the application or we might have undetected gaps.")
|
}).Fatal("Unable to get convert max block from DB to int. We must close the application or we might have undetected gaps.")
|
||||||
}
|
}
|
||||||
writeKnownGaps(db, tableIncrement, maxSlot, firstSlot, fmt.Errorf(""), "startup")
|
if maxSlot != firstSlot-1 {
|
||||||
|
writeKnownGaps(db, tableIncrement, maxSlot, firstSlot-1, fmt.Errorf(""), "startup", metric)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// A quick helper function to calculate the epoch.
|
// A quick helper function to calculate the epoch.
|
||||||
|
@ -1,10 +1,15 @@
|
|||||||
package beaconclient
|
package beaconclient
|
||||||
|
|
||||||
import "sync/atomic"
|
import (
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
// Wrapper function to increment inserts. If we want to use mutexes later we can easily update all
|
// Wrapper function to increment inserts. If we want to use mutexes later we can easily update all
|
||||||
// occurrences here.
|
// occurrences here.
|
||||||
func (m *BeaconClientMetrics) IncrementHeadTrackingInserts(inc uint64) {
|
func (m *BeaconClientMetrics) IncrementHeadTrackingInserts(inc uint64) {
|
||||||
|
log.Info("Updating the insert ")
|
||||||
atomic.AddUint64(&m.HeadTrackingInserts, inc)
|
atomic.AddUint64(&m.HeadTrackingInserts, inc)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -13,3 +18,9 @@ func (m *BeaconClientMetrics) IncrementHeadTrackingInserts(inc uint64) {
|
|||||||
func (m *BeaconClientMetrics) IncrementHeadTrackingReorgs(inc uint64) {
|
func (m *BeaconClientMetrics) IncrementHeadTrackingReorgs(inc uint64) {
|
||||||
atomic.AddUint64(&m.HeadTrackingReorgs, inc)
|
atomic.AddUint64(&m.HeadTrackingReorgs, inc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wrapper function to increment reorgs. If we want to use mutexes later we can easily update all
|
||||||
|
// occurrences here.
|
||||||
|
func (m *BeaconClientMetrics) IncrementHeadTrackingKnownGaps(inc uint64) {
|
||||||
|
atomic.AddUint64(&m.HeadTrackingKnownGaps, inc)
|
||||||
|
}
|
||||||
|
@ -43,7 +43,7 @@ func (bc *BeaconClient) handleHead() {
|
|||||||
"lastProcessedSlot": bc.PreviousSlot,
|
"lastProcessedSlot": bc.PreviousSlot,
|
||||||
"errorMessages": errorSlots,
|
"errorMessages": errorSlots,
|
||||||
}).Warn("We added slots to the knownGaps table because we got bad head messages.")
|
}).Warn("We added slots to the knownGaps table because we got bad head messages.")
|
||||||
writeKnownGaps(bc.Db, bc.KnownGapTableIncrement, bc.PreviousSlot, bcSlotsPerEpoch+errorSlots, fmt.Errorf("Bad Head Messages"), "headProcessing")
|
writeKnownGaps(bc.Db, bc.KnownGapTableIncrement, bc.PreviousSlot, bcSlotsPerEpoch+errorSlots, fmt.Errorf("Bad Head Messages"), "headProcessing", bc.Metrics)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.")
|
log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.")
|
||||||
|
@ -89,24 +89,25 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
|
|||||||
})
|
})
|
||||||
|
|
||||||
if err := g.Wait(); err != nil {
|
if err := g.Wait(); err != nil {
|
||||||
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
|
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot", ps.Metrics)
|
||||||
}
|
}
|
||||||
|
|
||||||
if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" {
|
if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" {
|
||||||
writeStartUpGaps(db, knownGapsTableIncrement, ps.Slot)
|
writeStartUpGaps(db, knownGapsTableIncrement, ps.Slot, ps.Metrics)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get this object ready to write
|
// Get this object ready to write
|
||||||
blockRootEndpoint := serverAddress + BcBlockRootEndpoint(strconv.Itoa(ps.Slot))
|
blockRootEndpoint := serverAddress + BcBlockRootEndpoint(strconv.Itoa(ps.Slot))
|
||||||
dw, err := ps.createWriteObjects(blockRootEndpoint)
|
dw, err := ps.createWriteObjects(blockRootEndpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "blockRoot")
|
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "blockRoot", ps.Metrics)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Write the object to the DB.
|
// Write the object to the DB.
|
||||||
err = dw.writeFullSlot()
|
err = dw.writeFullSlot()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
|
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot", ps.Metrics)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle any reorgs or skipped slots.
|
// Handle any reorgs or skipped slots.
|
||||||
@ -221,14 +222,14 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
|
|||||||
"previousSlot": previousSlot,
|
"previousSlot": previousSlot,
|
||||||
"currentSlot": ps.FullBeaconState.Slot,
|
"currentSlot": ps.FullBeaconState.Slot,
|
||||||
}).Error("We skipped a few slots.")
|
}).Error("We skipped a few slots.")
|
||||||
writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot)-1, fmt.Errorf("Gaps during head processing"), "headGaps")
|
writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot)-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics)
|
||||||
} else if previousBlockRoot != parentRoot {
|
} else if previousBlockRoot != parentRoot {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"previousBlockRoot": previousBlockRoot,
|
"previousBlockRoot": previousBlockRoot,
|
||||||
"currentBlockParent": parentRoot,
|
"currentBlockParent": parentRoot,
|
||||||
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
|
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
|
||||||
writeReorgs(ps.Db, strconv.Itoa(previousSlot), parentRoot, ps.Metrics)
|
writeReorgs(ps.Db, strconv.Itoa(previousSlot), parentRoot, ps.Metrics)
|
||||||
writeKnownGaps(ps.Db, 1, ps.Slot-1, ps.Slot-1, fmt.Errorf("Incorrect Parent"), "processSlot")
|
writeKnownGaps(ps.Db, 1, ps.Slot-1, ps.Slot-1, fmt.Errorf("Incorrect Parent"), "processSlot", ps.Metrics)
|
||||||
} else {
|
} else {
|
||||||
log.Debug("Previous Slot and Current Slot are one distance from each other.")
|
log.Debug("Previous Slot and Current Slot are one distance from each other.")
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user