2022-05-24 20:18:55 +00:00
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// This file contains all the code to process historic slots.
package beaconclient
import (
"context"
"fmt"
"strconv"
"time"
"github.com/jackc/pgx/v4"
log "github.com/sirupsen/logrus"
2022-06-09 20:12:08 +00:00
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper"
2022-05-24 20:18:55 +00:00
)
var (
2022-06-09 20:12:08 +00:00
// Get a single highest priority and non-checked out row row from eth_beacon.historical_process
getHpEntryStmt string = ` SELECT start_slot , end_slot FROM eth_beacon . historic_process
2022-05-24 20:18:55 +00:00
WHERE checked_out = false
ORDER BY priority ASC
LIMIT 1 ; `
2022-06-09 20:12:08 +00:00
// Used to periodically check to see if there is a new entry in the eth_beacon.historic_process table.
checkHpEntryStmt string = ` SELECT * FROM eth_beacon.historic_process WHERE checked_out=false; `
// Used to checkout a row from the eth_beacon.historic_process table
lockHpEntryStmt string = ` UPDATE eth_beacon . historic_process
2022-06-03 16:47:13 +00:00
SET checked_out = true , checked_out_by = $ 3
2022-05-24 20:18:55 +00:00
WHERE start_slot = $ 1 AND end_slot = $ 2 ; `
2022-06-09 20:12:08 +00:00
// Used to delete an entry from the eth_beacon.historic_process table
deleteHpEntryStmt string = ` DELETE FROM eth_beacon . historic_process
2022-05-24 20:18:55 +00:00
WHERE start_slot = $ 1 AND end_slot = $ 2 ; `
2022-06-03 16:47:13 +00:00
// Used to update every single row that this node has checked out.
2022-06-09 20:12:08 +00:00
releaseHpLockStmt string = ` UPDATE eth_beacon . historic_process
2022-06-08 15:54:29 +00:00
SET checked_out = false , checked_out_by = null
2022-06-03 16:47:13 +00:00
WHERE checked_out_by = $ 1 `
2022-05-24 20:18:55 +00:00
)
2022-06-08 14:26:27 +00:00
type HistoricProcessing struct {
2022-06-03 16:47:13 +00:00
db sql . Database //db connection
metrics * BeaconClientMetrics // metrics for beaconclient
uniqueNodeIdentifier int // node unique identifier.
2022-05-24 20:18:55 +00:00
}
// Get a single row of historical slots from the table.
2022-06-08 14:26:27 +00:00
func ( hp HistoricProcessing ) getSlotRange ( ctx context . Context , slotCh chan <- slotsToProcess ) [ ] error {
return getBatchProcessRow ( ctx , hp . db , getHpEntryStmt , checkHpEntryStmt , lockHpEntryStmt , slotCh , strconv . Itoa ( hp . uniqueNodeIdentifier ) )
2022-05-24 20:18:55 +00:00
}
// Remove the table entry.
2022-06-08 14:26:27 +00:00
func ( hp HistoricProcessing ) removeTableEntry ( ctx context . Context , processCh <- chan slotsToProcess ) error {
return removeRowPostProcess ( ctx , hp . db , processCh , QueryBySlotStmt , deleteHpEntryStmt )
2022-05-24 20:18:55 +00:00
}
// Remove the table entry.
2022-06-08 14:26:27 +00:00
func ( hp HistoricProcessing ) handleProcessingErrors ( ctx context . Context , errMessages <- chan batchHistoricError ) {
2022-05-24 20:18:55 +00:00
for {
2022-06-08 14:26:27 +00:00
select {
case <- ctx . Done ( ) :
return
case errMs := <- errMessages :
loghelper . LogSlotError ( strconv . Itoa ( errMs . slot ) , errMs . err )
writeKnownGaps ( hp . db , 1 , errMs . slot , errMs . slot , errMs . err , errMs . errProcess , hp . metrics )
}
2022-05-24 20:18:55 +00:00
}
}
2022-06-09 20:12:08 +00:00
// "un"-checkout the rows held by this DB in the eth_beacon.historical_process table.
2022-06-08 14:26:27 +00:00
func ( hp HistoricProcessing ) releaseDbLocks ( cancel context . CancelFunc ) error {
2022-06-09 18:53:35 +00:00
cancel ( )
2022-06-09 20:12:08 +00:00
log . Debug ( "Updating all the entries to eth_beacon.historical processing" )
2022-06-08 14:26:27 +00:00
log . Debug ( "Db: " , hp . db )
log . Debug ( "hp.uniqueNodeIdentifier " , hp . uniqueNodeIdentifier )
2022-06-03 16:47:13 +00:00
res , err := hp . db . Exec ( context . Background ( ) , releaseHpLockStmt , hp . uniqueNodeIdentifier )
if err != nil {
2022-06-09 20:12:08 +00:00
return fmt . Errorf ( "Unable to remove lock from eth_beacon.historical_processing table for node %d, error is %e" , hp . uniqueNodeIdentifier , err )
2022-06-03 16:47:13 +00:00
}
2022-06-09 20:12:08 +00:00
log . Debug ( "Update all the entries to eth_beacon.historical processing" )
2022-06-03 16:47:13 +00:00
rows , err := res . RowsAffected ( )
if err != nil {
2022-06-09 20:12:08 +00:00
return fmt . Errorf ( "Unable to calculated number of rows affected by releasing locks from eth_beacon.historical_processing table for node %d, error is %e" , hp . uniqueNodeIdentifier , err )
2022-06-03 16:47:13 +00:00
}
log . WithField ( "rowCount" , rows ) . Info ( "Released historicalProcess locks for specified rows." )
return nil
}
2022-05-24 20:18:55 +00:00
// Process the slot range.
2022-06-08 14:26:27 +00:00
func processSlotRangeWorker ( ctx context . Context , workCh <- chan int , errCh chan <- batchHistoricError , db sql . Database , serverAddress string , metrics * BeaconClientMetrics , checkDb bool ) {
for {
select {
case <- ctx . Done ( ) :
return
case slot := <- workCh :
log . Debug ( "Handling slot: " , slot )
2022-06-09 18:53:35 +00:00
err , errProcess := handleHistoricSlot ( ctx , db , serverAddress , slot , metrics , checkDb )
2022-06-08 14:26:27 +00:00
if err != nil {
errMs := batchHistoricError {
err : err ,
errProcess : errProcess ,
slot : slot ,
}
errCh <- errMs
2022-06-06 13:02:43 +00:00
}
2022-05-24 20:18:55 +00:00
}
}
}
// A wrapper function that insert the start_slot and end_slot from a single row into a channel.
// It also locks the row by updating the checked_out column.
// The statement for getting the start_slot and end_slot must be provided.
// The statement for "locking" the row must also be provided.
2022-06-08 14:26:27 +00:00
func getBatchProcessRow ( ctx context . Context , db sql . Database , getStartEndSlotStmt string , checkNewRowsStmt string , checkOutRowStmt string , slotCh chan <- slotsToProcess , uniqueNodeIdentifier string ) [ ] error {
2022-05-24 20:18:55 +00:00
errCount := make ( [ ] error , 0 )
// 5 is an arbitrary number. It allows us to retry a few times before
// ending the application.
prevErrCount := 0
for len ( errCount ) < 5 {
2022-06-08 14:26:27 +00:00
select {
case <- ctx . Done ( ) :
return errCount
default :
if len ( errCount ) != prevErrCount {
log . WithFields ( log . Fields {
"errCount" : errCount ,
} ) . Error ( "New error entry added" )
}
processRow , err := db . Exec ( context . Background ( ) , checkNewRowsStmt )
if err != nil {
2022-05-24 20:18:55 +00:00
errCount = append ( errCount , err )
}
2022-06-08 14:26:27 +00:00
row , err := processRow . RowsAffected ( )
if err != nil {
errCount = append ( errCount , err )
2022-05-24 20:18:55 +00:00
}
2022-06-08 14:26:27 +00:00
if row < 1 {
time . Sleep ( 1000 * time . Millisecond )
log . Debug ( "We are checking rows, be patient" )
break
}
log . Debug ( "We found a new row" )
dbCtx := context . Background ( )
2022-05-24 20:18:55 +00:00
2022-06-08 14:26:27 +00:00
// Setup TX
tx , err := db . Begin ( dbCtx )
if err != nil {
loghelper . LogError ( err ) . Error ( "We are unable to Begin a SQL transaction" )
errCount = append ( errCount , err )
break
}
defer func ( ) {
err := tx . Rollback ( dbCtx )
if err != nil && err != pgx . ErrTxClosed {
loghelper . LogError ( err ) . Error ( "We were unable to Rollback a transaction" )
errCount = append ( errCount , err )
}
} ( )
// Query the DB for slots.
sp := slotsToProcess { }
err = tx . QueryRow ( dbCtx , getStartEndSlotStmt ) . Scan ( & sp . startSlot , & sp . endSlot )
if err != nil {
if err == pgx . ErrNoRows {
time . Sleep ( 100 * time . Millisecond )
break
}
loghelper . LogSlotRangeStatementError ( strconv . Itoa ( sp . startSlot ) , strconv . Itoa ( sp . endSlot ) , getStartEndSlotStmt , err ) . Error ( "Unable to get a row" )
errCount = append ( errCount , err )
break
}
// Checkout the Row
res , err := tx . Exec ( dbCtx , checkOutRowStmt , sp . startSlot , sp . endSlot , uniqueNodeIdentifier )
if err != nil {
loghelper . LogSlotRangeStatementError ( strconv . Itoa ( sp . startSlot ) , strconv . Itoa ( sp . endSlot ) , checkOutRowStmt , err ) . Error ( "Unable to checkout the row" )
errCount = append ( errCount , err )
break
}
rows , err := res . RowsAffected ( )
if err != nil {
loghelper . LogSlotRangeStatementError ( strconv . Itoa ( sp . startSlot ) , strconv . Itoa ( sp . endSlot ) , checkOutRowStmt , fmt . Errorf ( "Unable to determine the rows affected when trying to checkout a row." ) )
errCount = append ( errCount , err )
break
}
if rows > 1 {
loghelper . LogSlotRangeStatementError ( strconv . Itoa ( sp . startSlot ) , strconv . Itoa ( sp . endSlot ) , checkOutRowStmt , err ) . WithFields ( log . Fields {
"rowsReturn" : rows ,
} ) . Error ( "We locked too many rows....." )
errCount = append ( errCount , err )
break
}
if rows == 0 {
loghelper . LogSlotRangeStatementError ( strconv . Itoa ( sp . startSlot ) , strconv . Itoa ( sp . endSlot ) , checkOutRowStmt , err ) . WithFields ( log . Fields {
"rowsReturn" : rows ,
} ) . Error ( "We did not lock a single row." )
errCount = append ( errCount , err )
break
}
err = tx . Commit ( dbCtx )
if err != nil {
loghelper . LogSlotRangeError ( strconv . Itoa ( sp . startSlot ) , strconv . Itoa ( sp . endSlot ) , err ) . Error ( "Unable commit transactions." )
errCount = append ( errCount , err )
break
}
log . WithField ( "slots" , sp ) . Debug ( "Added a new slots to be processed" )
slotCh <- sp
2022-05-24 20:18:55 +00:00
}
}
log . WithFields ( log . Fields {
"ErrCount" : errCount ,
} ) . Error ( "The ErrCounter" )
return errCount
}
// After a row has been processed it should be removed from its appropriate table.
2022-06-08 14:26:27 +00:00
func removeRowPostProcess ( ctx context . Context , db sql . Database , processCh <- chan slotsToProcess , checkProcessedStmt , removeStmt string ) error {
2022-05-24 20:18:55 +00:00
errCh := make ( chan error )
for {
2022-06-08 14:26:27 +00:00
select {
case <- ctx . Done ( ) :
return nil
case slots := <- processCh :
// Make sure the start and end slot exist in the slots table.
go func ( ) {
finishedProcess := false
for ! finishedProcess {
isStartProcess , err := isSlotProcessed ( db , checkProcessedStmt , strconv . Itoa ( slots . startSlot ) )
if err != nil {
errCh <- err
}
isEndProcess , err := isSlotProcessed ( db , checkProcessedStmt , strconv . Itoa ( slots . endSlot ) )
if err != nil {
errCh <- err
}
if isStartProcess && isEndProcess {
finishedProcess = true
}
2022-05-24 20:18:55 +00:00
}
2022-06-08 14:26:27 +00:00
_ , err := db . Exec ( context . Background ( ) , removeStmt , strconv . Itoa ( slots . startSlot ) , strconv . Itoa ( slots . endSlot ) )
2022-05-24 20:18:55 +00:00
if err != nil {
errCh <- err
}
2022-06-08 14:26:27 +00:00
} ( )
if len ( errCh ) != 0 {
return <- errCh
2022-05-24 20:18:55 +00:00
}
}
}
}