Refactor util code to be reused (#241)

* Ignore watch contract jobs in event processing complete handler

* Update job-queue config and handle errors on job completion hook

* Update graph decimal implementation

* Return generic type from method to read watcher config

* Export fill prefetch batch size default value
This commit is contained in:
prathamesh0 2022-11-18 04:59:06 -06:00 committed by GitHub
parent cc8fcffaa1
commit f3c65cbd64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 85 additions and 90 deletions

View File

@ -10,7 +10,7 @@ import debug from 'debug';
import { getCache } from '@cerc-io/cache';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { DEFAULT_CONFIG_PATH, getConfig, JobQueue } from '@cerc-io/util';
import { Config, DEFAULT_CONFIG_PATH, getConfig, JobQueue } from '@cerc-io/util';
import { Database } from './database';
import { QUEUE_TX_TRACING } from './tx-watcher';
@ -43,7 +43,7 @@ export const main = async (): Promise<any> => {
}
}).argv;
const config = await getConfig(argv.configFile);
const config = await getConfig<Config>(argv.configFile);
assert(config.server, 'Missing server config');

View File

@ -11,7 +11,7 @@ import debug from 'debug';
import { getCache } from '@cerc-io/cache';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { TracingClient } from '@cerc-io/tracing-client';
import { getConfig, JobQueue, DEFAULT_CONFIG_PATH } from '@cerc-io/util';
import { getConfig, JobQueue, DEFAULT_CONFIG_PATH, Config } from '@cerc-io/util';
import { Indexer } from './indexer';
import { Database } from './database';
@ -30,7 +30,7 @@ export const main = async (): Promise<any> => {
})
.argv;
const config = await getConfig(argv.f);
const config: Config = await getConfig(argv.f);
assert(config.server, 'Missing server config');

View File

@ -13,7 +13,7 @@ import debug from 'debug';
import { getCache } from '@cerc-io/cache';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { TracingClient } from '@cerc-io/tracing-client';
import { getConfig, JobQueue, DEFAULT_CONFIG_PATH, createAndStartServer } from '@cerc-io/util';
import { getConfig, JobQueue, DEFAULT_CONFIG_PATH, createAndStartServer, Config } from '@cerc-io/util';
import typeDefs from './schema';
@ -35,7 +35,7 @@ export const main = async (): Promise<any> => {
})
.argv;
const config = await getConfig(argv.f);
const config: Config = await getConfig(argv.f);
assert(config.server, 'Missing server config');

View File

@ -4,7 +4,7 @@
import debug from 'debug';
import { getConfig, resetJobs } from '@cerc-io/util';
import { getConfig, resetJobs, Config } from '@cerc-io/util';
const log = debug('vulcanize:reset-job-queue');
@ -15,7 +15,7 @@ export const desc = 'Reset job queue';
export const builder = {};
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
const config: Config = await getConfig(argv.configFile);
await resetJobs(config);
log('Job queue reset successfully');

View File

@ -4,7 +4,7 @@
import debug from 'debug';
import { getConfig } from '@cerc-io/util';
import { getConfig, Config } from '@cerc-io/util';
import { Database } from '../../database';
@ -22,7 +22,7 @@ export const builder = {
export const handler = async (argv: any): Promise<void> => {
const { blockNumber } = argv;
const config = await getConfig(argv.configFile);
const config: Config = await getConfig(argv.configFile);
// Initialize database
const db = new Database(config.database);

View File

@ -6,7 +6,7 @@ import debug from 'debug';
import { MoreThan } from 'typeorm';
import assert from 'assert';
import { getConfig, initClients, resetJobs, JobQueue } from '@cerc-io/util';
import { getConfig, initClients, resetJobs, JobQueue, Config } from '@cerc-io/util';
{{#if (subgraphPath)}}
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
{{/if}}
@ -32,7 +32,7 @@ export const builder = {
};
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
const config: Config = await getConfig(argv.configFile);
await resetJobs(config);
const { ethClient, ethProvider } = await initClients(config);

View File

@ -4,7 +4,7 @@
import debug from 'debug';
import { getConfig, resetJobs } from '@cerc-io/util';
import { getConfig, resetJobs, Config } from '@cerc-io/util';
const log = debug('vulcanize:reset-job-queue');
@ -15,7 +15,7 @@ export const desc = 'Reset job queue';
export const builder = {};
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
const config: Config = await getConfig(argv.configFile);
await resetJobs(config);
log('Job queue reset successfully');

View File

@ -4,7 +4,7 @@
import debug from 'debug';
import { getConfig } from '@cerc-io/util';
import { getConfig, Config } from '@cerc-io/util';
import { Database } from '../../database';
@ -22,7 +22,7 @@ export const builder = {
export const handler = async (argv: any): Promise<void> => {
const { blockNumber } = argv;
const config = await getConfig(argv.configFile);
const config: Config = await getConfig(argv.configFile);
// Initialize database
const db = new Database(config.database);

View File

@ -5,7 +5,7 @@
import debug from 'debug';
import assert from 'assert';
import { getConfig, initClients, resetJobs, JobQueue } from '@cerc-io/util';
import { getConfig, initClients, resetJobs, JobQueue, Config } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../../database';
@ -24,7 +24,7 @@ export const builder = {
};
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
const config: Config = await getConfig(argv.configFile);
await resetJobs(config);
const { ethClient, ethProvider } = await initClients(config);

View File

@ -4,7 +4,7 @@
import debug from 'debug';
import { getConfig, resetJobs } from '@cerc-io/util';
import { getConfig, resetJobs, Config } from '@cerc-io/util';
const log = debug('vulcanize:reset-job-queue');
@ -15,7 +15,7 @@ export const desc = 'Reset job queue';
export const builder = {};
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
const config: Config = await getConfig(argv.configFile);
await resetJobs(config);
log('Job queue reset successfully');

View File

@ -3,16 +3,12 @@
//
import debug from 'debug';
import { MoreThan } from 'typeorm';
import assert from 'assert';
import { getConfig, initClients, JobQueue, resetJobs } from '@cerc-io/util';
import { getConfig, initClients, JobQueue, resetJobs, Config } from '@cerc-io/util';
import { Database } from '../../database';
import { Indexer } from '../../indexer';
import { BlockProgress } from '../../entity/BlockProgress';
import { Allowance } from '../../entity/Allowance';
import { Balance } from '../../entity/Balance';
const log = debug('vulcanize:reset-watcher');
@ -27,7 +23,7 @@ export const builder = {
};
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
const config: Config = await getConfig(argv.configFile);
await resetJobs(config);
const { ethClient, ethProvider } = await initClients(config);

View File

@ -4,7 +4,7 @@
import debug from 'debug';
import { getConfig, resetJobs } from '@cerc-io/util';
import { getConfig, resetJobs, Config } from '@cerc-io/util';
const log = debug('vulcanize:reset-job-queue');
@ -15,7 +15,7 @@ export const desc = 'Reset job queue';
export const builder = {};
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
const config: Config = await getConfig(argv.configFile);
await resetJobs(config);
log('Job queue reset successfully');

View File

@ -3,29 +3,12 @@
//
import debug from 'debug';
import { MoreThan } from 'typeorm';
import assert from 'assert';
import { getConfig, initClients, resetJobs, JobQueue } from '@cerc-io/util';
import { getConfig, initClients, resetJobs, JobQueue, Config } from '@cerc-io/util';
import { Database } from '../../database';
import { Indexer } from '../../indexer';
import { BlockProgress } from '../../entity/BlockProgress';
import { SupportsInterface } from '../../entity/SupportsInterface';
import { BalanceOf } from '../../entity/BalanceOf';
import { OwnerOf } from '../../entity/OwnerOf';
import { GetApproved } from '../../entity/GetApproved';
import { IsApprovedForAll } from '../../entity/IsApprovedForAll';
import { Name } from '../../entity/Name';
import { Symbol } from '../../entity/Symbol';
import { TokenURI } from '../../entity/TokenURI';
import { _Name } from '../../entity/_Name';
import { _Symbol } from '../../entity/_Symbol';
import { _Owners } from '../../entity/_Owners';
import { _Balances } from '../../entity/_Balances';
import { _TokenApprovals } from '../../entity/_TokenApprovals';
import { _OperatorApprovals } from '../../entity/_OperatorApprovals';
const log = debug('vulcanize:reset-watcher');
@ -40,7 +23,7 @@ export const builder = {
};
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
const config: Config = await getConfig(argv.configFile);
await resetJobs(config);
const { ethClient, ethProvider } = await initClients(config);

View File

@ -8,8 +8,7 @@ import debug from 'debug';
import path from 'path';
import assert from 'assert';
import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
import _ from 'lodash';
import { getConfig as getWatcherConfig, wait, Database as BaseDatabase } from '@cerc-io/util';
import { getConfig as getWatcherConfig, wait, Database as BaseDatabase, Config as WatcherConfig } from '@cerc-io/util';
import { GraphQLClient } from '@cerc-io/ipld-eth-client';
import {
@ -122,7 +121,7 @@ export const main = async (): Promise<void> => {
if (config.watcher) {
const watcherConfigPath = path.resolve(path.dirname(configFile), config.watcher.configPath);
const entitiesDir = path.resolve(path.dirname(configFile), config.watcher.entitiesDir);
const watcherConfig = await getWatcherConfig(watcherConfigPath);
const watcherConfig: WatcherConfig = await getWatcherConfig(watcherConfigPath);
const baseDatabase = new BaseDatabase({ ...watcherConfig.database, entities: [entitiesDir] });
await baseDatabase.init();

View File

@ -4,7 +4,7 @@
import debug from 'debug';
import { getConfig, resetJobs } from '@cerc-io/util';
import { getConfig, resetJobs, Config } from '@cerc-io/util';
const log = debug('vulcanize:reset-job-queue');
@ -15,7 +15,7 @@ export const desc = 'Reset job queue';
export const builder = {};
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
const config: Config = await getConfig(argv.configFile);
await resetJobs(config);
log('Job queue reset successfully');

View File

@ -5,7 +5,7 @@
import debug from 'debug';
import assert from 'assert';
import { getConfig, initClients, resetJobs, JobQueue } from '@cerc-io/util';
import { getConfig, initClients, resetJobs, JobQueue, Config } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { Database } from '../../database';
@ -24,7 +24,7 @@ export const builder = {
};
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
const config: Config = await getConfig(argv.configFile);
await resetJobs(config);
const { ethClient, ethProvider } = await initClients(config);

View File

@ -4,7 +4,7 @@
import debug from 'debug';
import { getConfig, resetJobs } from '@cerc-io/util';
import { getConfig, resetJobs, Config } from '@cerc-io/util';
const log = debug('vulcanize:reset-job-queue');
@ -15,7 +15,7 @@ export const desc = 'Reset job queue';
export const builder = {};
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
const config: Config = await getConfig(argv.configFile);
await resetJobs(config);
log('Job queue reset successfully');

View File

@ -3,20 +3,12 @@
//
import debug from 'debug';
import { MoreThan } from 'typeorm';
import assert from 'assert';
import { getConfig, initClients, resetJobs, JobQueue } from '@cerc-io/util';
import { getConfig, initClients, resetJobs, JobQueue, Config } from '@cerc-io/util';
import { Database } from '../../database';
import { Indexer } from '../../indexer';
import { BlockProgress } from '../../entity/BlockProgress';
import { MultiNonce } from '../../entity/MultiNonce';
import { _Owner } from '../../entity/_Owner';
import { IsRevoked } from '../../entity/IsRevoked';
import { IsPhisher } from '../../entity/IsPhisher';
import { IsMember } from '../../entity/IsMember';
const log = debug('vulcanize:reset-watcher');
@ -31,7 +23,7 @@ export const builder = {
};
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
const config: Config = await getConfig(argv.configFile);
await resetJobs(config);
const { ethClient, ethProvider } = await initClients(config);

View File

@ -73,14 +73,6 @@ export interface UpstreamConfig {
rpcProviderEndpoint: string;
}
traceProviderEndpoint: string;
uniWatcher: {
gqlEndpoint: string;
gqlSubscriptionEndpoint: string;
};
tokenWatcher: {
gqlEndpoint: string;
gqlSubscriptionEndpoint: string;
}
}
export interface GQLMetricsConfig {
@ -101,7 +93,7 @@ export interface Config {
metrics: MetricsConfig,
}
export const getConfig = async (configFile: string): Promise<Config> => {
export const getConfig = async<ConfigType> (configFile: string): Promise<ConfigType> => {
const configFilePath = path.resolve(configFile);
const fileExists = await fs.pathExists(configFilePath);
if (!fileExists) {

View File

@ -24,4 +24,6 @@ export const UNKNOWN_EVENT_NAME = '__unknown__';
export const KIND_ACTIVE = 'active';
export const KIND_LAZY = 'lazy';
export const DEFAULT_PREFETCH_BATCH_SIZE = 10;
export const DEFAULT_MAX_GQL_CACHE_SIZE = Math.pow(2, 20) * 8; // 8 MB

View File

@ -10,7 +10,7 @@ import { EthClient } from '@cerc-io/ipld-eth-client';
import { JobQueue } from './job-queue';
import { BlockProgressInterface, EventInterface, IndexerInterface } from './types';
import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME } from './constants';
import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME, JOB_KIND_EVENTS } from './constants';
import { createPruningJob, processBlockByNumberWithCache } from './common';
import { UpstreamConfig } from './config';
import { OrderDirection } from './database';
@ -98,7 +98,12 @@ export class EventWatcher {
}
async eventProcessingCompleteHandler (job: any): Promise<EventInterface[]> {
const { data: { request: { data: { blockHash } } } } = job;
const { data: { request: { data: { kind, blockHash } } } } = job;
// Ignore jobs other than JOB_KIND_EVENTS
if (kind !== JOB_KIND_EVENTS) {
return [];
}
assert(blockHash);
const blockProgress = await this._indexer.getBlockProgress(blockHash);

View File

@ -8,11 +8,10 @@ import { JobQueue } from './job-queue';
import { EventWatcherInterface, IndexerInterface } from './types';
import { wait } from './misc';
import { processBlockByNumberWithCache } from './common';
import { DEFAULT_PREFETCH_BATCH_SIZE } from './constants';
const log = debug('vulcanize:fill');
const DEFAULT_PREFETCH_BATCH_SIZE = 10;
export const fillBlocks = async (
jobQueue: JobQueue,
indexer: IndexerInterface,

View File

@ -29,6 +29,11 @@ export class GraphDecimal {
return this.value.toString();
}
toJSON (): string {
// Using fixed-point notation in preparing entity state.
return this.toFixed();
}
toFixed (): string {
this._checkOutOfRange(this);
@ -91,6 +96,13 @@ export class GraphDecimal {
return new GraphDecimal(this.value.div(param));
}
pow (n: Decimal.Value | GraphDecimal): GraphDecimal {
this._checkOutOfRange(this);
const param = this._checkOutOfRange(n);
return new GraphDecimal(this.value.pow(param));
}
isZero (): boolean {
this._checkOutOfRange(this);
@ -115,14 +127,14 @@ export class GraphDecimal {
this._checkOutOfRange(this);
const param = this._checkOutOfRange(n);
return this.value.lessThan(param);
return this.value.greaterThan(param);
}
gt (n: Decimal.Value | GraphDecimal): boolean {
this._checkOutOfRange(this);
const param = this._checkOutOfRange(n);
return this.value.lessThan(param);
return this.value.gt(param);
}
comparedTo (n: Decimal.Value | GraphDecimal): number {

View File

@ -3,7 +3,7 @@
//
import assert from 'assert';
import { DeepPartial, EntityTarget, FindConditions, FindManyOptions, LessThanOrEqual, MoreThan } from 'typeorm';
import { DeepPartial, EntityTarget, FindConditions, FindManyOptions, MoreThan } from 'typeorm';
import debug from 'debug';
import JSONbig from 'json-bigint';
import { ethers } from 'ethers';

View File

@ -26,7 +26,7 @@ export class JobQueue {
constructor (config: Config) {
this._config = config;
this._boss = new PgBoss({
// https://github.com/timgit/pg-boss/blob/master/docs/configuration.md
// https://github.com/timgit/pg-boss/blob/6.1.0/docs/configuration.md
connectionString: this._config.dbConnectionString,
onComplete: true,
@ -37,9 +37,11 @@ export class JobQueue {
retryBackoff: true,
// Time before active job fails by expiration.
expireInHours: 24 * 7, // 7 days
expireInHours: 24 * 1, // 1 day
retentionDays: 30, // 30 days
retentionDays: 1, // 1 day
deleteAfterHours: 1, // 1 hour
newJobCheckInterval: 100,
@ -106,11 +108,24 @@ export class JobQueue {
}
async onComplete (queue: string, callback: JobCallback): Promise<string> {
return await this._boss.onComplete(queue, { teamSize: JOBS_PER_INTERVAL, teamConcurrency: 1 }, async (job: any) => {
const { id, data: { failed, createdOn } } = job;
log(`Job onComplete for queue ${queue} job ${id} created ${createdOn} success ${!failed}`);
await callback(job);
});
return await this._boss.onComplete(
queue,
{
teamSize: JOBS_PER_INTERVAL,
teamConcurrency: 1
},
async (job: any) => {
try {
const { id, data: { failed, createdOn } } = job;
log(`Job onComplete for queue ${queue} job ${id} created ${createdOn} success ${!failed}`);
await callback(job);
} catch (error) {
log(`Error in onComplete handler for ${queue} job ${job.id}`);
log(error);
throw error;
}
}
);
}
async markComplete (job: any): Promise<void> {