Slight Performance improvements with Logging (#67) #68

Merged
abdulrabbani00 merged 1 commits from develop into main 2022-06-15 15:51:38 +00:00
11 changed files with 140 additions and 63 deletions

View File

@ -49,6 +49,7 @@ jobs:
echo vulcanize_ipld_eth_beacon_db=$GITHUB_WORKSPACE/ipld-eth-beacon-db/ > ./config.sh echo vulcanize_ipld_eth_beacon_db=$GITHUB_WORKSPACE/ipld-eth-beacon-db/ > ./config.sh
echo vulcanize_ipld_eth_beacon_indexer=$GITHUB_WORKSPACE/ipld-eth-beacon-indexer >> ./config.sh echo vulcanize_ipld_eth_beacon_indexer=$GITHUB_WORKSPACE/ipld-eth-beacon-indexer >> ./config.sh
echo eth_beacon_config_file=$GITHUB_WORKSPACE/ipld-eth-beacon-indexer/config/cicd/boot.ipld-eth-beacon-indexer.json >> ./config.sh echo eth_beacon_config_file=$GITHUB_WORKSPACE/ipld-eth-beacon-indexer/config/cicd/boot.ipld-eth-beacon-indexer.json >> ./config.sh
echo eth_beacon_capture_mode=boot >> ./config.sh
cat ./config.sh cat ./config.sh
- name: Run docker compose - name: Run docker compose
@ -159,7 +160,6 @@ jobs:
run: | run: |
echo vulcanize_ipld_eth_beacon_db=$GITHUB_WORKSPACE/ipld-eth-beacon-db/ > ./config.sh echo vulcanize_ipld_eth_beacon_db=$GITHUB_WORKSPACE/ipld-eth-beacon-db/ > ./config.sh
echo vulcanize_ipld_eth_beacon_indexer=$GITHUB_WORKSPACE/ipld-eth-beacon-indexer >> ./config.sh echo vulcanize_ipld_eth_beacon_indexer=$GITHUB_WORKSPACE/ipld-eth-beacon-indexer >> ./config.sh
echo eth_beacon_capture_mode=boot >> ./config.sh
cat ./config.sh cat ./config.sh
- name: Run docker compose - name: Run docker compose

2
.gitignore vendored
View File

@ -8,4 +8,6 @@ temp/*
pkg/beaconclient/ssz-data/ pkg/beaconclient/ssz-data/
*.test *.test
ipld-eth-beacon-indexer.log ipld-eth-beacon-indexer.log
ipld-eth-beacon-indexer
config/local.ipld-eth-beacon-indexer-config.json config/local.ipld-eth-beacon-indexer-config.json
config/docker.ipld-eth-beacon-indexer-config.json

View File

@ -1,7 +1,7 @@
FROM golang:1.18-alpine as builder FROM golang:1.18-alpine as builder
WORKDIR /go/src/github.com/vulcanize/ipld-eth-beacon-indexer WORKDIR /go/src/github.com/vulcanize/ipld-eth-beacon-indexer
RUN apk --no-cache add ca-certificates make git g++ linux-headers RUN apk --no-cache add ca-certificates make git g++ linux-headers libstdc++
ENV GO111MODULE=on ENV GO111MODULE=on
COPY go.mod . COPY go.mod .
@ -9,11 +9,11 @@ COPY go.sum .
RUN go mod tidy; go mod download RUN go mod tidy; go mod download
COPY . . COPY . .
RUN GCO_ENABLED=0 GOOS=linux go build -race -a -installsuffix cgo -ldflags '-extldflags "-static"' -o ipld-eth-beacon-indexer . RUN GCO_ENABLED=0 GOOS=linux go build -race -ldflags="-s -w" -o ipld-eth-beacon-indexer .
RUN chmod +x ipld-eth-beacon-indexer RUN chmod +x ipld-eth-beacon-indexer
FROM frolvlad/alpine-bash:latest FROM frolvlad/alpine-bash:latest
RUN apk --no-cache add ca-certificates RUN apk --no-cache add ca-certificates libstdc++
WORKDIR /root/ WORKDIR /root/
COPY --from=builder /go/src/github.com/vulcanize/ipld-eth-beacon-indexer/ipld-eth-beacon-indexer /root/ipld-eth-beacon-indexer COPY --from=builder /go/src/github.com/vulcanize/ipld-eth-beacon-indexer/ipld-eth-beacon-indexer /root/ipld-eth-beacon-indexer
ADD entrypoint.sh . ADD entrypoint.sh .

View File

@ -67,6 +67,7 @@ unit-test-local:
go fmt ./... go fmt ./...
$(GINKGO) -r --label-filter unit \ $(GINKGO) -r --label-filter unit \
--randomize-all --randomize-suites \ --randomize-all --randomize-suites \
--flake-attempts=3 \
--fail-on-pending --keep-going \ --fail-on-pending --keep-going \
--trace --trace
@ -75,7 +76,8 @@ unit-test-ci:
go vet ./... go vet ./...
go fmt ./... go fmt ./...
$(GINKGO) -r --label-filter unit \ $(GINKGO) -r --label-filter unit \
--randomize-all --randomize-suites \ --randomize-all --randomize-suites
--flake-attempts=3 \
--fail-on-pending --keep-going \ --fail-on-pending --keep-going \
--cover --coverprofile=cover.profile \ --cover --coverprofile=cover.profile \
--trace --json-report=report.json --trace --json-report=report.json

View File

@ -5,19 +5,20 @@ echo "Starting ipld-eth-beacon-indexer"
echo /root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output echo /root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output
/root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output if [ ${CAPTURE_MODE} == "boot" ]; then
rv=$? /root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output
rv=$?
if [ $rv != 0 ]; then if [ $rv != 0 ]; then
echo "ipld-eth-beacon-indexer failed" echo "ipld-eth-beacon-indexer boot failed"
echo $rv > /root/HEALTH else
echo $rv echo "ipld-eth-beacon-indexer boot succeeded"
cat /root/ipld-eth-beacon-indexer.output fi
echo $rv > /root/HEALTH
echo $rv
cat /root/ipld-eth-beacon-indexer.output
tail -f /dev/null
else else
echo "ipld-eth-beacon-indexer succeeded" exec /root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output
echo $rv > /root/HEALTH
echo $rv
cat /root/ipld-eth-beacon-indexer.output
fi fi
tail -f /dev/null

View File

@ -31,7 +31,7 @@ import (
func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []error { func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []error {
log.Info("We are starting the historical processing service.") log.Info("We are starting the historical processing service.")
bc.HistoricalProcess = HistoricProcessing{db: bc.Db, metrics: bc.Metrics, uniqueNodeIdentifier: bc.UniqueNodeIdentifier} bc.HistoricalProcess = HistoricProcessing{db: bc.Db, metrics: bc.Metrics, uniqueNodeIdentifier: bc.UniqueNodeIdentifier}
errs := handleBatchProcess(ctx, maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb) errs := handleBatchProcess(ctx, maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb, bc.Metrics.IncrementHistoricSlotProcessed)
log.Debug("Exiting Historical") log.Debug("Exiting Historical")
return errs return errs
} }
@ -39,7 +39,8 @@ func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []e
// This function will perform all the necessary clean up tasks for stopping historical processing. // This function will perform all the necessary clean up tasks for stopping historical processing.
func (bc *BeaconClient) StopHistoric(cancel context.CancelFunc) error { func (bc *BeaconClient) StopHistoric(cancel context.CancelFunc) error {
log.Info("We are stopping the historical processing service.") log.Info("We are stopping the historical processing service.")
err := bc.HistoricalProcess.releaseDbLocks(cancel) cancel()
err := bc.HistoricalProcess.releaseDbLocks()
if err != nil { if err != nil {
loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.historic_processing table. Manual Intervention is needed!") loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.historic_processing table. Manual Intervention is needed!")
} }
@ -55,7 +56,7 @@ type BatchProcessing interface {
getSlotRange(context.Context, chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write. getSlotRange(context.Context, chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write.
handleProcessingErrors(context.Context, <-chan batchHistoricError) // Custom logic to handle errors. handleProcessingErrors(context.Context, <-chan batchHistoricError) // Custom logic to handle errors.
removeTableEntry(context.Context, <-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database. removeTableEntry(context.Context, <-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database.
releaseDbLocks(context.CancelFunc) error // Update the checked_out column to false for whatever table is being updated. releaseDbLocks() error // Update the checked_out column to false for whatever table is being updated.
} }
/// ^^^ /// ^^^
@ -90,19 +91,24 @@ type batchHistoricError struct {
// 4. Remove the slot entry from the DB. // 4. Remove the slot entry from the DB.
// //
// 5. Handle any errors. // 5. Handle any errors.
func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool) []error { func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) []error {
slotsCh := make(chan slotsToProcess) slotsCh := make(chan slotsToProcess)
workCh := make(chan int) workCh := make(chan int)
processedCh := make(chan slotsToProcess) processedCh := make(chan slotsToProcess)
errCh := make(chan batchHistoricError) errCh := make(chan batchHistoricError)
finalErrCh := make(chan []error, 1) finalErrCh := make(chan []error, 1)
// Checkout Rows with same node Identifier.
err := bp.releaseDbLocks()
if err != nil {
loghelper.LogError(err).Error(("We are unable to un-checkout entries at the start!"))
}
// Start workers // Start workers
for w := 1; w <= maxWorkers; w++ { for w := 1; w <= maxWorkers; w++ {
log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers") log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers")
// Pass in function to increment metric! KnownGapProcessing or HistoricProcessing go processSlotRangeWorker(ctx, workCh, errCh, db, serverEndpoint, metrics, checkDb, incrementTracker)
go processSlotRangeWorker(ctx, workCh, errCh, db, serverEndpoint, metrics, checkDb)
} }
// Process all ranges and send each individual slot to the worker. // Process all ranges and send each individual slot to the worker.

View File

@ -90,12 +90,12 @@ type DatabaseWriter struct {
DbSlots *DbSlots DbSlots *DbSlots
DbSignedBeaconBlock *DbSignedBeaconBlock DbSignedBeaconBlock *DbSignedBeaconBlock
DbBeaconState *DbBeaconState DbBeaconState *DbBeaconState
rawBeaconState []byte rawBeaconState *[]byte
rawSignedBeaconBlock []byte rawSignedBeaconBlock *[]byte
} }
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string,
eth1BlockHash string, status string, rawSignedBeaconBlock []byte, rawBeaconState []byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) { eth1BlockHash string, status string, rawSignedBeaconBlock *[]byte, rawBeaconState *[]byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) {
ctx := context.Background() ctx := context.Background()
tx, err := db.Begin(ctx) tx, err := db.Begin(ctx)
if err != nil { if err != nil {
@ -194,6 +194,7 @@ func (dw *DatabaseWriter) transactFullSlot() error {
// return err // return err
//} //}
// Might want to seperate writing to public.blocks so we can do this concurrently... // Might want to seperate writing to public.blocks so we can do this concurrently...
// Cant concurrently write because we are using a transaction.
err := dw.transactSignedBeaconBlocks() err := dw.transactSignedBeaconBlocks()
if err != nil { if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the eth_beacon block table...") loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the eth_beacon block table...")
@ -240,8 +241,8 @@ func (dw *DatabaseWriter) transactSignedBeaconBlocks() error {
} }
// Upsert to public.blocks. // Upsert to public.blocks.
func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error { func (dw *DatabaseWriter) upsertPublicBlocks(key string, data *[]byte) error {
_, err := dw.Tx.Exec(dw.Ctx, UpsertBlocksStmt, key, data) _, err := dw.Tx.Exec(dw.Ctx, UpsertBlocksStmt, key, *data)
if err != nil { if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table") loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table")
return err return err

View File

@ -54,6 +54,10 @@ func CreateBeaconClientMetrics() (*BeaconClientMetrics, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = prometheusRegisterHelper("historic_slots_processed", "Keeps track of the number of historic slots we successfully processed.", &metrics.HistoricSlotProcessed)
if err != nil {
return nil, err
}
err = prometheusRegisterHelper("head_error", "Keeps track of the number of errors we had processing head messages.", &metrics.HeadError) err = prometheusRegisterHelper("head_error", "Keeps track of the number of errors we had processing head messages.", &metrics.HeadError)
if err != nil { if err != nil {
return nil, err return nil, err
@ -91,6 +95,7 @@ type BeaconClientMetrics struct {
KnownGapsInserts uint64 // Number of known_gaps we successfully wrote to the DB. KnownGapsInserts uint64 // Number of known_gaps we successfully wrote to the DB.
KnownGapsProcessed uint64 // Number of knownGaps processed. KnownGapsProcessed uint64 // Number of knownGaps processed.
KnownGapsReprocessError uint64 // Number of knownGaps that were updated with an error. KnownGapsReprocessError uint64 // Number of knownGaps that were updated with an error.
HistoricSlotProcessed uint64 // Number of historic slots successfully processed.
HeadError uint64 // Number of errors that occurred when decoding the head message. HeadError uint64 // Number of errors that occurred when decoding the head message.
HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message. HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message.
} }
@ -135,6 +140,11 @@ func (m *BeaconClientMetrics) IncrementReorgError(inc uint64) {
// Wrapper function to increment the number of knownGaps that were updated with reprocessing errors. // Wrapper function to increment the number of knownGaps that were updated with reprocessing errors.
//If we want to use mutexes later we can easily update all occurrences here. //If we want to use mutexes later we can easily update all occurrences here.
func (m *BeaconClientMetrics) IncrementKnownGapsReprocessError(inc uint64) { func (m *BeaconClientMetrics) IncrementKnownGapsReprocessError(inc uint64) {
log.Debug("Incrementing Known Gap Reprocessing: ", &m.KnownGapsReprocessError)
atomic.AddUint64(&m.KnownGapsReprocessError, inc) atomic.AddUint64(&m.KnownGapsReprocessError, inc)
} }
// Wrapper function to increment the number of historicSlots that were processed successfully.
// If we want to use mutexes later we can easily update all occurrences here.
func (m *BeaconClientMetrics) IncrementHistoricSlotProcessed(inc uint64) {
atomic.AddUint64(&m.HistoricSlotProcessed, inc)
}

View File

@ -81,11 +81,8 @@ func (hp HistoricProcessing) handleProcessingErrors(ctx context.Context, errMess
} }
// "un"-checkout the rows held by this DB in the eth_beacon.historical_process table. // "un"-checkout the rows held by this DB in the eth_beacon.historical_process table.
func (hp HistoricProcessing) releaseDbLocks(cancel context.CancelFunc) error { func (hp HistoricProcessing) releaseDbLocks() error {
cancel()
log.Debug("Updating all the entries to eth_beacon.historical processing") log.Debug("Updating all the entries to eth_beacon.historical processing")
log.Debug("Db: ", hp.db)
log.Debug("hp.uniqueNodeIdentifier ", hp.uniqueNodeIdentifier)
res, err := hp.db.Exec(context.Background(), releaseHpLockStmt, hp.uniqueNodeIdentifier) res, err := hp.db.Exec(context.Background(), releaseHpLockStmt, hp.uniqueNodeIdentifier)
if err != nil { if err != nil {
return fmt.Errorf("Unable to remove lock from eth_beacon.historical_processing table for node %d, error is %e", hp.uniqueNodeIdentifier, err) return fmt.Errorf("Unable to remove lock from eth_beacon.historical_processing table for node %d, error is %e", hp.uniqueNodeIdentifier, err)
@ -100,7 +97,7 @@ func (hp HistoricProcessing) releaseDbLocks(cancel context.CancelFunc) error {
} }
// Process the slot range. // Process the slot range.
func processSlotRangeWorker(ctx context.Context, workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics, checkDb bool) { func processSlotRangeWorker(ctx context.Context, workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -115,6 +112,8 @@ func processSlotRangeWorker(ctx context.Context, workCh <-chan int, errCh chan<-
slot: slot, slot: slot,
} }
errCh <- errMs errCh <- errMs
} else {
incrementTracker(1)
} }
} }
} }
@ -149,7 +148,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
errCount = append(errCount, err) errCount = append(errCount, err)
} }
if row < 1 { if row < 1 {
time.Sleep(1000 * time.Millisecond) time.Sleep(3 * time.Second)
log.Debug("We are checking rows, be patient") log.Debug("We are checking rows, be patient")
break break
} }
@ -176,7 +175,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
err = tx.QueryRow(dbCtx, getStartEndSlotStmt).Scan(&sp.startSlot, &sp.endSlot) err = tx.QueryRow(dbCtx, getStartEndSlotStmt).Scan(&sp.startSlot, &sp.endSlot)
if err != nil { if err != nil {
if err == pgx.ErrNoRows { if err == pgx.ErrNoRows {
time.Sleep(100 * time.Millisecond) time.Sleep(1 * time.Second)
break break
} }
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), getStartEndSlotStmt, err).Error("Unable to get a row") loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), getStartEndSlotStmt, err).Error("Unable to get a row")
@ -253,7 +252,7 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan
if isStartProcess && isEndProcess { if isStartProcess && isEndProcess {
break break
} }
time.Sleep(1000 * time.Millisecond) time.Sleep(3 * time.Second)
} }
_, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot)) _, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot))

View File

@ -61,7 +61,7 @@ type KnownGapsProcessing struct {
func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) []error { func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) []error {
log.Info("We are starting the known gaps processing service.") log.Info("We are starting the known gaps processing service.")
bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics} bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics}
errs := handleBatchProcess(ctx, maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb) errs := handleBatchProcess(ctx, maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb, bc.Metrics.IncrementKnownGapsProcessed)
log.Debug("Exiting known gaps processing service") log.Debug("Exiting known gaps processing service")
return errs return errs
} }
@ -69,7 +69,8 @@ func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) []
// This function will perform all the necessary clean up tasks for stopping historical processing. // This function will perform all the necessary clean up tasks for stopping historical processing.
func (bc *BeaconClient) StopKnownGapsProcessing(cancel context.CancelFunc) error { func (bc *BeaconClient) StopKnownGapsProcessing(cancel context.CancelFunc) error {
log.Info("We are stopping the known gaps processing service.") log.Info("We are stopping the known gaps processing service.")
err := bc.KnownGapsProcess.releaseDbLocks(cancel) cancel()
err := bc.KnownGapsProcess.releaseDbLocks()
if err != nil { if err != nil {
loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.known_gaps table. Manual Intervention is needed!") loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.known_gaps table. Manual Intervention is needed!")
} }
@ -121,11 +122,8 @@ func (kgp KnownGapsProcessing) handleProcessingErrors(ctx context.Context, errMe
} }
// Updated checked_out column for the uniqueNodeIdentifier. // Updated checked_out column for the uniqueNodeIdentifier.
func (kgp KnownGapsProcessing) releaseDbLocks(cancel context.CancelFunc) error { func (kgp KnownGapsProcessing) releaseDbLocks() error {
cancel()
log.Debug("Updating all the entries to eth_beacon.known_gaps") log.Debug("Updating all the entries to eth_beacon.known_gaps")
log.Debug("Db: ", kgp.db)
log.Debug("kgp.uniqueNodeIdentifier ", kgp.uniqueNodeIdentifier)
res, err := kgp.db.Exec(context.Background(), releaseKgLockStmt, kgp.uniqueNodeIdentifier) res, err := kgp.db.Exec(context.Background(), releaseKgLockStmt, kgp.uniqueNodeIdentifier)
if err != nil { if err != nil {
return err return err

View File

@ -25,6 +25,7 @@ import (
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4"
si "github.com/prysmaticlabs/prysm/consensus-types/interfaces" si "github.com/prysmaticlabs/prysm/consensus-types/interfaces"
@ -50,15 +51,16 @@ var (
type ProcessSlot struct { type ProcessSlot struct {
// Generic // Generic
Slot int // The slot number. Slot int // The slot number.
Epoch int // The epoch number. Epoch int // The epoch number.
BlockRoot string // The hex encoded string of the BlockRoot. BlockRoot string // The hex encoded string of the BlockRoot.
StateRoot string // The hex encoded string of the StateRoot. StateRoot string // The hex encoded string of the StateRoot.
ParentBlockRoot string // The hex encoded string of the parent block. ParentBlockRoot string // The hex encoded string of the parent block.
Status string // The status of the block Status string // The status of the block
HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and skipped slots. HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and skipped slots.
Db sql.Database // The DB object used to write to the DB. Db sql.Database // The DB object used to write to the DB.
Metrics *BeaconClientMetrics // An object to keep track of the beaconclient metrics Metrics *BeaconClientMetrics // An object to keep track of the beaconclient metrics
PerformanceMetrics PerformanceMetrics // An object to keep track of performance metrics.
// BeaconBlock // BeaconBlock
SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock
@ -74,6 +76,19 @@ type ProcessSlot struct {
DbBeaconState *DbBeaconState // The model being written to the state table. DbBeaconState *DbBeaconState // The model being written to the state table.
} }
type PerformanceMetrics struct {
BeaconNodeBlockRetrievalTime time.Duration // How long it took to get the BeaconBlock from the Beacon Node.
BeaconNodeStateRetrievalTime time.Duration // How long it took to get the BeaconState from the Beacon Node.
ParseBeaconObjectForHash time.Duration // How long it took to get some information from the beacon objects.
CheckDbPreProcessing time.Duration // How long it takes to check the DB before processing a block.
CreateDbWriteObject time.Duration // How long it takes to create a DB write object.
TransactSlotOnly time.Duration // How long it takes to transact the slot information only.
CheckReorg time.Duration // How long it takes to check for Reorgs
CommitTransaction time.Duration // How long it takes to commit the final transaction.
TotalDbTransaction time.Duration // How long it takes from start to committing the entire DB transaction.
TotalProcessing time.Duration // How long it took to process the entire slot.
}
// This function will do all the work to process the slot and write it to the DB. // This function will do all the work to process the slot and write it to the DB.
// It will return the error and error process. The error process is used for providing reach detail to the // It will return the error and error process. The error process is used for providing reach detail to the
// known_gaps table. // known_gaps table.
@ -82,6 +97,7 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
case <-ctx.Done(): case <-ctx.Done():
return nil, "" return nil, ""
default: default:
totalStart := time.Now()
ps := &ProcessSlot{ ps := &ProcessSlot{
Slot: slot, Slot: slot,
BlockRoot: blockRoot, BlockRoot: blockRoot,
@ -89,6 +105,18 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
HeadOrHistoric: headOrHistoric, HeadOrHistoric: headOrHistoric,
Db: db, Db: db,
Metrics: metrics, Metrics: metrics,
PerformanceMetrics: PerformanceMetrics{
BeaconNodeBlockRetrievalTime: 0,
BeaconNodeStateRetrievalTime: 0,
ParseBeaconObjectForHash: 0,
CheckDbPreProcessing: 0,
CreateDbWriteObject: 0,
TransactSlotOnly: 0,
CheckReorg: 0,
CommitTransaction: 0,
TotalDbTransaction: 0,
TotalProcessing: 0,
},
} }
g, _ := errgroup.WithContext(context.Background()) g, _ := errgroup.WithContext(context.Background())
@ -100,10 +128,12 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
default: default:
start := time.Now()
err := ps.getBeaconState(serverAddress, vUnmarshalerCh) err := ps.getBeaconState(serverAddress, vUnmarshalerCh)
if err != nil { if err != nil {
return err return err
} }
ps.PerformanceMetrics.BeaconNodeStateRetrievalTime = time.Since(start)
return nil return nil
} }
}) })
@ -114,10 +144,12 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
default: default:
start := time.Now()
err := ps.getSignedBeaconBlock(serverAddress, vUnmarshalerCh) err := ps.getSignedBeaconBlock(serverAddress, vUnmarshalerCh)
if err != nil { if err != nil {
return err return err
} }
ps.PerformanceMetrics.BeaconNodeBlockRetrievalTime = time.Since(start)
return nil return nil
} }
}) })
@ -126,11 +158,15 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
return err, "processSlot" return err, "processSlot"
} }
parseBeaconTime := time.Now()
finalBlockRoot, finalStateRoot, finalEth1BlockHash, err := ps.provideFinalHash() finalBlockRoot, finalStateRoot, finalEth1BlockHash, err := ps.provideFinalHash()
if err != nil { if err != nil {
return err, "CalculateBlockRoot" return err, "CalculateBlockRoot"
} }
ps.PerformanceMetrics.ParseBeaconObjectForHash = time.Since(parseBeaconTime)
if checkDb { if checkDb {
checkDbTime := time.Now()
inDb, err := IsSlotInDb(ctx, ps.Db, strconv.Itoa(ps.Slot), finalBlockRoot, finalStateRoot) inDb, err := IsSlotInDb(ctx, ps.Db, strconv.Itoa(ps.Slot), finalBlockRoot, finalStateRoot)
if err != nil { if err != nil {
return err, "checkDb" return err, "checkDb"
@ -139,26 +175,35 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
log.WithField("slot", slot).Info("Slot already in the DB.") log.WithField("slot", slot).Info("Slot already in the DB.")
return nil, "" return nil, ""
} }
ps.PerformanceMetrics.CheckDbPreProcessing = time.Since(checkDbTime)
} }
// Get this object ready to write // Get this object ready to write
createDbWriteTime := time.Now()
dw, err := ps.createWriteObjects(finalBlockRoot, finalStateRoot, finalEth1BlockHash) dw, err := ps.createWriteObjects(finalBlockRoot, finalStateRoot, finalEth1BlockHash)
if err != nil { if err != nil {
return err, "blockRoot" return err, "blockRoot"
} }
ps.PerformanceMetrics.CreateDbWriteObject = time.Since(createDbWriteTime)
// Write the object to the DB. // Write the object to the DB.
dbFullTransactionTime := time.Now()
defer func() { defer func() {
err := dw.Tx.Rollback(dw.Ctx) err := dw.Tx.Rollback(dw.Ctx)
if err != nil && err != pgx.ErrTxClosed { if err != nil && err != pgx.ErrTxClosed {
loghelper.LogError(err).Error("We were unable to Rollback a transaction") loghelper.LogError(err).Error("We were unable to Rollback a transaction")
} }
}() }()
transactionTime := time.Now()
err = dw.transactFullSlot() err = dw.transactFullSlot()
if err != nil { if err != nil {
return err, "processSlot" return err, "processSlot"
} }
ps.PerformanceMetrics.TransactSlotOnly = time.Since(transactionTime)
// Handle any reorgs or skipped slots. // Handle any reorgs or skipped slots.
reorgTime := time.Now()
headOrHistoric = strings.ToLower(headOrHistoric) headOrHistoric = strings.ToLower(headOrHistoric)
if headOrHistoric != "head" && headOrHistoric != "historic" { if headOrHistoric != "head" && headOrHistoric != "historic" {
return fmt.Errorf("headOrHistoric must be either historic or head!"), "" return fmt.Errorf("headOrHistoric must be either historic or head!"), ""
@ -166,11 +211,23 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" { if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" {
ps.checkPreviousSlot(dw.Tx, dw.Ctx, previousSlot, previousBlockRoot, knownGapsTableIncrement) ps.checkPreviousSlot(dw.Tx, dw.Ctx, previousSlot, previousBlockRoot, knownGapsTableIncrement)
} }
ps.PerformanceMetrics.CheckReorg = time.Since(reorgTime)
// Commit the transaction // Commit the transaction
commitTime := time.Now()
if err = dw.Tx.Commit(dw.Ctx); err != nil { if err = dw.Tx.Commit(dw.Ctx); err != nil {
return err, "transactionCommit" return err, "transactionCommit"
} }
ps.PerformanceMetrics.CommitTransaction = time.Since(commitTime)
// Total metric capture time.
ps.PerformanceMetrics.TotalDbTransaction = time.Since(dbFullTransactionTime)
ps.PerformanceMetrics.TotalProcessing = time.Since(totalStart)
log.WithFields(log.Fields{
"slot": slot,
"performanceMetrics": fmt.Sprintf("%+v\n", ps.PerformanceMetrics),
}).Debug("Performance Metric output!")
return nil, "" return nil, ""
} }
@ -261,23 +318,24 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.Ver
// Check to make sure that the previous block we processed is the parent of the current block. // Check to make sure that the previous block we processed is the parent of the current block.
func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) { func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) {
parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().ParentRoot()) parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().ParentRoot())
if previousSlot == int(ps.FullBeaconState.Slot()) { slot := int(ps.FullBeaconState.Slot())
if previousSlot == slot {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"slot": ps.FullBeaconState.Slot(), "slot": slot,
"fork": true, "fork": true,
}).Warn("A fork occurred! The previous slot and current slot match.") }).Warn("A fork occurred! The previous slot and current slot match.")
transactReorgs(tx, ctx, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics) transactReorgs(tx, ctx, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
} else if previousSlot > int(ps.FullBeaconState.Slot()) { } else if previousSlot > slot {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"previousSlot": previousSlot, "previousSlot": previousSlot,
"curSlot": int(ps.FullBeaconState.Slot()), "curSlot": slot,
}).Warn("We noticed the previous slot is greater than the current slot.") }).Warn("We noticed the previous slot is greater than the current slot.")
} else if previousSlot+1 != int(ps.FullBeaconState.Slot()) { } else if previousSlot+1 != slot {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"previousSlot": previousSlot, "previousSlot": previousSlot,
"currentSlot": ps.FullBeaconState.Slot(), "currentSlot": slot,
}).Error("We skipped a few slots.") }).Error("We skipped a few slots.")
transactKnownGaps(tx, ctx, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot())-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics) transactKnownGaps(tx, ctx, knownGapsTableIncrement, previousSlot+1, slot-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics)
} else if previousBlockRoot != parentRoot { } else if previousBlockRoot != parentRoot {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"previousBlockRoot": previousBlockRoot, "previousBlockRoot": previousBlockRoot,
@ -298,7 +356,7 @@ func (ps *ProcessSlot) createWriteObjects(blockRoot, stateRoot, eth1BlockHash st
status = "proposed" status = "proposed"
} }
dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.SszSignedBeaconBlock, ps.SszBeaconState, ps.Metrics) dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, &ps.SszSignedBeaconBlock, &ps.SszBeaconState, ps.Metrics)
if err != nil { if err != nil {
return dw, err return dw, err
} }