2022-05-06 15:03:15 +00:00
package beaconclient
import (
"context"
"fmt"
"strconv"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
var (
// Statement to upsert to the ethcl.slots table.
UpsertSlotsStmt string = `
INSERT INTO ethcl . slots ( epoch , slot , block_root , state_root , status )
VALUES ( $ 1 , $ 2 , $ 3 , $ 4 , $ 5 ) ON CONFLICT ( slot , block_root ) DO NOTHING `
// Statement to upsert to the ethcl.signed_beacon_blocks table.
UpsertSignedBeaconBlockStmt string = `
2022-05-12 19:44:05 +00:00
INSERT INTO ethcl . signed_beacon_block ( slot , block_root , parent_block_root , eth1_block_hash , mh_key )
VALUES ( $ 1 , $ 2 , $ 3 , $ 4 , $ 5 ) ON CONFLICT ( slot , block_root ) DO NOTHING `
2022-05-06 15:03:15 +00:00
// Statement to upsert to the ethcl.beacon_state table.
UpsertBeaconState string = `
INSERT INTO ethcl . beacon_state ( slot , state_root , mh_key )
VALUES ( $ 1 , $ 2 , $ 3 ) ON CONFLICT ( slot , state_root ) DO NOTHING `
// Statement to upsert to the public.blocks table.
UpsertBlocksStmt string = `
INSERT INTO public . blocks ( key , data )
VALUES ( $ 1 , $ 2 ) ON CONFLICT ( key ) DO NOTHING `
2022-05-12 13:52:13 +00:00
UpdateForkedStmt string = ` UPDATE ethcl . slots
SET status = ' forked '
WHERE slot = $ 1 AND block_root < > $ 2
RETURNING block_root ; `
UpdateProposedStmt string = ` UPDATE ethcl . slots
SET status = ' proposed '
WHERE slot = $ 1 AND block_root = $ 2
RETURNING block_root ; `
CheckProposedStmt string = ` SELECT slot , block_root
FROM ethcl . slots
WHERE slot = $ 1 AND block_root = $ 2 ; `
2022-05-12 19:44:05 +00:00
// Statement to insert known_gaps. We don't pass in timestamp, we let the server take care of that one.
UpsertKnownGapsStmt string = `
INSERT INTO ethcl . known_gaps ( start_slot , end_slot , checked_out , reprocessing_error , entry_error , entry_process )
VALUES ( $ 1 , $ 2 , $ 3 , $ 4 , $ 5 , $ 6 ) on CONFLICT ( start_slot , end_slot ) DO NOTHING `
QueryHighestSlotStmt string = "SELECT COALESCE(MAX(slot), 0) FROM ethcl.slots"
2022-05-06 15:03:15 +00:00
)
// Put all functionality to prepare the write object
// And write it in this file.
// Remove any of it from the processslot file.
type DatabaseWriter struct {
Db sql . Database
2022-05-12 13:52:13 +00:00
Metrics * BeaconClientMetrics
2022-05-06 15:03:15 +00:00
DbSlots * DbSlots
DbSignedBeaconBlock * DbSignedBeaconBlock
DbBeaconState * DbBeaconState
rawBeaconState [ ] byte
rawSignedBeaconBlock [ ] byte
}
2022-05-13 13:51:51 +00:00
func CreateDatabaseWrite ( db sql . Database , slot int , stateRoot string , blockRoot string , parentBlockRoot string ,
eth1BlockHash string , status string , rawSignedBeaconBlock [ ] byte , rawBeaconState [ ] byte , metrics * BeaconClientMetrics ) ( * DatabaseWriter , error ) {
2022-05-06 15:03:15 +00:00
dw := & DatabaseWriter {
2022-05-13 13:51:51 +00:00
Db : db ,
rawBeaconState : rawBeaconState ,
rawSignedBeaconBlock : rawSignedBeaconBlock ,
Metrics : metrics ,
2022-05-06 15:03:15 +00:00
}
dw . prepareSlotsModel ( slot , stateRoot , blockRoot , status )
2022-05-13 13:04:54 +00:00
err := dw . prepareSignedBeaconBlockModel ( slot , blockRoot , parentBlockRoot , eth1BlockHash )
if err != nil {
return nil , err
}
err = dw . prepareBeaconStateModel ( slot , stateRoot )
if err != nil {
return nil , err
}
return dw , err
2022-05-06 15:03:15 +00:00
}
// Write functions to write each all together...
// Should I do one atomic write?
// Create the model for the ethcl.slots table
func ( dw * DatabaseWriter ) prepareSlotsModel ( slot int , stateRoot string , blockRoot string , status string ) {
dw . DbSlots = & DbSlots {
Epoch : calculateEpoch ( slot , bcSlotsPerEpoch ) ,
Slot : strconv . Itoa ( slot ) ,
StateRoot : stateRoot ,
BlockRoot : blockRoot ,
Status : status ,
}
log . Debug ( "dw.DbSlots: " , dw . DbSlots )
}
// Create the model for the ethcl.signed_beacon_block table.
2022-05-13 13:04:54 +00:00
func ( dw * DatabaseWriter ) prepareSignedBeaconBlockModel ( slot int , blockRoot string , parentBlockRoot string , eth1BlockHash string ) error {
2022-05-13 14:20:23 +00:00
mhKey , err := MultihashKeyFromSSZRoot ( [ ] byte ( dw . DbSlots . BlockRoot ) )
2022-05-13 13:04:54 +00:00
if err != nil {
return err
}
2022-05-06 15:03:15 +00:00
dw . DbSignedBeaconBlock = & DbSignedBeaconBlock {
2022-05-12 19:44:05 +00:00
Slot : strconv . Itoa ( slot ) ,
BlockRoot : blockRoot ,
ParentBlock : parentBlockRoot ,
Eth1BlockHash : eth1BlockHash ,
2022-05-13 13:04:54 +00:00
MhKey : mhKey ,
2022-05-06 15:03:15 +00:00
}
log . Debug ( "dw.DbSignedBeaconBlock: " , dw . DbSignedBeaconBlock )
2022-05-13 13:04:54 +00:00
return nil
2022-05-06 15:03:15 +00:00
}
// Create the model for the ethcl.beacon_state table.
2022-05-13 13:04:54 +00:00
func ( dw * DatabaseWriter ) prepareBeaconStateModel ( slot int , stateRoot string ) error {
2022-05-13 14:20:23 +00:00
mhKey , err := MultihashKeyFromSSZRoot ( [ ] byte ( dw . DbSlots . StateRoot ) )
2022-05-13 13:04:54 +00:00
if err != nil {
return err
}
2022-05-06 15:03:15 +00:00
dw . DbBeaconState = & DbBeaconState {
Slot : strconv . Itoa ( slot ) ,
StateRoot : stateRoot ,
2022-05-13 13:04:54 +00:00
MhKey : mhKey ,
2022-05-06 15:03:15 +00:00
}
log . Debug ( "dw.DbBeaconState: " , dw . DbBeaconState )
2022-05-13 13:04:54 +00:00
return nil
2022-05-06 15:03:15 +00:00
}
// Write all the data for a given slot.
2022-05-12 19:44:05 +00:00
func ( dw * DatabaseWriter ) writeFullSlot ( ) error {
2022-05-06 15:03:15 +00:00
// Add errors for each function call
// If an error occurs, write to knownGaps table.
2022-05-12 19:44:05 +00:00
err := dw . writeSlots ( )
if err != nil {
return err
}
2022-05-13 14:20:23 +00:00
if dw . DbSlots . Status != "skipped" {
err = dw . writeSignedBeaconBlocks ( )
if err != nil {
return err
}
err = dw . writeBeaconState ( )
if err != nil {
return err
}
2022-05-12 19:44:05 +00:00
}
2022-05-12 13:52:13 +00:00
dw . Metrics . IncrementHeadTrackingInserts ( 1 )
2022-05-12 19:44:05 +00:00
return nil
2022-05-06 15:03:15 +00:00
}
// Write the information for the generic slots table. For now this is only one function.
// But in the future if we need to incorporate any FK's or perform any actions to write to the
// slots table we can do it all here.
2022-05-12 19:44:05 +00:00
func ( dw * DatabaseWriter ) writeSlots ( ) error {
return dw . upsertSlots ( )
2022-05-06 15:03:15 +00:00
}
// Upsert to the ethcl.slots table.
2022-05-12 19:44:05 +00:00
func ( dw * DatabaseWriter ) upsertSlots ( ) error {
2022-05-06 15:03:15 +00:00
_ , err := dw . Db . Exec ( context . Background ( ) , UpsertSlotsStmt , dw . DbSlots . Epoch , dw . DbSlots . Slot , dw . DbSlots . BlockRoot , dw . DbSlots . StateRoot , dw . DbSlots . Status )
if err != nil {
loghelper . LogSlotError ( dw . DbSlots . Slot , err ) . Error ( "Unable to write to the slot to the ethcl.slots table" )
2022-05-12 19:44:05 +00:00
return err
2022-05-06 15:03:15 +00:00
}
2022-05-12 19:44:05 +00:00
return nil
2022-05-06 15:03:15 +00:00
}
// Write the information for the signed_beacon_block.
2022-05-12 19:44:05 +00:00
func ( dw * DatabaseWriter ) writeSignedBeaconBlocks ( ) error {
err := dw . upsertPublicBlocks ( dw . DbSignedBeaconBlock . MhKey , dw . rawSignedBeaconBlock )
if err != nil {
return err
}
err = dw . upsertSignedBeaconBlock ( )
if err != nil {
return err
}
return nil
2022-05-06 15:03:15 +00:00
}
// Upsert to public.blocks.
2022-05-12 19:44:05 +00:00
func ( dw * DatabaseWriter ) upsertPublicBlocks ( key string , data [ ] byte ) error {
2022-05-06 15:03:15 +00:00
_ , err := dw . Db . Exec ( context . Background ( ) , UpsertBlocksStmt , key , data )
if err != nil {
loghelper . LogSlotError ( dw . DbSlots . Slot , err ) . Error ( "Unable to write to the slot to the public.blocks table" )
2022-05-12 19:44:05 +00:00
return err
2022-05-06 15:03:15 +00:00
}
2022-05-12 19:44:05 +00:00
return nil
2022-05-06 15:03:15 +00:00
}
// Upsert to the ethcl.signed_beacon_block table.
2022-05-12 19:44:05 +00:00
func ( dw * DatabaseWriter ) upsertSignedBeaconBlock ( ) error {
_ , err := dw . Db . Exec ( context . Background ( ) , UpsertSignedBeaconBlockStmt , dw . DbSignedBeaconBlock . Slot , dw . DbSignedBeaconBlock . BlockRoot , dw . DbSignedBeaconBlock . ParentBlock , dw . DbSignedBeaconBlock . Eth1BlockHash , dw . DbSignedBeaconBlock . MhKey )
2022-05-06 15:03:15 +00:00
if err != nil {
2022-05-12 13:52:13 +00:00
loghelper . LogSlotError ( dw . DbSlots . Slot , err ) . WithFields ( log . Fields { "block_root" : dw . DbSignedBeaconBlock . BlockRoot } ) . Error ( "Unable to write to the slot to the ethcl.signed_beacon_block table" )
2022-05-12 19:44:05 +00:00
return err
2022-05-06 15:03:15 +00:00
}
2022-05-12 19:44:05 +00:00
return nil
2022-05-06 15:03:15 +00:00
}
// Write the information for the beacon_state.
2022-05-12 19:44:05 +00:00
func ( dw * DatabaseWriter ) writeBeaconState ( ) error {
err := dw . upsertPublicBlocks ( dw . DbBeaconState . MhKey , dw . rawBeaconState )
if err != nil {
return err
}
err = dw . upsertBeaconState ( )
if err != nil {
return err
}
return nil
2022-05-06 15:03:15 +00:00
}
// Upsert to the ethcl.beacon_state table.
2022-05-12 19:44:05 +00:00
func ( dw * DatabaseWriter ) upsertBeaconState ( ) error {
2022-05-06 15:03:15 +00:00
_ , err := dw . Db . Exec ( context . Background ( ) , UpsertBeaconState , dw . DbBeaconState . Slot , dw . DbBeaconState . StateRoot , dw . DbBeaconState . MhKey )
if err != nil {
2022-05-12 13:52:13 +00:00
loghelper . LogSlotError ( dw . DbSlots . Slot , err ) . Error ( "Unable to write to the slot to the ethcl.beacon_state table" )
2022-05-12 19:44:05 +00:00
return err
2022-05-06 15:03:15 +00:00
}
2022-05-12 19:44:05 +00:00
return nil
2022-05-06 15:03:15 +00:00
}
// Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot.
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
2022-05-12 13:52:13 +00:00
func writeReorgs ( db sql . Database , slot string , latestBlockRoot string , metrics * BeaconClientMetrics ) {
2022-05-12 19:44:05 +00:00
slotNum , strErr := strconv . Atoi ( slot )
if strErr != nil {
loghelper . LogReorgError ( slot , latestBlockRoot , strErr ) . Error ( "We can't convert the slot to an int..." )
}
2022-05-12 13:52:13 +00:00
forkCount , err := updateForked ( db , slot , latestBlockRoot )
2022-05-06 15:03:15 +00:00
if err != nil {
2022-05-12 13:52:13 +00:00
loghelper . LogReorgError ( slot , latestBlockRoot , err ) . Error ( "We ran into some trouble while updating all forks." )
2022-05-12 19:44:05 +00:00
writeKnownGaps ( db , 1 , slotNum , slotNum , err , "reorg" )
2022-05-12 13:52:13 +00:00
}
proposedCount , err := updateProposed ( db , slot , latestBlockRoot )
if err != nil {
loghelper . LogReorgError ( slot , latestBlockRoot , err ) . Error ( "We ran into some trouble while trying to update the proposed slot." )
2022-05-12 19:44:05 +00:00
writeKnownGaps ( db , 1 , slotNum , slotNum , err , "reorg" )
2022-05-12 13:52:13 +00:00
}
if forkCount > 0 {
loghelper . LogReorg ( slot , latestBlockRoot ) . WithFields ( log . Fields {
"forkCount" : forkCount ,
} ) . Info ( "Updated rows that were forked." )
} else {
loghelper . LogReorg ( slot , latestBlockRoot ) . WithFields ( log . Fields {
"forkCount" : forkCount ,
} ) . Warn ( "There were no forked rows to update." )
}
if proposedCount == 1 {
loghelper . LogReorg ( slot , latestBlockRoot ) . WithFields ( log . Fields {
"proposedCount" : proposedCount ,
} ) . Info ( "Updated the row that should have been marked as proposed." )
} else if proposedCount > 1 {
loghelper . LogReorg ( slot , latestBlockRoot ) . WithFields ( log . Fields {
"proposedCount" : proposedCount ,
} ) . Error ( "Too many rows were marked as proposed!" )
2022-05-12 19:44:05 +00:00
writeKnownGaps ( db , 1 , slotNum , slotNum , err , "reorg" )
2022-05-12 13:52:13 +00:00
} else if proposedCount == 0 {
var count int
err := db . QueryRow ( context . Background ( ) , CheckProposedStmt , slot , latestBlockRoot ) . Scan ( count )
if err != nil {
loghelper . LogReorgError ( slot , latestBlockRoot , err ) . Error ( "Unable to query proposed rows after reorg." )
2022-05-12 19:44:05 +00:00
writeKnownGaps ( db , 1 , slotNum , slotNum , err , "reorg" )
2022-05-12 13:52:13 +00:00
}
if count != 1 {
loghelper . LogReorg ( slot , latestBlockRoot ) . WithFields ( log . Fields {
"proposedCount" : count ,
} ) . Warn ( "The proposed block was not marked as proposed..." )
2022-05-12 19:44:05 +00:00
writeKnownGaps ( db , 1 , slotNum , slotNum , err , "reorg" )
2022-05-12 13:52:13 +00:00
} else {
loghelper . LogReorg ( slot , latestBlockRoot ) . Info ( "Updated the row that should have been marked as proposed." )
}
}
metrics . IncrementHeadTrackingReorgs ( 1 )
}
// Update the slots table by marking the old slot's as forked.
func updateForked ( db sql . Database , slot string , latestBlockRoot string ) ( int64 , error ) {
res , err := db . Exec ( context . Background ( ) , UpdateForkedStmt , slot , latestBlockRoot )
if err != nil {
loghelper . LogReorgError ( slot , latestBlockRoot , err ) . Error ( "We are unable to update the ethcl.slots table with the forked slots" )
return 0 , err
}
count , err := res . RowsAffected ( )
if err != nil {
loghelper . LogReorgError ( slot , latestBlockRoot , err ) . Error ( "Unable to figure out how many entries were marked as forked." )
return 0 , err
}
return count , err
}
func updateProposed ( db sql . Database , slot string , latestBlockRoot string ) ( int64 , error ) {
res , err := db . Exec ( context . Background ( ) , UpdateProposedStmt , slot , latestBlockRoot )
if err != nil {
loghelper . LogReorgError ( slot , latestBlockRoot , err ) . Error ( "We are unable to update the ethcl.slots table with the proposed slot." )
2022-05-06 15:03:15 +00:00
return 0 , err
}
count , err := res . RowsAffected ( )
if err != nil {
2022-05-12 13:52:13 +00:00
loghelper . LogReorgError ( slot , latestBlockRoot , err ) . Error ( "Unable to figure out how many entries were marked as proposed" )
2022-05-06 15:03:15 +00:00
return 0 , err
}
2022-05-12 13:52:13 +00:00
return count , err
2022-05-06 15:03:15 +00:00
}
2022-05-12 19:44:05 +00:00
// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaos into
// smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would
// have 10 entries as follows: 1-10, 11-20, etc...
func writeKnownGaps ( db sql . Database , tableIncrement int , startSlot int , endSlot int , entryError error , entryProcess string ) {
if endSlot - startSlot <= tableIncrement {
kgModel := DbKnownGaps {
StartSlot : strconv . Itoa ( startSlot ) ,
EndSlot : strconv . Itoa ( endSlot ) ,
CheckedOut : false ,
ReprocessingError : "" ,
EntryError : entryError . Error ( ) ,
EntryProcess : entryProcess ,
}
upsertKnownGaps ( db , kgModel )
}
totalSlots := endSlot - startSlot
var chunks int
chunks = totalSlots / tableIncrement
if totalSlots % tableIncrement != 0 {
chunks = chunks + 1
}
for i := 0 ; i < chunks ; i ++ {
var tempStart , tempEnd int
tempStart = startSlot + ( i * tableIncrement )
if i + 1 == chunks {
tempEnd = endSlot
} else {
tempEnd = startSlot + ( ( i + 1 ) * tableIncrement )
}
kgModel := DbKnownGaps {
StartSlot : strconv . Itoa ( tempStart ) ,
EndSlot : strconv . Itoa ( tempEnd ) ,
CheckedOut : false ,
ReprocessingError : "" ,
EntryError : entryError . Error ( ) ,
EntryProcess : entryProcess ,
}
upsertKnownGaps ( db , kgModel )
}
}
// A function to upsert a single entry to the ethcl.known_gaps table.
func upsertKnownGaps ( db sql . Database , knModel DbKnownGaps ) {
_ , err := db . Exec ( context . Background ( ) , UpsertKnownGapsStmt , knModel . StartSlot , knModel . EndSlot ,
knModel . CheckedOut , knModel . ReprocessingError , knModel . EntryError , knModel . EntryProcess )
if err != nil {
log . WithFields ( log . Fields {
"err" : err ,
"startSlot" : knModel . StartSlot ,
"endSlot" : knModel . EndSlot ,
} ) . Fatal ( "We are unable to write to the ethcl.known_gaps table!!! We will stop the application because of that." )
}
log . WithFields ( log . Fields {
"startSlot" : knModel . StartSlot ,
"endSlot" : knModel . EndSlot ,
} ) . Warn ( "A new gap has been added to the ethcl.known_gaps table." )
}
// A function to write the gap between the highest slot in the DB and the first processed slot.
func writeStartUpGaps ( db sql . Database , tableIncrement int , firstSlot int ) {
var maxSlot int
err := db . QueryRow ( context . Background ( ) , QueryHighestSlotStmt ) . Scan ( & maxSlot )
if err != nil {
loghelper . LogError ( err ) . Fatal ( "Unable to get the max block from the DB. We must close the application or we might have undetected gaps." )
}
if err != nil {
loghelper . LogError ( err ) . WithFields ( log . Fields {
"maxSlot" : maxSlot ,
} ) . Fatal ( "Unable to get convert max block from DB to int. We must close the application or we might have undetected gaps." )
}
writeKnownGaps ( db , tableIncrement , maxSlot , firstSlot , fmt . Errorf ( "" ) , "startup" )
}
2022-05-06 15:03:15 +00:00
// A quick helper function to calculate the epoch.
func calculateEpoch ( slot int , slotPerEpoch int ) string {
epoch := slot / slotPerEpoch
return strconv . Itoa ( epoch )
}