diff --git a/packages/cli/src/job-runner.ts b/packages/cli/src/job-runner.ts index edac3072..959ddd2f 100644 --- a/packages/cli/src/job-runner.ts +++ b/packages/cli/src/job-runner.ts @@ -108,6 +108,7 @@ export class JobRunnerCmd { const jobRunner = new JobRunner(config.jobQueue, indexer, jobQueue); + // Delete all active and pending (before completed) jobs to start job-runner without old queued jobs await jobRunner.jobQueue.deleteAllJobs('completed'); await jobRunner.resetToPrevIndexedBlock(); diff --git a/packages/cli/src/server.ts b/packages/cli/src/server.ts index 866945ed..9a970033 100644 --- a/packages/cli/src/server.ts +++ b/packages/cli/src/server.ts @@ -281,7 +281,7 @@ export class ServerCmd { assert(eventWatcher); if (config.server.kind === KIND_ACTIVE) { - // Delete jobs before completed state to prevent creating jobs after completion of processing previous block. + // Delete all active and pending (before completed) jobs to prevent creating jobs after completion of processing previous block await jobQueue.deleteAllJobs('completed'); await eventWatcher.start(); } diff --git a/packages/codegen/src/entity.ts b/packages/codegen/src/entity.ts index 981d5a25..f6e37a0b 100644 --- a/packages/codegen/src/entity.ts +++ b/packages/codegen/src/entity.ts @@ -347,45 +347,46 @@ export class Entity { }); entityObject.columns.forEach((column: any) => { - // Implement bigintTransformer for bigint type. - if (column.tsType === 'bigint') { - column.columnOptions.push( - { - option: 'transformer', - value: 'bigintTransformer' + if (column.tsType.includes('bigint')) { + // Check if it is of array type + if (column.tsType.includes('bigint[]')) { + // Implement bigintArrayTransformer for array of bigint type. + column.columnOptions.push( + { + option: 'transformer', + value: 'bigintArrayTransformer' + } + ); + + if (importObject) { + importObject.toImport.add('bigintArrayTransformer'); + } else { + importObject = { + toImport: new Set(['bigintArrayTransformer']), + from: '@cerc-io/util' + }; + + entityObject.imports.push(importObject); } - ); - - if (importObject) { - importObject.toImport.add('bigintTransformer'); } else { - importObject = { - toImport: new Set(['bigintTransformer']), - from: '@cerc-io/util' - }; + // Implement bigintTransformer for bigint type. + column.columnOptions.push( + { + option: 'transformer', + value: 'bigintTransformer' + } + ); - entityObject.imports.push(importObject); - } - } + if (importObject) { + importObject.toImport.add('bigintTransformer'); + } else { + importObject = { + toImport: new Set(['bigintTransformer']), + from: '@cerc-io/util' + }; - // Implement bigintArrayTransformer for array of bigint type. - if (column.tsType === 'bigint[]') { - column.columnOptions.push( - { - option: 'transformer', - value: 'bigintArrayTransformer' + entityObject.imports.push(importObject); } - ); - - if (importObject) { - importObject.toImport.add('bigintArrayTransformer'); - } else { - importObject = { - toImport: new Set(['bigintArrayTransformer']), - from: '@cerc-io/util' - }; - - entityObject.imports.push(importObject); } } }); @@ -399,49 +400,48 @@ export class Entity { let isDecimalRequired = false; entityObject.columns.forEach((column: any) => { - // Implement decimalTransformer for Decimal type. - if (column.tsType === 'Decimal') { + if (column.tsType.includes('Decimal')) { isDecimalRequired = true; - column.columnOptions.push( - { - option: 'transformer', - value: 'decimalTransformer' + // Check if it is of array type + if (column.tsType.includes('Decimal[]')) { + // Implement decimalArrayTransformer for array of Decimal type. + column.columnOptions.push( + { + option: 'transformer', + value: 'decimalArrayTransformer' + } + ); + + if (importObject) { + importObject.toImport.add('decimalArrayTransformer'); + } else { + importObject = { + toImport: new Set(['decimalArrayTransformer']), + from: '@cerc-io/util' + }; + + entityObject.imports.push(importObject); } - ); - - if (importObject) { - importObject.toImport.add('decimalTransformer'); } else { - importObject = { - toImport: new Set(['decimalTransformer']), - from: '@cerc-io/util' - }; + // Implement decimalTransformer for Decimal type. + column.columnOptions.push( + { + option: 'transformer', + value: 'decimalTransformer' + } + ); - entityObject.imports.push(importObject); - } - } + if (importObject) { + importObject.toImport.add('decimalTransformer'); + } else { + importObject = { + toImport: new Set(['decimalTransformer']), + from: '@cerc-io/util' + }; - // Implement decimalArrayTransformer for array of Decimal type. - if (column.tsType === 'Decimal[]') { - isDecimalRequired = true; - - column.columnOptions.push( - { - option: 'transformer', - value: 'decimalArrayTransformer' + entityObject.imports.push(importObject); } - ); - - if (importObject) { - importObject.toImport.add('decimalArrayTransformer'); - } else { - importObject = { - toImport: new Set(['decimalArrayTransformer']), - from: '@cerc-io/util' - }; - - entityObject.imports.push(importObject); } } }); diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 5948897a..1b4c06b5 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -661,6 +661,14 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.fetchEventsAndSaveBlocks(blocks, this._eventSignaturesMap, this.parseEventNameAndArgs.bind(this)); } + async fetchAndSaveFilteredEventsAndBlocks (startBlock: number, endBlock: number): Promise<{ blockProgress: BlockProgress, events: DeepPartial[] }[]> { + return this._baseIndexer.fetchAndSaveFilteredEventsAndBlocks(startBlock, endBlock, this._eventSignaturesMap, this.parseEventNameAndArgs.bind(this)); + } + + async fetchEventsForContracts (blockHash: string, blockNumber: number, addresses: string[]): Promise[]> { + return this._baseIndexer.fetchEventsForContracts(blockHash, blockNumber, addresses, this._eventSignaturesMap, this.parseEventNameAndArgs.bind(this)); + } + async saveBlockAndFetchEvents (block: DeepPartial): Promise<[BlockProgress, DeepPartial[]]> { return this._saveBlockAndFetchEvents(block); } diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 595a9858..4f35b567 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -171,7 +171,6 @@ export class JobRunner { } else { // Check that startBlock is one greater than previous batch end block if (startBlock - 1 !== this._historicalProcessingCompletedUpto) { - // TODO: Debug jobQueue deleteJobs for historical processing not working await this.jobQueue.markComplete( job, { isComplete: false } diff --git a/packages/util/src/misc.ts b/packages/util/src/misc.ts index 40d60a10..c5db6d4c 100644 --- a/packages/util/src/misc.ts +++ b/packages/util/src/misc.ts @@ -133,6 +133,7 @@ export const resetJobs = async (config: Config): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); + // Delete all active and pending (before completed) jobs await jobQueue.deleteAllJobs('completed'); };