Fetch job queue counts for metrics on scraping (#509)

This commit is contained in:
prathamesh0 2024-05-16 16:34:07 +05:30 committed by GitHub
parent 20fa6ceaa6
commit 1ca74548ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 46 additions and 35 deletions

View File

@ -154,7 +154,7 @@ export class JobRunnerCmd {
await startJobRunner(jobRunner); await startJobRunner(jobRunner);
jobRunner.handleShutdown(); jobRunner.handleShutdown();
await startMetricsServer(config, indexer, this._currentEndpointIndex); await startMetricsServer(config, jobQueue, indexer, this._currentEndpointIndex);
} }
_getArgv (): any { _getArgv (): any {

View File

@ -4,9 +4,9 @@
import assert from 'assert'; import assert from 'assert';
import debug from 'debug'; import debug from 'debug';
import PgBoss from 'pg-boss'; import PgBoss, { MonitorStates } from 'pg-boss';
import { jobCount, lastJobCompletedOn } from './metrics'; import { lastJobCompletedOn } from './metrics';
import { wait } from './misc'; import { wait } from './misc';
interface Config { interface Config {
@ -48,35 +48,10 @@ export class JobQueue {
deleteAfterHours: 1, // 1 hour deleteAfterHours: 1, // 1 hour
newJobCheckInterval: 100, newJobCheckInterval: 100
// Time interval for firing monitor-states event.
monitorStateIntervalSeconds: 10
}); });
this._boss.on('error', error => log(error)); this._boss.on('error', error => log(error));
this._boss.on('monitor-states', monitorStates => {
jobCount.set({ state: 'all' }, monitorStates.all);
jobCount.set({ state: 'created' }, monitorStates.created);
jobCount.set({ state: 'retry' }, monitorStates.retry);
jobCount.set({ state: 'active' }, monitorStates.active);
jobCount.set({ state: 'completed' }, monitorStates.completed);
jobCount.set({ state: 'expired' }, monitorStates.expired);
jobCount.set({ state: 'cancelled' }, monitorStates.cancelled);
jobCount.set({ state: 'failed' }, monitorStates.failed);
Object.entries(monitorStates.queues).forEach(([name, counts]) => {
jobCount.set({ state: 'all', name }, counts.all);
jobCount.set({ state: 'created', name }, counts.created);
jobCount.set({ state: 'retry', name }, counts.retry);
jobCount.set({ state: 'active', name }, counts.active);
jobCount.set({ state: 'completed', name }, counts.completed);
jobCount.set({ state: 'expired', name }, counts.expired);
jobCount.set({ state: 'cancelled', name }, counts.cancelled);
jobCount.set({ state: 'failed', name }, counts.failed);
});
});
} }
get maxCompletionLag (): number { get maxCompletionLag (): number {
@ -178,4 +153,10 @@ export class JobQueue {
await wait(EMPTY_QUEUE_CHECK_INTERVAL); await wait(EMPTY_QUEUE_CHECK_INTERVAL);
} }
} }
async getJobCounts (): Promise<MonitorStates> {
// Use any as countStates() method is not present in the types
const monitorStates = await (this._boss as any).countStates();
return monitorStates;
}
} }

View File

@ -12,6 +12,7 @@ import JsonRpcProvider = ethers.providers.JsonRpcProvider;
import { Config } from './config'; import { Config } from './config';
import { IndexerInterface } from './types'; import { IndexerInterface } from './types';
import { JobQueue } from './job-queue';
const DB_SIZE_QUERY = 'SELECT pg_database_size(current_database())'; const DB_SIZE_QUERY = 'SELECT pg_database_size(current_database())';
@ -27,11 +28,6 @@ export async function fetchLatestBlockNumber (provider: JsonRpcProvider): Promis
} }
// Create custom metrics // Create custom metrics
export const jobCount = new client.Gauge({
name: 'pgboss_jobs_total',
help: 'Total entries in job table',
labelNames: ['state', 'name'] as const
});
export const lastJobCompletedOn = new client.Gauge({ export const lastJobCompletedOn = new client.Gauge({
name: 'pgboss_last_job_completed_timestamp_seconds', name: 'pgboss_last_job_completed_timestamp_seconds',
@ -116,7 +112,7 @@ const upstreamEndpointsMetric = new client.Gauge({
// Export metrics on a server // Export metrics on a server
const app: Application = express(); const app: Application = express();
export const startMetricsServer = async (config: Config, indexer: IndexerInterface, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise<void> => { export const startMetricsServer = async (config: Config, jobQueue: JobQueue, indexer: IndexerInterface, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise<void> => {
if (!config.metrics) { if (!config.metrics) {
log('Metrics is disabled. To enable add metrics host and port.'); log('Metrics is disabled. To enable add metrics host and port.');
return; return;
@ -142,6 +138,8 @@ export const startMetricsServer = async (config: Config, indexer: IndexerInterfa
} }
}); });
await registerJobQueueMetrics(jobQueue);
await registerWatcherConfigMetrics(config); await registerWatcherConfigMetrics(config);
setActiveUpstreamEndpointMetric(config, endpointIndexes.rpcProviderEndpoint); setActiveUpstreamEndpointMetric(config, endpointIndexes.rpcProviderEndpoint);
@ -246,3 +244,35 @@ const registerWatcherConfigMetrics = async ({ server, upstream, jobQueue }: Conf
watcherConfigMetric.set({ category: 'jobqueue', field: 'historical_logs_block_range' }, Number(jobQueue.historicalLogsBlockRange)); watcherConfigMetric.set({ category: 'jobqueue', field: 'historical_logs_block_range' }, Number(jobQueue.historicalLogsBlockRange));
watcherConfigMetric.set({ category: 'jobqueue', field: 'historical_max_fetch_ahead' }, Number(jobQueue.historicalMaxFetchAhead)); watcherConfigMetric.set({ category: 'jobqueue', field: 'historical_max_fetch_ahead' }, Number(jobQueue.historicalMaxFetchAhead));
}; };
const registerJobQueueMetrics = async (jobQueue: JobQueue): Promise<void> => {
// eslint-disable-next-line no-new
new client.Gauge({
name: 'pgboss_jobs_total',
help: 'Total entries in job table',
labelNames: ['state', 'name'] as const,
async collect () {
const jobCounts = await jobQueue.getJobCounts();
this.set({ state: 'all' }, jobCounts.all);
this.set({ state: 'created' }, jobCounts.created);
this.set({ state: 'retry' }, jobCounts.retry);
this.set({ state: 'active' }, jobCounts.active);
this.set({ state: 'completed' }, jobCounts.completed);
this.set({ state: 'expired' }, jobCounts.expired);
this.set({ state: 'cancelled' }, jobCounts.cancelled);
this.set({ state: 'failed' }, jobCounts.failed);
Object.entries(jobCounts.queues as Array<any>).forEach(([name, counts]) => {
this.set({ state: 'all', name }, counts.all);
this.set({ state: 'created', name }, counts.created);
this.set({ state: 'retry', name }, counts.retry);
this.set({ state: 'active', name }, counts.active);
this.set({ state: 'completed', name }, counts.completed);
this.set({ state: 'expired', name }, counts.expired);
this.set({ state: 'cancelled', name }, counts.cancelled);
this.set({ state: 'failed', name }, counts.failed);
});
}
});
};