Slight Performance improvements with Logging (#67) #68

Merged
abdulrabbani00 merged 1 commits from develop into main 2022-06-15 15:51:38 +00:00
11 changed files with 140 additions and 63 deletions

View File

@ -49,6 +49,7 @@ jobs:
echo vulcanize_ipld_eth_beacon_db=$GITHUB_WORKSPACE/ipld-eth-beacon-db/ > ./config.sh
echo vulcanize_ipld_eth_beacon_indexer=$GITHUB_WORKSPACE/ipld-eth-beacon-indexer >> ./config.sh
echo eth_beacon_config_file=$GITHUB_WORKSPACE/ipld-eth-beacon-indexer/config/cicd/boot.ipld-eth-beacon-indexer.json >> ./config.sh
echo eth_beacon_capture_mode=boot >> ./config.sh
cat ./config.sh
- name: Run docker compose
@ -159,7 +160,6 @@ jobs:
run: |
echo vulcanize_ipld_eth_beacon_db=$GITHUB_WORKSPACE/ipld-eth-beacon-db/ > ./config.sh
echo vulcanize_ipld_eth_beacon_indexer=$GITHUB_WORKSPACE/ipld-eth-beacon-indexer >> ./config.sh
echo eth_beacon_capture_mode=boot >> ./config.sh
cat ./config.sh
- name: Run docker compose

2
.gitignore vendored
View File

@ -8,4 +8,6 @@ temp/*
pkg/beaconclient/ssz-data/
*.test
ipld-eth-beacon-indexer.log
ipld-eth-beacon-indexer
config/local.ipld-eth-beacon-indexer-config.json
config/docker.ipld-eth-beacon-indexer-config.json

View File

@ -1,7 +1,7 @@
FROM golang:1.18-alpine as builder
WORKDIR /go/src/github.com/vulcanize/ipld-eth-beacon-indexer
RUN apk --no-cache add ca-certificates make git g++ linux-headers
RUN apk --no-cache add ca-certificates make git g++ linux-headers libstdc++
ENV GO111MODULE=on
COPY go.mod .
@ -9,11 +9,11 @@ COPY go.sum .
RUN go mod tidy; go mod download
COPY . .
RUN GCO_ENABLED=0 GOOS=linux go build -race -a -installsuffix cgo -ldflags '-extldflags "-static"' -o ipld-eth-beacon-indexer .
RUN GCO_ENABLED=0 GOOS=linux go build -race -ldflags="-s -w" -o ipld-eth-beacon-indexer .
RUN chmod +x ipld-eth-beacon-indexer
FROM frolvlad/alpine-bash:latest
RUN apk --no-cache add ca-certificates
RUN apk --no-cache add ca-certificates libstdc++
WORKDIR /root/
COPY --from=builder /go/src/github.com/vulcanize/ipld-eth-beacon-indexer/ipld-eth-beacon-indexer /root/ipld-eth-beacon-indexer
ADD entrypoint.sh .

View File

@ -67,6 +67,7 @@ unit-test-local:
go fmt ./...
$(GINKGO) -r --label-filter unit \
--randomize-all --randomize-suites \
--flake-attempts=3 \
--fail-on-pending --keep-going \
--trace
@ -75,7 +76,8 @@ unit-test-ci:
go vet ./...
go fmt ./...
$(GINKGO) -r --label-filter unit \
--randomize-all --randomize-suites \
--randomize-all --randomize-suites
--flake-attempts=3 \
--fail-on-pending --keep-going \
--cover --coverprofile=cover.profile \
--trace --json-report=report.json

View File

@ -5,19 +5,20 @@ echo "Starting ipld-eth-beacon-indexer"
echo /root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output
/root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output
rv=$?
if [ ${CAPTURE_MODE} == "boot" ]; then
/root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output
rv=$?
if [ $rv != 0 ]; then
echo "ipld-eth-beacon-indexer failed"
echo $rv > /root/HEALTH
echo $rv
cat /root/ipld-eth-beacon-indexer.output
if [ $rv != 0 ]; then
echo "ipld-eth-beacon-indexer boot failed"
else
echo "ipld-eth-beacon-indexer boot succeeded"
fi
echo $rv > /root/HEALTH
echo $rv
cat /root/ipld-eth-beacon-indexer.output
tail -f /dev/null
else
echo "ipld-eth-beacon-indexer succeeded"
echo $rv > /root/HEALTH
echo $rv
cat /root/ipld-eth-beacon-indexer.output
fi
tail -f /dev/null
exec /root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output
fi

View File

@ -31,7 +31,7 @@ import (
func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []error {
log.Info("We are starting the historical processing service.")
bc.HistoricalProcess = HistoricProcessing{db: bc.Db, metrics: bc.Metrics, uniqueNodeIdentifier: bc.UniqueNodeIdentifier}
errs := handleBatchProcess(ctx, maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb)
errs := handleBatchProcess(ctx, maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb, bc.Metrics.IncrementHistoricSlotProcessed)
log.Debug("Exiting Historical")
return errs
}
@ -39,7 +39,8 @@ func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []e
// This function will perform all the necessary clean up tasks for stopping historical processing.
func (bc *BeaconClient) StopHistoric(cancel context.CancelFunc) error {
log.Info("We are stopping the historical processing service.")
err := bc.HistoricalProcess.releaseDbLocks(cancel)
cancel()
err := bc.HistoricalProcess.releaseDbLocks()
if err != nil {
loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.historic_processing table. Manual Intervention is needed!")
}
@ -55,7 +56,7 @@ type BatchProcessing interface {
getSlotRange(context.Context, chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write.
handleProcessingErrors(context.Context, <-chan batchHistoricError) // Custom logic to handle errors.
removeTableEntry(context.Context, <-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database.
releaseDbLocks(context.CancelFunc) error // Update the checked_out column to false for whatever table is being updated.
releaseDbLocks() error // Update the checked_out column to false for whatever table is being updated.
}
/// ^^^
@ -90,19 +91,24 @@ type batchHistoricError struct {
// 4. Remove the slot entry from the DB.
//
// 5. Handle any errors.
func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool) []error {
func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) []error {
slotsCh := make(chan slotsToProcess)
workCh := make(chan int)
processedCh := make(chan slotsToProcess)
errCh := make(chan batchHistoricError)
finalErrCh := make(chan []error, 1)
// Checkout Rows with same node Identifier.
err := bp.releaseDbLocks()
if err != nil {
loghelper.LogError(err).Error(("We are unable to un-checkout entries at the start!"))
}
// Start workers
for w := 1; w <= maxWorkers; w++ {
log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers")
log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers")
// Pass in function to increment metric! KnownGapProcessing or HistoricProcessing
go processSlotRangeWorker(ctx, workCh, errCh, db, serverEndpoint, metrics, checkDb)
go processSlotRangeWorker(ctx, workCh, errCh, db, serverEndpoint, metrics, checkDb, incrementTracker)
}
// Process all ranges and send each individual slot to the worker.

View File

@ -90,12 +90,12 @@ type DatabaseWriter struct {
DbSlots *DbSlots
DbSignedBeaconBlock *DbSignedBeaconBlock
DbBeaconState *DbBeaconState
rawBeaconState []byte
rawSignedBeaconBlock []byte
rawBeaconState *[]byte
rawSignedBeaconBlock *[]byte
}
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string,
eth1BlockHash string, status string, rawSignedBeaconBlock []byte, rawBeaconState []byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) {
eth1BlockHash string, status string, rawSignedBeaconBlock *[]byte, rawBeaconState *[]byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) {
ctx := context.Background()
tx, err := db.Begin(ctx)
if err != nil {
@ -194,6 +194,7 @@ func (dw *DatabaseWriter) transactFullSlot() error {
// return err
//}
// Might want to seperate writing to public.blocks so we can do this concurrently...
// Cant concurrently write because we are using a transaction.
err := dw.transactSignedBeaconBlocks()
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the eth_beacon block table...")
@ -240,8 +241,8 @@ func (dw *DatabaseWriter) transactSignedBeaconBlocks() error {
}
// Upsert to public.blocks.
func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error {
_, err := dw.Tx.Exec(dw.Ctx, UpsertBlocksStmt, key, data)
func (dw *DatabaseWriter) upsertPublicBlocks(key string, data *[]byte) error {
_, err := dw.Tx.Exec(dw.Ctx, UpsertBlocksStmt, key, *data)
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table")
return err

View File

@ -54,6 +54,10 @@ func CreateBeaconClientMetrics() (*BeaconClientMetrics, error) {
if err != nil {
return nil, err
}
err = prometheusRegisterHelper("historic_slots_processed", "Keeps track of the number of historic slots we successfully processed.", &metrics.HistoricSlotProcessed)
if err != nil {
return nil, err
}
err = prometheusRegisterHelper("head_error", "Keeps track of the number of errors we had processing head messages.", &metrics.HeadError)
if err != nil {
return nil, err
@ -91,6 +95,7 @@ type BeaconClientMetrics struct {
KnownGapsInserts uint64 // Number of known_gaps we successfully wrote to the DB.
KnownGapsProcessed uint64 // Number of knownGaps processed.
KnownGapsReprocessError uint64 // Number of knownGaps that were updated with an error.
HistoricSlotProcessed uint64 // Number of historic slots successfully processed.
HeadError uint64 // Number of errors that occurred when decoding the head message.
HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message.
}
@ -135,6 +140,11 @@ func (m *BeaconClientMetrics) IncrementReorgError(inc uint64) {
// Wrapper function to increment the number of knownGaps that were updated with reprocessing errors.
//If we want to use mutexes later we can easily update all occurrences here.
func (m *BeaconClientMetrics) IncrementKnownGapsReprocessError(inc uint64) {
log.Debug("Incrementing Known Gap Reprocessing: ", &m.KnownGapsReprocessError)
atomic.AddUint64(&m.KnownGapsReprocessError, inc)
}
// Wrapper function to increment the number of historicSlots that were processed successfully.
// If we want to use mutexes later we can easily update all occurrences here.
func (m *BeaconClientMetrics) IncrementHistoricSlotProcessed(inc uint64) {
atomic.AddUint64(&m.HistoricSlotProcessed, inc)
}

View File

@ -81,11 +81,8 @@ func (hp HistoricProcessing) handleProcessingErrors(ctx context.Context, errMess
}
// "un"-checkout the rows held by this DB in the eth_beacon.historical_process table.
func (hp HistoricProcessing) releaseDbLocks(cancel context.CancelFunc) error {
cancel()
func (hp HistoricProcessing) releaseDbLocks() error {
log.Debug("Updating all the entries to eth_beacon.historical processing")
log.Debug("Db: ", hp.db)
log.Debug("hp.uniqueNodeIdentifier ", hp.uniqueNodeIdentifier)
res, err := hp.db.Exec(context.Background(), releaseHpLockStmt, hp.uniqueNodeIdentifier)
if err != nil {
return fmt.Errorf("Unable to remove lock from eth_beacon.historical_processing table for node %d, error is %e", hp.uniqueNodeIdentifier, err)
@ -100,7 +97,7 @@ func (hp HistoricProcessing) releaseDbLocks(cancel context.CancelFunc) error {
}
// Process the slot range.
func processSlotRangeWorker(ctx context.Context, workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics, checkDb bool) {
func processSlotRangeWorker(ctx context.Context, workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) {
for {
select {
case <-ctx.Done():
@ -115,6 +112,8 @@ func processSlotRangeWorker(ctx context.Context, workCh <-chan int, errCh chan<-
slot: slot,
}
errCh <- errMs
} else {
incrementTracker(1)
}
}
}
@ -149,7 +148,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
errCount = append(errCount, err)
}
if row < 1 {
time.Sleep(1000 * time.Millisecond)
time.Sleep(3 * time.Second)
log.Debug("We are checking rows, be patient")
break
}
@ -176,7 +175,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
err = tx.QueryRow(dbCtx, getStartEndSlotStmt).Scan(&sp.startSlot, &sp.endSlot)
if err != nil {
if err == pgx.ErrNoRows {
time.Sleep(100 * time.Millisecond)
time.Sleep(1 * time.Second)
break
}
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), getStartEndSlotStmt, err).Error("Unable to get a row")
@ -253,7 +252,7 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan
if isStartProcess && isEndProcess {
break
}
time.Sleep(1000 * time.Millisecond)
time.Sleep(3 * time.Second)
}
_, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot))

View File

@ -61,7 +61,7 @@ type KnownGapsProcessing struct {
func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) []error {
log.Info("We are starting the known gaps processing service.")
bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics}
errs := handleBatchProcess(ctx, maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb)
errs := handleBatchProcess(ctx, maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb, bc.Metrics.IncrementKnownGapsProcessed)
log.Debug("Exiting known gaps processing service")
return errs
}
@ -69,7 +69,8 @@ func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) []
// This function will perform all the necessary clean up tasks for stopping historical processing.
func (bc *BeaconClient) StopKnownGapsProcessing(cancel context.CancelFunc) error {
log.Info("We are stopping the known gaps processing service.")
err := bc.KnownGapsProcess.releaseDbLocks(cancel)
cancel()
err := bc.KnownGapsProcess.releaseDbLocks()
if err != nil {
loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.known_gaps table. Manual Intervention is needed!")
}
@ -121,11 +122,8 @@ func (kgp KnownGapsProcessing) handleProcessingErrors(ctx context.Context, errMe
}
// Updated checked_out column for the uniqueNodeIdentifier.
func (kgp KnownGapsProcessing) releaseDbLocks(cancel context.CancelFunc) error {
cancel()
func (kgp KnownGapsProcessing) releaseDbLocks() error {
log.Debug("Updating all the entries to eth_beacon.known_gaps")
log.Debug("Db: ", kgp.db)
log.Debug("kgp.uniqueNodeIdentifier ", kgp.uniqueNodeIdentifier)
res, err := kgp.db.Exec(context.Background(), releaseKgLockStmt, kgp.uniqueNodeIdentifier)
if err != nil {
return err

View File

@ -25,6 +25,7 @@ import (
"fmt"
"strconv"
"strings"
"time"
"github.com/jackc/pgx/v4"
si "github.com/prysmaticlabs/prysm/consensus-types/interfaces"
@ -50,15 +51,16 @@ var (
type ProcessSlot struct {
// Generic
Slot int // The slot number.
Epoch int // The epoch number.
BlockRoot string // The hex encoded string of the BlockRoot.
StateRoot string // The hex encoded string of the StateRoot.
ParentBlockRoot string // The hex encoded string of the parent block.
Status string // The status of the block
HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and skipped slots.
Db sql.Database // The DB object used to write to the DB.
Metrics *BeaconClientMetrics // An object to keep track of the beaconclient metrics
Slot int // The slot number.
Epoch int // The epoch number.
BlockRoot string // The hex encoded string of the BlockRoot.
StateRoot string // The hex encoded string of the StateRoot.
ParentBlockRoot string // The hex encoded string of the parent block.
Status string // The status of the block
HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and skipped slots.
Db sql.Database // The DB object used to write to the DB.
Metrics *BeaconClientMetrics // An object to keep track of the beaconclient metrics
PerformanceMetrics PerformanceMetrics // An object to keep track of performance metrics.
// BeaconBlock
SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock
@ -74,6 +76,19 @@ type ProcessSlot struct {
DbBeaconState *DbBeaconState // The model being written to the state table.
}
type PerformanceMetrics struct {
BeaconNodeBlockRetrievalTime time.Duration // How long it took to get the BeaconBlock from the Beacon Node.
BeaconNodeStateRetrievalTime time.Duration // How long it took to get the BeaconState from the Beacon Node.
ParseBeaconObjectForHash time.Duration // How long it took to get some information from the beacon objects.
CheckDbPreProcessing time.Duration // How long it takes to check the DB before processing a block.
CreateDbWriteObject time.Duration // How long it takes to create a DB write object.
TransactSlotOnly time.Duration // How long it takes to transact the slot information only.
CheckReorg time.Duration // How long it takes to check for Reorgs
CommitTransaction time.Duration // How long it takes to commit the final transaction.
TotalDbTransaction time.Duration // How long it takes from start to committing the entire DB transaction.
TotalProcessing time.Duration // How long it took to process the entire slot.
}
// This function will do all the work to process the slot and write it to the DB.
// It will return the error and error process. The error process is used for providing reach detail to the
// known_gaps table.
@ -82,6 +97,7 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
case <-ctx.Done():
return nil, ""
default:
totalStart := time.Now()
ps := &ProcessSlot{
Slot: slot,
BlockRoot: blockRoot,
@ -89,6 +105,18 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
HeadOrHistoric: headOrHistoric,
Db: db,
Metrics: metrics,
PerformanceMetrics: PerformanceMetrics{
BeaconNodeBlockRetrievalTime: 0,
BeaconNodeStateRetrievalTime: 0,
ParseBeaconObjectForHash: 0,
CheckDbPreProcessing: 0,
CreateDbWriteObject: 0,
TransactSlotOnly: 0,
CheckReorg: 0,
CommitTransaction: 0,
TotalDbTransaction: 0,
TotalProcessing: 0,
},
}
g, _ := errgroup.WithContext(context.Background())
@ -100,10 +128,12 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
case <-ctx.Done():
return nil
default:
start := time.Now()
err := ps.getBeaconState(serverAddress, vUnmarshalerCh)
if err != nil {
return err
}
ps.PerformanceMetrics.BeaconNodeStateRetrievalTime = time.Since(start)
return nil
}
})
@ -114,10 +144,12 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
case <-ctx.Done():
return nil
default:
start := time.Now()
err := ps.getSignedBeaconBlock(serverAddress, vUnmarshalerCh)
if err != nil {
return err
}
ps.PerformanceMetrics.BeaconNodeBlockRetrievalTime = time.Since(start)
return nil
}
})
@ -126,11 +158,15 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
return err, "processSlot"
}
parseBeaconTime := time.Now()
finalBlockRoot, finalStateRoot, finalEth1BlockHash, err := ps.provideFinalHash()
if err != nil {
return err, "CalculateBlockRoot"
}
ps.PerformanceMetrics.ParseBeaconObjectForHash = time.Since(parseBeaconTime)
if checkDb {
checkDbTime := time.Now()
inDb, err := IsSlotInDb(ctx, ps.Db, strconv.Itoa(ps.Slot), finalBlockRoot, finalStateRoot)
if err != nil {
return err, "checkDb"
@ -139,26 +175,35 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
log.WithField("slot", slot).Info("Slot already in the DB.")
return nil, ""
}
ps.PerformanceMetrics.CheckDbPreProcessing = time.Since(checkDbTime)
}
// Get this object ready to write
createDbWriteTime := time.Now()
dw, err := ps.createWriteObjects(finalBlockRoot, finalStateRoot, finalEth1BlockHash)
if err != nil {
return err, "blockRoot"
}
ps.PerformanceMetrics.CreateDbWriteObject = time.Since(createDbWriteTime)
// Write the object to the DB.
dbFullTransactionTime := time.Now()
defer func() {
err := dw.Tx.Rollback(dw.Ctx)
if err != nil && err != pgx.ErrTxClosed {
loghelper.LogError(err).Error("We were unable to Rollback a transaction")
}
}()
transactionTime := time.Now()
err = dw.transactFullSlot()
if err != nil {
return err, "processSlot"
}
ps.PerformanceMetrics.TransactSlotOnly = time.Since(transactionTime)
// Handle any reorgs or skipped slots.
reorgTime := time.Now()
headOrHistoric = strings.ToLower(headOrHistoric)
if headOrHistoric != "head" && headOrHistoric != "historic" {
return fmt.Errorf("headOrHistoric must be either historic or head!"), ""
@ -166,11 +211,23 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" {
ps.checkPreviousSlot(dw.Tx, dw.Ctx, previousSlot, previousBlockRoot, knownGapsTableIncrement)
}
ps.PerformanceMetrics.CheckReorg = time.Since(reorgTime)
// Commit the transaction
commitTime := time.Now()
if err = dw.Tx.Commit(dw.Ctx); err != nil {
return err, "transactionCommit"
}
ps.PerformanceMetrics.CommitTransaction = time.Since(commitTime)
// Total metric capture time.
ps.PerformanceMetrics.TotalDbTransaction = time.Since(dbFullTransactionTime)
ps.PerformanceMetrics.TotalProcessing = time.Since(totalStart)
log.WithFields(log.Fields{
"slot": slot,
"performanceMetrics": fmt.Sprintf("%+v\n", ps.PerformanceMetrics),
}).Debug("Performance Metric output!")
return nil, ""
}
@ -261,23 +318,24 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.Ver
// Check to make sure that the previous block we processed is the parent of the current block.
func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) {
parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().ParentRoot())
if previousSlot == int(ps.FullBeaconState.Slot()) {
slot := int(ps.FullBeaconState.Slot())
if previousSlot == slot {
log.WithFields(log.Fields{
"slot": ps.FullBeaconState.Slot(),
"slot": slot,
"fork": true,
}).Warn("A fork occurred! The previous slot and current slot match.")
transactReorgs(tx, ctx, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
} else if previousSlot > int(ps.FullBeaconState.Slot()) {
} else if previousSlot > slot {
log.WithFields(log.Fields{
"previousSlot": previousSlot,
"curSlot": int(ps.FullBeaconState.Slot()),
"curSlot": slot,
}).Warn("We noticed the previous slot is greater than the current slot.")
} else if previousSlot+1 != int(ps.FullBeaconState.Slot()) {
} else if previousSlot+1 != slot {
log.WithFields(log.Fields{
"previousSlot": previousSlot,
"currentSlot": ps.FullBeaconState.Slot(),
"currentSlot": slot,
}).Error("We skipped a few slots.")
transactKnownGaps(tx, ctx, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot())-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics)
transactKnownGaps(tx, ctx, knownGapsTableIncrement, previousSlot+1, slot-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics)
} else if previousBlockRoot != parentRoot {
log.WithFields(log.Fields{
"previousBlockRoot": previousBlockRoot,
@ -298,7 +356,7 @@ func (ps *ProcessSlot) createWriteObjects(blockRoot, stateRoot, eth1BlockHash st
status = "proposed"
}
dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.SszSignedBeaconBlock, ps.SszBeaconState, ps.Metrics)
dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, &ps.SszSignedBeaconBlock, &ps.SszBeaconState, ps.Metrics)
if err != nil {
return dw, err
}