Add Prometheus metrics and write SQL file if it has data.

This commit is contained in:
Abdul Rabbani 2022-03-30 12:53:44 -04:00
parent b960661807
commit fc6a9379e1
5 changed files with 100 additions and 43 deletions

View File

@ -9,7 +9,8 @@ The folder will allow developers to clone/move related repositories to this dire
It is recommended that you move the following repositories under this folder. Keep the repository names! It is recommended that you move the following repositories under this folder. Keep the repository names!
- `vulcanize/foundry-tests` - `vulcanize/foundry-tests`
- `vulcanize/hive`
- `vulcanize/ipld-eth-db` - `vulcanize/ipld-eth-db`
## Symlinks ## Symlinks
You can also create symlinks in this folder with the location of your repositories.

View File

@ -19,8 +19,10 @@ package statediff
import ( import (
"context" "context"
"fmt" "fmt"
"io/ioutil"
"math/big" "math/big"
"os" "os"
"strings"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
@ -36,11 +38,6 @@ var (
defaultWriteFilePath = "./known_gaps.sql" defaultWriteFilePath = "./known_gaps.sql"
) )
type KnownGaps interface {
PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error
FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error
}
type KnownGapsState struct { type KnownGapsState struct {
// Should we check for gaps by looking at the DB and comparing the latest block with head // Should we check for gaps by looking at the DB and comparing the latest block with head
checkForGaps bool checkForGaps bool
@ -61,10 +58,15 @@ type KnownGapsState struct {
writeFilePath string writeFilePath string
// DB object to use for reading and writing to the DB // DB object to use for reading and writing to the DB
db sql.Database db sql.Database
//Do we have entries in the local sql file that need to be written to the DB
sqlFileWaitingForWrite bool
// Metrics object used to track metrics.
statediffMetrics statediffMetricsHandles
} }
// Unused
func NewKnownGapsState(checkForGaps bool, processingKey int64, expectedDifference *big.Int, func NewKnownGapsState(checkForGaps bool, processingKey int64, expectedDifference *big.Int,
errorState bool, writeFilePath string, db sql.Database) *KnownGapsState { errorState bool, writeFilePath string, db sql.Database, statediffMetrics statediffMetricsHandles) *KnownGapsState {
return &KnownGapsState{ return &KnownGapsState{
checkForGaps: checkForGaps, checkForGaps: checkForGaps,
@ -73,11 +75,12 @@ func NewKnownGapsState(checkForGaps bool, processingKey int64, expectedDifferenc
errorState: errorState, errorState: errorState,
writeFilePath: writeFilePath, writeFilePath: writeFilePath,
db: db, db: db,
statediffMetrics: statediffMetrics,
} }
} }
func MinMax(array []*big.Int) (*big.Int, *big.Int) { func minMax(array []*big.Int) (*big.Int, *big.Int) {
var max *big.Int = array[0] var max *big.Int = array[0]
var min *big.Int = array[0] var min *big.Int = array[0]
for _, value := range array { for _, value := range array {
@ -96,7 +99,7 @@ func MinMax(array []*big.Int) (*big.Int, *big.Int) {
// 2. Write to sql file locally. // 2. Write to sql file locally.
// 3. Write to prometheus directly. // 3. Write to prometheus directly.
// 4. Logs and error. // 4. Logs and error.
func (kg *KnownGapsState) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error { func (kg *KnownGapsState) pushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
if startingBlockNumber.Cmp(endingBlockNumber) != -1 { if startingBlockNumber.Cmp(endingBlockNumber) != -1 {
return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber) return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber)
} }
@ -106,9 +109,13 @@ func (kg *KnownGapsState) PushKnownGaps(startingBlockNumber *big.Int, endingBloc
CheckedOut: checkedOut, CheckedOut: checkedOut,
ProcessingKey: processingKey, ProcessingKey: processingKey,
} }
log.Info("Writing known gaps to the DB")
log.Info("Updating Metrics for the start and end block")
//kg.statediffMetrics.knownGapStart.Update(startingBlockNumber.Int64())
//kg.statediffMetrics.knownGapEnd.Update(endingBlockNumber.Int64())
var writeErr error var writeErr error
log.Info("Writing known gaps to the DB")
if kg.db != nil { if kg.db != nil {
dbErr := kg.upsertKnownGaps(knownGap) dbErr := kg.upsertKnownGaps(knownGap)
if dbErr != nil { if dbErr != nil {
@ -117,22 +124,24 @@ func (kg *KnownGapsState) PushKnownGaps(startingBlockNumber *big.Int, endingBloc
} }
} else { } else {
writeErr = kg.upsertKnownGapsFile(knownGap) writeErr = kg.upsertKnownGapsFile(knownGap)
} }
if writeErr != nil { if writeErr != nil {
log.Info("Unsuccessful when writing to a file", "Error", writeErr) log.Error("Unsuccessful when writing to a file", "Error", writeErr)
return writeErr log.Error("Updating Metrics for the start and end error block")
log.Error("Unable to write the following Gaps to DB or File", "startBlock", startingBlockNumber, "endBlock", endingBlockNumber)
kg.statediffMetrics.knownGapErrorStart.Update(startingBlockNumber.Int64())
kg.statediffMetrics.knownGapErrorEnd.Update(endingBlockNumber.Int64())
} }
return nil return nil
} }
// This is a simple wrapper function to write gaps from a knownErrorBlocks array. // This is a simple wrapper function to write gaps from a knownErrorBlocks array.
func (kg *KnownGapsState) captureErrorBlocks(knownErrorBlocks []*big.Int) { func (kg *KnownGapsState) captureErrorBlocks(knownErrorBlocks []*big.Int) {
startErrorBlock, endErrorBlock := MinMax(knownErrorBlocks) startErrorBlock, endErrorBlock := minMax(knownErrorBlocks)
log.Warn("The following Gaps were found", "knownErrorBlocks", knownErrorBlocks) log.Warn("The following Gaps were found", "knownErrorBlocks", knownErrorBlocks)
log.Warn("Updating known Gaps table", "startErrorBlock", startErrorBlock, "endErrorBlock", endErrorBlock, "processingKey", kg.processingKey) log.Warn("Updating known Gaps table", "startErrorBlock", startErrorBlock, "endErrorBlock", endErrorBlock, "processingKey", kg.processingKey)
kg.PushKnownGaps(startErrorBlock, endErrorBlock, false, kg.processingKey) kg.pushKnownGaps(startErrorBlock, endErrorBlock, false, kg.processingKey)
} }
@ -156,9 +165,9 @@ func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDiffer
// TODO: // TODO:
// REmove the return value // REmove the return value
// Write to file if err in writing to DB // Write to file if err in writing to DB
func (kg *KnownGapsState) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error { func (kg *KnownGapsState) findAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
// Make this global // Make this global
latestBlockInDb, err := kg.QueryDbToBigInt(dbQueryString) latestBlockInDb, err := kg.queryDbToBigInt(dbQueryString)
if err != nil { if err != nil {
return err return err
} }
@ -171,7 +180,7 @@ func (kg *KnownGapsState) FindAndUpdateGaps(latestBlockOnChain *big.Int, expecte
endBlock.Sub(latestBlockOnChain, expectedDifference) endBlock.Sub(latestBlockOnChain, expectedDifference)
log.Warn("Found Gaps starting at", "startBlock", startBlock, "endingBlock", endBlock) log.Warn("Found Gaps starting at", "startBlock", startBlock, "endingBlock", endBlock)
err := kg.PushKnownGaps(startBlock, endBlock, false, processingKey) err := kg.pushKnownGaps(startBlock, endBlock, false, processingKey)
if err != nil { if err != nil {
log.Error("We were unable to write the following gap to the DB", "start Block", startBlock, "endBlock", endBlock, "error", err) log.Error("We were unable to write the following gap to the DB", "start Block", startBlock, "endBlock", endBlock, "error", err)
return err return err
@ -212,11 +221,37 @@ func (kg *KnownGapsState) upsertKnownGapsFile(knownGaps models.KnownGapsModel) e
return err return err
} }
log.Info("Wrote the gaps to a local SQL file") log.Info("Wrote the gaps to a local SQL file")
kg.sqlFileWaitingForWrite = true
return nil
}
func (kg *KnownGapsState) writeSqlFileStmtToDb() error {
log.Info("Writing the local SQL file for KnownGaps to the DB")
file, ioErr := ioutil.ReadFile(kg.writeFilePath)
if ioErr != nil {
log.Error("Unable to open local SQL File for writing")
return ioErr
}
requests := strings.Split(string(file), ";")
for _, request := range requests {
_, err := kg.db.Exec(context.Background(), request)
if err != nil {
log.Error("Unable to run insert statement from file to the DB")
return err
}
}
if err := os.Truncate(kg.writeFilePath, 0); err != nil {
log.Info("Failed to empty knownGaps file after inserting statements to the DB", "error", err)
}
kg.sqlFileWaitingForWrite = false
return nil return nil
} }
// This is a simple wrapper function which will run QueryRow on the DB // This is a simple wrapper function which will run QueryRow on the DB
func (kg *KnownGapsState) QueryDb(queryString string) (string, error) { func (kg *KnownGapsState) queryDb(queryString string) (string, error) {
var ret string var ret string
err := kg.db.QueryRow(context.Background(), queryString).Scan(&ret) err := kg.db.QueryRow(context.Background(), queryString).Scan(&ret)
if err != nil { if err != nil {
@ -228,9 +263,9 @@ func (kg *KnownGapsState) QueryDb(queryString string) (string, error) {
// This function is a simple wrapper which will call QueryDb but the return value will be // This function is a simple wrapper which will call QueryDb but the return value will be
// a big int instead of a string // a big int instead of a string
func (kg *KnownGapsState) QueryDbToBigInt(queryString string) (*big.Int, error) { func (kg *KnownGapsState) queryDbToBigInt(queryString string) (*big.Int, error) {
ret := new(big.Int) ret := new(big.Int)
res, err := kg.QueryDb(queryString) res, err := kg.queryDb(queryString)
if err != nil { if err != nil {
return ret, err return ret, err
} }

View File

@ -3,12 +3,11 @@ package statediff
import ( import (
"context" "context"
"fmt" "fmt"
"io/ioutil"
"math/big" "math/big"
"os" "os"
"strings"
"testing" "testing"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
@ -68,6 +67,7 @@ func testWriteToDb(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) {
processingKey: tc.processingKey, processingKey: tc.processingKey,
expectedDifference: big.NewInt(tc.expectedDif), expectedDifference: big.NewInt(tc.expectedDif),
db: db, db: db,
statediffMetrics: RegisterStatediffMetrics(metrics.DefaultRegistry),
} }
service := &Service{ service := &Service{
KnownGaps: knownGaps, KnownGaps: knownGaps,
@ -106,6 +106,7 @@ func testWriteToFile(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) {
processingKey: tc.processingKey, processingKey: tc.processingKey,
expectedDifference: big.NewInt(tc.expectedDif), expectedDifference: big.NewInt(tc.expectedDif),
writeFilePath: knownGapsFilePath, writeFilePath: knownGapsFilePath,
statediffMetrics: RegisterStatediffMetrics(metrics.DefaultRegistry),
db: nil, // Only set to nil to be verbose that we can't use it db: nil, // Only set to nil to be verbose that we can't use it
} }
service := &Service{ service := &Service{
@ -116,18 +117,13 @@ func testWriteToFile(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) {
service.KnownGaps.knownErrorBlocks = knownErrorBlocks service.KnownGaps.knownErrorBlocks = knownErrorBlocks
testCaptureErrorBlocks(t, service) testCaptureErrorBlocks(t, service)
file, ioErr := ioutil.ReadFile(knownGapsFilePath)
require.NoError(t, ioErr)
requests := strings.Split(string(file), ";")
newDb := setupDb(t) newDb := setupDb(t)
service.KnownGaps.db = newDb service.KnownGaps.db = newDb
for _, request := range requests { if service.KnownGaps.sqlFileWaitingForWrite {
_, err := newDb.Exec(context.Background(), request) writeErr := service.KnownGaps.writeSqlFileStmtToDb()
require.NoError(t, err) require.NoError(t, writeErr)
} }
// Validate that the upsert was done correctly. // Validate that the upsert was done correctly.
validateUpsert(t, service, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd) validateUpsert(t, service, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
tearDown(t, newDb) tearDown(t, newDb)
@ -145,12 +141,13 @@ func testFindAndUpdateGaps(t *testing.T, wipeDbBeforeStart bool) {
processingKey: 1, processingKey: 1,
expectedDifference: big.NewInt(1), expectedDifference: big.NewInt(1),
db: db, db: db,
statediffMetrics: RegisterStatediffMetrics(metrics.DefaultRegistry),
} }
service := &Service{ service := &Service{
KnownGaps: knownGaps, KnownGaps: knownGaps,
} }
latestBlockInDb, err := service.KnownGaps.QueryDbToBigInt("SELECT MAX(block_number) FROM eth.header_cids") latestBlockInDb, err := service.KnownGaps.queryDbToBigInt("SELECT MAX(block_number) FROM eth.header_cids")
if err != nil { if err != nil {
t.Skip("Can't find a block in the eth.header_cids table.. Please put one there") t.Skip("Can't find a block in the eth.header_cids table.. Please put one there")
} }
@ -165,7 +162,7 @@ func testFindAndUpdateGaps(t *testing.T, wipeDbBeforeStart bool) {
t.Log("The latest block on the chain is: ", latestBlockOnChain) t.Log("The latest block on the chain is: ", latestBlockOnChain)
t.Log("The latest block on the DB is: ", latestBlockInDb) t.Log("The latest block on the DB is: ", latestBlockInDb)
gapUpsertErr := service.KnownGaps.FindAndUpdateGaps(latestBlockOnChain, expectedDifference, 0) gapUpsertErr := service.KnownGaps.findAndUpdateGaps(latestBlockOnChain, expectedDifference, 0)
require.NoError(t, gapUpsertErr) require.NoError(t, gapUpsertErr)
startBlock := big.NewInt(0) startBlock := big.NewInt(0)
@ -195,7 +192,7 @@ func validateUpsert(t *testing.T, service *Service, startingBlock int64, endingB
t.Logf("Starting to query blocks: %d - %d", startingBlock, endingBlock) t.Logf("Starting to query blocks: %d - %d", startingBlock, endingBlock)
queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startingBlock, endingBlock) queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startingBlock, endingBlock)
_, queryErr := service.KnownGaps.QueryDb(queryString) // Figure out the string. _, queryErr := service.KnownGaps.queryDb(queryString) // Figure out the string.
t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startingBlock, endingBlock) t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startingBlock, endingBlock)
require.NoError(t, queryErr) require.NoError(t, queryErr)
} }

View File

@ -50,6 +50,14 @@ type statediffMetricsHandles struct {
// Current length of chainEvent channels // Current length of chainEvent channels
serviceLoopChannelLen metrics.Gauge serviceLoopChannelLen metrics.Gauge
writeLoopChannelLen metrics.Gauge writeLoopChannelLen metrics.Gauge
// The start block of the known gap
knownGapStart metrics.Gauge
// The end block of the known gap
knownGapEnd metrics.Gauge
// A known gaps start block which had an error being written to the DB
knownGapErrorStart metrics.Gauge
// A known gaps end block which had an error being written to the DB
knownGapErrorEnd metrics.Gauge
} }
func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles { func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles {
@ -59,6 +67,10 @@ func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles {
lastStatediffHeight: metrics.NewGauge(), lastStatediffHeight: metrics.NewGauge(),
serviceLoopChannelLen: metrics.NewGauge(), serviceLoopChannelLen: metrics.NewGauge(),
writeLoopChannelLen: metrics.NewGauge(), writeLoopChannelLen: metrics.NewGauge(),
knownGapStart: metrics.NewGauge(),
knownGapEnd: metrics.NewGauge(),
knownGapErrorStart: metrics.NewGauge(),
knownGapErrorEnd: metrics.NewGauge(),
} }
subsys := "service" subsys := "service"
reg.Register(metricName(subsys, "last_sync_height"), ctx.lastSyncHeight) reg.Register(metricName(subsys, "last_sync_height"), ctx.lastSyncHeight)
@ -66,5 +78,9 @@ func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles {
reg.Register(metricName(subsys, "last_statediff_height"), ctx.lastStatediffHeight) reg.Register(metricName(subsys, "last_statediff_height"), ctx.lastStatediffHeight)
reg.Register(metricName(subsys, "service_loop_channel_len"), ctx.serviceLoopChannelLen) reg.Register(metricName(subsys, "service_loop_channel_len"), ctx.serviceLoopChannelLen)
reg.Register(metricName(subsys, "write_loop_channel_len"), ctx.writeLoopChannelLen) reg.Register(metricName(subsys, "write_loop_channel_len"), ctx.writeLoopChannelLen)
reg.Register(metricName(subsys, "known_gaps_start"), ctx.knownGapStart)
reg.Register(metricName(subsys, "known_gaps_end"), ctx.knownGapEnd)
reg.Register(metricName(subsys, "known_gaps_error_start"), ctx.knownGapErrorStart)
reg.Register(metricName(subsys, "known_gaps_error_end"), ctx.knownGapErrorEnd)
return ctx return ctx
} }

View File

@ -190,6 +190,8 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
errorState: false, errorState: false,
writeFilePath: params.KnownGapsFilePath, writeFilePath: params.KnownGapsFilePath,
db: db, db: db,
statediffMetrics: statediffMetrics,
sqlFileWaitingForWrite: false,
} }
if params.IndexerConfig.Type() == shared.POSTGRES { if params.IndexerConfig.Type() == shared.POSTGRES {
knownGaps.checkForGaps = true knownGaps.checkForGaps = true
@ -336,7 +338,7 @@ func (sds *Service) writeLoopWorker(params workerParams) {
// Check and update the gaps table. // Check and update the gaps table.
if sds.KnownGaps.checkForGaps && !sds.KnownGaps.errorState { if sds.KnownGaps.checkForGaps && !sds.KnownGaps.errorState {
log.Info("Checking for Gaps at", "current block", currentBlock.Number()) log.Info("Checking for Gaps at", "current block", currentBlock.Number())
go sds.KnownGaps.FindAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey) go sds.KnownGaps.findAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey)
sds.KnownGaps.checkForGaps = false sds.KnownGaps.checkForGaps = false
} }
@ -357,11 +359,17 @@ func (sds *Service) writeLoopWorker(params workerParams) {
staticKnownErrorBlocks := make([]*big.Int, len(sds.KnownGaps.knownErrorBlocks)) staticKnownErrorBlocks := make([]*big.Int, len(sds.KnownGaps.knownErrorBlocks))
copy(staticKnownErrorBlocks, sds.KnownGaps.knownErrorBlocks) copy(staticKnownErrorBlocks, sds.KnownGaps.knownErrorBlocks)
sds.KnownGaps.knownErrorBlocks = nil sds.KnownGaps.knownErrorBlocks = nil
log.Debug("Starting capturedMissedBlocks")
go sds.KnownGaps.captureErrorBlocks(staticKnownErrorBlocks) go sds.KnownGaps.captureErrorBlocks(staticKnownErrorBlocks)
} }
if sds.KnownGaps.sqlFileWaitingForWrite {
log.Info("There are entries in the SQL file for knownGaps that should be written")
err := sds.KnownGaps.writeSqlFileStmtToDb()
if err != nil {
log.Error("Unable to write KnownGap sql file to DB")
}
}
// TODO: how to handle with concurrent workers // TODO: how to handle with concurrent workers
statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64())) statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64()))
case <-sds.QuitChan: case <-sds.QuitChan: