From f3c65cbd64a988472049e65cbd7ca784a23aaa5d Mon Sep 17 00:00:00 2001 From: prathamesh0 <42446521+prathamesh0@users.noreply.github.com> Date: Fri, 18 Nov 2022 04:59:06 -0600 Subject: [PATCH] Refactor util code to be reused (#241) * Ignore watch contract jobs in event processing complete handler * Update job-queue config and handle errors on job completion hook * Update graph decimal implementation * Return generic type from method to read watcher config * Export fill prefetch batch size default value --- packages/address-watcher/src/fill.ts | 4 +-- packages/address-watcher/src/job-runner.ts | 4 +-- packages/address-watcher/src/server.ts | 4 +-- .../reset-job-queue-template.handlebars | 4 +-- .../templates/reset-state-template.handlebars | 4 +-- .../reset-watcher-template.handlebars | 4 +-- .../src/cli/reset-cmds/job-queue.ts | 4 +-- .../eden-watcher/src/cli/reset-cmds/state.ts | 4 +-- .../src/cli/reset-cmds/watcher.ts | 4 +-- .../src/cli/reset-cmds/job-queue.ts | 4 +-- .../src/cli/reset-cmds/watcher.ts | 8 ++--- .../src/cli/reset-cmds/job-queue.ts | 4 +-- .../src/cli/reset-cmds/watcher.ts | 21 ++----------- .../src/cli/compare/compare-blocks.ts | 5 ++- .../src/cli/reset-cmds/job-queue.ts | 4 +-- .../src/cli/reset-cmds/watcher.ts | 4 +-- .../src/cli/reset-cmds/job-queue.ts | 4 +-- .../src/cli/reset-cmds/watcher.ts | 12 ++----- packages/util/src/config.ts | 10 +----- packages/util/src/constants.ts | 2 ++ packages/util/src/events.ts | 9 ++++-- packages/util/src/fill.ts | 3 +- packages/util/src/graph-decimal.ts | 16 ++++++++-- packages/util/src/indexer.ts | 2 +- packages/util/src/job-queue.ts | 31 ++++++++++++++----- 25 files changed, 85 insertions(+), 90 deletions(-) diff --git a/packages/address-watcher/src/fill.ts b/packages/address-watcher/src/fill.ts index 67da0491..154f619f 100644 --- a/packages/address-watcher/src/fill.ts +++ b/packages/address-watcher/src/fill.ts @@ -10,7 +10,7 @@ import debug from 'debug'; import { getCache } from '@cerc-io/cache'; import { EthClient } from '@cerc-io/ipld-eth-client'; -import { DEFAULT_CONFIG_PATH, getConfig, JobQueue } from '@cerc-io/util'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, JobQueue } from '@cerc-io/util'; import { Database } from './database'; import { QUEUE_TX_TRACING } from './tx-watcher'; @@ -43,7 +43,7 @@ export const main = async (): Promise => { } }).argv; - const config = await getConfig(argv.configFile); + const config = await getConfig(argv.configFile); assert(config.server, 'Missing server config'); diff --git a/packages/address-watcher/src/job-runner.ts b/packages/address-watcher/src/job-runner.ts index b090b556..704c65a5 100644 --- a/packages/address-watcher/src/job-runner.ts +++ b/packages/address-watcher/src/job-runner.ts @@ -11,7 +11,7 @@ import debug from 'debug'; import { getCache } from '@cerc-io/cache'; import { EthClient } from '@cerc-io/ipld-eth-client'; import { TracingClient } from '@cerc-io/tracing-client'; -import { getConfig, JobQueue, DEFAULT_CONFIG_PATH } from '@cerc-io/util'; +import { getConfig, JobQueue, DEFAULT_CONFIG_PATH, Config } from '@cerc-io/util'; import { Indexer } from './indexer'; import { Database } from './database'; @@ -30,7 +30,7 @@ export const main = async (): Promise => { }) .argv; - const config = await getConfig(argv.f); + const config: Config = await getConfig(argv.f); assert(config.server, 'Missing server config'); diff --git a/packages/address-watcher/src/server.ts b/packages/address-watcher/src/server.ts index a7e8b291..0979a149 100644 --- a/packages/address-watcher/src/server.ts +++ b/packages/address-watcher/src/server.ts @@ -13,7 +13,7 @@ import debug from 'debug'; import { getCache } from '@cerc-io/cache'; import { EthClient } from '@cerc-io/ipld-eth-client'; import { TracingClient } from '@cerc-io/tracing-client'; -import { getConfig, JobQueue, DEFAULT_CONFIG_PATH, createAndStartServer } from '@cerc-io/util'; +import { getConfig, JobQueue, DEFAULT_CONFIG_PATH, createAndStartServer, Config } from '@cerc-io/util'; import typeDefs from './schema'; @@ -35,7 +35,7 @@ export const main = async (): Promise => { }) .argv; - const config = await getConfig(argv.f); + const config: Config = await getConfig(argv.f); assert(config.server, 'Missing server config'); diff --git a/packages/codegen/src/templates/reset-job-queue-template.handlebars b/packages/codegen/src/templates/reset-job-queue-template.handlebars index 520cd8b9..c33cbfd6 100644 --- a/packages/codegen/src/templates/reset-job-queue-template.handlebars +++ b/packages/codegen/src/templates/reset-job-queue-template.handlebars @@ -4,7 +4,7 @@ import debug from 'debug'; -import { getConfig, resetJobs } from '@cerc-io/util'; +import { getConfig, resetJobs, Config } from '@cerc-io/util'; const log = debug('vulcanize:reset-job-queue'); @@ -15,7 +15,7 @@ export const desc = 'Reset job queue'; export const builder = {}; export const handler = async (argv: any): Promise => { - const config = await getConfig(argv.configFile); + const config: Config = await getConfig(argv.configFile); await resetJobs(config); log('Job queue reset successfully'); diff --git a/packages/codegen/src/templates/reset-state-template.handlebars b/packages/codegen/src/templates/reset-state-template.handlebars index e5a9918f..500c62a8 100644 --- a/packages/codegen/src/templates/reset-state-template.handlebars +++ b/packages/codegen/src/templates/reset-state-template.handlebars @@ -4,7 +4,7 @@ import debug from 'debug'; -import { getConfig } from '@cerc-io/util'; +import { getConfig, Config } from '@cerc-io/util'; import { Database } from '../../database'; @@ -22,7 +22,7 @@ export const builder = { export const handler = async (argv: any): Promise => { const { blockNumber } = argv; - const config = await getConfig(argv.configFile); + const config: Config = await getConfig(argv.configFile); // Initialize database const db = new Database(config.database); diff --git a/packages/codegen/src/templates/reset-watcher-template.handlebars b/packages/codegen/src/templates/reset-watcher-template.handlebars index 762d023a..d65b3e4c 100644 --- a/packages/codegen/src/templates/reset-watcher-template.handlebars +++ b/packages/codegen/src/templates/reset-watcher-template.handlebars @@ -6,7 +6,7 @@ import debug from 'debug'; import { MoreThan } from 'typeorm'; import assert from 'assert'; -import { getConfig, initClients, resetJobs, JobQueue } from '@cerc-io/util'; +import { getConfig, initClients, resetJobs, JobQueue, Config } from '@cerc-io/util'; {{#if (subgraphPath)}} import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; {{/if}} @@ -32,7 +32,7 @@ export const builder = { }; export const handler = async (argv: any): Promise => { - const config = await getConfig(argv.configFile); + const config: Config = await getConfig(argv.configFile); await resetJobs(config); const { ethClient, ethProvider } = await initClients(config); diff --git a/packages/eden-watcher/src/cli/reset-cmds/job-queue.ts b/packages/eden-watcher/src/cli/reset-cmds/job-queue.ts index 520cd8b9..c33cbfd6 100644 --- a/packages/eden-watcher/src/cli/reset-cmds/job-queue.ts +++ b/packages/eden-watcher/src/cli/reset-cmds/job-queue.ts @@ -4,7 +4,7 @@ import debug from 'debug'; -import { getConfig, resetJobs } from '@cerc-io/util'; +import { getConfig, resetJobs, Config } from '@cerc-io/util'; const log = debug('vulcanize:reset-job-queue'); @@ -15,7 +15,7 @@ export const desc = 'Reset job queue'; export const builder = {}; export const handler = async (argv: any): Promise => { - const config = await getConfig(argv.configFile); + const config: Config = await getConfig(argv.configFile); await resetJobs(config); log('Job queue reset successfully'); diff --git a/packages/eden-watcher/src/cli/reset-cmds/state.ts b/packages/eden-watcher/src/cli/reset-cmds/state.ts index 0ed58e20..efd8b3c4 100644 --- a/packages/eden-watcher/src/cli/reset-cmds/state.ts +++ b/packages/eden-watcher/src/cli/reset-cmds/state.ts @@ -4,7 +4,7 @@ import debug from 'debug'; -import { getConfig } from '@cerc-io/util'; +import { getConfig, Config } from '@cerc-io/util'; import { Database } from '../../database'; @@ -22,7 +22,7 @@ export const builder = { export const handler = async (argv: any): Promise => { const { blockNumber } = argv; - const config = await getConfig(argv.configFile); + const config: Config = await getConfig(argv.configFile); // Initialize database const db = new Database(config.database); diff --git a/packages/eden-watcher/src/cli/reset-cmds/watcher.ts b/packages/eden-watcher/src/cli/reset-cmds/watcher.ts index e69412cb..7ceba205 100644 --- a/packages/eden-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/eden-watcher/src/cli/reset-cmds/watcher.ts @@ -5,7 +5,7 @@ import debug from 'debug'; import assert from 'assert'; -import { getConfig, initClients, resetJobs, JobQueue } from '@cerc-io/util'; +import { getConfig, initClients, resetJobs, JobQueue, Config } from '@cerc-io/util'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../../database'; @@ -24,7 +24,7 @@ export const builder = { }; export const handler = async (argv: any): Promise => { - const config = await getConfig(argv.configFile); + const config: Config = await getConfig(argv.configFile); await resetJobs(config); const { ethClient, ethProvider } = await initClients(config); diff --git a/packages/erc20-watcher/src/cli/reset-cmds/job-queue.ts b/packages/erc20-watcher/src/cli/reset-cmds/job-queue.ts index 520cd8b9..c33cbfd6 100644 --- a/packages/erc20-watcher/src/cli/reset-cmds/job-queue.ts +++ b/packages/erc20-watcher/src/cli/reset-cmds/job-queue.ts @@ -4,7 +4,7 @@ import debug from 'debug'; -import { getConfig, resetJobs } from '@cerc-io/util'; +import { getConfig, resetJobs, Config } from '@cerc-io/util'; const log = debug('vulcanize:reset-job-queue'); @@ -15,7 +15,7 @@ export const desc = 'Reset job queue'; export const builder = {}; export const handler = async (argv: any): Promise => { - const config = await getConfig(argv.configFile); + const config: Config = await getConfig(argv.configFile); await resetJobs(config); log('Job queue reset successfully'); diff --git a/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts b/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts index 293d8f04..39d2ac1e 100644 --- a/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts @@ -3,16 +3,12 @@ // import debug from 'debug'; -import { MoreThan } from 'typeorm'; import assert from 'assert'; -import { getConfig, initClients, JobQueue, resetJobs } from '@cerc-io/util'; +import { getConfig, initClients, JobQueue, resetJobs, Config } from '@cerc-io/util'; import { Database } from '../../database'; import { Indexer } from '../../indexer'; -import { BlockProgress } from '../../entity/BlockProgress'; -import { Allowance } from '../../entity/Allowance'; -import { Balance } from '../../entity/Balance'; const log = debug('vulcanize:reset-watcher'); @@ -27,7 +23,7 @@ export const builder = { }; export const handler = async (argv: any): Promise => { - const config = await getConfig(argv.configFile); + const config: Config = await getConfig(argv.configFile); await resetJobs(config); const { ethClient, ethProvider } = await initClients(config); diff --git a/packages/erc721-watcher/src/cli/reset-cmds/job-queue.ts b/packages/erc721-watcher/src/cli/reset-cmds/job-queue.ts index 520cd8b9..c33cbfd6 100644 --- a/packages/erc721-watcher/src/cli/reset-cmds/job-queue.ts +++ b/packages/erc721-watcher/src/cli/reset-cmds/job-queue.ts @@ -4,7 +4,7 @@ import debug from 'debug'; -import { getConfig, resetJobs } from '@cerc-io/util'; +import { getConfig, resetJobs, Config } from '@cerc-io/util'; const log = debug('vulcanize:reset-job-queue'); @@ -15,7 +15,7 @@ export const desc = 'Reset job queue'; export const builder = {}; export const handler = async (argv: any): Promise => { - const config = await getConfig(argv.configFile); + const config: Config = await getConfig(argv.configFile); await resetJobs(config); log('Job queue reset successfully'); diff --git a/packages/erc721-watcher/src/cli/reset-cmds/watcher.ts b/packages/erc721-watcher/src/cli/reset-cmds/watcher.ts index 030d462c..0ed2cf0e 100644 --- a/packages/erc721-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/erc721-watcher/src/cli/reset-cmds/watcher.ts @@ -3,29 +3,12 @@ // import debug from 'debug'; -import { MoreThan } from 'typeorm'; import assert from 'assert'; -import { getConfig, initClients, resetJobs, JobQueue } from '@cerc-io/util'; +import { getConfig, initClients, resetJobs, JobQueue, Config } from '@cerc-io/util'; import { Database } from '../../database'; import { Indexer } from '../../indexer'; -import { BlockProgress } from '../../entity/BlockProgress'; - -import { SupportsInterface } from '../../entity/SupportsInterface'; -import { BalanceOf } from '../../entity/BalanceOf'; -import { OwnerOf } from '../../entity/OwnerOf'; -import { GetApproved } from '../../entity/GetApproved'; -import { IsApprovedForAll } from '../../entity/IsApprovedForAll'; -import { Name } from '../../entity/Name'; -import { Symbol } from '../../entity/Symbol'; -import { TokenURI } from '../../entity/TokenURI'; -import { _Name } from '../../entity/_Name'; -import { _Symbol } from '../../entity/_Symbol'; -import { _Owners } from '../../entity/_Owners'; -import { _Balances } from '../../entity/_Balances'; -import { _TokenApprovals } from '../../entity/_TokenApprovals'; -import { _OperatorApprovals } from '../../entity/_OperatorApprovals'; const log = debug('vulcanize:reset-watcher'); @@ -40,7 +23,7 @@ export const builder = { }; export const handler = async (argv: any): Promise => { - const config = await getConfig(argv.configFile); + const config: Config = await getConfig(argv.configFile); await resetJobs(config); const { ethClient, ethProvider } = await initClients(config); diff --git a/packages/graph-node/src/cli/compare/compare-blocks.ts b/packages/graph-node/src/cli/compare/compare-blocks.ts index 25e5face..d63f54a1 100644 --- a/packages/graph-node/src/cli/compare/compare-blocks.ts +++ b/packages/graph-node/src/cli/compare/compare-blocks.ts @@ -8,8 +8,7 @@ import debug from 'debug'; import path from 'path'; import assert from 'assert'; import { SnakeNamingStrategy } from 'typeorm-naming-strategies'; -import _ from 'lodash'; -import { getConfig as getWatcherConfig, wait, Database as BaseDatabase } from '@cerc-io/util'; +import { getConfig as getWatcherConfig, wait, Database as BaseDatabase, Config as WatcherConfig } from '@cerc-io/util'; import { GraphQLClient } from '@cerc-io/ipld-eth-client'; import { @@ -122,7 +121,7 @@ export const main = async (): Promise => { if (config.watcher) { const watcherConfigPath = path.resolve(path.dirname(configFile), config.watcher.configPath); const entitiesDir = path.resolve(path.dirname(configFile), config.watcher.entitiesDir); - const watcherConfig = await getWatcherConfig(watcherConfigPath); + const watcherConfig: WatcherConfig = await getWatcherConfig(watcherConfigPath); const baseDatabase = new BaseDatabase({ ...watcherConfig.database, entities: [entitiesDir] }); await baseDatabase.init(); diff --git a/packages/graph-test-watcher/src/cli/reset-cmds/job-queue.ts b/packages/graph-test-watcher/src/cli/reset-cmds/job-queue.ts index 520cd8b9..c33cbfd6 100644 --- a/packages/graph-test-watcher/src/cli/reset-cmds/job-queue.ts +++ b/packages/graph-test-watcher/src/cli/reset-cmds/job-queue.ts @@ -4,7 +4,7 @@ import debug from 'debug'; -import { getConfig, resetJobs } from '@cerc-io/util'; +import { getConfig, resetJobs, Config } from '@cerc-io/util'; const log = debug('vulcanize:reset-job-queue'); @@ -15,7 +15,7 @@ export const desc = 'Reset job queue'; export const builder = {}; export const handler = async (argv: any): Promise => { - const config = await getConfig(argv.configFile); + const config: Config = await getConfig(argv.configFile); await resetJobs(config); log('Job queue reset successfully'); diff --git a/packages/graph-test-watcher/src/cli/reset-cmds/watcher.ts b/packages/graph-test-watcher/src/cli/reset-cmds/watcher.ts index 665dc69a..445fcde7 100644 --- a/packages/graph-test-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/graph-test-watcher/src/cli/reset-cmds/watcher.ts @@ -5,7 +5,7 @@ import debug from 'debug'; import assert from 'assert'; -import { getConfig, initClients, resetJobs, JobQueue } from '@cerc-io/util'; +import { getConfig, initClients, resetJobs, JobQueue, Config } from '@cerc-io/util'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; import { Database } from '../../database'; @@ -24,7 +24,7 @@ export const builder = { }; export const handler = async (argv: any): Promise => { - const config = await getConfig(argv.configFile); + const config: Config = await getConfig(argv.configFile); await resetJobs(config); const { ethClient, ethProvider } = await initClients(config); diff --git a/packages/mobymask-watcher/src/cli/reset-cmds/job-queue.ts b/packages/mobymask-watcher/src/cli/reset-cmds/job-queue.ts index 520cd8b9..c33cbfd6 100644 --- a/packages/mobymask-watcher/src/cli/reset-cmds/job-queue.ts +++ b/packages/mobymask-watcher/src/cli/reset-cmds/job-queue.ts @@ -4,7 +4,7 @@ import debug from 'debug'; -import { getConfig, resetJobs } from '@cerc-io/util'; +import { getConfig, resetJobs, Config } from '@cerc-io/util'; const log = debug('vulcanize:reset-job-queue'); @@ -15,7 +15,7 @@ export const desc = 'Reset job queue'; export const builder = {}; export const handler = async (argv: any): Promise => { - const config = await getConfig(argv.configFile); + const config: Config = await getConfig(argv.configFile); await resetJobs(config); log('Job queue reset successfully'); diff --git a/packages/mobymask-watcher/src/cli/reset-cmds/watcher.ts b/packages/mobymask-watcher/src/cli/reset-cmds/watcher.ts index 32fb5b68..0ed2cf0e 100644 --- a/packages/mobymask-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/mobymask-watcher/src/cli/reset-cmds/watcher.ts @@ -3,20 +3,12 @@ // import debug from 'debug'; -import { MoreThan } from 'typeorm'; import assert from 'assert'; -import { getConfig, initClients, resetJobs, JobQueue } from '@cerc-io/util'; +import { getConfig, initClients, resetJobs, JobQueue, Config } from '@cerc-io/util'; import { Database } from '../../database'; import { Indexer } from '../../indexer'; -import { BlockProgress } from '../../entity/BlockProgress'; - -import { MultiNonce } from '../../entity/MultiNonce'; -import { _Owner } from '../../entity/_Owner'; -import { IsRevoked } from '../../entity/IsRevoked'; -import { IsPhisher } from '../../entity/IsPhisher'; -import { IsMember } from '../../entity/IsMember'; const log = debug('vulcanize:reset-watcher'); @@ -31,7 +23,7 @@ export const builder = { }; export const handler = async (argv: any): Promise => { - const config = await getConfig(argv.configFile); + const config: Config = await getConfig(argv.configFile); await resetJobs(config); const { ethClient, ethProvider } = await initClients(config); diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index 416eead7..874bde91 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -73,14 +73,6 @@ export interface UpstreamConfig { rpcProviderEndpoint: string; } traceProviderEndpoint: string; - uniWatcher: { - gqlEndpoint: string; - gqlSubscriptionEndpoint: string; - }; - tokenWatcher: { - gqlEndpoint: string; - gqlSubscriptionEndpoint: string; - } } export interface GQLMetricsConfig { @@ -101,7 +93,7 @@ export interface Config { metrics: MetricsConfig, } -export const getConfig = async (configFile: string): Promise => { +export const getConfig = async (configFile: string): Promise => { const configFilePath = path.resolve(configFile); const fileExists = await fs.pathExists(configFilePath); if (!fileExists) { diff --git a/packages/util/src/constants.ts b/packages/util/src/constants.ts index 8f260e9f..5cf1d010 100644 --- a/packages/util/src/constants.ts +++ b/packages/util/src/constants.ts @@ -24,4 +24,6 @@ export const UNKNOWN_EVENT_NAME = '__unknown__'; export const KIND_ACTIVE = 'active'; export const KIND_LAZY = 'lazy'; +export const DEFAULT_PREFETCH_BATCH_SIZE = 10; + export const DEFAULT_MAX_GQL_CACHE_SIZE = Math.pow(2, 20) * 8; // 8 MB diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index f32b98d9..52ed865f 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -10,7 +10,7 @@ import { EthClient } from '@cerc-io/ipld-eth-client'; import { JobQueue } from './job-queue'; import { BlockProgressInterface, EventInterface, IndexerInterface } from './types'; -import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME } from './constants'; +import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME, JOB_KIND_EVENTS } from './constants'; import { createPruningJob, processBlockByNumberWithCache } from './common'; import { UpstreamConfig } from './config'; import { OrderDirection } from './database'; @@ -98,7 +98,12 @@ export class EventWatcher { } async eventProcessingCompleteHandler (job: any): Promise { - const { data: { request: { data: { blockHash } } } } = job; + const { data: { request: { data: { kind, blockHash } } } } = job; + + // Ignore jobs other than JOB_KIND_EVENTS + if (kind !== JOB_KIND_EVENTS) { + return []; + } assert(blockHash); const blockProgress = await this._indexer.getBlockProgress(blockHash); diff --git a/packages/util/src/fill.ts b/packages/util/src/fill.ts index e72a4011..35579219 100644 --- a/packages/util/src/fill.ts +++ b/packages/util/src/fill.ts @@ -8,11 +8,10 @@ import { JobQueue } from './job-queue'; import { EventWatcherInterface, IndexerInterface } from './types'; import { wait } from './misc'; import { processBlockByNumberWithCache } from './common'; +import { DEFAULT_PREFETCH_BATCH_SIZE } from './constants'; const log = debug('vulcanize:fill'); -const DEFAULT_PREFETCH_BATCH_SIZE = 10; - export const fillBlocks = async ( jobQueue: JobQueue, indexer: IndexerInterface, diff --git a/packages/util/src/graph-decimal.ts b/packages/util/src/graph-decimal.ts index 7a7f2a1c..e2a08674 100644 --- a/packages/util/src/graph-decimal.ts +++ b/packages/util/src/graph-decimal.ts @@ -29,6 +29,11 @@ export class GraphDecimal { return this.value.toString(); } + toJSON (): string { + // Using fixed-point notation in preparing entity state. + return this.toFixed(); + } + toFixed (): string { this._checkOutOfRange(this); @@ -91,6 +96,13 @@ export class GraphDecimal { return new GraphDecimal(this.value.div(param)); } + pow (n: Decimal.Value | GraphDecimal): GraphDecimal { + this._checkOutOfRange(this); + const param = this._checkOutOfRange(n); + + return new GraphDecimal(this.value.pow(param)); + } + isZero (): boolean { this._checkOutOfRange(this); @@ -115,14 +127,14 @@ export class GraphDecimal { this._checkOutOfRange(this); const param = this._checkOutOfRange(n); - return this.value.lessThan(param); + return this.value.greaterThan(param); } gt (n: Decimal.Value | GraphDecimal): boolean { this._checkOutOfRange(this); const param = this._checkOutOfRange(n); - return this.value.lessThan(param); + return this.value.gt(param); } comparedTo (n: Decimal.Value | GraphDecimal): number { diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 7cd11d86..f1529407 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -3,7 +3,7 @@ // import assert from 'assert'; -import { DeepPartial, EntityTarget, FindConditions, FindManyOptions, LessThanOrEqual, MoreThan } from 'typeorm'; +import { DeepPartial, EntityTarget, FindConditions, FindManyOptions, MoreThan } from 'typeorm'; import debug from 'debug'; import JSONbig from 'json-bigint'; import { ethers } from 'ethers'; diff --git a/packages/util/src/job-queue.ts b/packages/util/src/job-queue.ts index 066ea94f..88a7d56c 100644 --- a/packages/util/src/job-queue.ts +++ b/packages/util/src/job-queue.ts @@ -26,7 +26,7 @@ export class JobQueue { constructor (config: Config) { this._config = config; this._boss = new PgBoss({ - // https://github.com/timgit/pg-boss/blob/master/docs/configuration.md + // https://github.com/timgit/pg-boss/blob/6.1.0/docs/configuration.md connectionString: this._config.dbConnectionString, onComplete: true, @@ -37,9 +37,11 @@ export class JobQueue { retryBackoff: true, // Time before active job fails by expiration. - expireInHours: 24 * 7, // 7 days + expireInHours: 24 * 1, // 1 day - retentionDays: 30, // 30 days + retentionDays: 1, // 1 day + + deleteAfterHours: 1, // 1 hour newJobCheckInterval: 100, @@ -106,11 +108,24 @@ export class JobQueue { } async onComplete (queue: string, callback: JobCallback): Promise { - return await this._boss.onComplete(queue, { teamSize: JOBS_PER_INTERVAL, teamConcurrency: 1 }, async (job: any) => { - const { id, data: { failed, createdOn } } = job; - log(`Job onComplete for queue ${queue} job ${id} created ${createdOn} success ${!failed}`); - await callback(job); - }); + return await this._boss.onComplete( + queue, + { + teamSize: JOBS_PER_INTERVAL, + teamConcurrency: 1 + }, + async (job: any) => { + try { + const { id, data: { failed, createdOn } } = job; + log(`Job onComplete for queue ${queue} job ${id} created ${createdOn} success ${!failed}`); + await callback(job); + } catch (error) { + log(`Error in onComplete handler for ${queue} job ${job.id}`); + log(error); + throw error; + } + } + ); } async markComplete (job: any): Promise {