ipld-eth-beacon-indexer/pkg/beaconclient/databasewrite.go
Abdul Rabbani 7e8d63c667
Feature/22 test handling incoming events (#30)
* Checkpoint before the weekend

* Update location for SetupPostgresDB

* Include first functioning tests for processing head

* Fix gitignore

* Test CaptureHead | Add Metrics | Handle Test Race Conditions

This Commit allows us to:

* Test the `CaptureHead` function.
* Test parsing a single Head message.
* Test a Reorg condition.
* Add Metrics. This is primarily used for testing but can have future use cases.
* Rearrange the test due to race conditions introduced by reusing a variable. `BeforeEach` can't be used to update `BC`.

* Update and finalize testing at this stage

* Update code and CI/CD

* Fix lint errors

* Update CICD and fail when file not found.

* Update test to have failed as expected.
2022-05-12 09:52:13 -04:00

262 lines
9.4 KiB
Go

package beaconclient
import (
"context"
"fmt"
"math/rand"
"strconv"
"time"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
var (
// Statement to upsert to the ethcl.slots table.
UpsertSlotsStmt string = `
INSERT INTO ethcl.slots (epoch, slot, block_root, state_root, status)
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (slot, block_root) DO NOTHING`
// Statement to upsert to the ethcl.signed_beacon_blocks table.
UpsertSignedBeaconBlockStmt string = `
INSERT INTO ethcl.signed_beacon_block (slot, block_root, parent_block_root, mh_key)
VALUES ($1, $2, $3, $4) ON CONFLICT (slot, block_root) DO NOTHING`
// Statement to upsert to the ethcl.beacon_state table.
UpsertBeaconState string = `
INSERT INTO ethcl.beacon_state (slot, state_root, mh_key)
VALUES ($1, $2, $3) ON CONFLICT (slot, state_root) DO NOTHING`
// Statement to upsert to the public.blocks table.
UpsertBlocksStmt string = `
INSERT INTO public.blocks (key, data)
VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
UpdateForkedStmt string = `UPDATE ethcl.slots
SET status='forked'
WHERE slot=$1 AND block_root<>$2
RETURNING block_root;`
UpdateProposedStmt string = `UPDATE ethcl.slots
SET status='proposed'
WHERE slot=$1 AND block_root=$2
RETURNING block_root;`
CheckProposedStmt string = `SELECT slot, block_root
FROM ethcl.slots
WHERE slot=$1 AND block_root=$2;`
)
// Put all functionality to prepare the write object
// And write it in this file.
// Remove any of it from the processslot file.
type DatabaseWriter struct {
Db sql.Database
Metrics *BeaconClientMetrics
DbSlots *DbSlots
DbSignedBeaconBlock *DbSignedBeaconBlock
DbBeaconState *DbBeaconState
rawBeaconState []byte
rawSignedBeaconBlock []byte
}
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, status string, metrics *BeaconClientMetrics) *DatabaseWriter {
dw := &DatabaseWriter{
Db: db,
Metrics: metrics,
}
dw.prepareSlotsModel(slot, stateRoot, blockRoot, status)
dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot)
dw.prepareBeaconStateModel(slot, stateRoot)
return dw
}
// Write functions to write each all together...
// Should I do one atomic write?
// Create the model for the ethcl.slots table
func (dw *DatabaseWriter) prepareSlotsModel(slot int, stateRoot string, blockRoot string, status string) {
dw.DbSlots = &DbSlots{
Epoch: calculateEpoch(slot, bcSlotsPerEpoch),
Slot: strconv.Itoa(slot),
StateRoot: stateRoot,
BlockRoot: blockRoot,
Status: status,
}
log.Debug("dw.DbSlots: ", dw.DbSlots)
}
// Create the model for the ethcl.signed_beacon_block table.
func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot int, blockRoot string, parentBlockRoot string) {
dw.DbSignedBeaconBlock = &DbSignedBeaconBlock{
Slot: strconv.Itoa(slot),
BlockRoot: blockRoot,
ParentBlock: parentBlockRoot,
MhKey: calculateMhKey(),
}
log.Debug("dw.DbSignedBeaconBlock: ", dw.DbSignedBeaconBlock)
}
// Create the model for the ethcl.beacon_state table.
func (dw *DatabaseWriter) prepareBeaconStateModel(slot int, stateRoot string) {
dw.DbBeaconState = &DbBeaconState{
Slot: strconv.Itoa(slot),
StateRoot: stateRoot,
MhKey: calculateMhKey(),
}
log.Debug("dw.DbBeaconState: ", dw.DbBeaconState)
}
// Write all the data for a given slot.
func (dw *DatabaseWriter) writeFullSlot() {
// Add errors for each function call
// If an error occurs, write to knownGaps table.
dw.writeSlots()
dw.writeSignedBeaconBlocks()
dw.writeBeaconState()
dw.Metrics.IncrementHeadTrackingInserts(1)
}
// Write the information for the generic slots table. For now this is only one function.
// But in the future if we need to incorporate any FK's or perform any actions to write to the
// slots table we can do it all here.
func (dw *DatabaseWriter) writeSlots() {
dw.upsertSlots()
}
// Upsert to the ethcl.slots table.
func (dw *DatabaseWriter) upsertSlots() {
_, err := dw.Db.Exec(context.Background(), UpsertSlotsStmt, dw.DbSlots.Epoch, dw.DbSlots.Slot, dw.DbSlots.BlockRoot, dw.DbSlots.StateRoot, dw.DbSlots.Status)
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.slots table")
}
}
// Write the information for the signed_beacon_block.
func (dw *DatabaseWriter) writeSignedBeaconBlocks() {
dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock)
dw.upsertSignedBeaconBlock()
}
// Upsert to public.blocks.
func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) {
_, err := dw.Db.Exec(context.Background(), UpsertBlocksStmt, key, data)
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table")
}
}
// Upsert to the ethcl.signed_beacon_block table.
func (dw *DatabaseWriter) upsertSignedBeaconBlock() {
_, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.MhKey)
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table")
}
}
// Write the information for the beacon_state.
func (dw *DatabaseWriter) writeBeaconState() {
dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState)
dw.upsertBeaconState()
}
// Upsert to the ethcl.beacon_state table.
func (dw *DatabaseWriter) upsertBeaconState() {
_, err := dw.Db.Exec(context.Background(), UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey)
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table")
}
}
// Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot.
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
forkCount, err := updateForked(db, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.")
// Add to knownGaps Table
}
proposedCount, err := updateProposed(db, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.")
// Add to knownGaps Table
}
if forkCount > 0 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"forkCount": forkCount,
}).Info("Updated rows that were forked.")
} else {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"forkCount": forkCount,
}).Warn("There were no forked rows to update.")
}
if proposedCount == 1 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": proposedCount,
}).Info("Updated the row that should have been marked as proposed.")
} else if proposedCount > 1 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": proposedCount,
}).Error("Too many rows were marked as proposed!")
} else if proposedCount == 0 {
var count int
err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.")
}
if count != 1 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": count,
}).Warn("The proposed block was not marked as proposed...")
} else {
loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
}
}
metrics.IncrementHeadTrackingReorgs(1)
}
// Update the slots table by marking the old slot's as forked.
func updateForked(db sql.Database, slot string, latestBlockRoot string) (int64, error) {
res, err := db.Exec(context.Background(), UpdateForkedStmt, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the forked slots")
return 0, err
}
count, err := res.RowsAffected()
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were marked as forked.")
return 0, err
}
return count, err
}
func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64, error) {
res, err := db.Exec(context.Background(), UpdateProposedStmt, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the proposed slot.")
return 0, err
}
count, err := res.RowsAffected()
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were marked as proposed")
return 0, err
}
return count, err
}
// Dummy function for calculating the mhKey.
func calculateMhKey() string {
rand.Seed(time.Now().UnixNano())
b := make([]byte, 10)
rand.Read(b)
return fmt.Sprintf("%x", b)[:10]
}
// A quick helper function to calculate the epoch.
func calculateEpoch(slot int, slotPerEpoch int) string {
epoch := slot / slotPerEpoch
return strconv.Itoa(epoch)
}