From 91ed2fefe6f50951deebb4885cb7827fbbb919fa Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Mon, 23 May 2022 14:29:25 -0400 Subject: [PATCH] Seperate checking for new entries and locking them --- pkg/beaconclient/capturehead_test.go | 11 +-- pkg/beaconclient/capturehistoric_test.go | 3 +- pkg/beaconclient/databasewrite.go | 4 ++ pkg/beaconclient/healthcheck_test.go | 4 +- pkg/beaconclient/processhistoric.go | 89 +++++++++++++++++++++--- 5 files changed, 93 insertions(+), 18 deletions(-) diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go index fc31f11..459b2af 100644 --- a/pkg/beaconclient/capturehead_test.go +++ b/pkg/beaconclient/capturehead_test.go @@ -514,10 +514,9 @@ func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot strin var epoch, slot int var blockRoot, stateRoot, status string log.Debug("Starting to query the ethcl.slots table, ", querySlot, " ", queryBlockRoot) - row := db.QueryRow(context.Background(), sqlStatement, querySlot, queryBlockRoot) - log.Debug("Querying the ethcl.slots table complete") - err := row.Scan(&epoch, &slot, &blockRoot, &stateRoot, &status) + err := db.QueryRow(context.Background(), sqlStatement, querySlot, queryBlockRoot).Scan(&epoch, &slot, &blockRoot, &stateRoot, &status) Expect(err).ToNot(HaveOccurred()) + log.Debug("Querying the ethcl.slots table complete") return epoch, slot, blockRoot, stateRoot, status } @@ -668,6 +667,7 @@ func (tbc TestBeaconNode) SetupBeaconNodeMock(TestEvents map[string]Message, pro return httpmock.NewBytesResponse(200, dat), nil }, ) + // Not needed but could be useful to have. blockRootUrl := `=~^` + protocol + "://" + address + ":" + strconv.Itoa(port) + "/eth/v1/beacon/blocks/" + `([^/]+)` + "/root" httpmock.RegisterResponder("GET", blockRootUrl, func(req *http.Request) (*http.Response, error) { @@ -683,6 +683,7 @@ func (tbc TestBeaconNode) SetupBeaconNodeMock(TestEvents map[string]Message, pro ) } +// Provide the Block root func (tbc TestBeaconNode) provideBlockRoot(slot string) ([]byte, error) { for _, val := range tbc.TestEvents { @@ -702,12 +703,12 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string for _, val := range tbc.TestEvents { if sszIdentifier == "state" { - if val.HeadMessage.Slot == slotIdentifier || val.HeadMessage.State == slotIdentifier { + if (val.HeadMessage.Slot == slotIdentifier && val.MimicConfig == nil) || val.HeadMessage.State == slotIdentifier { slotFile = val.BeaconState Message = val } } else if sszIdentifier == "block" { - if val.HeadMessage.Slot == slotIdentifier || val.HeadMessage.Block == slotIdentifier { + if (val.HeadMessage.Slot == slotIdentifier && val.MimicConfig == nil) || val.HeadMessage.Block == slotIdentifier { slotFile = val.SignedBeaconBlock Message = val } diff --git a/pkg/beaconclient/capturehistoric_test.go b/pkg/beaconclient/capturehistoric_test.go index ff1e0a2..23a6785 100644 --- a/pkg/beaconclient/capturehistoric_test.go +++ b/pkg/beaconclient/capturehistoric_test.go @@ -24,7 +24,8 @@ var _ = Describe("Capturehistoric", func() { BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 101, 10) BeaconNodeTester.runBatchProcess(bc, 2, 100, 101, 0, 0) - //validateSlot(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, "proposed") + time.Sleep(2 * time.Second) + validateSlot(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, "proposed") //validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE") //validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW") diff --git a/pkg/beaconclient/databasewrite.go b/pkg/beaconclient/databasewrite.go index cc67dbc..d044f88 100644 --- a/pkg/beaconclient/databasewrite.go +++ b/pkg/beaconclient/databasewrite.go @@ -54,6 +54,10 @@ VALUES ($1, $2) ON CONFLICT (key) DO NOTHING` CheckProposedStmt string = `SELECT slot, block_root FROM ethcl.slots WHERE slot=$1 AND block_root=$2;` + // Used to get a single slot from the table if it exists + QueryBySlotStmt string = `SELECT slot + FROM ethcl.slots + WHERE slot=$1` // Statement to insert known_gaps. We don't pass in timestamp, we let the server take care of that one. UpsertKnownGapsStmt string = ` INSERT INTO ethcl.known_gaps (start_slot, end_slot, checked_out, reprocessing_error, entry_error, entry_process) diff --git a/pkg/beaconclient/healthcheck_test.go b/pkg/beaconclient/healthcheck_test.go index 8052856..06ba2dd 100644 --- a/pkg/beaconclient/healthcheck_test.go +++ b/pkg/beaconclient/healthcheck_test.go @@ -25,8 +25,8 @@ import ( var _ = Describe("Healthcheck", func() { var ( - BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 10, 5052) - errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 10, 1010) + BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10) + errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10) ) Describe("Connecting to the lighthouse client", Label("integration"), func() { Context("When the client is running", func() { diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index b4d6706..7a853ce 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -32,11 +32,14 @@ import ( var ( // Get a single highest priority and non-checked out row. - getBpEntryStmt string = `SELECT start_slot, end_slot FROM ethcl.historic_process + getHpEntryStmt string = `SELECT start_slot, end_slot FROM ethcl.historic_process WHERE checked_out=false ORDER BY priority ASC LIMIT 1;` - lockBpEntryStmt string = `UPDATE ethcl.historic_process + // Used to periodically check to see if there is a new entry in the ethcl_historic_process table. + checkHpEntryStmt string = `INSERT INTO ethcl.historic_process (start_slot, end_slot) VALUES ($1, $2) ON CONFLICT (start_slot, end_slot) DO NOTHING;` + // Used to get the highest priority row that is not checked out, and to check it out within the ethcl.historic_process table. + lockHpEntryStmt string = `UPDATE ethcl.historic_process SET checked_out=true WHERE start_slot=$1 AND end_slot=$2;` deleteSlotsEntryStmt string = `DELETE FROM ethcl.historic_process @@ -50,18 +53,19 @@ type historicProcessing struct { // Get a single row of historical slots from the table. func (hp historicProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error { - return getBatchProcessRow(hp.db, getBpEntryStmt, lockBpEntryStmt, slotCh) + return getBatchProcessRow(hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh) } // Remove the table entry. func (hp historicProcessing) removeTableEntry(processCh <-chan slotsToProcess) error { - return removeRowPostProcess(hp.db, processCh, deleteSlotsEntryStmt) + return removeRowPostProcess(hp.db, processCh, QueryBySlotStmt, deleteSlotsEntryStmt) } // Remove the table entry. func (hp historicProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) { for { errMs := <-errMessages + loghelper.LogSlotError(strconv.Itoa(errMs.slot), errMs.err) writeKnownGaps(hp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, hp.metrics) } } @@ -86,19 +90,42 @@ func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, // It also locks the row by updating the checked_out column. // The statement for getting the start_slot and end_slot must be provided. // The statement for "locking" the row must also be provided. -func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess) []error { +func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess) []error { errCount := make([]error, 0) for len(errCount) < 5 { + processRow, err := db.Exec(context.Background(), checkNewRowsStmt) + if err != nil { + errCount = append(errCount, err) + } + row, err := processRow.RowsAffected() + if err != nil { + errCount = append(errCount, err) + } + if row < 1 { + time.Sleep(100 * time.Millisecond) + continue + } + log.Debug("Found a row, going to start processing.") + log.WithFields(log.Fields{ + "ErrCount": errCount, + }).Debug("The ErrCounter") ctx := context.Background() // Setup TX tx, err := db.Begin(ctx) if err != nil { + loghelper.LogError(err).Error("We are unable to Begin a SQL transaction") errCount = append(errCount, err) continue } - defer tx.Rollback(ctx) + defer func() { + err := tx.Rollback(ctx) + if err != nil { + loghelper.LogError(err).Error("We were unable to Rollback a transaction") + errCount = append(errCount, err) + } + }() // Query the DB for slots. sp := slotsToProcess{} @@ -148,17 +175,59 @@ func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkOutRow } slotCh <- sp } + log.WithFields(log.Fields{ + "ErrCount": errCount, + }).Error("The ErrCounter") return errCount } // After a row has been processed it should be removed from its appropriate table. -func removeRowPostProcess(db sql.Database, processCh <-chan slotsToProcess, removeStmt string) error { +func removeRowPostProcess(db sql.Database, processCh <-chan slotsToProcess, checkProcessedStmt, removeStmt string) error { + errCh := make(chan error, 0) for { slots := <-processCh // Make sure the start and end slot exist in the slots table. - _, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), slots.endSlot) - if err != nil { - return err + go func() { + finishedProcess := false + for finishedProcess == false { + isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot)) + if err != nil { + errCh <- err + } + isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.endSlot)) + if err != nil { + errCh <- err + } + if isStartProcess && isEndProcess { + finishedProcess = true + } + } + + _, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot)) + if err != nil { + errCh <- err + } + + }() + if len(errCh) != 0 { + return <-errCh } } } + +// A helper function to check to see if the slot is processed. +func isSlotProcessed(db sql.Database, checkProcessStmt string, slot string) (bool, error) { + processRow, err := db.Exec(context.Background(), checkProcessStmt, slot) + if err != nil { + return false, err + } + row, err := processRow.RowsAffected() + if err != nil { + return false, err + } + + if row > 0 { + return true, nil + } + return false, nil +}