2022-05-18 16:12:54 +00:00
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
2022-05-06 15:03:15 +00:00
package beaconclient
import (
"context"
"fmt"
"strconv"
2022-06-06 13:02:43 +00:00
"github.com/jackc/pgx/v4"
2022-05-06 15:03:15 +00:00
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
2022-05-24 20:18:55 +00:00
"golang.org/x/sync/errgroup"
2022-05-06 15:03:15 +00:00
)
var (
// Statement to upsert to the ethcl.slots table.
UpsertSlotsStmt string = `
INSERT INTO ethcl . slots ( epoch , slot , block_root , state_root , status )
VALUES ( $ 1 , $ 2 , $ 3 , $ 4 , $ 5 ) ON CONFLICT ( slot , block_root ) DO NOTHING `
// Statement to upsert to the ethcl.signed_beacon_blocks table.
UpsertSignedBeaconBlockStmt string = `
2022-05-12 19:44:05 +00:00
INSERT INTO ethcl . signed_beacon_block ( slot , block_root , parent_block_root , eth1_block_hash , mh_key )
VALUES ( $ 1 , $ 2 , $ 3 , $ 4 , $ 5 ) ON CONFLICT ( slot , block_root ) DO NOTHING `
2022-05-06 15:03:15 +00:00
// Statement to upsert to the ethcl.beacon_state table.
UpsertBeaconState string = `
INSERT INTO ethcl . beacon_state ( slot , state_root , mh_key )
VALUES ( $ 1 , $ 2 , $ 3 ) ON CONFLICT ( slot , state_root ) DO NOTHING `
// Statement to upsert to the public.blocks table.
UpsertBlocksStmt string = `
INSERT INTO public . blocks ( key , data )
VALUES ( $ 1 , $ 2 ) ON CONFLICT ( key ) DO NOTHING `
2022-05-12 13:52:13 +00:00
UpdateForkedStmt string = ` UPDATE ethcl . slots
SET status = ' forked '
WHERE slot = $ 1 AND block_root < > $ 2
RETURNING block_root ; `
UpdateProposedStmt string = ` UPDATE ethcl . slots
SET status = ' proposed '
WHERE slot = $ 1 AND block_root = $ 2
RETURNING block_root ; `
CheckProposedStmt string = ` SELECT slot , block_root
FROM ethcl . slots
WHERE slot = $ 1 AND block_root = $ 2 ; `
2022-06-06 13:02:43 +00:00
// Check to see if the slot and block_root exist in ethcl.signed_beacon_block
CheckSignedBeaconBlockStmt string = ` SELECT slot , block_root
FROM ethcl . signed_beacon_block
WHERE slot = $ 1 AND block_root = $ 2 `
// Check to see if the slot and state_root exist in ethcl.beacon_state
CheckBeaconStateStmt string = ` SELECT slot , state_root
FROM ethcl . beacon_state
WHERE slot = $ 1 AND state_root = $ 2 `
2022-05-24 20:18:55 +00:00
// Used to get a single slot from the table if it exists
QueryBySlotStmt string = ` SELECT slot
FROM ethcl . slots
WHERE slot = $ 1 `
2022-05-12 19:44:05 +00:00
// Statement to insert known_gaps. We don't pass in timestamp, we let the server take care of that one.
UpsertKnownGapsStmt string = `
INSERT INTO ethcl . known_gaps ( start_slot , end_slot , checked_out , reprocessing_error , entry_error , entry_process )
VALUES ( $ 1 , $ 2 , $ 3 , $ 4 , $ 5 , $ 6 ) on CONFLICT ( start_slot , end_slot ) DO NOTHING `
2022-05-24 20:18:55 +00:00
UpsertKnownGapsErrorStmt string = `
UPDATE ethcl . known_gaps
2022-06-09 12:33:34 +00:00
SET reprocessing_error = $ 3 , priority = priority + 1
2022-05-24 20:18:55 +00:00
WHERE start_slot = $ 1 AND end_slot = $ 2 ; `
// Get the highest slot if one exists
2022-05-12 19:44:05 +00:00
QueryHighestSlotStmt string = "SELECT COALESCE(MAX(slot), 0) FROM ethcl.slots"
2022-05-06 15:03:15 +00:00
)
// Put all functionality to prepare the write object
// And write it in this file.
// Remove any of it from the processslot file.
type DatabaseWriter struct {
Db sql . Database
2022-06-06 13:02:43 +00:00
Tx sql . Tx
Ctx context . Context
2022-05-12 13:52:13 +00:00
Metrics * BeaconClientMetrics
2022-05-06 15:03:15 +00:00
DbSlots * DbSlots
DbSignedBeaconBlock * DbSignedBeaconBlock
DbBeaconState * DbBeaconState
rawBeaconState [ ] byte
rawSignedBeaconBlock [ ] byte
}
2022-05-13 14:46:13 +00:00
func CreateDatabaseWrite ( db sql . Database , slot int , stateRoot string , blockRoot string , parentBlockRoot string ,
eth1BlockHash string , status string , rawSignedBeaconBlock [ ] byte , rawBeaconState [ ] byte , metrics * BeaconClientMetrics ) ( * DatabaseWriter , error ) {
2022-06-06 13:02:43 +00:00
ctx := context . Background ( )
tx , err := db . Begin ( ctx )
if err != nil {
loghelper . LogError ( err ) . Error ( "We are unable to Begin a SQL transaction" )
}
2022-05-06 15:03:15 +00:00
dw := & DatabaseWriter {
2022-05-13 14:46:13 +00:00
Db : db ,
2022-06-06 13:02:43 +00:00
Tx : tx ,
Ctx : ctx ,
2022-05-13 14:46:13 +00:00
rawBeaconState : rawBeaconState ,
rawSignedBeaconBlock : rawSignedBeaconBlock ,
Metrics : metrics ,
2022-05-06 15:03:15 +00:00
}
dw . prepareSlotsModel ( slot , stateRoot , blockRoot , status )
2022-06-06 13:02:43 +00:00
err = dw . prepareSignedBeaconBlockModel ( slot , blockRoot , parentBlockRoot , eth1BlockHash )
2022-05-13 14:46:13 +00:00
if err != nil {
return nil , err
}
err = dw . prepareBeaconStateModel ( slot , stateRoot )
if err != nil {
return nil , err
}
return dw , err
2022-05-06 15:03:15 +00:00
}
// Write functions to write each all together...
// Should I do one atomic write?
// Create the model for the ethcl.slots table
func ( dw * DatabaseWriter ) prepareSlotsModel ( slot int , stateRoot string , blockRoot string , status string ) {
dw . DbSlots = & DbSlots {
Epoch : calculateEpoch ( slot , bcSlotsPerEpoch ) ,
Slot : strconv . Itoa ( slot ) ,
StateRoot : stateRoot ,
BlockRoot : blockRoot ,
Status : status ,
}
log . Debug ( "dw.DbSlots: " , dw . DbSlots )
}
// Create the model for the ethcl.signed_beacon_block table.
2022-05-13 14:46:13 +00:00
func ( dw * DatabaseWriter ) prepareSignedBeaconBlockModel ( slot int , blockRoot string , parentBlockRoot string , eth1BlockHash string ) error {
mhKey , err := MultihashKeyFromSSZRoot ( [ ] byte ( dw . DbSlots . BlockRoot ) )
if err != nil {
return err
}
2022-05-06 15:03:15 +00:00
dw . DbSignedBeaconBlock = & DbSignedBeaconBlock {
2022-05-12 19:44:05 +00:00
Slot : strconv . Itoa ( slot ) ,
BlockRoot : blockRoot ,
ParentBlock : parentBlockRoot ,
Eth1BlockHash : eth1BlockHash ,
2022-05-13 14:46:13 +00:00
MhKey : mhKey ,
2022-05-06 15:03:15 +00:00
}
log . Debug ( "dw.DbSignedBeaconBlock: " , dw . DbSignedBeaconBlock )
2022-05-13 14:46:13 +00:00
return nil
2022-05-06 15:03:15 +00:00
}
// Create the model for the ethcl.beacon_state table.
2022-05-13 14:46:13 +00:00
func ( dw * DatabaseWriter ) prepareBeaconStateModel ( slot int , stateRoot string ) error {
mhKey , err := MultihashKeyFromSSZRoot ( [ ] byte ( dw . DbSlots . StateRoot ) )
if err != nil {
return err
}
2022-05-06 15:03:15 +00:00
dw . DbBeaconState = & DbBeaconState {
Slot : strconv . Itoa ( slot ) ,
StateRoot : stateRoot ,
2022-05-13 14:46:13 +00:00
MhKey : mhKey ,
2022-05-06 15:03:15 +00:00
}
log . Debug ( "dw.DbBeaconState: " , dw . DbBeaconState )
2022-05-13 14:46:13 +00:00
return nil
2022-05-06 15:03:15 +00:00
}
2022-06-06 13:02:43 +00:00
// Add all the data for a given slot to a SQL transaction.
// Originally it wrote to each table individually.
func ( dw * DatabaseWriter ) transactFullSlot ( ) error {
2022-05-06 15:03:15 +00:00
// If an error occurs, write to knownGaps table.
2022-05-17 20:05:15 +00:00
log . WithFields ( log . Fields {
"slot" : dw . DbSlots . Slot ,
} ) . Debug ( "Starting to write to the DB." )
2022-06-06 13:02:43 +00:00
err := dw . transactSlots ( )
2022-05-12 19:44:05 +00:00
if err != nil {
2022-05-24 20:18:55 +00:00
loghelper . LogSlotError ( dw . DbSlots . Slot , err ) . Error ( "We couldn't write to the ethcl.slots table..." )
2022-05-12 19:44:05 +00:00
return err
}
2022-05-24 20:18:55 +00:00
log . Debug ( "We finished writing to the ethcl.slots table." )
2022-05-13 14:46:13 +00:00
if dw . DbSlots . Status != "skipped" {
2022-06-06 13:02:43 +00:00
//errG, _ := errgroup.WithContext(context.Background())
//errG.Go(func() error {
// return dw.transactSignedBeaconBlocks()
//})
//errG.Go(func() error {
// return dw.transactBeaconState()
//})
//if err := errG.Wait(); err != nil {
// loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl block or state table...")
// return err
//}
// Might want to seperate writing to public.blocks so we can do this concurrently...
err := dw . transactSignedBeaconBlocks ( )
if err != nil {
loghelper . LogSlotError ( dw . DbSlots . Slot , err ) . Error ( "We couldn't write to the ethcl block table..." )
return err
}
err = dw . transactBeaconState ( )
if err != nil {
loghelper . LogSlotError ( dw . DbSlots . Slot , err ) . Error ( "We couldn't write to the ethcl state table..." )
2022-05-13 14:46:13 +00:00
return err
}
2022-05-12 19:44:05 +00:00
}
2022-05-24 20:18:55 +00:00
dw . Metrics . IncrementSlotInserts ( 1 )
2022-05-12 19:44:05 +00:00
return nil
2022-05-06 15:03:15 +00:00
}
2022-06-06 13:02:43 +00:00
// Add data for the ethcl.slots table to a transaction. For now this is only one function.
2022-05-06 15:03:15 +00:00
// But in the future if we need to incorporate any FK's or perform any actions to write to the
// slots table we can do it all here.
2022-06-06 13:02:43 +00:00
func ( dw * DatabaseWriter ) transactSlots ( ) error {
2022-05-12 19:44:05 +00:00
return dw . upsertSlots ( )
2022-05-06 15:03:15 +00:00
}
// Upsert to the ethcl.slots table.
2022-05-12 19:44:05 +00:00
func ( dw * DatabaseWriter ) upsertSlots ( ) error {
2022-06-06 13:02:43 +00:00
_ , err := dw . Tx . Exec ( dw . Ctx , UpsertSlotsStmt , dw . DbSlots . Epoch , dw . DbSlots . Slot , dw . DbSlots . BlockRoot , dw . DbSlots . StateRoot , dw . DbSlots . Status )
2022-05-06 15:03:15 +00:00
if err != nil {
loghelper . LogSlotError ( dw . DbSlots . Slot , err ) . Error ( "Unable to write to the slot to the ethcl.slots table" )
2022-05-12 19:44:05 +00:00
return err
2022-05-06 15:03:15 +00:00
}
2022-05-12 19:44:05 +00:00
return nil
2022-05-06 15:03:15 +00:00
}
2022-06-06 13:02:43 +00:00
// Add the information for the signed_beacon_block to a transaction.
func ( dw * DatabaseWriter ) transactSignedBeaconBlocks ( ) error {
2022-05-12 19:44:05 +00:00
err := dw . upsertPublicBlocks ( dw . DbSignedBeaconBlock . MhKey , dw . rawSignedBeaconBlock )
if err != nil {
return err
}
err = dw . upsertSignedBeaconBlock ( )
if err != nil {
return err
}
return nil
2022-05-06 15:03:15 +00:00
}
// Upsert to public.blocks.
2022-05-12 19:44:05 +00:00
func ( dw * DatabaseWriter ) upsertPublicBlocks ( key string , data [ ] byte ) error {
2022-06-06 13:02:43 +00:00
_ , err := dw . Tx . Exec ( dw . Ctx , UpsertBlocksStmt , key , data )
2022-05-06 15:03:15 +00:00
if err != nil {
loghelper . LogSlotError ( dw . DbSlots . Slot , err ) . Error ( "Unable to write to the slot to the public.blocks table" )
2022-05-12 19:44:05 +00:00
return err
2022-05-06 15:03:15 +00:00
}
2022-05-12 19:44:05 +00:00
return nil
2022-05-06 15:03:15 +00:00
}
// Upsert to the ethcl.signed_beacon_block table.
2022-05-12 19:44:05 +00:00
func ( dw * DatabaseWriter ) upsertSignedBeaconBlock ( ) error {
2022-06-06 13:02:43 +00:00
_ , err := dw . Tx . Exec ( dw . Ctx , UpsertSignedBeaconBlockStmt , dw . DbSignedBeaconBlock . Slot , dw . DbSignedBeaconBlock . BlockRoot , dw . DbSignedBeaconBlock . ParentBlock , dw . DbSignedBeaconBlock . Eth1BlockHash , dw . DbSignedBeaconBlock . MhKey )
2022-05-06 15:03:15 +00:00
if err != nil {
2022-05-12 13:52:13 +00:00
loghelper . LogSlotError ( dw . DbSlots . Slot , err ) . WithFields ( log . Fields { "block_root" : dw . DbSignedBeaconBlock . BlockRoot } ) . Error ( "Unable to write to the slot to the ethcl.signed_beacon_block table" )
2022-05-12 19:44:05 +00:00
return err
2022-05-06 15:03:15 +00:00
}
2022-05-12 19:44:05 +00:00
return nil
2022-05-06 15:03:15 +00:00
}
2022-06-06 13:02:43 +00:00
// Add the information for the beacon_state to a transaction.
func ( dw * DatabaseWriter ) transactBeaconState ( ) error {
2022-05-12 19:44:05 +00:00
err := dw . upsertPublicBlocks ( dw . DbBeaconState . MhKey , dw . rawBeaconState )
if err != nil {
return err
}
err = dw . upsertBeaconState ( )
if err != nil {
return err
}
return nil
2022-05-06 15:03:15 +00:00
}
// Upsert to the ethcl.beacon_state table.
2022-05-12 19:44:05 +00:00
func ( dw * DatabaseWriter ) upsertBeaconState ( ) error {
2022-06-06 13:02:43 +00:00
_ , err := dw . Tx . Exec ( dw . Ctx , UpsertBeaconState , dw . DbBeaconState . Slot , dw . DbBeaconState . StateRoot , dw . DbBeaconState . MhKey )
2022-05-06 15:03:15 +00:00
if err != nil {
2022-05-12 13:52:13 +00:00
loghelper . LogSlotError ( dw . DbSlots . Slot , err ) . Error ( "Unable to write to the slot to the ethcl.beacon_state table" )
2022-05-12 19:44:05 +00:00
return err
2022-05-06 15:03:15 +00:00
}
2022-05-12 19:44:05 +00:00
return nil
2022-05-06 15:03:15 +00:00
}
2022-06-06 13:02:43 +00:00
// Update a given slot to be marked as forked within a transaction. Provide the slot and the latest latestBlockRoot.
2022-05-06 15:03:15 +00:00
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
2022-06-06 13:02:43 +00:00
func transactReorgs ( tx sql . Tx , ctx context . Context , slot string , latestBlockRoot string , metrics * BeaconClientMetrics ) {
2022-05-12 19:44:05 +00:00
slotNum , strErr := strconv . Atoi ( slot )
if strErr != nil {
loghelper . LogReorgError ( slot , latestBlockRoot , strErr ) . Error ( "We can't convert the slot to an int..." )
}
2022-06-06 13:02:43 +00:00
forkCount , err := updateForked ( tx , ctx , slot , latestBlockRoot )
2022-05-06 15:03:15 +00:00
if err != nil {
2022-05-12 13:52:13 +00:00
loghelper . LogReorgError ( slot , latestBlockRoot , err ) . Error ( "We ran into some trouble while updating all forks." )
2022-06-06 13:02:43 +00:00
transactKnownGaps ( tx , ctx , 1 , slotNum , slotNum , err , "reorg" , metrics )
2022-05-12 13:52:13 +00:00
}
2022-06-06 13:02:43 +00:00
proposedCount , err := updateProposed ( tx , ctx , slot , latestBlockRoot )
2022-05-12 13:52:13 +00:00
if err != nil {
loghelper . LogReorgError ( slot , latestBlockRoot , err ) . Error ( "We ran into some trouble while trying to update the proposed slot." )
2022-06-06 13:02:43 +00:00
transactKnownGaps ( tx , ctx , 1 , slotNum , slotNum , err , "reorg" , metrics )
2022-05-12 13:52:13 +00:00
}
if forkCount > 0 {
loghelper . LogReorg ( slot , latestBlockRoot ) . WithFields ( log . Fields {
"forkCount" : forkCount ,
} ) . Info ( "Updated rows that were forked." )
} else {
loghelper . LogReorg ( slot , latestBlockRoot ) . WithFields ( log . Fields {
"forkCount" : forkCount ,
} ) . Warn ( "There were no forked rows to update." )
}
if proposedCount == 1 {
loghelper . LogReorg ( slot , latestBlockRoot ) . WithFields ( log . Fields {
"proposedCount" : proposedCount ,
} ) . Info ( "Updated the row that should have been marked as proposed." )
} else if proposedCount > 1 {
loghelper . LogReorg ( slot , latestBlockRoot ) . WithFields ( log . Fields {
"proposedCount" : proposedCount ,
} ) . Error ( "Too many rows were marked as proposed!" )
2022-06-06 13:02:43 +00:00
transactKnownGaps ( tx , ctx , 1 , slotNum , slotNum , fmt . Errorf ( "Too many rows were marked as unproposed." ) , "reorg" , metrics )
2022-05-12 13:52:13 +00:00
} else if proposedCount == 0 {
2022-06-06 13:02:43 +00:00
transactKnownGaps ( tx , ctx , 1 , slotNum , slotNum , fmt . Errorf ( "Unable to find properly proposed row in DB" ) , "reorg" , metrics )
loghelper . LogReorg ( slot , latestBlockRoot ) . Info ( "Updated the row that should have been marked as proposed." )
2022-05-12 13:52:13 +00:00
}
2022-05-24 20:18:55 +00:00
metrics . IncrementReorgsInsert ( 1 )
2022-05-12 13:52:13 +00:00
}
2022-06-06 13:02:43 +00:00
// Wrapper function that will create a transaction and execute the function.
func writeReorgs ( db sql . Database , slot string , latestBlockRoot string , metrics * BeaconClientMetrics ) {
ctx := context . Background ( )
tx , err := db . Begin ( ctx )
if err != nil {
loghelper . LogReorgError ( slot , latestBlockRoot , err ) . Fatal ( "Unable to create a new transaction for reorgs" )
}
defer func ( ) {
err := tx . Rollback ( ctx )
if err != nil && err != pgx . ErrTxClosed {
loghelper . LogError ( err ) . Error ( "We were unable to Rollback a transaction for reorgs" )
}
} ( )
transactReorgs ( tx , ctx , slot , latestBlockRoot , metrics )
if err = tx . Commit ( ctx ) ; err != nil {
loghelper . LogReorgError ( slot , latestBlockRoot , err ) . Fatal ( "Unable to execute the transaction for reorgs" )
}
}
2022-05-12 13:52:13 +00:00
// Update the slots table by marking the old slot's as forked.
2022-06-06 13:02:43 +00:00
func updateForked ( tx sql . Tx , ctx context . Context , slot string , latestBlockRoot string ) ( int64 , error ) {
res , err := tx . Exec ( ctx , UpdateForkedStmt , slot , latestBlockRoot )
2022-05-12 13:52:13 +00:00
if err != nil {
loghelper . LogReorgError ( slot , latestBlockRoot , err ) . Error ( "We are unable to update the ethcl.slots table with the forked slots" )
return 0 , err
}
count , err := res . RowsAffected ( )
if err != nil {
loghelper . LogReorgError ( slot , latestBlockRoot , err ) . Error ( "Unable to figure out how many entries were marked as forked." )
return 0 , err
}
return count , err
}
2022-06-06 13:02:43 +00:00
// Mark a slot as proposed.
func updateProposed ( tx sql . Tx , ctx context . Context , slot string , latestBlockRoot string ) ( int64 , error ) {
res , err := tx . Exec ( ctx , UpdateProposedStmt , slot , latestBlockRoot )
2022-05-12 13:52:13 +00:00
if err != nil {
loghelper . LogReorgError ( slot , latestBlockRoot , err ) . Error ( "We are unable to update the ethcl.slots table with the proposed slot." )
2022-05-06 15:03:15 +00:00
return 0 , err
}
count , err := res . RowsAffected ( )
if err != nil {
2022-05-12 13:52:13 +00:00
loghelper . LogReorgError ( slot , latestBlockRoot , err ) . Error ( "Unable to figure out how many entries were marked as proposed" )
2022-05-06 15:03:15 +00:00
return 0 , err
}
2022-05-12 13:52:13 +00:00
return count , err
2022-05-06 15:03:15 +00:00
}
2022-06-06 13:02:43 +00:00
// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaps into
2022-05-12 19:44:05 +00:00
// smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would
// have 10 entries as follows: 1-10, 11-20, etc...
2022-06-06 13:02:43 +00:00
func transactKnownGaps ( tx sql . Tx , ctx context . Context , tableIncrement int , startSlot int , endSlot int , entryError error , entryProcess string , metric * BeaconClientMetrics ) {
var entryErrorMsg string
if entryError == nil {
entryErrorMsg = ""
} else {
entryErrorMsg = entryError . Error ( )
}
2022-05-12 19:44:05 +00:00
if endSlot - startSlot <= tableIncrement {
kgModel := DbKnownGaps {
StartSlot : strconv . Itoa ( startSlot ) ,
EndSlot : strconv . Itoa ( endSlot ) ,
CheckedOut : false ,
ReprocessingError : "" ,
2022-06-06 13:02:43 +00:00
EntryError : entryErrorMsg ,
2022-05-12 19:44:05 +00:00
EntryProcess : entryProcess ,
}
2022-06-06 13:02:43 +00:00
upsertKnownGaps ( tx , ctx , kgModel , metric )
2022-05-17 20:05:15 +00:00
} else {
totalSlots := endSlot - startSlot
var chunks int
chunks = totalSlots / tableIncrement
if totalSlots % tableIncrement != 0 {
chunks = chunks + 1
2022-05-12 19:44:05 +00:00
}
2022-05-17 20:05:15 +00:00
for i := 0 ; i < chunks ; i ++ {
var tempStart , tempEnd int
tempStart = startSlot + ( i * tableIncrement )
if i + 1 == chunks {
tempEnd = endSlot
} else {
tempEnd = startSlot + ( ( i + 1 ) * tableIncrement )
}
kgModel := DbKnownGaps {
StartSlot : strconv . Itoa ( tempStart ) ,
EndSlot : strconv . Itoa ( tempEnd ) ,
CheckedOut : false ,
ReprocessingError : "" ,
2022-06-06 13:02:43 +00:00
EntryError : entryErrorMsg ,
2022-05-17 20:05:15 +00:00
EntryProcess : entryProcess ,
}
2022-06-06 13:02:43 +00:00
upsertKnownGaps ( tx , ctx , kgModel , metric )
2022-05-12 19:44:05 +00:00
}
}
2022-06-06 13:02:43 +00:00
}
2022-05-12 19:44:05 +00:00
2022-06-06 13:02:43 +00:00
// Wrapper function, instead of adding the knownGaps entries to a transaction, it will
// create the transaction and write it.
func writeKnownGaps ( db sql . Database , tableIncrement int , startSlot int , endSlot int , entryError error , entryProcess string , metric * BeaconClientMetrics ) {
ctx := context . Background ( )
tx , err := db . Begin ( ctx )
if err != nil {
loghelper . LogSlotRangeError ( strconv . Itoa ( startSlot ) , strconv . Itoa ( endSlot ) , err ) . Fatal ( "Unable to create a new transaction for knownGaps" )
}
defer func ( ) {
err := tx . Rollback ( ctx )
if err != nil && err != pgx . ErrTxClosed {
loghelper . LogError ( err ) . Error ( "We were unable to Rollback a transaction for reorgs" )
}
} ( )
transactKnownGaps ( tx , ctx , tableIncrement , startSlot , endSlot , entryError , entryProcess , metric )
if err = tx . Commit ( ctx ) ; err != nil {
loghelper . LogSlotRangeError ( strconv . Itoa ( startSlot ) , strconv . Itoa ( endSlot ) , err ) . Fatal ( "Unable to execute the transaction for knownGaps" )
}
2022-05-12 19:44:05 +00:00
}
// A function to upsert a single entry to the ethcl.known_gaps table.
2022-06-06 13:02:43 +00:00
func upsertKnownGaps ( tx sql . Tx , ctx context . Context , knModel DbKnownGaps , metric * BeaconClientMetrics ) {
_ , err := tx . Exec ( ctx , UpsertKnownGapsStmt , knModel . StartSlot , knModel . EndSlot ,
2022-05-12 19:44:05 +00:00
knModel . CheckedOut , knModel . ReprocessingError , knModel . EntryError , knModel . EntryProcess )
if err != nil {
log . WithFields ( log . Fields {
"err" : err ,
"startSlot" : knModel . StartSlot ,
"endSlot" : knModel . EndSlot ,
} ) . Fatal ( "We are unable to write to the ethcl.known_gaps table!!! We will stop the application because of that." )
}
log . WithFields ( log . Fields {
"startSlot" : knModel . StartSlot ,
"endSlot" : knModel . EndSlot ,
} ) . Warn ( "A new gap has been added to the ethcl.known_gaps table." )
2022-05-24 20:18:55 +00:00
metric . IncrementKnownGapsInserts ( 1 )
2022-05-12 19:44:05 +00:00
}
// A function to write the gap between the highest slot in the DB and the first processed slot.
2022-05-17 20:05:15 +00:00
func writeStartUpGaps ( db sql . Database , tableIncrement int , firstSlot int , metric * BeaconClientMetrics ) {
2022-05-12 19:44:05 +00:00
var maxSlot int
err := db . QueryRow ( context . Background ( ) , QueryHighestSlotStmt ) . Scan ( & maxSlot )
if err != nil {
loghelper . LogError ( err ) . Fatal ( "Unable to get the max block from the DB. We must close the application or we might have undetected gaps." )
}
if err != nil {
loghelper . LogError ( err ) . WithFields ( log . Fields {
"maxSlot" : maxSlot ,
} ) . Fatal ( "Unable to get convert max block from DB to int. We must close the application or we might have undetected gaps." )
}
2022-05-17 20:05:15 +00:00
if maxSlot != firstSlot - 1 {
2022-06-09 15:56:24 +00:00
if maxSlot < firstSlot - 1 {
writeKnownGaps ( db , tableIncrement , maxSlot + 1 , firstSlot - 1 , fmt . Errorf ( "" ) , "startup" , metric )
} else {
log . WithFields ( log . Fields {
"maxSlot" : maxSlot ,
"firstSlot" : firstSlot ,
} ) . Warn ( "The maxSlot in the DB is greater than or equal to the first Slot we are processing." )
}
2022-05-17 20:05:15 +00:00
}
2022-05-12 19:44:05 +00:00
}
2022-05-24 20:18:55 +00:00
// A function to update a knownGap range with a reprocessing error.
func updateKnownGapErrors ( db sql . Database , startSlot int , endSlot int , reprocessingErr error , metric * BeaconClientMetrics ) error {
res , err := db . Exec ( context . Background ( ) , UpsertKnownGapsErrorStmt , startSlot , endSlot , reprocessingErr . Error ( ) )
if err != nil {
loghelper . LogSlotRangeError ( strconv . Itoa ( startSlot ) , strconv . Itoa ( endSlot ) , err ) . Error ( "Unable to update reprocessing_error" )
return err
}
row , err := res . RowsAffected ( )
if err != nil {
loghelper . LogSlotRangeError ( strconv . Itoa ( startSlot ) , strconv . Itoa ( endSlot ) , err ) . Error ( "Unable to count rows affected when trying to update reprocessing_error." )
return err
}
if row != 1 {
loghelper . LogSlotRangeError ( strconv . Itoa ( startSlot ) , strconv . Itoa ( endSlot ) , err ) . WithFields ( log . Fields {
"rowCount" : row ,
} ) . Error ( "The rows affected by the upsert for reprocessing_error is not 1." )
2022-06-08 14:26:27 +00:00
metric . IncrementKnownGapsReprocessError ( 1 )
2022-05-24 20:18:55 +00:00
return err
}
2022-06-08 14:26:27 +00:00
metric . IncrementKnownGapsReprocessError ( 1 )
2022-05-24 20:18:55 +00:00
return nil
}
2022-05-06 15:03:15 +00:00
// A quick helper function to calculate the epoch.
func calculateEpoch ( slot int , slotPerEpoch int ) string {
epoch := slot / slotPerEpoch
return strconv . Itoa ( epoch )
}
2022-05-24 20:18:55 +00:00
// A helper function to check to see if the slot is processed.
func isSlotProcessed ( db sql . Database , checkProcessStmt string , slot string ) ( bool , error ) {
processRow , err := db . Exec ( context . Background ( ) , checkProcessStmt , slot )
if err != nil {
return false , err
}
row , err := processRow . RowsAffected ( )
if err != nil {
return false , err
}
2022-06-06 13:02:43 +00:00
if row > 0 {
return true , nil
}
return false , nil
}
2022-05-24 20:18:55 +00:00
2022-06-06 13:02:43 +00:00
// Check to see if this slot is in the DB. Check ethcl.slots, ethcl.signed_beacon_block
// and ethcl.beacon_state. If the slot exists, return true
func IsSlotInDb ( db sql . Database , slot string , blockRoot string , stateRoot string ) ( bool , error ) {
var (
isInBeaconState bool
isInSignedBeaconBlock bool
err error
)
errG , _ := errgroup . WithContext ( context . Background ( ) )
errG . Go ( func ( ) error {
isInBeaconState , err = checkSlotAndRoot ( db , CheckBeaconStateStmt , slot , stateRoot )
if err != nil {
loghelper . LogError ( err ) . Error ( "Unable to check if the slot and stateroot exist in ethcl.beacon_state" )
}
return err
} )
errG . Go ( func ( ) error {
isInSignedBeaconBlock , err = checkSlotAndRoot ( db , CheckSignedBeaconBlockStmt , slot , blockRoot )
if err != nil {
loghelper . LogError ( err ) . Error ( "Unable to check if the slot and block_root exist in ethcl.signed_beacon_block" )
}
return err
} )
if err := errG . Wait ( ) ; err != nil {
return false , err
}
if isInBeaconState && isInSignedBeaconBlock {
return true , nil
}
return false , nil
}
// Provide a statement, slot, and root, and this function will check to see
// if the slot and root exist in the table.
func checkSlotAndRoot ( db sql . Database , statement , slot , root string ) ( bool , error ) {
processRow , err := db . Exec ( context . Background ( ) , statement , slot , root )
if err != nil {
return false , err
}
row , err := processRow . RowsAffected ( )
if err != nil {
return false , err
}
2022-05-24 20:18:55 +00:00
if row > 0 {
return true , nil
}
return false , nil
}