diff --git a/.github/workflows/generic-testing.yml b/.github/workflows/generic-testing.yml index 063360e..d5f906b 100644 --- a/.github/workflows/generic-testing.yml +++ b/.github/workflows/generic-testing.yml @@ -49,6 +49,7 @@ jobs: echo vulcanize_ipld_eth_beacon_db=$GITHUB_WORKSPACE/ipld-eth-beacon-db/ > ./config.sh echo vulcanize_ipld_eth_beacon_indexer=$GITHUB_WORKSPACE/ipld-eth-beacon-indexer >> ./config.sh echo eth_beacon_config_file=$GITHUB_WORKSPACE/ipld-eth-beacon-indexer/config/cicd/boot.ipld-eth-beacon-indexer.json >> ./config.sh + echo eth_beacon_capture_mode=boot >> ./config.sh cat ./config.sh - name: Run docker compose @@ -159,7 +160,6 @@ jobs: run: | echo vulcanize_ipld_eth_beacon_db=$GITHUB_WORKSPACE/ipld-eth-beacon-db/ > ./config.sh echo vulcanize_ipld_eth_beacon_indexer=$GITHUB_WORKSPACE/ipld-eth-beacon-indexer >> ./config.sh - echo eth_beacon_capture_mode=boot >> ./config.sh cat ./config.sh - name: Run docker compose diff --git a/.gitignore b/.gitignore index bca777b..93cec94 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,6 @@ temp/* pkg/beaconclient/ssz-data/ *.test ipld-eth-beacon-indexer.log +ipld-eth-beacon-indexer config/local.ipld-eth-beacon-indexer-config.json +config/docker.ipld-eth-beacon-indexer-config.json diff --git a/Dockerfile b/Dockerfile index 908313d..e467f7d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,7 @@ FROM golang:1.18-alpine as builder WORKDIR /go/src/github.com/vulcanize/ipld-eth-beacon-indexer -RUN apk --no-cache add ca-certificates make git g++ linux-headers +RUN apk --no-cache add ca-certificates make git g++ linux-headers libstdc++ ENV GO111MODULE=on COPY go.mod . @@ -9,11 +9,11 @@ COPY go.sum . RUN go mod tidy; go mod download COPY . . -RUN GCO_ENABLED=0 GOOS=linux go build -race -a -installsuffix cgo -ldflags '-extldflags "-static"' -o ipld-eth-beacon-indexer . +RUN GCO_ENABLED=0 GOOS=linux go build -race -ldflags="-s -w" -o ipld-eth-beacon-indexer . RUN chmod +x ipld-eth-beacon-indexer FROM frolvlad/alpine-bash:latest -RUN apk --no-cache add ca-certificates +RUN apk --no-cache add ca-certificates libstdc++ WORKDIR /root/ COPY --from=builder /go/src/github.com/vulcanize/ipld-eth-beacon-indexer/ipld-eth-beacon-indexer /root/ipld-eth-beacon-indexer ADD entrypoint.sh . diff --git a/Makefile b/Makefile index 4306273..7f9f925 100644 --- a/Makefile +++ b/Makefile @@ -67,6 +67,7 @@ unit-test-local: go fmt ./... $(GINKGO) -r --label-filter unit \ --randomize-all --randomize-suites \ + --flake-attempts=3 \ --fail-on-pending --keep-going \ --trace @@ -75,7 +76,8 @@ unit-test-ci: go vet ./... go fmt ./... $(GINKGO) -r --label-filter unit \ - --randomize-all --randomize-suites \ + --randomize-all --randomize-suites + --flake-attempts=3 \ --fail-on-pending --keep-going \ --cover --coverprofile=cover.profile \ --trace --json-report=report.json diff --git a/entrypoint.sh b/entrypoint.sh index 70858ac..52b1f0d 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -5,19 +5,20 @@ echo "Starting ipld-eth-beacon-indexer" echo /root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output -/root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output -rv=$? +if [ ${CAPTURE_MODE} == "boot" ]; then + /root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output + rv=$? -if [ $rv != 0 ]; then - echo "ipld-eth-beacon-indexer failed" - echo $rv > /root/HEALTH - echo $rv - cat /root/ipld-eth-beacon-indexer.output + if [ $rv != 0 ]; then + echo "ipld-eth-beacon-indexer boot failed" + else + echo "ipld-eth-beacon-indexer boot succeeded" + fi + echo $rv > /root/HEALTH + echo $rv + cat /root/ipld-eth-beacon-indexer.output + + tail -f /dev/null else - echo "ipld-eth-beacon-indexer succeeded" - echo $rv > /root/HEALTH - echo $rv - cat /root/ipld-eth-beacon-indexer.output -fi - -tail -f /dev/null \ No newline at end of file + exec /root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output +fi \ No newline at end of file diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index af3bd69..2bf6dfc 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -31,7 +31,7 @@ import ( func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []error { log.Info("We are starting the historical processing service.") bc.HistoricalProcess = HistoricProcessing{db: bc.Db, metrics: bc.Metrics, uniqueNodeIdentifier: bc.UniqueNodeIdentifier} - errs := handleBatchProcess(ctx, maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb) + errs := handleBatchProcess(ctx, maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb, bc.Metrics.IncrementHistoricSlotProcessed) log.Debug("Exiting Historical") return errs } @@ -39,7 +39,8 @@ func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []e // This function will perform all the necessary clean up tasks for stopping historical processing. func (bc *BeaconClient) StopHistoric(cancel context.CancelFunc) error { log.Info("We are stopping the historical processing service.") - err := bc.HistoricalProcess.releaseDbLocks(cancel) + cancel() + err := bc.HistoricalProcess.releaseDbLocks() if err != nil { loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.historic_processing table. Manual Intervention is needed!") } @@ -55,7 +56,7 @@ type BatchProcessing interface { getSlotRange(context.Context, chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write. handleProcessingErrors(context.Context, <-chan batchHistoricError) // Custom logic to handle errors. removeTableEntry(context.Context, <-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database. - releaseDbLocks(context.CancelFunc) error // Update the checked_out column to false for whatever table is being updated. + releaseDbLocks() error // Update the checked_out column to false for whatever table is being updated. } /// ^^^ @@ -90,19 +91,24 @@ type batchHistoricError struct { // 4. Remove the slot entry from the DB. // // 5. Handle any errors. -func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool) []error { +func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) []error { slotsCh := make(chan slotsToProcess) workCh := make(chan int) processedCh := make(chan slotsToProcess) errCh := make(chan batchHistoricError) finalErrCh := make(chan []error, 1) + // Checkout Rows with same node Identifier. + err := bp.releaseDbLocks() + if err != nil { + loghelper.LogError(err).Error(("We are unable to un-checkout entries at the start!")) + } + // Start workers for w := 1; w <= maxWorkers; w++ { - log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers") + log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers") - // Pass in function to increment metric! KnownGapProcessing or HistoricProcessing - go processSlotRangeWorker(ctx, workCh, errCh, db, serverEndpoint, metrics, checkDb) + go processSlotRangeWorker(ctx, workCh, errCh, db, serverEndpoint, metrics, checkDb, incrementTracker) } // Process all ranges and send each individual slot to the worker. diff --git a/pkg/beaconclient/databasewrite.go b/pkg/beaconclient/databasewrite.go index 93b4d58..9ef1ec2 100644 --- a/pkg/beaconclient/databasewrite.go +++ b/pkg/beaconclient/databasewrite.go @@ -90,12 +90,12 @@ type DatabaseWriter struct { DbSlots *DbSlots DbSignedBeaconBlock *DbSignedBeaconBlock DbBeaconState *DbBeaconState - rawBeaconState []byte - rawSignedBeaconBlock []byte + rawBeaconState *[]byte + rawSignedBeaconBlock *[]byte } func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, - eth1BlockHash string, status string, rawSignedBeaconBlock []byte, rawBeaconState []byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) { + eth1BlockHash string, status string, rawSignedBeaconBlock *[]byte, rawBeaconState *[]byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) { ctx := context.Background() tx, err := db.Begin(ctx) if err != nil { @@ -194,6 +194,7 @@ func (dw *DatabaseWriter) transactFullSlot() error { // return err //} // Might want to seperate writing to public.blocks so we can do this concurrently... + // Cant concurrently write because we are using a transaction. err := dw.transactSignedBeaconBlocks() if err != nil { loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the eth_beacon block table...") @@ -240,8 +241,8 @@ func (dw *DatabaseWriter) transactSignedBeaconBlocks() error { } // Upsert to public.blocks. -func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error { - _, err := dw.Tx.Exec(dw.Ctx, UpsertBlocksStmt, key, data) +func (dw *DatabaseWriter) upsertPublicBlocks(key string, data *[]byte) error { + _, err := dw.Tx.Exec(dw.Ctx, UpsertBlocksStmt, key, *data) if err != nil { loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table") return err diff --git a/pkg/beaconclient/metrics.go b/pkg/beaconclient/metrics.go index d704f1f..26842fb 100644 --- a/pkg/beaconclient/metrics.go +++ b/pkg/beaconclient/metrics.go @@ -54,6 +54,10 @@ func CreateBeaconClientMetrics() (*BeaconClientMetrics, error) { if err != nil { return nil, err } + err = prometheusRegisterHelper("historic_slots_processed", "Keeps track of the number of historic slots we successfully processed.", &metrics.HistoricSlotProcessed) + if err != nil { + return nil, err + } err = prometheusRegisterHelper("head_error", "Keeps track of the number of errors we had processing head messages.", &metrics.HeadError) if err != nil { return nil, err @@ -91,6 +95,7 @@ type BeaconClientMetrics struct { KnownGapsInserts uint64 // Number of known_gaps we successfully wrote to the DB. KnownGapsProcessed uint64 // Number of knownGaps processed. KnownGapsReprocessError uint64 // Number of knownGaps that were updated with an error. + HistoricSlotProcessed uint64 // Number of historic slots successfully processed. HeadError uint64 // Number of errors that occurred when decoding the head message. HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message. } @@ -135,6 +140,11 @@ func (m *BeaconClientMetrics) IncrementReorgError(inc uint64) { // Wrapper function to increment the number of knownGaps that were updated with reprocessing errors. //If we want to use mutexes later we can easily update all occurrences here. func (m *BeaconClientMetrics) IncrementKnownGapsReprocessError(inc uint64) { - log.Debug("Incrementing Known Gap Reprocessing: ", &m.KnownGapsReprocessError) atomic.AddUint64(&m.KnownGapsReprocessError, inc) } + +// Wrapper function to increment the number of historicSlots that were processed successfully. +// If we want to use mutexes later we can easily update all occurrences here. +func (m *BeaconClientMetrics) IncrementHistoricSlotProcessed(inc uint64) { + atomic.AddUint64(&m.HistoricSlotProcessed, inc) +} diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index 7f2096d..c520e41 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -81,11 +81,8 @@ func (hp HistoricProcessing) handleProcessingErrors(ctx context.Context, errMess } // "un"-checkout the rows held by this DB in the eth_beacon.historical_process table. -func (hp HistoricProcessing) releaseDbLocks(cancel context.CancelFunc) error { - cancel() +func (hp HistoricProcessing) releaseDbLocks() error { log.Debug("Updating all the entries to eth_beacon.historical processing") - log.Debug("Db: ", hp.db) - log.Debug("hp.uniqueNodeIdentifier ", hp.uniqueNodeIdentifier) res, err := hp.db.Exec(context.Background(), releaseHpLockStmt, hp.uniqueNodeIdentifier) if err != nil { return fmt.Errorf("Unable to remove lock from eth_beacon.historical_processing table for node %d, error is %e", hp.uniqueNodeIdentifier, err) @@ -100,7 +97,7 @@ func (hp HistoricProcessing) releaseDbLocks(cancel context.CancelFunc) error { } // Process the slot range. -func processSlotRangeWorker(ctx context.Context, workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics, checkDb bool) { +func processSlotRangeWorker(ctx context.Context, workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) { for { select { case <-ctx.Done(): @@ -115,6 +112,8 @@ func processSlotRangeWorker(ctx context.Context, workCh <-chan int, errCh chan<- slot: slot, } errCh <- errMs + } else { + incrementTracker(1) } } } @@ -149,7 +148,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm errCount = append(errCount, err) } if row < 1 { - time.Sleep(1000 * time.Millisecond) + time.Sleep(3 * time.Second) log.Debug("We are checking rows, be patient") break } @@ -176,7 +175,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm err = tx.QueryRow(dbCtx, getStartEndSlotStmt).Scan(&sp.startSlot, &sp.endSlot) if err != nil { if err == pgx.ErrNoRows { - time.Sleep(100 * time.Millisecond) + time.Sleep(1 * time.Second) break } loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), getStartEndSlotStmt, err).Error("Unable to get a row") @@ -253,7 +252,7 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan if isStartProcess && isEndProcess { break } - time.Sleep(1000 * time.Millisecond) + time.Sleep(3 * time.Second) } _, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot)) diff --git a/pkg/beaconclient/processknowngaps.go b/pkg/beaconclient/processknowngaps.go index c54d37c..343fc4a 100644 --- a/pkg/beaconclient/processknowngaps.go +++ b/pkg/beaconclient/processknowngaps.go @@ -61,7 +61,7 @@ type KnownGapsProcessing struct { func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) []error { log.Info("We are starting the known gaps processing service.") bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics} - errs := handleBatchProcess(ctx, maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb) + errs := handleBatchProcess(ctx, maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb, bc.Metrics.IncrementKnownGapsProcessed) log.Debug("Exiting known gaps processing service") return errs } @@ -69,7 +69,8 @@ func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) [] // This function will perform all the necessary clean up tasks for stopping historical processing. func (bc *BeaconClient) StopKnownGapsProcessing(cancel context.CancelFunc) error { log.Info("We are stopping the known gaps processing service.") - err := bc.KnownGapsProcess.releaseDbLocks(cancel) + cancel() + err := bc.KnownGapsProcess.releaseDbLocks() if err != nil { loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.known_gaps table. Manual Intervention is needed!") } @@ -121,11 +122,8 @@ func (kgp KnownGapsProcessing) handleProcessingErrors(ctx context.Context, errMe } // Updated checked_out column for the uniqueNodeIdentifier. -func (kgp KnownGapsProcessing) releaseDbLocks(cancel context.CancelFunc) error { - cancel() +func (kgp KnownGapsProcessing) releaseDbLocks() error { log.Debug("Updating all the entries to eth_beacon.known_gaps") - log.Debug("Db: ", kgp.db) - log.Debug("kgp.uniqueNodeIdentifier ", kgp.uniqueNodeIdentifier) res, err := kgp.db.Exec(context.Background(), releaseKgLockStmt, kgp.uniqueNodeIdentifier) if err != nil { return err diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index 3fb9465..1b8f619 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -25,6 +25,7 @@ import ( "fmt" "strconv" "strings" + "time" "github.com/jackc/pgx/v4" si "github.com/prysmaticlabs/prysm/consensus-types/interfaces" @@ -50,15 +51,16 @@ var ( type ProcessSlot struct { // Generic - Slot int // The slot number. - Epoch int // The epoch number. - BlockRoot string // The hex encoded string of the BlockRoot. - StateRoot string // The hex encoded string of the StateRoot. - ParentBlockRoot string // The hex encoded string of the parent block. - Status string // The status of the block - HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and skipped slots. - Db sql.Database // The DB object used to write to the DB. - Metrics *BeaconClientMetrics // An object to keep track of the beaconclient metrics + Slot int // The slot number. + Epoch int // The epoch number. + BlockRoot string // The hex encoded string of the BlockRoot. + StateRoot string // The hex encoded string of the StateRoot. + ParentBlockRoot string // The hex encoded string of the parent block. + Status string // The status of the block + HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and skipped slots. + Db sql.Database // The DB object used to write to the DB. + Metrics *BeaconClientMetrics // An object to keep track of the beaconclient metrics + PerformanceMetrics PerformanceMetrics // An object to keep track of performance metrics. // BeaconBlock SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock @@ -74,6 +76,19 @@ type ProcessSlot struct { DbBeaconState *DbBeaconState // The model being written to the state table. } +type PerformanceMetrics struct { + BeaconNodeBlockRetrievalTime time.Duration // How long it took to get the BeaconBlock from the Beacon Node. + BeaconNodeStateRetrievalTime time.Duration // How long it took to get the BeaconState from the Beacon Node. + ParseBeaconObjectForHash time.Duration // How long it took to get some information from the beacon objects. + CheckDbPreProcessing time.Duration // How long it takes to check the DB before processing a block. + CreateDbWriteObject time.Duration // How long it takes to create a DB write object. + TransactSlotOnly time.Duration // How long it takes to transact the slot information only. + CheckReorg time.Duration // How long it takes to check for Reorgs + CommitTransaction time.Duration // How long it takes to commit the final transaction. + TotalDbTransaction time.Duration // How long it takes from start to committing the entire DB transaction. + TotalProcessing time.Duration // How long it took to process the entire slot. +} + // This function will do all the work to process the slot and write it to the DB. // It will return the error and error process. The error process is used for providing reach detail to the // known_gaps table. @@ -82,6 +97,7 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string, case <-ctx.Done(): return nil, "" default: + totalStart := time.Now() ps := &ProcessSlot{ Slot: slot, BlockRoot: blockRoot, @@ -89,6 +105,18 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string, HeadOrHistoric: headOrHistoric, Db: db, Metrics: metrics, + PerformanceMetrics: PerformanceMetrics{ + BeaconNodeBlockRetrievalTime: 0, + BeaconNodeStateRetrievalTime: 0, + ParseBeaconObjectForHash: 0, + CheckDbPreProcessing: 0, + CreateDbWriteObject: 0, + TransactSlotOnly: 0, + CheckReorg: 0, + CommitTransaction: 0, + TotalDbTransaction: 0, + TotalProcessing: 0, + }, } g, _ := errgroup.WithContext(context.Background()) @@ -100,10 +128,12 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string, case <-ctx.Done(): return nil default: + start := time.Now() err := ps.getBeaconState(serverAddress, vUnmarshalerCh) if err != nil { return err } + ps.PerformanceMetrics.BeaconNodeStateRetrievalTime = time.Since(start) return nil } }) @@ -114,10 +144,12 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string, case <-ctx.Done(): return nil default: + start := time.Now() err := ps.getSignedBeaconBlock(serverAddress, vUnmarshalerCh) if err != nil { return err } + ps.PerformanceMetrics.BeaconNodeBlockRetrievalTime = time.Since(start) return nil } }) @@ -126,11 +158,15 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string, return err, "processSlot" } + parseBeaconTime := time.Now() finalBlockRoot, finalStateRoot, finalEth1BlockHash, err := ps.provideFinalHash() if err != nil { return err, "CalculateBlockRoot" } + ps.PerformanceMetrics.ParseBeaconObjectForHash = time.Since(parseBeaconTime) + if checkDb { + checkDbTime := time.Now() inDb, err := IsSlotInDb(ctx, ps.Db, strconv.Itoa(ps.Slot), finalBlockRoot, finalStateRoot) if err != nil { return err, "checkDb" @@ -139,26 +175,35 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string, log.WithField("slot", slot).Info("Slot already in the DB.") return nil, "" } + ps.PerformanceMetrics.CheckDbPreProcessing = time.Since(checkDbTime) } // Get this object ready to write + createDbWriteTime := time.Now() dw, err := ps.createWriteObjects(finalBlockRoot, finalStateRoot, finalEth1BlockHash) if err != nil { return err, "blockRoot" } + ps.PerformanceMetrics.CreateDbWriteObject = time.Since(createDbWriteTime) + // Write the object to the DB. + dbFullTransactionTime := time.Now() defer func() { err := dw.Tx.Rollback(dw.Ctx) if err != nil && err != pgx.ErrTxClosed { loghelper.LogError(err).Error("We were unable to Rollback a transaction") } }() + + transactionTime := time.Now() err = dw.transactFullSlot() if err != nil { return err, "processSlot" } + ps.PerformanceMetrics.TransactSlotOnly = time.Since(transactionTime) // Handle any reorgs or skipped slots. + reorgTime := time.Now() headOrHistoric = strings.ToLower(headOrHistoric) if headOrHistoric != "head" && headOrHistoric != "historic" { return fmt.Errorf("headOrHistoric must be either historic or head!"), "" @@ -166,11 +211,23 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string, if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" { ps.checkPreviousSlot(dw.Tx, dw.Ctx, previousSlot, previousBlockRoot, knownGapsTableIncrement) } + ps.PerformanceMetrics.CheckReorg = time.Since(reorgTime) // Commit the transaction + commitTime := time.Now() if err = dw.Tx.Commit(dw.Ctx); err != nil { return err, "transactionCommit" } + ps.PerformanceMetrics.CommitTransaction = time.Since(commitTime) + + // Total metric capture time. + ps.PerformanceMetrics.TotalDbTransaction = time.Since(dbFullTransactionTime) + ps.PerformanceMetrics.TotalProcessing = time.Since(totalStart) + + log.WithFields(log.Fields{ + "slot": slot, + "performanceMetrics": fmt.Sprintf("%+v\n", ps.PerformanceMetrics), + }).Debug("Performance Metric output!") return nil, "" } @@ -261,23 +318,24 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.Ver // Check to make sure that the previous block we processed is the parent of the current block. func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) { parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().ParentRoot()) - if previousSlot == int(ps.FullBeaconState.Slot()) { + slot := int(ps.FullBeaconState.Slot()) + if previousSlot == slot { log.WithFields(log.Fields{ - "slot": ps.FullBeaconState.Slot(), + "slot": slot, "fork": true, }).Warn("A fork occurred! The previous slot and current slot match.") transactReorgs(tx, ctx, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics) - } else if previousSlot > int(ps.FullBeaconState.Slot()) { + } else if previousSlot > slot { log.WithFields(log.Fields{ "previousSlot": previousSlot, - "curSlot": int(ps.FullBeaconState.Slot()), + "curSlot": slot, }).Warn("We noticed the previous slot is greater than the current slot.") - } else if previousSlot+1 != int(ps.FullBeaconState.Slot()) { + } else if previousSlot+1 != slot { log.WithFields(log.Fields{ "previousSlot": previousSlot, - "currentSlot": ps.FullBeaconState.Slot(), + "currentSlot": slot, }).Error("We skipped a few slots.") - transactKnownGaps(tx, ctx, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot())-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics) + transactKnownGaps(tx, ctx, knownGapsTableIncrement, previousSlot+1, slot-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics) } else if previousBlockRoot != parentRoot { log.WithFields(log.Fields{ "previousBlockRoot": previousBlockRoot, @@ -298,7 +356,7 @@ func (ps *ProcessSlot) createWriteObjects(blockRoot, stateRoot, eth1BlockHash st status = "proposed" } - dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.SszSignedBeaconBlock, ps.SszBeaconState, ps.Metrics) + dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, &ps.SszSignedBeaconBlock, &ps.SszBeaconState, ps.Metrics) if err != nil { return dw, err }