Seperate checking for new entries and locking them

This commit is contained in:
Abdul Rabbani 2022-05-23 14:29:25 -04:00
parent bf21c0ac35
commit 91ed2fefe6
5 changed files with 93 additions and 18 deletions

View File

@ -514,10 +514,9 @@ func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot strin
var epoch, slot int var epoch, slot int
var blockRoot, stateRoot, status string var blockRoot, stateRoot, status string
log.Debug("Starting to query the ethcl.slots table, ", querySlot, " ", queryBlockRoot) log.Debug("Starting to query the ethcl.slots table, ", querySlot, " ", queryBlockRoot)
row := db.QueryRow(context.Background(), sqlStatement, querySlot, queryBlockRoot) err := db.QueryRow(context.Background(), sqlStatement, querySlot, queryBlockRoot).Scan(&epoch, &slot, &blockRoot, &stateRoot, &status)
log.Debug("Querying the ethcl.slots table complete")
err := row.Scan(&epoch, &slot, &blockRoot, &stateRoot, &status)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
log.Debug("Querying the ethcl.slots table complete")
return epoch, slot, blockRoot, stateRoot, status return epoch, slot, blockRoot, stateRoot, status
} }
@ -668,6 +667,7 @@ func (tbc TestBeaconNode) SetupBeaconNodeMock(TestEvents map[string]Message, pro
return httpmock.NewBytesResponse(200, dat), nil return httpmock.NewBytesResponse(200, dat), nil
}, },
) )
// Not needed but could be useful to have.
blockRootUrl := `=~^` + protocol + "://" + address + ":" + strconv.Itoa(port) + "/eth/v1/beacon/blocks/" + `([^/]+)` + "/root" blockRootUrl := `=~^` + protocol + "://" + address + ":" + strconv.Itoa(port) + "/eth/v1/beacon/blocks/" + `([^/]+)` + "/root"
httpmock.RegisterResponder("GET", blockRootUrl, httpmock.RegisterResponder("GET", blockRootUrl,
func(req *http.Request) (*http.Response, error) { func(req *http.Request) (*http.Response, error) {
@ -683,6 +683,7 @@ func (tbc TestBeaconNode) SetupBeaconNodeMock(TestEvents map[string]Message, pro
) )
} }
// Provide the Block root
func (tbc TestBeaconNode) provideBlockRoot(slot string) ([]byte, error) { func (tbc TestBeaconNode) provideBlockRoot(slot string) ([]byte, error) {
for _, val := range tbc.TestEvents { for _, val := range tbc.TestEvents {
@ -702,12 +703,12 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string
for _, val := range tbc.TestEvents { for _, val := range tbc.TestEvents {
if sszIdentifier == "state" { if sszIdentifier == "state" {
if val.HeadMessage.Slot == slotIdentifier || val.HeadMessage.State == slotIdentifier { if (val.HeadMessage.Slot == slotIdentifier && val.MimicConfig == nil) || val.HeadMessage.State == slotIdentifier {
slotFile = val.BeaconState slotFile = val.BeaconState
Message = val Message = val
} }
} else if sszIdentifier == "block" { } else if sszIdentifier == "block" {
if val.HeadMessage.Slot == slotIdentifier || val.HeadMessage.Block == slotIdentifier { if (val.HeadMessage.Slot == slotIdentifier && val.MimicConfig == nil) || val.HeadMessage.Block == slotIdentifier {
slotFile = val.SignedBeaconBlock slotFile = val.SignedBeaconBlock
Message = val Message = val
} }

View File

@ -24,7 +24,8 @@ var _ = Describe("Capturehistoric", func() {
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 101, 10) BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 101, 10)
BeaconNodeTester.runBatchProcess(bc, 2, 100, 101, 0, 0) BeaconNodeTester.runBatchProcess(bc, 2, 100, 101, 0, 0)
//validateSlot(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, "proposed") time.Sleep(2 * time.Second)
validateSlot(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, "proposed")
//validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE") //validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE")
//validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW") //validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW")

View File

@ -54,6 +54,10 @@ VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
CheckProposedStmt string = `SELECT slot, block_root CheckProposedStmt string = `SELECT slot, block_root
FROM ethcl.slots FROM ethcl.slots
WHERE slot=$1 AND block_root=$2;` WHERE slot=$1 AND block_root=$2;`
// Used to get a single slot from the table if it exists
QueryBySlotStmt string = `SELECT slot
FROM ethcl.slots
WHERE slot=$1`
// Statement to insert known_gaps. We don't pass in timestamp, we let the server take care of that one. // Statement to insert known_gaps. We don't pass in timestamp, we let the server take care of that one.
UpsertKnownGapsStmt string = ` UpsertKnownGapsStmt string = `
INSERT INTO ethcl.known_gaps (start_slot, end_slot, checked_out, reprocessing_error, entry_error, entry_process) INSERT INTO ethcl.known_gaps (start_slot, end_slot, checked_out, reprocessing_error, entry_error, entry_process)

View File

@ -25,8 +25,8 @@ import (
var _ = Describe("Healthcheck", func() { var _ = Describe("Healthcheck", func() {
var ( var (
BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 10, 5052) BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10)
errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 10, 1010) errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10)
) )
Describe("Connecting to the lighthouse client", Label("integration"), func() { Describe("Connecting to the lighthouse client", Label("integration"), func() {
Context("When the client is running", func() { Context("When the client is running", func() {

View File

@ -32,11 +32,14 @@ import (
var ( var (
// Get a single highest priority and non-checked out row. // Get a single highest priority and non-checked out row.
getBpEntryStmt string = `SELECT start_slot, end_slot FROM ethcl.historic_process getHpEntryStmt string = `SELECT start_slot, end_slot FROM ethcl.historic_process
WHERE checked_out=false WHERE checked_out=false
ORDER BY priority ASC ORDER BY priority ASC
LIMIT 1;` LIMIT 1;`
lockBpEntryStmt string = `UPDATE ethcl.historic_process // Used to periodically check to see if there is a new entry in the ethcl_historic_process table.
checkHpEntryStmt string = `INSERT INTO ethcl.historic_process (start_slot, end_slot) VALUES ($1, $2) ON CONFLICT (start_slot, end_slot) DO NOTHING;`
// Used to get the highest priority row that is not checked out, and to check it out within the ethcl.historic_process table.
lockHpEntryStmt string = `UPDATE ethcl.historic_process
SET checked_out=true SET checked_out=true
WHERE start_slot=$1 AND end_slot=$2;` WHERE start_slot=$1 AND end_slot=$2;`
deleteSlotsEntryStmt string = `DELETE FROM ethcl.historic_process deleteSlotsEntryStmt string = `DELETE FROM ethcl.historic_process
@ -50,18 +53,19 @@ type historicProcessing struct {
// Get a single row of historical slots from the table. // Get a single row of historical slots from the table.
func (hp historicProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error { func (hp historicProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error {
return getBatchProcessRow(hp.db, getBpEntryStmt, lockBpEntryStmt, slotCh) return getBatchProcessRow(hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh)
} }
// Remove the table entry. // Remove the table entry.
func (hp historicProcessing) removeTableEntry(processCh <-chan slotsToProcess) error { func (hp historicProcessing) removeTableEntry(processCh <-chan slotsToProcess) error {
return removeRowPostProcess(hp.db, processCh, deleteSlotsEntryStmt) return removeRowPostProcess(hp.db, processCh, QueryBySlotStmt, deleteSlotsEntryStmt)
} }
// Remove the table entry. // Remove the table entry.
func (hp historicProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) { func (hp historicProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) {
for { for {
errMs := <-errMessages errMs := <-errMessages
loghelper.LogSlotError(strconv.Itoa(errMs.slot), errMs.err)
writeKnownGaps(hp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, hp.metrics) writeKnownGaps(hp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, hp.metrics)
} }
} }
@ -86,19 +90,42 @@ func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError,
// It also locks the row by updating the checked_out column. // It also locks the row by updating the checked_out column.
// The statement for getting the start_slot and end_slot must be provided. // The statement for getting the start_slot and end_slot must be provided.
// The statement for "locking" the row must also be provided. // The statement for "locking" the row must also be provided.
func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess) []error { func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess) []error {
errCount := make([]error, 0) errCount := make([]error, 0)
for len(errCount) < 5 { for len(errCount) < 5 {
processRow, err := db.Exec(context.Background(), checkNewRowsStmt)
if err != nil {
errCount = append(errCount, err)
}
row, err := processRow.RowsAffected()
if err != nil {
errCount = append(errCount, err)
}
if row < 1 {
time.Sleep(100 * time.Millisecond)
continue
}
log.Debug("Found a row, going to start processing.")
log.WithFields(log.Fields{
"ErrCount": errCount,
}).Debug("The ErrCounter")
ctx := context.Background() ctx := context.Background()
// Setup TX // Setup TX
tx, err := db.Begin(ctx) tx, err := db.Begin(ctx)
if err != nil { if err != nil {
loghelper.LogError(err).Error("We are unable to Begin a SQL transaction")
errCount = append(errCount, err) errCount = append(errCount, err)
continue continue
} }
defer tx.Rollback(ctx) defer func() {
err := tx.Rollback(ctx)
if err != nil {
loghelper.LogError(err).Error("We were unable to Rollback a transaction")
errCount = append(errCount, err)
}
}()
// Query the DB for slots. // Query the DB for slots.
sp := slotsToProcess{} sp := slotsToProcess{}
@ -148,17 +175,59 @@ func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkOutRow
} }
slotCh <- sp slotCh <- sp
} }
log.WithFields(log.Fields{
"ErrCount": errCount,
}).Error("The ErrCounter")
return errCount return errCount
} }
// After a row has been processed it should be removed from its appropriate table. // After a row has been processed it should be removed from its appropriate table.
func removeRowPostProcess(db sql.Database, processCh <-chan slotsToProcess, removeStmt string) error { func removeRowPostProcess(db sql.Database, processCh <-chan slotsToProcess, checkProcessedStmt, removeStmt string) error {
errCh := make(chan error, 0)
for { for {
slots := <-processCh slots := <-processCh
// Make sure the start and end slot exist in the slots table. // Make sure the start and end slot exist in the slots table.
_, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), slots.endSlot) go func() {
if err != nil { finishedProcess := false
return err for finishedProcess == false {
isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot))
if err != nil {
errCh <- err
}
isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.endSlot))
if err != nil {
errCh <- err
}
if isStartProcess && isEndProcess {
finishedProcess = true
}
}
_, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot))
if err != nil {
errCh <- err
}
}()
if len(errCh) != 0 {
return <-errCh
} }
} }
} }
// A helper function to check to see if the slot is processed.
func isSlotProcessed(db sql.Database, checkProcessStmt string, slot string) (bool, error) {
processRow, err := db.Exec(context.Background(), checkProcessStmt, slot)
if err != nil {
return false, err
}
row, err := processRow.RowsAffected()
if err != nil {
return false, err
}
if row > 0 {
return true, nil
}
return false, nil
}