From 1ca74548fffea50c5ecb296445c00d3c3b3c2957 Mon Sep 17 00:00:00 2001 From: prathamesh0 <42446521+prathamesh0@users.noreply.github.com> Date: Thu, 16 May 2024 16:34:07 +0530 Subject: [PATCH] Fetch job queue counts for metrics on scraping (#509) --- packages/cli/src/job-runner.ts | 2 +- packages/util/src/job-queue.ts | 37 ++++++++---------------------- packages/util/src/metrics.ts | 42 +++++++++++++++++++++++++++++----- 3 files changed, 46 insertions(+), 35 deletions(-) diff --git a/packages/cli/src/job-runner.ts b/packages/cli/src/job-runner.ts index 635325e2..c9f2d720 100644 --- a/packages/cli/src/job-runner.ts +++ b/packages/cli/src/job-runner.ts @@ -154,7 +154,7 @@ export class JobRunnerCmd { await startJobRunner(jobRunner); jobRunner.handleShutdown(); - await startMetricsServer(config, indexer, this._currentEndpointIndex); + await startMetricsServer(config, jobQueue, indexer, this._currentEndpointIndex); } _getArgv (): any { diff --git a/packages/util/src/job-queue.ts b/packages/util/src/job-queue.ts index 49fdffa8..12378e0e 100644 --- a/packages/util/src/job-queue.ts +++ b/packages/util/src/job-queue.ts @@ -4,9 +4,9 @@ import assert from 'assert'; import debug from 'debug'; -import PgBoss from 'pg-boss'; +import PgBoss, { MonitorStates } from 'pg-boss'; -import { jobCount, lastJobCompletedOn } from './metrics'; +import { lastJobCompletedOn } from './metrics'; import { wait } from './misc'; interface Config { @@ -48,35 +48,10 @@ export class JobQueue { deleteAfterHours: 1, // 1 hour - newJobCheckInterval: 100, - - // Time interval for firing monitor-states event. - monitorStateIntervalSeconds: 10 + newJobCheckInterval: 100 }); this._boss.on('error', error => log(error)); - - this._boss.on('monitor-states', monitorStates => { - jobCount.set({ state: 'all' }, monitorStates.all); - jobCount.set({ state: 'created' }, monitorStates.created); - jobCount.set({ state: 'retry' }, monitorStates.retry); - jobCount.set({ state: 'active' }, monitorStates.active); - jobCount.set({ state: 'completed' }, monitorStates.completed); - jobCount.set({ state: 'expired' }, monitorStates.expired); - jobCount.set({ state: 'cancelled' }, monitorStates.cancelled); - jobCount.set({ state: 'failed' }, monitorStates.failed); - - Object.entries(monitorStates.queues).forEach(([name, counts]) => { - jobCount.set({ state: 'all', name }, counts.all); - jobCount.set({ state: 'created', name }, counts.created); - jobCount.set({ state: 'retry', name }, counts.retry); - jobCount.set({ state: 'active', name }, counts.active); - jobCount.set({ state: 'completed', name }, counts.completed); - jobCount.set({ state: 'expired', name }, counts.expired); - jobCount.set({ state: 'cancelled', name }, counts.cancelled); - jobCount.set({ state: 'failed', name }, counts.failed); - }); - }); } get maxCompletionLag (): number { @@ -178,4 +153,10 @@ export class JobQueue { await wait(EMPTY_QUEUE_CHECK_INTERVAL); } } + + async getJobCounts (): Promise { + // Use any as countStates() method is not present in the types + const monitorStates = await (this._boss as any).countStates(); + return monitorStates; + } } diff --git a/packages/util/src/metrics.ts b/packages/util/src/metrics.ts index 670fbe3f..9605d318 100644 --- a/packages/util/src/metrics.ts +++ b/packages/util/src/metrics.ts @@ -12,6 +12,7 @@ import JsonRpcProvider = ethers.providers.JsonRpcProvider; import { Config } from './config'; import { IndexerInterface } from './types'; +import { JobQueue } from './job-queue'; const DB_SIZE_QUERY = 'SELECT pg_database_size(current_database())'; @@ -27,11 +28,6 @@ export async function fetchLatestBlockNumber (provider: JsonRpcProvider): Promis } // Create custom metrics -export const jobCount = new client.Gauge({ - name: 'pgboss_jobs_total', - help: 'Total entries in job table', - labelNames: ['state', 'name'] as const -}); export const lastJobCompletedOn = new client.Gauge({ name: 'pgboss_last_job_completed_timestamp_seconds', @@ -116,7 +112,7 @@ const upstreamEndpointsMetric = new client.Gauge({ // Export metrics on a server const app: Application = express(); -export const startMetricsServer = async (config: Config, indexer: IndexerInterface, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise => { +export const startMetricsServer = async (config: Config, jobQueue: JobQueue, indexer: IndexerInterface, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise => { if (!config.metrics) { log('Metrics is disabled. To enable add metrics host and port.'); return; @@ -142,6 +138,8 @@ export const startMetricsServer = async (config: Config, indexer: IndexerInterfa } }); + await registerJobQueueMetrics(jobQueue); + await registerWatcherConfigMetrics(config); setActiveUpstreamEndpointMetric(config, endpointIndexes.rpcProviderEndpoint); @@ -246,3 +244,35 @@ const registerWatcherConfigMetrics = async ({ server, upstream, jobQueue }: Conf watcherConfigMetric.set({ category: 'jobqueue', field: 'historical_logs_block_range' }, Number(jobQueue.historicalLogsBlockRange)); watcherConfigMetric.set({ category: 'jobqueue', field: 'historical_max_fetch_ahead' }, Number(jobQueue.historicalMaxFetchAhead)); }; + +const registerJobQueueMetrics = async (jobQueue: JobQueue): Promise => { + // eslint-disable-next-line no-new + new client.Gauge({ + name: 'pgboss_jobs_total', + help: 'Total entries in job table', + labelNames: ['state', 'name'] as const, + async collect () { + const jobCounts = await jobQueue.getJobCounts(); + + this.set({ state: 'all' }, jobCounts.all); + this.set({ state: 'created' }, jobCounts.created); + this.set({ state: 'retry' }, jobCounts.retry); + this.set({ state: 'active' }, jobCounts.active); + this.set({ state: 'completed' }, jobCounts.completed); + this.set({ state: 'expired' }, jobCounts.expired); + this.set({ state: 'cancelled' }, jobCounts.cancelled); + this.set({ state: 'failed' }, jobCounts.failed); + + Object.entries(jobCounts.queues as Array).forEach(([name, counts]) => { + this.set({ state: 'all', name }, counts.all); + this.set({ state: 'created', name }, counts.created); + this.set({ state: 'retry', name }, counts.retry); + this.set({ state: 'active', name }, counts.active); + this.set({ state: 'completed', name }, counts.completed); + this.set({ state: 'expired', name }, counts.expired); + this.set({ state: 'cancelled', name }, counts.cancelled); + this.set({ state: 'failed', name }, counts.failed); + }); + } + }); +};