mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-01-08 12:28:05 +00:00
Add prometheus metrics support for watchers (#152)
* Add prometheus metrics endpoint in watcher * Add event, sync status and DB size metrics * Fix subgraph watchers DB entities directory path * Make watcher metrics optional using config
This commit is contained in:
parent
1bcabd64f2
commit
a15305450c
@ -51,7 +51,7 @@ const main = async (): Promise<void> => {
|
||||
await db.init();
|
||||
{{#if (subgraphPath)}}
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -24,6 +24,10 @@
|
||||
# Use -1 for skipping check on block range.
|
||||
maxEventsBlockRange = 1000
|
||||
|
||||
[metrics]
|
||||
host = "127.0.0.1"
|
||||
port = 9000
|
||||
|
||||
[database]
|
||||
type = "postgres"
|
||||
host = "localhost"
|
||||
|
@ -46,7 +46,7 @@ const main = async (): Promise<void> => {
|
||||
await db.init();
|
||||
{{#if (subgraphPath)}}
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -50,7 +50,7 @@ export const main = async (): Promise<any> => {
|
||||
await db.init();
|
||||
{{#if (subgraphPath)}}
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -47,7 +47,7 @@ const main = async (): Promise<void> => {
|
||||
await db.init();
|
||||
{{#if (subgraphPath)}}
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -48,7 +48,7 @@ const main = async (): Promise<void> => {
|
||||
await db.init();
|
||||
{{#if (subgraphPath)}}
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -24,7 +24,8 @@ import {
|
||||
JOB_KIND_PRUNE,
|
||||
JobQueueConfig,
|
||||
DEFAULT_CONFIG_PATH,
|
||||
initClients
|
||||
initClients,
|
||||
startMetricsServer
|
||||
} from '@vulcanize/util';
|
||||
{{#if (subgraphPath)}}
|
||||
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
|
||||
@ -286,6 +287,8 @@ export const main = async (): Promise<any> => {
|
||||
|
||||
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
|
||||
await jobRunner.start();
|
||||
|
||||
startMetricsServer(config, indexer);
|
||||
};
|
||||
|
||||
main().then(() => {
|
||||
|
@ -44,7 +44,7 @@ export const handler = async (argv: any): Promise<void> => {
|
||||
await db.init();
|
||||
{{#if (subgraphPath)}}
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -64,7 +64,7 @@ const main = async (): Promise<void> => {
|
||||
await db.init();
|
||||
{{#if (subgraphPath)}}
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -22,6 +22,10 @@
|
||||
# Use -1 for skipping check on block range.
|
||||
maxEventsBlockRange = 1000
|
||||
|
||||
[metrics]
|
||||
host = "127.0.0.1"
|
||||
port = 9000
|
||||
|
||||
[database]
|
||||
type = "postgres"
|
||||
host = "localhost"
|
||||
|
@ -46,7 +46,7 @@ const main = async (): Promise<void> => {
|
||||
const db = new Database(config.database);
|
||||
await db.init();
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -43,7 +43,7 @@ const main = async (): Promise<void> => {
|
||||
const db = new Database(config.database);
|
||||
await db.init();
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -47,7 +47,7 @@ export const main = async (): Promise<any> => {
|
||||
const db = new Database(config.database);
|
||||
await db.init();
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -42,7 +42,7 @@ const main = async (): Promise<void> => {
|
||||
const db = new Database(config.database);
|
||||
await db.init();
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -43,7 +43,7 @@ const main = async (): Promise<void> => {
|
||||
const db = new Database(config.database);
|
||||
await db.init();
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -53,7 +53,7 @@ export const handler = async (argv: any): Promise<void> => {
|
||||
const db = new Database(config.database);
|
||||
await db.init();
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -59,7 +59,7 @@ const main = async (): Promise<void> => {
|
||||
const db = new Database(config.database);
|
||||
await db.init();
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -22,7 +22,8 @@ import {
|
||||
JOB_KIND_PRUNE,
|
||||
JobQueueConfig,
|
||||
DEFAULT_CONFIG_PATH,
|
||||
initClients
|
||||
initClients,
|
||||
startMetricsServer
|
||||
} from '@vulcanize/util';
|
||||
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
|
||||
|
||||
@ -278,6 +279,8 @@ export const main = async (): Promise<any> => {
|
||||
|
||||
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
|
||||
await jobRunner.start();
|
||||
|
||||
startMetricsServer(config, indexer);
|
||||
};
|
||||
|
||||
main().then(() => {
|
||||
|
@ -4,6 +4,10 @@
|
||||
mode = "eth_call"
|
||||
kind = "lazy"
|
||||
|
||||
[metrics]
|
||||
host = "127.0.0.1"
|
||||
port = 9000
|
||||
|
||||
[database]
|
||||
type = "postgres"
|
||||
host = "localhost"
|
||||
|
@ -17,7 +17,8 @@ import {
|
||||
QUEUE_EVENT_PROCESSING,
|
||||
JobQueueConfig,
|
||||
DEFAULT_CONFIG_PATH,
|
||||
initClients
|
||||
initClients,
|
||||
startMetricsServer
|
||||
} from '@vulcanize/util';
|
||||
|
||||
import { Indexer } from './indexer';
|
||||
@ -87,6 +88,8 @@ export const main = async (): Promise<any> => {
|
||||
|
||||
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
|
||||
await jobRunner.start();
|
||||
|
||||
startMetricsServer(config, indexer);
|
||||
};
|
||||
|
||||
main().then(() => {
|
||||
|
@ -19,6 +19,10 @@
|
||||
# Use -1 for skipping check on block range.
|
||||
maxEventsBlockRange = 1000
|
||||
|
||||
[metrics]
|
||||
host = "127.0.0.1"
|
||||
port = 9000
|
||||
|
||||
[database]
|
||||
type = "postgres"
|
||||
host = "localhost"
|
||||
|
@ -21,7 +21,8 @@ import {
|
||||
JOB_KIND_PRUNE,
|
||||
JobQueueConfig,
|
||||
DEFAULT_CONFIG_PATH,
|
||||
initClients
|
||||
initClients,
|
||||
startMetricsServer
|
||||
} from '@vulcanize/util';
|
||||
|
||||
import { Indexer } from './indexer';
|
||||
@ -265,6 +266,8 @@ export const main = async (): Promise<any> => {
|
||||
|
||||
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
|
||||
await jobRunner.start();
|
||||
|
||||
startMetricsServer(config, indexer);
|
||||
};
|
||||
|
||||
main().then(() => {
|
||||
|
@ -22,6 +22,10 @@
|
||||
# Use -1 for skipping check on block range.
|
||||
maxEventsBlockRange = 1000
|
||||
|
||||
[metrics]
|
||||
host = "127.0.0.1"
|
||||
port = 9000
|
||||
|
||||
[database]
|
||||
type = "postgres"
|
||||
host = "localhost"
|
||||
|
@ -46,7 +46,7 @@ const main = async (): Promise<void> => {
|
||||
const db = new Database(config.database);
|
||||
await db.init();
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -43,7 +43,7 @@ const main = async (): Promise<void> => {
|
||||
const db = new Database(config.database);
|
||||
await db.init();
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -47,7 +47,7 @@ export const main = async (): Promise<any> => {
|
||||
const db = new Database(config.database);
|
||||
await db.init();
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -42,7 +42,7 @@ const main = async (): Promise<void> => {
|
||||
const db = new Database(config.database);
|
||||
await db.init();
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -43,7 +43,7 @@ const main = async (): Promise<void> => {
|
||||
const db = new Database(config.database);
|
||||
await db.init();
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -41,7 +41,7 @@ export const handler = async (argv: any): Promise<void> => {
|
||||
const db = new Database(config.database);
|
||||
await db.init();
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -59,7 +59,7 @@ const main = async (): Promise<void> => {
|
||||
const db = new Database(config.database);
|
||||
await db.init();
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
|
@ -22,7 +22,8 @@ import {
|
||||
JOB_KIND_PRUNE,
|
||||
JobQueueConfig,
|
||||
DEFAULT_CONFIG_PATH,
|
||||
initClients
|
||||
initClients,
|
||||
startMetricsServer
|
||||
} from '@vulcanize/util';
|
||||
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
|
||||
|
||||
@ -278,6 +279,8 @@ export const main = async (): Promise<any> => {
|
||||
|
||||
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
|
||||
await jobRunner.start();
|
||||
|
||||
startMetricsServer(config, indexer);
|
||||
};
|
||||
|
||||
main().then(() => {
|
||||
|
@ -19,6 +19,10 @@
|
||||
# Use -1 for skipping check on block range.
|
||||
maxEventsBlockRange = -1
|
||||
|
||||
[metrics]
|
||||
host = "127.0.0.1"
|
||||
port = 9000
|
||||
|
||||
[database]
|
||||
type = "postgres"
|
||||
host = "localhost"
|
||||
|
@ -21,7 +21,8 @@ import {
|
||||
JOB_KIND_PRUNE,
|
||||
JobQueueConfig,
|
||||
DEFAULT_CONFIG_PATH,
|
||||
initClients
|
||||
initClients,
|
||||
startMetricsServer
|
||||
} from '@vulcanize/util';
|
||||
|
||||
import { Indexer } from './indexer';
|
||||
@ -265,6 +266,8 @@ export const main = async (): Promise<any> => {
|
||||
|
||||
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
|
||||
await jobRunner.start();
|
||||
|
||||
startMetricsServer(config, indexer);
|
||||
};
|
||||
|
||||
main().then(() => {
|
||||
|
@ -5,6 +5,10 @@
|
||||
# Mode demo whitelists all tokens so that entity values get updated.
|
||||
mode = "demo"
|
||||
|
||||
[metrics]
|
||||
host = "127.0.0.1"
|
||||
port = 9000
|
||||
|
||||
[database]
|
||||
type = "postgres"
|
||||
host = "localhost"
|
||||
|
@ -20,7 +20,8 @@ import {
|
||||
JobRunner as BaseJobRunner,
|
||||
JobQueueConfig,
|
||||
DEFAULT_CONFIG_PATH,
|
||||
initClients
|
||||
initClients,
|
||||
startMetricsServer
|
||||
} from '@vulcanize/util';
|
||||
|
||||
import { Indexer } from './indexer';
|
||||
@ -99,6 +100,8 @@ export const main = async (): Promise<any> => {
|
||||
|
||||
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
|
||||
await jobRunner.start();
|
||||
|
||||
startMetricsServer(config, indexer);
|
||||
};
|
||||
|
||||
main().then(() => {
|
||||
|
@ -2,6 +2,10 @@
|
||||
host = "127.0.0.1"
|
||||
port = 3003
|
||||
|
||||
[metrics]
|
||||
host = "127.0.0.1"
|
||||
port = 9000
|
||||
|
||||
[database]
|
||||
type = "postgres"
|
||||
host = "localhost"
|
||||
|
@ -17,7 +17,8 @@ import {
|
||||
QUEUE_EVENT_PROCESSING,
|
||||
JobQueueConfig,
|
||||
DEFAULT_CONFIG_PATH,
|
||||
initClients
|
||||
initClients,
|
||||
startMetricsServer
|
||||
} from '@vulcanize/util';
|
||||
|
||||
import { Indexer } from './indexer';
|
||||
@ -89,6 +90,8 @@ export const main = async (): Promise<any> => {
|
||||
|
||||
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
|
||||
await jobRunner.start();
|
||||
|
||||
startMetricsServer(config, indexer);
|
||||
};
|
||||
|
||||
main().then(() => {
|
||||
|
@ -18,3 +18,4 @@ export * from './src/ipld-indexer';
|
||||
export * from './src/ipld-database';
|
||||
export * from './src/ipfs';
|
||||
export * from './src/index-block';
|
||||
export * from './src/metrics';
|
||||
|
@ -9,11 +9,12 @@
|
||||
"decimal.js": "^10.3.1",
|
||||
"ethers": "^5.4.4",
|
||||
"fs-extra": "^10.0.0",
|
||||
"ipfs-http-client": "^56.0.3",
|
||||
"lodash": "^4.17.21",
|
||||
"multiformats": "^9.4.8",
|
||||
"pg-boss": "^6.1.0",
|
||||
"toml": "^3.0.0",
|
||||
"ipfs-http-client": "^56.0.3"
|
||||
"prom-client": "^14.0.1",
|
||||
"toml": "^3.0.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/fs-extra": "^9.0.11",
|
||||
|
@ -92,7 +92,7 @@ export const processBlockByNumber = async (
|
||||
}
|
||||
}
|
||||
|
||||
await indexer.updateSyncStatusChainHead(blocks[0].blockHash, blocks[0].blockNumber);
|
||||
await indexer.updateSyncStatusChainHead(blocks[0].blockHash, Number(blocks[0].blockNumber));
|
||||
|
||||
return;
|
||||
}
|
||||
|
@ -56,11 +56,17 @@ export interface UpstreamConfig {
|
||||
}
|
||||
}
|
||||
|
||||
export interface MetricsConfig {
|
||||
host: string;
|
||||
port: number;
|
||||
}
|
||||
|
||||
export interface Config {
|
||||
server: ServerConfig;
|
||||
database: ConnectionOptions;
|
||||
upstream: UpstreamConfig,
|
||||
jobQueue: JobQueueConfig
|
||||
jobQueue: JobQueueConfig,
|
||||
metrics: MetricsConfig,
|
||||
}
|
||||
|
||||
export const getConfig = async (configFile: string): Promise<Config> => {
|
||||
|
@ -12,6 +12,7 @@ import {
|
||||
FindConditions,
|
||||
FindManyOptions,
|
||||
In,
|
||||
Not,
|
||||
QueryRunner,
|
||||
Repository,
|
||||
SelectQueryBuilder
|
||||
@ -21,6 +22,7 @@ import _ from 'lodash';
|
||||
|
||||
import { BlockProgressInterface, ContractInterface, EventInterface, SyncStatusInterface } from './types';
|
||||
import { MAX_REORG_DEPTH, UNKNOWN_EVENT_NAME } from './constants';
|
||||
import { blockProgressCount, eventCount } from './metrics';
|
||||
|
||||
const DEFAULT_LIMIT = 100;
|
||||
const DEFAULT_SKIP = 0;
|
||||
@ -69,6 +71,8 @@ export type Relation = string | { property: string, alias: string }
|
||||
export class Database {
|
||||
_config: ConnectionOptions
|
||||
_conn!: Connection
|
||||
_blockCount = 0
|
||||
_eventCount = 0
|
||||
|
||||
constructor (config: ConnectionOptions) {
|
||||
assert(config);
|
||||
@ -83,6 +87,9 @@ export class Database {
|
||||
namingStrategy: new SnakeNamingStrategy()
|
||||
});
|
||||
|
||||
await this._fetchBlockCount();
|
||||
await this._fetchEventCount();
|
||||
|
||||
return this._conn;
|
||||
}
|
||||
|
||||
@ -249,10 +256,18 @@ export class Database {
|
||||
});
|
||||
|
||||
const blockProgress = await blockRepo.save(entity);
|
||||
this._blockCount++;
|
||||
blockProgressCount.set(this._blockCount);
|
||||
|
||||
let blockEventCount = 0;
|
||||
|
||||
// Bulk insert events.
|
||||
events.forEach(event => {
|
||||
event.block = blockProgress;
|
||||
|
||||
if (event.eventName !== UNKNOWN_EVENT_NAME) {
|
||||
blockEventCount++;
|
||||
}
|
||||
});
|
||||
|
||||
const eventBatches = _.chunk(events, INSERT_EVENTS_BATCH);
|
||||
@ -266,6 +281,8 @@ export class Database {
|
||||
});
|
||||
|
||||
await Promise.all(insertPromises);
|
||||
this._eventCount += blockEventCount;
|
||||
eventCount.set(this._eventCount);
|
||||
|
||||
return blockProgress;
|
||||
}
|
||||
@ -369,7 +386,11 @@ export class Database {
|
||||
}
|
||||
|
||||
async saveEventEntity (repo: Repository<EventInterface>, entity: EventInterface): Promise<EventInterface> {
|
||||
return await repo.save(entity);
|
||||
const event = await repo.save(entity);
|
||||
this._eventCount++;
|
||||
eventCount.set(this._eventCount);
|
||||
|
||||
return event;
|
||||
}
|
||||
|
||||
async getModelEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, block: BlockHeight, where: Where = {}, queryOptions: QueryOptions = {}, relations: Relation[] = []): Promise<Entity[]> {
|
||||
@ -562,6 +583,24 @@ export class Database {
|
||||
return repo.save(entity);
|
||||
}
|
||||
|
||||
async _fetchBlockCount (): Promise<void> {
|
||||
this._blockCount = await this._conn.getRepository('block_progress')
|
||||
.count();
|
||||
|
||||
blockProgressCount.set(this._blockCount);
|
||||
}
|
||||
|
||||
async _fetchEventCount (): Promise<void> {
|
||||
this._eventCount = await this._conn.getRepository('event')
|
||||
.count({
|
||||
where: {
|
||||
eventName: Not(UNKNOWN_EVENT_NAME)
|
||||
}
|
||||
});
|
||||
|
||||
eventCount.set(this._eventCount);
|
||||
}
|
||||
|
||||
_buildQuery<Entity> (repo: Repository<Entity>, selectQueryBuilder: SelectQueryBuilder<Entity>, where: Where = {}, queryOptions: QueryOptions = {}): SelectQueryBuilder<Entity> {
|
||||
const { tableName } = repo.metadata;
|
||||
|
||||
|
@ -6,6 +6,8 @@ import assert from 'assert';
|
||||
import debug from 'debug';
|
||||
import PgBoss from 'pg-boss';
|
||||
|
||||
import { jobCount, lastJobCompletedOn } from './metrics';
|
||||
|
||||
interface Config {
|
||||
dbConnectionString: string
|
||||
maxCompletionLag: number
|
||||
@ -39,10 +41,35 @@ export class JobQueue {
|
||||
|
||||
retentionDays: 30, // 30 days
|
||||
|
||||
newJobCheckInterval: 100
|
||||
newJobCheckInterval: 100,
|
||||
|
||||
// Time interval for firing monitor-states event.
|
||||
monitorStateIntervalSeconds: 10
|
||||
});
|
||||
|
||||
this._boss.on('error', error => log(error));
|
||||
|
||||
this._boss.on('monitor-states', monitorStates => {
|
||||
jobCount.set({ state: 'all' }, monitorStates.all);
|
||||
jobCount.set({ state: 'created' }, monitorStates.created);
|
||||
jobCount.set({ state: 'retry' }, monitorStates.retry);
|
||||
jobCount.set({ state: 'active' }, monitorStates.active);
|
||||
jobCount.set({ state: 'completed' }, monitorStates.completed);
|
||||
jobCount.set({ state: 'expired' }, monitorStates.expired);
|
||||
jobCount.set({ state: 'cancelled' }, monitorStates.cancelled);
|
||||
jobCount.set({ state: 'failed' }, monitorStates.failed);
|
||||
|
||||
Object.entries(monitorStates.queues).forEach(([name, counts]) => {
|
||||
jobCount.set({ state: 'all', name }, counts.all);
|
||||
jobCount.set({ state: 'created', name }, counts.created);
|
||||
jobCount.set({ state: 'retry', name }, counts.retry);
|
||||
jobCount.set({ state: 'active', name }, counts.active);
|
||||
jobCount.set({ state: 'completed', name }, counts.completed);
|
||||
jobCount.set({ state: 'expired', name }, counts.expired);
|
||||
jobCount.set({ state: 'cancelled', name }, counts.cancelled);
|
||||
jobCount.set({ state: 'failed', name }, counts.failed);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
get maxCompletionLag (): number {
|
||||
@ -68,6 +95,7 @@ export class JobQueue {
|
||||
try {
|
||||
log(`Processing queue ${queue} job ${job.id}...`);
|
||||
await callback(job);
|
||||
lastJobCompletedOn.setToCurrentTime({ name: queue });
|
||||
} catch (error) {
|
||||
log(`Error in queue ${queue} job ${job.id}`);
|
||||
log(error);
|
||||
|
@ -20,6 +20,7 @@ import { JobQueue } from './job-queue';
|
||||
import { EventInterface, IndexerInterface, IPLDIndexerInterface, SyncStatusInterface } from './types';
|
||||
import { wait } from './misc';
|
||||
import { createPruningJob, processBatchEvents } from './common';
|
||||
import { lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber } from './metrics';
|
||||
|
||||
const log = debug('vulcanize:job-runner');
|
||||
|
||||
@ -28,6 +29,7 @@ export class JobRunner {
|
||||
_jobQueue: JobQueue
|
||||
_jobQueueConfig: JobQueueConfig
|
||||
_blockProcessStartTime?: Date
|
||||
_endBlockProcessTimer?: () => void
|
||||
|
||||
constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) {
|
||||
this._indexer = indexer;
|
||||
@ -246,6 +248,16 @@ export class JobRunner {
|
||||
await processBatchEvents(this._indexer, block, this._jobQueueConfig.eventsInBatch);
|
||||
|
||||
console.timeEnd('time:job-runner#_processEvents-events');
|
||||
|
||||
// Update metrics
|
||||
lastProcessedBlockNumber.set(block.blockNumber);
|
||||
lastBlockNumEvents.set(block.numEvents);
|
||||
|
||||
if (this._endBlockProcessTimer) {
|
||||
this._endBlockProcessTimer();
|
||||
}
|
||||
|
||||
this._endBlockProcessTimer = lastBlockProcessDuration.startTimer();
|
||||
}
|
||||
|
||||
async _updateWatchedContracts (job: any): Promise<void> {
|
||||
|
134
packages/util/src/metrics.ts
Normal file
134
packages/util/src/metrics.ts
Normal file
@ -0,0 +1,134 @@
|
||||
//
|
||||
// Copyright 2022 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import * as client from 'prom-client';
|
||||
import express, { Application } from 'express';
|
||||
import { createConnection } from 'typeorm';
|
||||
import debug from 'debug';
|
||||
import assert from 'assert';
|
||||
|
||||
import { Config } from './config';
|
||||
import { IndexerInterface } from './types';
|
||||
|
||||
const DB_SIZE_QUERY = 'SELECT pg_database_size(current_database())';
|
||||
|
||||
const log = debug('vulcanize:metrics');
|
||||
|
||||
// Create custom metrics
|
||||
export const jobCount = new client.Gauge({
|
||||
name: 'pgboss_jobs_total',
|
||||
help: 'Total entries in job table',
|
||||
labelNames: ['state', 'name'] as const
|
||||
});
|
||||
|
||||
export const lastJobCompletedOn = new client.Gauge({
|
||||
name: 'pgboss_last_job_completed_timestamp_seconds',
|
||||
help: 'Last job completed timestamp',
|
||||
labelNames: ['name'] as const
|
||||
});
|
||||
|
||||
export const lastProcessedBlockNumber = new client.Gauge({
|
||||
name: 'last_processed_block_number',
|
||||
help: 'Last processed block number'
|
||||
});
|
||||
|
||||
export const lastBlockProcessDuration = new client.Gauge({
|
||||
name: 'last_block_process_duration_seconds',
|
||||
help: 'Last block process duration (seconds)'
|
||||
});
|
||||
|
||||
export const lastBlockNumEvents = new client.Gauge({
|
||||
name: 'last_block_num_events_total',
|
||||
help: 'Number of events in the last block'
|
||||
});
|
||||
|
||||
export const blockProgressCount = new client.Gauge({
|
||||
name: 'block_progress_total',
|
||||
help: 'Total entries in block_progress table'
|
||||
});
|
||||
|
||||
export const eventCount = new client.Gauge({
|
||||
name: 'event_total',
|
||||
help: 'Total entries in event table'
|
||||
});
|
||||
|
||||
// Export metrics on a server
|
||||
const app: Application = express();
|
||||
|
||||
export const startMetricsServer = async (config: Config, indexer: IndexerInterface): Promise<void> => {
|
||||
if (!config.metrics) {
|
||||
log('Metrics is disabled. To enable add metrics host and port.');
|
||||
return;
|
||||
}
|
||||
|
||||
assert(config.metrics.host, 'Missing config for metrics host');
|
||||
assert(config.metrics.port, 'Missing config for metrics port');
|
||||
|
||||
// eslint-disable-next-line no-new
|
||||
new client.Gauge({
|
||||
name: 'sync_status_block_number',
|
||||
help: 'Sync status table info',
|
||||
labelNames: ['kind'] as const,
|
||||
async collect () {
|
||||
const syncStatus = await indexer.getSyncStatus();
|
||||
|
||||
if (syncStatus) {
|
||||
this.set({ kind: 'latest_indexed' }, syncStatus.latestIndexedBlockNumber);
|
||||
this.set({ kind: 'latest_canonical' }, syncStatus.latestCanonicalBlockNumber);
|
||||
this.set({ kind: 'chain_head' }, syncStatus.chainHeadBlockNumber);
|
||||
this.set({ kind: 'intial_indexed' }, syncStatus.initialIndexedBlockNumber);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
await registerDBSizeMetrics(config);
|
||||
|
||||
// Collect default metrics
|
||||
client.collectDefaultMetrics();
|
||||
|
||||
app.get('/metrics', async (req, res) => {
|
||||
res.setHeader('Content-Type', client.register.contentType);
|
||||
const metrics = await client.register.metrics();
|
||||
res.send(metrics);
|
||||
});
|
||||
|
||||
app.listen(config.metrics.port, config.metrics.host, () => {
|
||||
log(`Metrics exposed at http://${config.metrics.host}:${config.metrics.port}/metrics`);
|
||||
});
|
||||
};
|
||||
|
||||
const registerDBSizeMetrics = async ({ database, jobQueue }: Config): Promise<void> => {
|
||||
const [watcherConn, jobQueueConn] = await Promise.all([
|
||||
createConnection({
|
||||
...database,
|
||||
name: 'metrics-watcher-connection',
|
||||
synchronize: false
|
||||
}),
|
||||
createConnection({
|
||||
type: 'postgres',
|
||||
url: jobQueue.dbConnectionString,
|
||||
name: 'metrics-job-queue-connection',
|
||||
synchronize: false
|
||||
})
|
||||
]);
|
||||
|
||||
// eslint-disable-next-line no-new
|
||||
new client.Gauge({
|
||||
name: 'database_size_bytes',
|
||||
help: 'Total entries in event table',
|
||||
labelNames: ['type'] as const,
|
||||
async collect () {
|
||||
const [
|
||||
[{ pg_database_size: watcherDBSize }],
|
||||
[{ pg_database_size: jobQueueDBSize }]
|
||||
] = await Promise.all([
|
||||
watcherConn.query(DB_SIZE_QUERY),
|
||||
jobQueueConn.query(DB_SIZE_QUERY)
|
||||
]);
|
||||
|
||||
this.set({ type: 'watcher' }, Number(watcherDBSize));
|
||||
this.set({ type: 'job-queue' }, Number(jobQueueDBSize));
|
||||
}
|
||||
});
|
||||
};
|
19
yarn.lock
19
yarn.lock
@ -4176,6 +4176,11 @@ bindings@^1.2.1:
|
||||
dependencies:
|
||||
file-uri-to-path "1.0.0"
|
||||
|
||||
bintrees@1.0.2:
|
||||
version "1.0.2"
|
||||
resolved "https://registry.yarnpkg.com/bintrees/-/bintrees-1.0.2.tgz#49f896d6e858a4a499df85c38fb399b9aff840f8"
|
||||
integrity sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw==
|
||||
|
||||
bip39@2.5.0:
|
||||
version "2.5.0"
|
||||
resolved "https://registry.yarnpkg.com/bip39/-/bip39-2.5.0.tgz#51cbd5179460504a63ea3c000db3f787ca051235"
|
||||
@ -11718,6 +11723,13 @@ progress@^2.0.0:
|
||||
resolved "https://registry.yarnpkg.com/progress/-/progress-2.0.3.tgz#7e8cf8d8f5b8f239c1bc68beb4eb78567d572ef8"
|
||||
integrity sha512-7PiHtLll5LdnKIMw100I+8xJXR5gW2QwWYkT6iJva0bXitZKa/XMrSbdmg3r2Xnaidz9Qumd0VPaMrZlF9V9sA==
|
||||
|
||||
prom-client@^14.0.1:
|
||||
version "14.0.1"
|
||||
resolved "https://registry.yarnpkg.com/prom-client/-/prom-client-14.0.1.tgz#bdd9583e02ec95429677c0e013712d42ef1f86a8"
|
||||
integrity sha512-HxTArb6fkOntQHoRGvv4qd/BkorjliiuO2uSWC2KC17MUTKYttWdDoXX/vxOhQdkoECEM9BBH0pj2l8G8kev6w==
|
||||
dependencies:
|
||||
tdigest "^0.1.1"
|
||||
|
||||
promise-inflight@^1.0.1:
|
||||
version "1.0.1"
|
||||
resolved "https://registry.yarnpkg.com/promise-inflight/-/promise-inflight-1.0.1.tgz#98472870bf228132fcbdd868129bad12c3c029e3"
|
||||
@ -13475,6 +13487,13 @@ tar@^6.1.11:
|
||||
mkdirp "^1.0.3"
|
||||
yallist "^4.0.0"
|
||||
|
||||
tdigest@^0.1.1:
|
||||
version "0.1.2"
|
||||
resolved "https://registry.yarnpkg.com/tdigest/-/tdigest-0.1.2.tgz#96c64bac4ff10746b910b0e23b515794e12faced"
|
||||
integrity sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA==
|
||||
dependencies:
|
||||
bintrees "1.0.2"
|
||||
|
||||
temp-dir@^1.0.0:
|
||||
version "1.0.0"
|
||||
resolved "https://registry.yarnpkg.com/temp-dir/-/temp-dir-1.0.0.tgz#0a7c0ea26d3a39afa7e0ebea9c1fc0bc4daa011d"
|
||||
|
Loading…
Reference in New Issue
Block a user