Slight Performance improvements with Logging (#67) #68
2
.github/workflows/generic-testing.yml
vendored
2
.github/workflows/generic-testing.yml
vendored
@ -49,6 +49,7 @@ jobs:
|
|||||||
echo vulcanize_ipld_eth_beacon_db=$GITHUB_WORKSPACE/ipld-eth-beacon-db/ > ./config.sh
|
echo vulcanize_ipld_eth_beacon_db=$GITHUB_WORKSPACE/ipld-eth-beacon-db/ > ./config.sh
|
||||||
echo vulcanize_ipld_eth_beacon_indexer=$GITHUB_WORKSPACE/ipld-eth-beacon-indexer >> ./config.sh
|
echo vulcanize_ipld_eth_beacon_indexer=$GITHUB_WORKSPACE/ipld-eth-beacon-indexer >> ./config.sh
|
||||||
echo eth_beacon_config_file=$GITHUB_WORKSPACE/ipld-eth-beacon-indexer/config/cicd/boot.ipld-eth-beacon-indexer.json >> ./config.sh
|
echo eth_beacon_config_file=$GITHUB_WORKSPACE/ipld-eth-beacon-indexer/config/cicd/boot.ipld-eth-beacon-indexer.json >> ./config.sh
|
||||||
|
echo eth_beacon_capture_mode=boot >> ./config.sh
|
||||||
cat ./config.sh
|
cat ./config.sh
|
||||||
|
|
||||||
- name: Run docker compose
|
- name: Run docker compose
|
||||||
@ -159,7 +160,6 @@ jobs:
|
|||||||
run: |
|
run: |
|
||||||
echo vulcanize_ipld_eth_beacon_db=$GITHUB_WORKSPACE/ipld-eth-beacon-db/ > ./config.sh
|
echo vulcanize_ipld_eth_beacon_db=$GITHUB_WORKSPACE/ipld-eth-beacon-db/ > ./config.sh
|
||||||
echo vulcanize_ipld_eth_beacon_indexer=$GITHUB_WORKSPACE/ipld-eth-beacon-indexer >> ./config.sh
|
echo vulcanize_ipld_eth_beacon_indexer=$GITHUB_WORKSPACE/ipld-eth-beacon-indexer >> ./config.sh
|
||||||
echo eth_beacon_capture_mode=boot >> ./config.sh
|
|
||||||
cat ./config.sh
|
cat ./config.sh
|
||||||
|
|
||||||
- name: Run docker compose
|
- name: Run docker compose
|
||||||
|
2
.gitignore
vendored
2
.gitignore
vendored
@ -8,4 +8,6 @@ temp/*
|
|||||||
pkg/beaconclient/ssz-data/
|
pkg/beaconclient/ssz-data/
|
||||||
*.test
|
*.test
|
||||||
ipld-eth-beacon-indexer.log
|
ipld-eth-beacon-indexer.log
|
||||||
|
ipld-eth-beacon-indexer
|
||||||
config/local.ipld-eth-beacon-indexer-config.json
|
config/local.ipld-eth-beacon-indexer-config.json
|
||||||
|
config/docker.ipld-eth-beacon-indexer-config.json
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
FROM golang:1.18-alpine as builder
|
FROM golang:1.18-alpine as builder
|
||||||
|
|
||||||
WORKDIR /go/src/github.com/vulcanize/ipld-eth-beacon-indexer
|
WORKDIR /go/src/github.com/vulcanize/ipld-eth-beacon-indexer
|
||||||
RUN apk --no-cache add ca-certificates make git g++ linux-headers
|
RUN apk --no-cache add ca-certificates make git g++ linux-headers libstdc++
|
||||||
|
|
||||||
ENV GO111MODULE=on
|
ENV GO111MODULE=on
|
||||||
COPY go.mod .
|
COPY go.mod .
|
||||||
@ -9,11 +9,11 @@ COPY go.sum .
|
|||||||
RUN go mod tidy; go mod download
|
RUN go mod tidy; go mod download
|
||||||
COPY . .
|
COPY . .
|
||||||
|
|
||||||
RUN GCO_ENABLED=0 GOOS=linux go build -race -a -installsuffix cgo -ldflags '-extldflags "-static"' -o ipld-eth-beacon-indexer .
|
RUN GCO_ENABLED=0 GOOS=linux go build -race -ldflags="-s -w" -o ipld-eth-beacon-indexer .
|
||||||
RUN chmod +x ipld-eth-beacon-indexer
|
RUN chmod +x ipld-eth-beacon-indexer
|
||||||
|
|
||||||
FROM frolvlad/alpine-bash:latest
|
FROM frolvlad/alpine-bash:latest
|
||||||
RUN apk --no-cache add ca-certificates
|
RUN apk --no-cache add ca-certificates libstdc++
|
||||||
WORKDIR /root/
|
WORKDIR /root/
|
||||||
COPY --from=builder /go/src/github.com/vulcanize/ipld-eth-beacon-indexer/ipld-eth-beacon-indexer /root/ipld-eth-beacon-indexer
|
COPY --from=builder /go/src/github.com/vulcanize/ipld-eth-beacon-indexer/ipld-eth-beacon-indexer /root/ipld-eth-beacon-indexer
|
||||||
ADD entrypoint.sh .
|
ADD entrypoint.sh .
|
||||||
|
4
Makefile
4
Makefile
@ -67,6 +67,7 @@ unit-test-local:
|
|||||||
go fmt ./...
|
go fmt ./...
|
||||||
$(GINKGO) -r --label-filter unit \
|
$(GINKGO) -r --label-filter unit \
|
||||||
--randomize-all --randomize-suites \
|
--randomize-all --randomize-suites \
|
||||||
|
--flake-attempts=3 \
|
||||||
--fail-on-pending --keep-going \
|
--fail-on-pending --keep-going \
|
||||||
--trace
|
--trace
|
||||||
|
|
||||||
@ -75,7 +76,8 @@ unit-test-ci:
|
|||||||
go vet ./...
|
go vet ./...
|
||||||
go fmt ./...
|
go fmt ./...
|
||||||
$(GINKGO) -r --label-filter unit \
|
$(GINKGO) -r --label-filter unit \
|
||||||
--randomize-all --randomize-suites \
|
--randomize-all --randomize-suites
|
||||||
|
--flake-attempts=3 \
|
||||||
--fail-on-pending --keep-going \
|
--fail-on-pending --keep-going \
|
||||||
--cover --coverprofile=cover.profile \
|
--cover --coverprofile=cover.profile \
|
||||||
--trace --json-report=report.json
|
--trace --json-report=report.json
|
||||||
|
@ -5,19 +5,20 @@ echo "Starting ipld-eth-beacon-indexer"
|
|||||||
|
|
||||||
echo /root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output
|
echo /root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output
|
||||||
|
|
||||||
|
if [ ${CAPTURE_MODE} == "boot" ]; then
|
||||||
/root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output
|
/root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output
|
||||||
rv=$?
|
rv=$?
|
||||||
|
|
||||||
if [ $rv != 0 ]; then
|
if [ $rv != 0 ]; then
|
||||||
echo "ipld-eth-beacon-indexer failed"
|
echo "ipld-eth-beacon-indexer boot failed"
|
||||||
echo $rv > /root/HEALTH
|
|
||||||
echo $rv
|
|
||||||
cat /root/ipld-eth-beacon-indexer.output
|
|
||||||
else
|
else
|
||||||
echo "ipld-eth-beacon-indexer succeeded"
|
echo "ipld-eth-beacon-indexer boot succeeded"
|
||||||
|
fi
|
||||||
echo $rv > /root/HEALTH
|
echo $rv > /root/HEALTH
|
||||||
echo $rv
|
echo $rv
|
||||||
cat /root/ipld-eth-beacon-indexer.output
|
cat /root/ipld-eth-beacon-indexer.output
|
||||||
fi
|
|
||||||
|
|
||||||
tail -f /dev/null
|
tail -f /dev/null
|
||||||
|
else
|
||||||
|
exec /root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output
|
||||||
|
fi
|
@ -31,7 +31,7 @@ import (
|
|||||||
func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []error {
|
func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []error {
|
||||||
log.Info("We are starting the historical processing service.")
|
log.Info("We are starting the historical processing service.")
|
||||||
bc.HistoricalProcess = HistoricProcessing{db: bc.Db, metrics: bc.Metrics, uniqueNodeIdentifier: bc.UniqueNodeIdentifier}
|
bc.HistoricalProcess = HistoricProcessing{db: bc.Db, metrics: bc.Metrics, uniqueNodeIdentifier: bc.UniqueNodeIdentifier}
|
||||||
errs := handleBatchProcess(ctx, maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb)
|
errs := handleBatchProcess(ctx, maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb, bc.Metrics.IncrementHistoricSlotProcessed)
|
||||||
log.Debug("Exiting Historical")
|
log.Debug("Exiting Historical")
|
||||||
return errs
|
return errs
|
||||||
}
|
}
|
||||||
@ -39,7 +39,8 @@ func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []e
|
|||||||
// This function will perform all the necessary clean up tasks for stopping historical processing.
|
// This function will perform all the necessary clean up tasks for stopping historical processing.
|
||||||
func (bc *BeaconClient) StopHistoric(cancel context.CancelFunc) error {
|
func (bc *BeaconClient) StopHistoric(cancel context.CancelFunc) error {
|
||||||
log.Info("We are stopping the historical processing service.")
|
log.Info("We are stopping the historical processing service.")
|
||||||
err := bc.HistoricalProcess.releaseDbLocks(cancel)
|
cancel()
|
||||||
|
err := bc.HistoricalProcess.releaseDbLocks()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.historic_processing table. Manual Intervention is needed!")
|
loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.historic_processing table. Manual Intervention is needed!")
|
||||||
}
|
}
|
||||||
@ -55,7 +56,7 @@ type BatchProcessing interface {
|
|||||||
getSlotRange(context.Context, chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write.
|
getSlotRange(context.Context, chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write.
|
||||||
handleProcessingErrors(context.Context, <-chan batchHistoricError) // Custom logic to handle errors.
|
handleProcessingErrors(context.Context, <-chan batchHistoricError) // Custom logic to handle errors.
|
||||||
removeTableEntry(context.Context, <-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database.
|
removeTableEntry(context.Context, <-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database.
|
||||||
releaseDbLocks(context.CancelFunc) error // Update the checked_out column to false for whatever table is being updated.
|
releaseDbLocks() error // Update the checked_out column to false for whatever table is being updated.
|
||||||
}
|
}
|
||||||
|
|
||||||
/// ^^^
|
/// ^^^
|
||||||
@ -90,19 +91,24 @@ type batchHistoricError struct {
|
|||||||
// 4. Remove the slot entry from the DB.
|
// 4. Remove the slot entry from the DB.
|
||||||
//
|
//
|
||||||
// 5. Handle any errors.
|
// 5. Handle any errors.
|
||||||
func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool) []error {
|
func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) []error {
|
||||||
slotsCh := make(chan slotsToProcess)
|
slotsCh := make(chan slotsToProcess)
|
||||||
workCh := make(chan int)
|
workCh := make(chan int)
|
||||||
processedCh := make(chan slotsToProcess)
|
processedCh := make(chan slotsToProcess)
|
||||||
errCh := make(chan batchHistoricError)
|
errCh := make(chan batchHistoricError)
|
||||||
finalErrCh := make(chan []error, 1)
|
finalErrCh := make(chan []error, 1)
|
||||||
|
|
||||||
|
// Checkout Rows with same node Identifier.
|
||||||
|
err := bp.releaseDbLocks()
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogError(err).Error(("We are unable to un-checkout entries at the start!"))
|
||||||
|
}
|
||||||
|
|
||||||
// Start workers
|
// Start workers
|
||||||
for w := 1; w <= maxWorkers; w++ {
|
for w := 1; w <= maxWorkers; w++ {
|
||||||
log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers")
|
log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers")
|
||||||
|
|
||||||
// Pass in function to increment metric! KnownGapProcessing or HistoricProcessing
|
go processSlotRangeWorker(ctx, workCh, errCh, db, serverEndpoint, metrics, checkDb, incrementTracker)
|
||||||
go processSlotRangeWorker(ctx, workCh, errCh, db, serverEndpoint, metrics, checkDb)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process all ranges and send each individual slot to the worker.
|
// Process all ranges and send each individual slot to the worker.
|
||||||
|
@ -90,12 +90,12 @@ type DatabaseWriter struct {
|
|||||||
DbSlots *DbSlots
|
DbSlots *DbSlots
|
||||||
DbSignedBeaconBlock *DbSignedBeaconBlock
|
DbSignedBeaconBlock *DbSignedBeaconBlock
|
||||||
DbBeaconState *DbBeaconState
|
DbBeaconState *DbBeaconState
|
||||||
rawBeaconState []byte
|
rawBeaconState *[]byte
|
||||||
rawSignedBeaconBlock []byte
|
rawSignedBeaconBlock *[]byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string,
|
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string,
|
||||||
eth1BlockHash string, status string, rawSignedBeaconBlock []byte, rawBeaconState []byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) {
|
eth1BlockHash string, status string, rawSignedBeaconBlock *[]byte, rawBeaconState *[]byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
tx, err := db.Begin(ctx)
|
tx, err := db.Begin(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -194,6 +194,7 @@ func (dw *DatabaseWriter) transactFullSlot() error {
|
|||||||
// return err
|
// return err
|
||||||
//}
|
//}
|
||||||
// Might want to seperate writing to public.blocks so we can do this concurrently...
|
// Might want to seperate writing to public.blocks so we can do this concurrently...
|
||||||
|
// Cant concurrently write because we are using a transaction.
|
||||||
err := dw.transactSignedBeaconBlocks()
|
err := dw.transactSignedBeaconBlocks()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the eth_beacon block table...")
|
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the eth_beacon block table...")
|
||||||
@ -240,8 +241,8 @@ func (dw *DatabaseWriter) transactSignedBeaconBlocks() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Upsert to public.blocks.
|
// Upsert to public.blocks.
|
||||||
func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error {
|
func (dw *DatabaseWriter) upsertPublicBlocks(key string, data *[]byte) error {
|
||||||
_, err := dw.Tx.Exec(dw.Ctx, UpsertBlocksStmt, key, data)
|
_, err := dw.Tx.Exec(dw.Ctx, UpsertBlocksStmt, key, *data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table")
|
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table")
|
||||||
return err
|
return err
|
||||||
|
@ -54,6 +54,10 @@ func CreateBeaconClientMetrics() (*BeaconClientMetrics, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
err = prometheusRegisterHelper("historic_slots_processed", "Keeps track of the number of historic slots we successfully processed.", &metrics.HistoricSlotProcessed)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
err = prometheusRegisterHelper("head_error", "Keeps track of the number of errors we had processing head messages.", &metrics.HeadError)
|
err = prometheusRegisterHelper("head_error", "Keeps track of the number of errors we had processing head messages.", &metrics.HeadError)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -91,6 +95,7 @@ type BeaconClientMetrics struct {
|
|||||||
KnownGapsInserts uint64 // Number of known_gaps we successfully wrote to the DB.
|
KnownGapsInserts uint64 // Number of known_gaps we successfully wrote to the DB.
|
||||||
KnownGapsProcessed uint64 // Number of knownGaps processed.
|
KnownGapsProcessed uint64 // Number of knownGaps processed.
|
||||||
KnownGapsReprocessError uint64 // Number of knownGaps that were updated with an error.
|
KnownGapsReprocessError uint64 // Number of knownGaps that were updated with an error.
|
||||||
|
HistoricSlotProcessed uint64 // Number of historic slots successfully processed.
|
||||||
HeadError uint64 // Number of errors that occurred when decoding the head message.
|
HeadError uint64 // Number of errors that occurred when decoding the head message.
|
||||||
HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message.
|
HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message.
|
||||||
}
|
}
|
||||||
@ -135,6 +140,11 @@ func (m *BeaconClientMetrics) IncrementReorgError(inc uint64) {
|
|||||||
// Wrapper function to increment the number of knownGaps that were updated with reprocessing errors.
|
// Wrapper function to increment the number of knownGaps that were updated with reprocessing errors.
|
||||||
//If we want to use mutexes later we can easily update all occurrences here.
|
//If we want to use mutexes later we can easily update all occurrences here.
|
||||||
func (m *BeaconClientMetrics) IncrementKnownGapsReprocessError(inc uint64) {
|
func (m *BeaconClientMetrics) IncrementKnownGapsReprocessError(inc uint64) {
|
||||||
log.Debug("Incrementing Known Gap Reprocessing: ", &m.KnownGapsReprocessError)
|
|
||||||
atomic.AddUint64(&m.KnownGapsReprocessError, inc)
|
atomic.AddUint64(&m.KnownGapsReprocessError, inc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wrapper function to increment the number of historicSlots that were processed successfully.
|
||||||
|
// If we want to use mutexes later we can easily update all occurrences here.
|
||||||
|
func (m *BeaconClientMetrics) IncrementHistoricSlotProcessed(inc uint64) {
|
||||||
|
atomic.AddUint64(&m.HistoricSlotProcessed, inc)
|
||||||
|
}
|
||||||
|
@ -81,11 +81,8 @@ func (hp HistoricProcessing) handleProcessingErrors(ctx context.Context, errMess
|
|||||||
}
|
}
|
||||||
|
|
||||||
// "un"-checkout the rows held by this DB in the eth_beacon.historical_process table.
|
// "un"-checkout the rows held by this DB in the eth_beacon.historical_process table.
|
||||||
func (hp HistoricProcessing) releaseDbLocks(cancel context.CancelFunc) error {
|
func (hp HistoricProcessing) releaseDbLocks() error {
|
||||||
cancel()
|
|
||||||
log.Debug("Updating all the entries to eth_beacon.historical processing")
|
log.Debug("Updating all the entries to eth_beacon.historical processing")
|
||||||
log.Debug("Db: ", hp.db)
|
|
||||||
log.Debug("hp.uniqueNodeIdentifier ", hp.uniqueNodeIdentifier)
|
|
||||||
res, err := hp.db.Exec(context.Background(), releaseHpLockStmt, hp.uniqueNodeIdentifier)
|
res, err := hp.db.Exec(context.Background(), releaseHpLockStmt, hp.uniqueNodeIdentifier)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Unable to remove lock from eth_beacon.historical_processing table for node %d, error is %e", hp.uniqueNodeIdentifier, err)
|
return fmt.Errorf("Unable to remove lock from eth_beacon.historical_processing table for node %d, error is %e", hp.uniqueNodeIdentifier, err)
|
||||||
@ -100,7 +97,7 @@ func (hp HistoricProcessing) releaseDbLocks(cancel context.CancelFunc) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Process the slot range.
|
// Process the slot range.
|
||||||
func processSlotRangeWorker(ctx context.Context, workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics, checkDb bool) {
|
func processSlotRangeWorker(ctx context.Context, workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@ -115,6 +112,8 @@ func processSlotRangeWorker(ctx context.Context, workCh <-chan int, errCh chan<-
|
|||||||
slot: slot,
|
slot: slot,
|
||||||
}
|
}
|
||||||
errCh <- errMs
|
errCh <- errMs
|
||||||
|
} else {
|
||||||
|
incrementTracker(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -149,7 +148,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
|
|||||||
errCount = append(errCount, err)
|
errCount = append(errCount, err)
|
||||||
}
|
}
|
||||||
if row < 1 {
|
if row < 1 {
|
||||||
time.Sleep(1000 * time.Millisecond)
|
time.Sleep(3 * time.Second)
|
||||||
log.Debug("We are checking rows, be patient")
|
log.Debug("We are checking rows, be patient")
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -176,7 +175,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
|
|||||||
err = tx.QueryRow(dbCtx, getStartEndSlotStmt).Scan(&sp.startSlot, &sp.endSlot)
|
err = tx.QueryRow(dbCtx, getStartEndSlotStmt).Scan(&sp.startSlot, &sp.endSlot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == pgx.ErrNoRows {
|
if err == pgx.ErrNoRows {
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(1 * time.Second)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), getStartEndSlotStmt, err).Error("Unable to get a row")
|
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), getStartEndSlotStmt, err).Error("Unable to get a row")
|
||||||
@ -253,7 +252,7 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan
|
|||||||
if isStartProcess && isEndProcess {
|
if isStartProcess && isEndProcess {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
time.Sleep(1000 * time.Millisecond)
|
time.Sleep(3 * time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot))
|
_, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot))
|
||||||
|
@ -61,7 +61,7 @@ type KnownGapsProcessing struct {
|
|||||||
func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) []error {
|
func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) []error {
|
||||||
log.Info("We are starting the known gaps processing service.")
|
log.Info("We are starting the known gaps processing service.")
|
||||||
bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics}
|
bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics}
|
||||||
errs := handleBatchProcess(ctx, maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb)
|
errs := handleBatchProcess(ctx, maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb, bc.Metrics.IncrementKnownGapsProcessed)
|
||||||
log.Debug("Exiting known gaps processing service")
|
log.Debug("Exiting known gaps processing service")
|
||||||
return errs
|
return errs
|
||||||
}
|
}
|
||||||
@ -69,7 +69,8 @@ func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) []
|
|||||||
// This function will perform all the necessary clean up tasks for stopping historical processing.
|
// This function will perform all the necessary clean up tasks for stopping historical processing.
|
||||||
func (bc *BeaconClient) StopKnownGapsProcessing(cancel context.CancelFunc) error {
|
func (bc *BeaconClient) StopKnownGapsProcessing(cancel context.CancelFunc) error {
|
||||||
log.Info("We are stopping the known gaps processing service.")
|
log.Info("We are stopping the known gaps processing service.")
|
||||||
err := bc.KnownGapsProcess.releaseDbLocks(cancel)
|
cancel()
|
||||||
|
err := bc.KnownGapsProcess.releaseDbLocks()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.known_gaps table. Manual Intervention is needed!")
|
loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.known_gaps table. Manual Intervention is needed!")
|
||||||
}
|
}
|
||||||
@ -121,11 +122,8 @@ func (kgp KnownGapsProcessing) handleProcessingErrors(ctx context.Context, errMe
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Updated checked_out column for the uniqueNodeIdentifier.
|
// Updated checked_out column for the uniqueNodeIdentifier.
|
||||||
func (kgp KnownGapsProcessing) releaseDbLocks(cancel context.CancelFunc) error {
|
func (kgp KnownGapsProcessing) releaseDbLocks() error {
|
||||||
cancel()
|
|
||||||
log.Debug("Updating all the entries to eth_beacon.known_gaps")
|
log.Debug("Updating all the entries to eth_beacon.known_gaps")
|
||||||
log.Debug("Db: ", kgp.db)
|
|
||||||
log.Debug("kgp.uniqueNodeIdentifier ", kgp.uniqueNodeIdentifier)
|
|
||||||
res, err := kgp.db.Exec(context.Background(), releaseKgLockStmt, kgp.uniqueNodeIdentifier)
|
res, err := kgp.db.Exec(context.Background(), releaseKgLockStmt, kgp.uniqueNodeIdentifier)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v4"
|
"github.com/jackc/pgx/v4"
|
||||||
si "github.com/prysmaticlabs/prysm/consensus-types/interfaces"
|
si "github.com/prysmaticlabs/prysm/consensus-types/interfaces"
|
||||||
@ -59,6 +60,7 @@ type ProcessSlot struct {
|
|||||||
HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and skipped slots.
|
HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and skipped slots.
|
||||||
Db sql.Database // The DB object used to write to the DB.
|
Db sql.Database // The DB object used to write to the DB.
|
||||||
Metrics *BeaconClientMetrics // An object to keep track of the beaconclient metrics
|
Metrics *BeaconClientMetrics // An object to keep track of the beaconclient metrics
|
||||||
|
PerformanceMetrics PerformanceMetrics // An object to keep track of performance metrics.
|
||||||
// BeaconBlock
|
// BeaconBlock
|
||||||
|
|
||||||
SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock
|
SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock
|
||||||
@ -74,6 +76,19 @@ type ProcessSlot struct {
|
|||||||
DbBeaconState *DbBeaconState // The model being written to the state table.
|
DbBeaconState *DbBeaconState // The model being written to the state table.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PerformanceMetrics struct {
|
||||||
|
BeaconNodeBlockRetrievalTime time.Duration // How long it took to get the BeaconBlock from the Beacon Node.
|
||||||
|
BeaconNodeStateRetrievalTime time.Duration // How long it took to get the BeaconState from the Beacon Node.
|
||||||
|
ParseBeaconObjectForHash time.Duration // How long it took to get some information from the beacon objects.
|
||||||
|
CheckDbPreProcessing time.Duration // How long it takes to check the DB before processing a block.
|
||||||
|
CreateDbWriteObject time.Duration // How long it takes to create a DB write object.
|
||||||
|
TransactSlotOnly time.Duration // How long it takes to transact the slot information only.
|
||||||
|
CheckReorg time.Duration // How long it takes to check for Reorgs
|
||||||
|
CommitTransaction time.Duration // How long it takes to commit the final transaction.
|
||||||
|
TotalDbTransaction time.Duration // How long it takes from start to committing the entire DB transaction.
|
||||||
|
TotalProcessing time.Duration // How long it took to process the entire slot.
|
||||||
|
}
|
||||||
|
|
||||||
// This function will do all the work to process the slot and write it to the DB.
|
// This function will do all the work to process the slot and write it to the DB.
|
||||||
// It will return the error and error process. The error process is used for providing reach detail to the
|
// It will return the error and error process. The error process is used for providing reach detail to the
|
||||||
// known_gaps table.
|
// known_gaps table.
|
||||||
@ -82,6 +97,7 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil, ""
|
return nil, ""
|
||||||
default:
|
default:
|
||||||
|
totalStart := time.Now()
|
||||||
ps := &ProcessSlot{
|
ps := &ProcessSlot{
|
||||||
Slot: slot,
|
Slot: slot,
|
||||||
BlockRoot: blockRoot,
|
BlockRoot: blockRoot,
|
||||||
@ -89,6 +105,18 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
|
|||||||
HeadOrHistoric: headOrHistoric,
|
HeadOrHistoric: headOrHistoric,
|
||||||
Db: db,
|
Db: db,
|
||||||
Metrics: metrics,
|
Metrics: metrics,
|
||||||
|
PerformanceMetrics: PerformanceMetrics{
|
||||||
|
BeaconNodeBlockRetrievalTime: 0,
|
||||||
|
BeaconNodeStateRetrievalTime: 0,
|
||||||
|
ParseBeaconObjectForHash: 0,
|
||||||
|
CheckDbPreProcessing: 0,
|
||||||
|
CreateDbWriteObject: 0,
|
||||||
|
TransactSlotOnly: 0,
|
||||||
|
CheckReorg: 0,
|
||||||
|
CommitTransaction: 0,
|
||||||
|
TotalDbTransaction: 0,
|
||||||
|
TotalProcessing: 0,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
g, _ := errgroup.WithContext(context.Background())
|
g, _ := errgroup.WithContext(context.Background())
|
||||||
@ -100,10 +128,12 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
|
start := time.Now()
|
||||||
err := ps.getBeaconState(serverAddress, vUnmarshalerCh)
|
err := ps.getBeaconState(serverAddress, vUnmarshalerCh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
ps.PerformanceMetrics.BeaconNodeStateRetrievalTime = time.Since(start)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@ -114,10 +144,12 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
|
start := time.Now()
|
||||||
err := ps.getSignedBeaconBlock(serverAddress, vUnmarshalerCh)
|
err := ps.getSignedBeaconBlock(serverAddress, vUnmarshalerCh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
ps.PerformanceMetrics.BeaconNodeBlockRetrievalTime = time.Since(start)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@ -126,11 +158,15 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
|
|||||||
return err, "processSlot"
|
return err, "processSlot"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
parseBeaconTime := time.Now()
|
||||||
finalBlockRoot, finalStateRoot, finalEth1BlockHash, err := ps.provideFinalHash()
|
finalBlockRoot, finalStateRoot, finalEth1BlockHash, err := ps.provideFinalHash()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err, "CalculateBlockRoot"
|
return err, "CalculateBlockRoot"
|
||||||
}
|
}
|
||||||
|
ps.PerformanceMetrics.ParseBeaconObjectForHash = time.Since(parseBeaconTime)
|
||||||
|
|
||||||
if checkDb {
|
if checkDb {
|
||||||
|
checkDbTime := time.Now()
|
||||||
inDb, err := IsSlotInDb(ctx, ps.Db, strconv.Itoa(ps.Slot), finalBlockRoot, finalStateRoot)
|
inDb, err := IsSlotInDb(ctx, ps.Db, strconv.Itoa(ps.Slot), finalBlockRoot, finalStateRoot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err, "checkDb"
|
return err, "checkDb"
|
||||||
@ -139,26 +175,35 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
|
|||||||
log.WithField("slot", slot).Info("Slot already in the DB.")
|
log.WithField("slot", slot).Info("Slot already in the DB.")
|
||||||
return nil, ""
|
return nil, ""
|
||||||
}
|
}
|
||||||
|
ps.PerformanceMetrics.CheckDbPreProcessing = time.Since(checkDbTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get this object ready to write
|
// Get this object ready to write
|
||||||
|
createDbWriteTime := time.Now()
|
||||||
dw, err := ps.createWriteObjects(finalBlockRoot, finalStateRoot, finalEth1BlockHash)
|
dw, err := ps.createWriteObjects(finalBlockRoot, finalStateRoot, finalEth1BlockHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err, "blockRoot"
|
return err, "blockRoot"
|
||||||
}
|
}
|
||||||
|
ps.PerformanceMetrics.CreateDbWriteObject = time.Since(createDbWriteTime)
|
||||||
|
|
||||||
// Write the object to the DB.
|
// Write the object to the DB.
|
||||||
|
dbFullTransactionTime := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
err := dw.Tx.Rollback(dw.Ctx)
|
err := dw.Tx.Rollback(dw.Ctx)
|
||||||
if err != nil && err != pgx.ErrTxClosed {
|
if err != nil && err != pgx.ErrTxClosed {
|
||||||
loghelper.LogError(err).Error("We were unable to Rollback a transaction")
|
loghelper.LogError(err).Error("We were unable to Rollback a transaction")
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
transactionTime := time.Now()
|
||||||
err = dw.transactFullSlot()
|
err = dw.transactFullSlot()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err, "processSlot"
|
return err, "processSlot"
|
||||||
}
|
}
|
||||||
|
ps.PerformanceMetrics.TransactSlotOnly = time.Since(transactionTime)
|
||||||
|
|
||||||
// Handle any reorgs or skipped slots.
|
// Handle any reorgs or skipped slots.
|
||||||
|
reorgTime := time.Now()
|
||||||
headOrHistoric = strings.ToLower(headOrHistoric)
|
headOrHistoric = strings.ToLower(headOrHistoric)
|
||||||
if headOrHistoric != "head" && headOrHistoric != "historic" {
|
if headOrHistoric != "head" && headOrHistoric != "historic" {
|
||||||
return fmt.Errorf("headOrHistoric must be either historic or head!"), ""
|
return fmt.Errorf("headOrHistoric must be either historic or head!"), ""
|
||||||
@ -166,11 +211,23 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
|
|||||||
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" {
|
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" {
|
||||||
ps.checkPreviousSlot(dw.Tx, dw.Ctx, previousSlot, previousBlockRoot, knownGapsTableIncrement)
|
ps.checkPreviousSlot(dw.Tx, dw.Ctx, previousSlot, previousBlockRoot, knownGapsTableIncrement)
|
||||||
}
|
}
|
||||||
|
ps.PerformanceMetrics.CheckReorg = time.Since(reorgTime)
|
||||||
|
|
||||||
// Commit the transaction
|
// Commit the transaction
|
||||||
|
commitTime := time.Now()
|
||||||
if err = dw.Tx.Commit(dw.Ctx); err != nil {
|
if err = dw.Tx.Commit(dw.Ctx); err != nil {
|
||||||
return err, "transactionCommit"
|
return err, "transactionCommit"
|
||||||
}
|
}
|
||||||
|
ps.PerformanceMetrics.CommitTransaction = time.Since(commitTime)
|
||||||
|
|
||||||
|
// Total metric capture time.
|
||||||
|
ps.PerformanceMetrics.TotalDbTransaction = time.Since(dbFullTransactionTime)
|
||||||
|
ps.PerformanceMetrics.TotalProcessing = time.Since(totalStart)
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"slot": slot,
|
||||||
|
"performanceMetrics": fmt.Sprintf("%+v\n", ps.PerformanceMetrics),
|
||||||
|
}).Debug("Performance Metric output!")
|
||||||
|
|
||||||
return nil, ""
|
return nil, ""
|
||||||
}
|
}
|
||||||
@ -261,23 +318,24 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.Ver
|
|||||||
// Check to make sure that the previous block we processed is the parent of the current block.
|
// Check to make sure that the previous block we processed is the parent of the current block.
|
||||||
func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) {
|
func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) {
|
||||||
parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().ParentRoot())
|
parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().ParentRoot())
|
||||||
if previousSlot == int(ps.FullBeaconState.Slot()) {
|
slot := int(ps.FullBeaconState.Slot())
|
||||||
|
if previousSlot == slot {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"slot": ps.FullBeaconState.Slot(),
|
"slot": slot,
|
||||||
"fork": true,
|
"fork": true,
|
||||||
}).Warn("A fork occurred! The previous slot and current slot match.")
|
}).Warn("A fork occurred! The previous slot and current slot match.")
|
||||||
transactReorgs(tx, ctx, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
|
transactReorgs(tx, ctx, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
|
||||||
} else if previousSlot > int(ps.FullBeaconState.Slot()) {
|
} else if previousSlot > slot {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"previousSlot": previousSlot,
|
"previousSlot": previousSlot,
|
||||||
"curSlot": int(ps.FullBeaconState.Slot()),
|
"curSlot": slot,
|
||||||
}).Warn("We noticed the previous slot is greater than the current slot.")
|
}).Warn("We noticed the previous slot is greater than the current slot.")
|
||||||
} else if previousSlot+1 != int(ps.FullBeaconState.Slot()) {
|
} else if previousSlot+1 != slot {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"previousSlot": previousSlot,
|
"previousSlot": previousSlot,
|
||||||
"currentSlot": ps.FullBeaconState.Slot(),
|
"currentSlot": slot,
|
||||||
}).Error("We skipped a few slots.")
|
}).Error("We skipped a few slots.")
|
||||||
transactKnownGaps(tx, ctx, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot())-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics)
|
transactKnownGaps(tx, ctx, knownGapsTableIncrement, previousSlot+1, slot-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics)
|
||||||
} else if previousBlockRoot != parentRoot {
|
} else if previousBlockRoot != parentRoot {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"previousBlockRoot": previousBlockRoot,
|
"previousBlockRoot": previousBlockRoot,
|
||||||
@ -298,7 +356,7 @@ func (ps *ProcessSlot) createWriteObjects(blockRoot, stateRoot, eth1BlockHash st
|
|||||||
status = "proposed"
|
status = "proposed"
|
||||||
}
|
}
|
||||||
|
|
||||||
dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.SszSignedBeaconBlock, ps.SszBeaconState, ps.Metrics)
|
dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, &ps.SszSignedBeaconBlock, &ps.SszBeaconState, ps.Metrics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return dw, err
|
return dw, err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user