diff --git a/packages/codegen/src/indexer.ts b/packages/codegen/src/indexer.ts index cae33f5d..3b1aa663 100644 --- a/packages/codegen/src/indexer.ts +++ b/packages/codegen/src/indexer.ts @@ -45,6 +45,7 @@ export class Indexer { const queryObject = { name, + entityName: '', getQueryName: '', saveQueryName: '', params: _.cloneDeep(params), @@ -56,10 +57,12 @@ export class Indexer { if (name.charAt(0) === '_') { const capitalizedName = `${name.charAt(1).toUpperCase()}${name.slice(2)}`; + queryObject.entityName = `_${capitalizedName}`; queryObject.getQueryName = `_get${capitalizedName}`; queryObject.saveQueryName = `_save${capitalizedName}`; } else { const capitalizedName = `${name.charAt(0).toUpperCase()}${name.slice(1)}`; + queryObject.entityName = capitalizedName; queryObject.getQueryName = `get${capitalizedName}`; queryObject.saveQueryName = `save${capitalizedName}`; } diff --git a/packages/codegen/src/templates/database-template.handlebars b/packages/codegen/src/templates/database-template.handlebars index 3d2ed7f7..9ffe7829 100644 --- a/packages/codegen/src/templates/database-template.handlebars +++ b/packages/codegen/src/templates/database-template.handlebars @@ -3,7 +3,7 @@ // import assert from 'assert'; -import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm'; +import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions, EntityTarget } from 'typeorm'; import path from 'path'; import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util'; @@ -265,6 +265,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.removeEntities(queryRunner, entity, findConditions); } + async deleteEntitiesByConditions (queryRunner: QueryRunner, entity: EntityTarget, findConditions: FindConditions): Promise { + await this._baseDatabase.deleteEntitiesByConditions(queryRunner, entity, findConditions); + } + async getAncestorAtDepth (blockHash: string, depth: number): Promise { return this._baseDatabase.getAncestorAtDepth(blockHash, depth); } diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 957137fc..dd9857df 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -50,6 +50,9 @@ import { StateSyncStatus } from './entity/StateSyncStatus'; import { BlockProgress } from './entity/BlockProgress'; import { State } from './entity/State'; +{{#each queries as | query |}} +import { {{query.entityName}} } from './entity/{{query.entityName}}'; +{{/each}} {{#each subgraphEntities as | subgraphEntity |}} import { {{subgraphEntity.className}} } from './entity/{{subgraphEntity.className}}'; {{/each}} @@ -537,8 +540,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getBlocksAtHeight(height, isPruned); } - async fetchBlockWithEvents (block: DeepPartial): Promise { - return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this)); + async saveBlockAndFetchEvents (block: DeepPartial): Promise<[BlockProgress, DeepPartial[]]> { + return this._baseIndexer.saveBlockAndFetchEvents(block, this._saveBlockAndFetchEvents.bind(this)); } async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise> { @@ -594,6 +597,18 @@ export class Indexer implements IndexerInterface { this._subgraphStateMap.clear(); } + async resetWatcherToBlock (blockNumber: number): Promise { + const entities = [ + {{#each queries as | query |}} + {{query.entityName}}, + {{/each}} + {{#each subgraphEntities as | subgraphEntity |}} + {{subgraphEntity.className}}, + {{/each}} + ]; + await this._baseIndexer.resetWatcherToBlock(blockNumber, entities); + } + _populateEntityTypesMap (): void { {{#each subgraphEntities as | subgraphEntity |}} this._entityTypesMap.set('{{subgraphEntity.className}}', { diff --git a/packages/codegen/src/templates/job-runner-template.handlebars b/packages/codegen/src/templates/job-runner-template.handlebars index 5c25a80b..9d96479b 100644 --- a/packages/codegen/src/templates/job-runner-template.handlebars +++ b/packages/codegen/src/templates/job-runner-template.handlebars @@ -49,6 +49,8 @@ export class JobRunner { } async start (): Promise { + await this._jobQueue.deleteAllJobs(); + await this._baseJobRunner.resetToPrevIndexedBlock(); await this.subscribeBlockProcessingQueue(); await this.subscribeEventProcessingQueue(); await this.subscribeBlockCheckpointQueue(); diff --git a/packages/codegen/src/templates/reset-watcher-template.handlebars b/packages/codegen/src/templates/reset-watcher-template.handlebars index 77e05404..c54c2c86 100644 --- a/packages/codegen/src/templates/reset-watcher-template.handlebars +++ b/packages/codegen/src/templates/reset-watcher-template.handlebars @@ -67,58 +67,6 @@ export const handler = async (argv: any): Promise => { await graphWatcher.init(); {{/if}} - const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false); - assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`); - assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`); - const [blockProgress] = blockProgresses; - - const dbTx = await db.createTransactionRunner(); - - try { - const entities = [BlockProgress - {{~#each queries as | query |~}} - , {{query.entityName}} - {{~/each~}} - ]; - - const removeEntitiesPromise = entities.map(async entityClass => { - return db.removeEntities(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) }); - }); - - await Promise.all(removeEntitiesPromise); - - const syncStatus = await indexer.getSyncStatus(); - assert(syncStatus, 'Missing syncStatus'); - - if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) { - await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true); - } - - if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) { - await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true); - } - - const stateSyncStatus = await indexer.getStateSyncStatus(); - - if (stateSyncStatus) { - if (stateSyncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) { - await indexer.updateStateSyncStatusIndexedBlock(blockProgress.blockNumber, true); - } - - if (stateSyncStatus.latestCheckpointBlockNumber > blockProgress.blockNumber) { - await indexer.updateStateSyncStatusCheckpointBlock(blockProgress.blockNumber, true); - } - } - - await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true); - - dbTx.commitTransaction(); - } catch (error) { - await dbTx.rollbackTransaction(); - throw error; - } finally { - await dbTx.release(); - } - + await indexer.resetWatcherToBlock(argv.blockNumber); log('Reset watcher successfully'); }; diff --git a/packages/codegen/src/templates/server-template.handlebars b/packages/codegen/src/templates/server-template.handlebars index 58db08a1..c5636ed7 100644 --- a/packages/codegen/src/templates/server-template.handlebars +++ b/packages/codegen/src/templates/server-template.handlebars @@ -76,6 +76,8 @@ export const main = async (): Promise => { if (watcherKind === KIND_ACTIVE) { await jobQueue.start(); + // Delete jobs to prevent creating jobs after completion of processing previous block. + await jobQueue.deleteAllJobs(); await eventWatcher.start(); } diff --git a/packages/eden-watcher/src/cli/reset-cmds/watcher.ts b/packages/eden-watcher/src/cli/reset-cmds/watcher.ts index 28faa5c9..bc7784f8 100644 --- a/packages/eden-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/eden-watcher/src/cli/reset-cmds/watcher.ts @@ -73,52 +73,6 @@ export const handler = async (argv: any): Promise => { graphWatcher.setIndexer(indexer); await graphWatcher.init(); - const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false); - assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`); - assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`); - const [blockProgress] = blockProgresses; - - const dbTx = await db.createTransactionRunner(); - - try { - const entities = [BlockProgress, Producer, ProducerSet, ProducerSetChange, ProducerRewardCollectorChange, RewardScheduleEntry, RewardSchedule, ProducerEpoch, Block, Epoch, SlotClaim, Slot, Staker, Network, Distributor, Distribution, Claim, Slash, Account]; - - for (const entity of entities) { - await db.deleteEntitiesByConditions(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) }); - } - - const syncStatus = await indexer.getSyncStatus(); - assert(syncStatus, 'Missing syncStatus'); - - if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) { - await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true); - } - - if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) { - await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true); - } - - const stateSyncStatus = await indexer.getStateSyncStatus(); - - if (stateSyncStatus) { - if (stateSyncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) { - await indexer.updateStateSyncStatusIndexedBlock(blockProgress.blockNumber, true); - } - - if (stateSyncStatus.latestCheckpointBlockNumber > blockProgress.blockNumber) { - await indexer.updateStateSyncStatusCheckpointBlock(blockProgress.blockNumber, true); - } - } - - await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true); - - dbTx.commitTransaction(); - } catch (error) { - await dbTx.rollbackTransaction(); - throw error; - } finally { - await dbTx.release(); - } - + await indexer.resetWatcherToBlock(argv.blockNumber); log('Reset watcher successfully'); }; diff --git a/packages/eden-watcher/src/database.ts b/packages/eden-watcher/src/database.ts index 5b7ebceb..15af0048 100644 --- a/packages/eden-watcher/src/database.ts +++ b/packages/eden-watcher/src/database.ts @@ -3,7 +3,7 @@ // import assert from 'assert'; -import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm'; +import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions, EntityTarget } from 'typeorm'; import path from 'path'; import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util'; @@ -225,7 +225,7 @@ export class Database implements DatabaseInterface { return this._baseDatabase.removeEntities(queryRunner, entity, findConditions); } - async deleteEntitiesByConditions (queryRunner: QueryRunner, entity: new () => Entity, findConditions: FindConditions): Promise { + async deleteEntitiesByConditions (queryRunner: QueryRunner, entity: EntityTarget, findConditions: FindConditions): Promise { await this._baseDatabase.deleteEntitiesByConditions(queryRunner, entity, findConditions); } diff --git a/packages/eden-watcher/src/indexer.ts b/packages/eden-watcher/src/indexer.ts index d2546bc3..e8f19f52 100644 --- a/packages/eden-watcher/src/indexer.ts +++ b/packages/eden-watcher/src/indexer.ts @@ -519,6 +519,11 @@ export class Indexer implements IndexerInterface { this._subgraphStateMap.clear(); } + async resetWatcherToBlock (blockNumber: number): Promise { + const entities = [ProducerSet, Producer, RewardSchedule, RewardScheduleEntry, Network, Staker, ProducerEpoch, Epoch, Block, SlotClaim, Slot, Distributor, Distribution, Claim, Account, Slash]; + await this._baseIndexer.resetWatcherToBlock(blockNumber, entities); + } + _populateEntityTypesMap (): void { this._entityTypesMap.set( 'Producer', diff --git a/packages/eden-watcher/src/job-runner.ts b/packages/eden-watcher/src/job-runner.ts index 09db8431..0de67991 100644 --- a/packages/eden-watcher/src/job-runner.ts +++ b/packages/eden-watcher/src/job-runner.ts @@ -44,6 +44,8 @@ export class JobRunner { } async start (): Promise { + await this._jobQueue.deleteAllJobs(); + await this._baseJobRunner.resetToPrevIndexedBlock(); await this.subscribeBlockProcessingQueue(); await this.subscribeEventProcessingQueue(); await this.subscribeBlockCheckpointQueue(); diff --git a/packages/eden-watcher/src/server.ts b/packages/eden-watcher/src/server.ts index b34b0be6..e7b083f8 100644 --- a/packages/eden-watcher/src/server.ts +++ b/packages/eden-watcher/src/server.ts @@ -14,7 +14,7 @@ import debug from 'debug'; import 'graphql-import-node'; import { createServer } from 'http'; -import { DEFAULT_CONFIG_PATH, getConfig, Config, JobQueue, KIND_ACTIVE, initClients, startGQLMetricsServer } from '@cerc-io/util'; +import { DEFAULT_CONFIG_PATH, getConfig, Config, JobQueue, KIND_ACTIVE, initClients, startGQLMetricsServer, resetJobs } from '@cerc-io/util'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; import { createResolvers } from './resolvers'; @@ -70,6 +70,8 @@ export const main = async (): Promise => { if (watcherKind === KIND_ACTIVE) { await jobQueue.start(); + // Delete jobs to prevent creating jobs after completion of processing previous block. + await jobQueue.deleteAllJobs(); await eventWatcher.start(); } diff --git a/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts b/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts index b21aaf60..293d8f04 100644 --- a/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts @@ -48,35 +48,6 @@ export const handler = async (argv: any): Promise => { const syncStatus = await indexer.getSyncStatus(); assert(syncStatus, 'Missing syncStatus'); - const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false); - assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`); - assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`); - const [blockProgress] = blockProgresses; - - const dbTx = await db.createTransactionRunner(); - - try { - for (const entity of [BlockProgress, Allowance, Balance]) { - await db.deleteEntitiesByConditions(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) }); - } - - if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) { - await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true); - } - - if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) { - await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true); - } - - await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true); - - dbTx.commitTransaction(); - } catch (error) { - await dbTx.rollbackTransaction(); - throw error; - } finally { - await dbTx.release(); - } - + await indexer.resetWatcherToBlock(argv.blockNumber); log('Reset watcher successfully'); }; diff --git a/packages/erc20-watcher/src/database.ts b/packages/erc20-watcher/src/database.ts index ff69b49c..0161f09b 100644 --- a/packages/erc20-watcher/src/database.ts +++ b/packages/erc20-watcher/src/database.ts @@ -94,6 +94,18 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getStateSyncStatus(repo); } + async updateStateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise { + const repo = queryRunner.manager.getRepository(StateSyncStatus); + + return this._baseDatabase.updateStateSyncStatusIndexedBlock(repo, blockNumber, force); + } + + async updateStateSyncStatusCheckpointBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise { + const repo = queryRunner.manager.getRepository(StateSyncStatus); + + return this._baseDatabase.updateStateSyncStatusCheckpointBlock(repo, blockNumber, force); + } + async getBalance ({ blockHash, token, owner }: { blockHash: string, token: string, owner: string }): Promise { return this._conn.getRepository(Balance) .createQueryBuilder('balance') diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index 989eb837..d8769943 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -23,6 +23,8 @@ import artifacts from './artifacts/ERC20.json'; import { BlockProgress } from './entity/BlockProgress'; import { Contract } from './entity/Contract'; import { State } from './entity/State'; +import { Allowance } from './entity/Allowance'; +import { Balance } from './entity/Balance'; const log = debug('vulcanize:indexer'); const JSONbigNative = JSONbig({ useNativeBigInt: true }); @@ -254,6 +256,11 @@ export class Indexer implements IndexerInterface { // TODO Implement } + async processInitialState (contractAddress: string, blockHash: string): Promise { + // TODO: Call initial state hook. + return {}; + } + async processCheckpoint (blockHash: string): Promise { // TODO Implement } @@ -422,6 +429,11 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getAncestorAtDepth(blockHash, depth); } + async resetWatcherToBlock (blockNumber: number): Promise { + const entities = [Allowance, Balance]; + await this._baseIndexer.resetWatcherToBlock(blockNumber, entities); + } + async _saveBlockAndFetchEvents ({ cid: blockCid, blockHash, diff --git a/packages/erc20-watcher/src/job-runner.ts b/packages/erc20-watcher/src/job-runner.ts index 3ff3a7ff..f112f6f8 100644 --- a/packages/erc20-watcher/src/job-runner.ts +++ b/packages/erc20-watcher/src/job-runner.ts @@ -40,6 +40,8 @@ export class JobRunner { } async start (): Promise { + await this._jobQueue.deleteAllJobs(); + await this._baseJobRunner.resetToPrevIndexedBlock(); await this.subscribeBlockProcessingQueue(); await this.subscribeEventProcessingQueue(); this._baseJobRunner.handleShutdown(); diff --git a/packages/erc20-watcher/src/server.ts b/packages/erc20-watcher/src/server.ts index 317b8b01..74b89463 100644 --- a/packages/erc20-watcher/src/server.ts +++ b/packages/erc20-watcher/src/server.ts @@ -62,6 +62,8 @@ export const main = async (): Promise => { if (watcherKind === KIND_ACTIVE) { await jobQueue.start(); + // Delete jobs to prevent creating jobs after completion of processing previous block. + await jobQueue.deleteAllJobs(); await eventWatcher.start(); } diff --git a/packages/erc721-watcher/demo.md b/packages/erc721-watcher/demo.md index 4b207775..881ab183 100644 --- a/packages/erc721-watcher/demo.md +++ b/packages/erc721-watcher/demo.md @@ -86,18 +86,18 @@ yarn && yarn build ``` -* Run the watcher: - - ```bash - yarn server - ``` - * Run the job-runner: ```bash yarn job-runner ``` +* Run the watcher: + + ```bash + yarn server + ``` + * Deploy an ERC721 token: ```bash diff --git a/packages/erc721-watcher/src/cli/reset-cmds/watcher.ts b/packages/erc721-watcher/src/cli/reset-cmds/watcher.ts index d1181020..030d462c 100644 --- a/packages/erc721-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/erc721-watcher/src/cli/reset-cmds/watcher.ts @@ -60,52 +60,6 @@ export const handler = async (argv: any): Promise => { const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); await indexer.init(); - const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false); - assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`); - assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`); - const [blockProgress] = blockProgresses; - - const dbTx = await db.createTransactionRunner(); - - try { - const entities = [BlockProgress, SupportsInterface, BalanceOf, OwnerOf, GetApproved, IsApprovedForAll, Name, Symbol, TokenURI, _Name, _Symbol, _Owners, _Balances, _TokenApprovals, _OperatorApprovals]; - - for (const entity of entities) { - await db.deleteEntitiesByConditions(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) }); - } - - const syncStatus = await indexer.getSyncStatus(); - assert(syncStatus, 'Missing syncStatus'); - - if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) { - await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true); - } - - if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) { - await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true); - } - - const stateSyncStatus = await indexer.getStateSyncStatus(); - - if (stateSyncStatus) { - if (stateSyncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) { - await indexer.updateStateSyncStatusIndexedBlock(blockProgress.blockNumber, true); - } - - if (stateSyncStatus.latestCheckpointBlockNumber > blockProgress.blockNumber) { - await indexer.updateStateSyncStatusCheckpointBlock(blockProgress.blockNumber, true); - } - } - - await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true); - - dbTx.commitTransaction(); - } catch (error) { - await dbTx.rollbackTransaction(); - throw error; - } finally { - await dbTx.release(); - } - + await indexer.resetWatcherToBlock(argv.blockNumber); log('Reset watcher successfully'); }; diff --git a/packages/erc721-watcher/src/indexer.ts b/packages/erc721-watcher/src/indexer.ts index dde7de9e..00e79411 100644 --- a/packages/erc721-watcher/src/indexer.ts +++ b/packages/erc721-watcher/src/indexer.ts @@ -38,6 +38,20 @@ import { SyncStatus } from './entity/SyncStatus'; import { StateSyncStatus } from './entity/StateSyncStatus'; import { BlockProgress } from './entity/BlockProgress'; import { State } from './entity/State'; +import { SupportsInterface } from './entity/SupportsInterface'; +import { BalanceOf } from './entity/BalanceOf'; +import { OwnerOf } from './entity/OwnerOf'; +import { GetApproved } from './entity/GetApproved'; +import { IsApprovedForAll } from './entity/IsApprovedForAll'; +import { Name } from './entity/Name'; +import { Symbol } from './entity/Symbol'; +import { TokenURI } from './entity/TokenURI'; +import { _Name } from './entity/_Name'; +import { _Symbol } from './entity/_Symbol'; +import { _Owners } from './entity/_Owners'; +import { _Balances } from './entity/_Balances'; +import { _TokenApprovals } from './entity/_TokenApprovals'; +import { _OperatorApprovals } from './entity/_OperatorApprovals'; import { TransferCount } from './entity/TransferCount'; const log = debug('vulcanize:indexer'); @@ -863,6 +877,11 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getAncestorAtDepth(blockHash, depth); } + async resetWatcherToBlock (blockNumber: number): Promise { + const entities = [SupportsInterface, BalanceOf, OwnerOf, GetApproved, IsApprovedForAll, Name, Symbol, TokenURI, _Name, _Symbol, _Owners, _Balances, _TokenApprovals, _OperatorApprovals, TransferCount]; + await this._baseIndexer.resetWatcherToBlock(blockNumber, entities); + } + async _saveBlockAndFetchEvents ({ cid: blockCid, blockHash, diff --git a/packages/erc721-watcher/src/job-runner.ts b/packages/erc721-watcher/src/job-runner.ts index 2fc66644..eb324fac 100644 --- a/packages/erc721-watcher/src/job-runner.ts +++ b/packages/erc721-watcher/src/job-runner.ts @@ -42,6 +42,8 @@ export class JobRunner { } async start (): Promise { + await this._jobQueue.deleteAllJobs(); + await this._baseJobRunner.resetToPrevIndexedBlock(); await this.subscribeBlockProcessingQueue(); await this.subscribeEventProcessingQueue(); await this.subscribeBlockCheckpointQueue(); diff --git a/packages/erc721-watcher/src/server.ts b/packages/erc721-watcher/src/server.ts index b79c9678..c67f0339 100644 --- a/packages/erc721-watcher/src/server.ts +++ b/packages/erc721-watcher/src/server.ts @@ -61,6 +61,8 @@ export const main = async (): Promise => { if (watcherKind === KIND_ACTIVE) { await jobQueue.start(); + // Delete jobs to prevent creating jobs after completion of processing previous block. + await jobQueue.deleteAllJobs(); await eventWatcher.start(); } diff --git a/packages/graph-node/src/cli/compare/compare-blocks.ts b/packages/graph-node/src/cli/compare/compare-blocks.ts index efd88867..59eab62a 100644 --- a/packages/graph-node/src/cli/compare/compare-blocks.ts +++ b/packages/graph-node/src/cli/compare/compare-blocks.ts @@ -200,7 +200,6 @@ export const main = async (): Promise => { for (const [queryName, entityName] of Object.entries(queryNames)) { try { log(`At block ${blockNumber} for query ${queryName}:`); - let resultDiff = ''; if (fetchIds) { const queryLimit = config.queries.queryLimits[queryName]; @@ -230,11 +229,15 @@ export const main = async (): Promise => { } if (diff) { - resultDiff = diff; + log('Results mismatch:', diff); + diffFound = true; + } else { + log('Results match.'); } } } else { if (updatedEntities.has(entityName)) { + let resultDiff; let result; let skip = 0; @@ -271,14 +274,14 @@ export const main = async (): Promise => { // eslint-disable-next-line no-unmodified-loop-condition paginate ); - } - } - if (resultDiff) { - log('Results mismatch:', resultDiff); - diffFound = true; - } else { - log('Results match.'); + if (resultDiff) { + log('Results mismatch:', resultDiff); + diffFound = true; + } else { + log('Results match.'); + } + } } } catch (err: any) { log('Error:', err.message); diff --git a/packages/graph-node/src/database.ts b/packages/graph-node/src/database.ts index 5f46f577..6ba41119 100644 --- a/packages/graph-node/src/database.ts +++ b/packages/graph-node/src/database.ts @@ -41,7 +41,8 @@ const DEFAULT_CLEAR_ENTITIES_CACHE_INTERVAL = 1000; export enum ENTITY_QUERY_TYPE { SINGULAR, DISTINCT_ON, - GROUP_BY + GROUP_BY, + UNIQUE } interface CachedEntities { @@ -378,6 +379,10 @@ export class Database { entities = await this.getEntitiesDistinctOn(queryRunner, entity, block, where, queryOptions); break; + case ENTITY_QUERY_TYPE.UNIQUE: + entities = await this.getEntitiesUnique(queryRunner, entity, block, where, queryOptions); + break; + default: // Use group by query if entity query type is not specified in map. entities = await this.getEntitiesGroupBy(queryRunner, entity, block, where, queryOptions); @@ -413,6 +418,11 @@ export class Database { .andWhere('blockProgress.is_pruned = :isPruned', { isPruned: false }) .groupBy('subTable.id'); + if (where.id) { + subQuery = this._baseDatabase.buildQuery(repo, subQuery, { id: where.id }); + delete where.id; + } + if (block.hash) { const { canonicalBlockNumber, blockHashes } = await this._baseDatabase.getFrothyRegion(queryRunner, block.hash); @@ -473,6 +483,11 @@ export class Database { .addOrderBy('subTable.id', 'ASC') .addOrderBy('subTable.block_number', 'DESC'); + if (where.id) { + subQuery = this._baseDatabase.buildQuery(repo, subQuery, { id: where.id }); + delete where.id; + } + if (block.hash) { const { canonicalBlockNumber, blockHashes } = await this._baseDatabase.getFrothyRegion(queryRunner, block.hash); @@ -554,6 +569,56 @@ export class Database { return entities as Entity[]; } + async getEntitiesUnique ( + queryRunner: QueryRunner, + entity: new () => Entity, + block: BlockHeight, + where: Where = {}, + queryOptions: QueryOptions = {} + ): Promise { + const repo = queryRunner.manager.getRepository(entity); + const { tableName } = repo.metadata; + + let selectQueryBuilder = repo.createQueryBuilder(tableName) + .addFrom('block_progress', 'blockProgress') + .where(`${tableName}.block_hash = blockProgress.block_hash`) + .andWhere('blockProgress.is_pruned = :isPruned', { isPruned: false }); + + if (block.hash) { + const { canonicalBlockNumber, blockHashes } = await this._baseDatabase.getFrothyRegion(queryRunner, block.hash); + + selectQueryBuilder = selectQueryBuilder + .andWhere(new Brackets(qb => { + qb.where(`${tableName}.block_hash IN (:...blockHashes)`, { blockHashes }) + .orWhere(`${tableName}.block_number <= :canonicalBlockNumber`, { canonicalBlockNumber }); + })); + } + + if (block.number) { + selectQueryBuilder = selectQueryBuilder.andWhere(`${tableName}.block_number <= :blockNumber`, { blockNumber: block.number }); + } + + selectQueryBuilder = this._baseDatabase.buildQuery(repo, selectQueryBuilder, where); + + if (queryOptions.orderBy) { + selectQueryBuilder = this._baseDatabase.orderQuery(repo, selectQueryBuilder, queryOptions); + } + + selectQueryBuilder = this._baseDatabase.orderQuery(repo, selectQueryBuilder, { ...queryOptions, orderBy: 'id' }); + + if (queryOptions.skip) { + selectQueryBuilder = selectQueryBuilder.offset(queryOptions.skip); + } + + if (queryOptions.limit) { + selectQueryBuilder = selectQueryBuilder.limit(queryOptions.limit); + } + + const entities = await selectQueryBuilder.getMany(); + + return entities as Entity[]; + } + async loadEntitiesRelations ( queryRunner: QueryRunner, block: BlockHeight, diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index 45205889..44333c72 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -197,6 +197,10 @@ export class Indexer implements IndexerInterface { updateStateStatusMap (address: string, stateStatus: StateStatus): void { return undefined; } + + async resetWatcherToBlock (blockNumber: number): Promise { + return undefined; + } } class ServerConfig implements ServerConfigInterface { diff --git a/packages/graph-test-watcher/src/cli/reset-cmds/watcher.ts b/packages/graph-test-watcher/src/cli/reset-cmds/watcher.ts index 4206d38f..2fc935f9 100644 --- a/packages/graph-test-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/graph-test-watcher/src/cli/reset-cmds/watcher.ts @@ -61,52 +61,6 @@ export const handler = async (argv: any): Promise => { graphWatcher.setIndexer(indexer); await graphWatcher.init(); - const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false); - assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`); - assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`); - const [blockProgress] = blockProgresses; - - const dbTx = await db.createTransactionRunner(); - - try { - const entities = [BlockProgress, GetMethod, _Test, Author, Category, Blog]; - - for (const entity of entities) { - await db.deleteEntitiesByConditions(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) }); - } - - const syncStatus = await indexer.getSyncStatus(); - assert(syncStatus, 'Missing syncStatus'); - - if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) { - await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true); - } - - if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) { - await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true); - } - - const stateSyncStatus = await indexer.getStateSyncStatus(); - - if (stateSyncStatus) { - if (stateSyncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) { - await indexer.updateStateSyncStatusIndexedBlock(blockProgress.blockNumber, true); - } - - if (stateSyncStatus.latestCheckpointBlockNumber > blockProgress.blockNumber) { - await indexer.updateStateSyncStatusCheckpointBlock(blockProgress.blockNumber, true); - } - } - - await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true); - - dbTx.commitTransaction(); - } catch (error) { - await dbTx.rollbackTransaction(); - throw error; - } finally { - await dbTx.release(); - } - + await indexer.resetWatcherToBlock(argv.blockNumber); log('Reset watcher successfully'); }; diff --git a/packages/graph-test-watcher/src/indexer.ts b/packages/graph-test-watcher/src/indexer.ts index a8f989e4..fcf968ee 100644 --- a/packages/graph-test-watcher/src/indexer.ts +++ b/packages/graph-test-watcher/src/indexer.ts @@ -524,6 +524,11 @@ export class Indexer implements IndexerInterface { this._subgraphStateMap.clear(); } + async resetWatcherToBlock (blockNumber: number): Promise { + const entities = [Author, Blog, Category]; + await this._baseIndexer.resetWatcherToBlock(blockNumber, entities); + } + _populateEntityTypesMap (): void { this._entityTypesMap.set( 'Author', diff --git a/packages/graph-test-watcher/src/job-runner.ts b/packages/graph-test-watcher/src/job-runner.ts index 09db8431..0de67991 100644 --- a/packages/graph-test-watcher/src/job-runner.ts +++ b/packages/graph-test-watcher/src/job-runner.ts @@ -44,6 +44,8 @@ export class JobRunner { } async start (): Promise { + await this._jobQueue.deleteAllJobs(); + await this._baseJobRunner.resetToPrevIndexedBlock(); await this.subscribeBlockProcessingQueue(); await this.subscribeEventProcessingQueue(); await this.subscribeBlockCheckpointQueue(); diff --git a/packages/graph-test-watcher/src/server.ts b/packages/graph-test-watcher/src/server.ts index be6c5af1..b7bf1bae 100644 --- a/packages/graph-test-watcher/src/server.ts +++ b/packages/graph-test-watcher/src/server.ts @@ -70,6 +70,8 @@ export const main = async (): Promise => { if (watcherKind === KIND_ACTIVE) { await jobQueue.start(); + // Delete jobs to prevent creating jobs after completion of processing previous block. + await jobQueue.deleteAllJobs(); await eventWatcher.start(); } diff --git a/packages/mobymask-watcher/demo.md b/packages/mobymask-watcher/demo.md index f86bc84b..48ae7b6a 100644 --- a/packages/mobymask-watcher/demo.md +++ b/packages/mobymask-watcher/demo.md @@ -86,18 +86,18 @@ yarn && yarn build ``` -* Change directory to `packages/mobymask-watcher/` and run the watcher: - - ```bash - yarn server - ``` - * Run the job-runner: ```bash yarn job-runner ``` +* Change directory to `packages/mobymask-watcher/` and run the watcher: + + ```bash + yarn server + ``` + * Clone the [MobyMask](https://github.com/cerc-io/MobyMask) repo. * Checkout to the branch with changes for using this watcher: diff --git a/packages/mobymask-watcher/environments/local.toml b/packages/mobymask-watcher/environments/local.toml index 27b59f26..625311d5 100644 --- a/packages/mobymask-watcher/environments/local.toml +++ b/packages/mobymask-watcher/environments/local.toml @@ -47,6 +47,6 @@ maxCompletionLagInSecs = 300 jobDelayInMilliSecs = 100 eventsInBatch = 50 - blockDelayInMilliSecs = 60000 + blockDelayInMilliSecs = 2000 prefetchBlocksInMem = true prefetchBlockCount = 10 diff --git a/packages/mobymask-watcher/src/cli/reset-cmds/watcher.ts b/packages/mobymask-watcher/src/cli/reset-cmds/watcher.ts index fd611873..32fb5b68 100644 --- a/packages/mobymask-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/mobymask-watcher/src/cli/reset-cmds/watcher.ts @@ -51,52 +51,6 @@ export const handler = async (argv: any): Promise => { const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); await indexer.init(); - const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false); - assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`); - assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`); - const [blockProgress] = blockProgresses; - - const dbTx = await db.createTransactionRunner(); - - try { - const entities = [BlockProgress, MultiNonce, _Owner, IsRevoked, IsPhisher, IsMember]; - - for (const entity of entities) { - await db.deleteEntitiesByConditions(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) }); - } - - const syncStatus = await indexer.getSyncStatus(); - assert(syncStatus, 'Missing syncStatus'); - - if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) { - await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true); - } - - if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) { - await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true); - } - - const stateSyncStatus = await indexer.getStateSyncStatus(); - - if (stateSyncStatus) { - if (stateSyncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) { - await indexer.updateStateSyncStatusIndexedBlock(blockProgress.blockNumber, true); - } - - if (stateSyncStatus.latestCheckpointBlockNumber > blockProgress.blockNumber) { - await indexer.updateStateSyncStatusCheckpointBlock(blockProgress.blockNumber, true); - } - } - - await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true); - - dbTx.commitTransaction(); - } catch (error) { - await dbTx.rollbackTransaction(); - throw error; - } finally { - await dbTx.release(); - } - + await indexer.resetWatcherToBlock(argv.blockNumber); log('Reset watcher successfully'); }; diff --git a/packages/mobymask-watcher/src/indexer.ts b/packages/mobymask-watcher/src/indexer.ts index 6c2ecc0d..4e294ba7 100644 --- a/packages/mobymask-watcher/src/indexer.ts +++ b/packages/mobymask-watcher/src/indexer.ts @@ -607,6 +607,11 @@ export class Indexer implements IndexerInterface { return this._contractMap.get(kind); } + async resetWatcherToBlock (blockNumber: number): Promise { + const entities = [MultiNonce, _Owner, IsRevoked, IsPhisher, IsMember]; + await this._baseIndexer.resetWatcherToBlock(blockNumber, entities); + } + async _saveBlockAndFetchEvents ({ cid: blockCid, blockHash, diff --git a/packages/mobymask-watcher/src/job-runner.ts b/packages/mobymask-watcher/src/job-runner.ts index 2fc66644..eb324fac 100644 --- a/packages/mobymask-watcher/src/job-runner.ts +++ b/packages/mobymask-watcher/src/job-runner.ts @@ -42,6 +42,8 @@ export class JobRunner { } async start (): Promise { + await this._jobQueue.deleteAllJobs(); + await this._baseJobRunner.resetToPrevIndexedBlock(); await this.subscribeBlockProcessingQueue(); await this.subscribeEventProcessingQueue(); await this.subscribeBlockCheckpointQueue(); diff --git a/packages/mobymask-watcher/src/server.ts b/packages/mobymask-watcher/src/server.ts index e72a7b51..3aeb726e 100644 --- a/packages/mobymask-watcher/src/server.ts +++ b/packages/mobymask-watcher/src/server.ts @@ -61,6 +61,8 @@ export const main = async (): Promise => { if (watcherKind === KIND_ACTIVE) { await jobQueue.start(); + // Delete jobs to prevent creating jobs after completion of processing previous block. + await jobQueue.deleteAllJobs(); await eventWatcher.start(); } diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index 01dbba53..fa9d94fd 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -63,47 +63,39 @@ export const fetchBlocksAtHeight = async ( const { blockNumber } = job.data; let blocks = []; - // Check for blocks in cache if prefetchBlocksInMem flag set. - if (jobQueueConfig.prefetchBlocksInMem) { - // Get blocks prefetched in memory. - blocks = getPrefetchedBlocksAtHeight(prefetchedBlocksMap, blockNumber); + // Try fetching blocks from the db. + const blockProgressEntities = await indexer.getBlocksAtHeight(blockNumber, false); + blocks = blockProgressEntities.map((block: any) => { + block.timestamp = block.blockTimestamp; - // If not found in cache, fetch the next batch. - if (!blocks.length) { - // Wait for blocks to be prefetched. - console.time('time:common#fetchBlocks-_prefetchBlocks'); - await _prefetchBlocks(blockNumber, indexer, jobQueueConfig, prefetchedBlocksMap); - console.timeEnd('time:common#fetchBlocks-_prefetchBlocks'); - - blocks = getPrefetchedBlocksAtHeight(prefetchedBlocksMap, blockNumber); - } - - log('size:common#_fetchBlocks-_prefetchedBlocksMap-size:', prefetchedBlocksMap.size); - } + return block; + }); + // If blocks not found in the db: if (!blocks.length) { - log(`common#cache-miss-${blockNumber}`); - const blockProgressEntities = await indexer.getBlocksAtHeight(blockNumber, false); + // Check for blocks in cache if prefetchBlocksInMem flag set. + if (jobQueueConfig.prefetchBlocksInMem) { + // Get blocks prefetched in memory. + blocks = getPrefetchedBlocksAtHeight(prefetchedBlocksMap, blockNumber); - blocks = blockProgressEntities.map((block: any) => { - block.timestamp = block.blockTimestamp; + // If not found in cache, fetch the next batch. + if (!blocks.length) { + log(`common#cache-miss-${blockNumber}`); - return block; - }); - } + // Wait for blocks to be prefetched. + console.time('time:common#fetchBlocks-_prefetchBlocks'); + await _prefetchBlocks(blockNumber, indexer, jobQueueConfig, prefetchedBlocksMap); + console.timeEnd('time:common#fetchBlocks-_prefetchBlocks'); - // Try fetching blocks from eth-server until found. - while (!blocks.length) { - console.time('time:common#_fetchBlocks-eth-server'); - blocks = await indexer.getBlocks({ blockNumber }); - console.timeEnd('time:common#_fetchBlocks-eth-server'); + blocks = getPrefetchedBlocksAtHeight(prefetchedBlocksMap, blockNumber); + } - if (!blocks.length) { - log(`No blocks fetched for block number ${blockNumber}, retrying after ${jobQueueConfig.blockDelayInMilliSecs} ms delay.`); - await wait(jobQueueConfig.blockDelayInMilliSecs); + log('size:common#_fetchBlocks-_prefetchedBlocksMap-size:', prefetchedBlocksMap.size); } } + assert(blocks.length, 'Blocks not fetched'); + const blocksToBeIndexed: DeepPartial[] = []; for (const block of blocks) { const { cid, blockHash, blockNumber, parentHash, timestamp } = block; @@ -150,7 +142,15 @@ export const _prefetchBlocks = async ( * @param startBlock * @param endBlock */ -export const _fetchBatchBlocks = async (indexer: IndexerInterface, jobQueueConfig: JobQueueConfig, startBlock: number, endBlock: number): Promise => { +export const _fetchBatchBlocks = async ( + indexer: IndexerInterface, + jobQueueConfig: JobQueueConfig, + startBlock: number, + endBlock: number +): Promise<{ + blockProgress: BlockProgressInterface, + events: DeepPartial[] +}[]> => { const blockNumbers = [...Array(endBlock - startBlock).keys()].map(n => n + startBlock); let blocks = []; @@ -181,15 +181,26 @@ export const _fetchBatchBlocks = async (indexer: IndexerInterface, jobQueueConfi await wait(jobQueueConfig.jobDelayInMilliSecs); } - // TODO Catch errors and continue to process available events instead of retrying for whole range because of an error. - const blockAndEventPromises = blocks.map(async block => { + console.time('time:common#fetchBatchBlocks-saveBlockAndFetchEvents'); + const blockAndEventsPromises = blocks.map(async block => { block.blockTimestamp = block.timestamp; - const [blockProgress, events] = await indexer.saveBlockAndFetchEvents(block); - return { blockProgress, events }; + try { + const [blockProgress, events] = await indexer.saveBlockAndFetchEvents(block); + return { blockProgress, events }; + } catch (error) { + log(error); + return null; + } }); - return Promise.all(blockAndEventPromises); + const blockAndEventsList = await Promise.all(blockAndEventsPromises); + console.timeEnd('time:common#fetchBatchBlocks-saveBlockAndFetchEvents'); + + return blockAndEventsList.filter(blockAndEvent => blockAndEvent !== null) as { + blockProgress: BlockProgressInterface, + events: DeepPartial[] + }[]; }; /** diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index ab257979..995bfeef 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -9,6 +9,7 @@ import { ConnectionOptions, createConnection, DeepPartial, + EntityTarget, FindConditions, FindManyOptions, In, @@ -335,7 +336,7 @@ export class Database { await repo.remove(entities); } - async deleteEntitiesByConditions (queryRunner: QueryRunner, entity: new () => Entity, findConditions: FindConditions): Promise { + async deleteEntitiesByConditions (queryRunner: QueryRunner, entity: EntityTarget, findConditions: FindConditions): Promise { const repo = queryRunner.manager.getRepository(entity); await repo.delete(findConditions); diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 33e5606c..0013ad90 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -3,7 +3,7 @@ // import assert from 'assert'; -import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm'; +import { DeepPartial, EntityTarget, FindConditions, FindManyOptions, MoreThan } from 'typeorm'; import debug from 'debug'; import JSONbig from 'json-bigint'; import { ethers } from 'ethers'; @@ -981,6 +981,54 @@ export class Indexer { } } + async resetWatcherToBlock (blockNumber: number, entities: EntityTarget<{ blockNumber: number }>[]): Promise { + const blockProgresses = await this.getBlocksAtHeight(blockNumber, false); + assert(blockProgresses.length, `No blocks at specified block number ${blockNumber}`); + assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${blockNumber} with unprocessed events`); + const [blockProgress] = blockProgresses; + const dbTx = await this._db.createTransactionRunner(); + + try { + for (const entity of entities) { + await this._db.deleteEntitiesByConditions(dbTx, entity, { blockNumber: MoreThan(blockNumber) }); + } + + await this._db.deleteEntitiesByConditions(dbTx, 'block_progress', { blockNumber: MoreThan(blockNumber) }); + + const syncStatus = await this.getSyncStatus(); + assert(syncStatus, 'Missing syncStatus'); + + if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) { + await this.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true); + } + + if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) { + await this.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true); + } + + const stateSyncStatus = await this._db.getStateSyncStatus(); + + if (stateSyncStatus) { + if (stateSyncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) { + await this._db.updateStateSyncStatusIndexedBlock(dbTx, blockNumber, true); + } + + if (stateSyncStatus.latestCheckpointBlockNumber > blockProgress.blockNumber) { + await this._db.updateStateSyncStatusCheckpointBlock(dbTx, blockNumber, true); + } + } + + await this.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true); + + dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + } + updateStateStatusMap (address: string, stateStatus: StateStatus): void { // Get and update State status for the contract. const oldStateStatus = this._stateStatusMap[address]; diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index f180355f..8d89e8ff 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -172,6 +172,17 @@ export class JobRunner { await this._jobQueue.markComplete(job); } + async resetToPrevIndexedBlock (): Promise { + const syncStatus = await this._indexer.getSyncStatus(); + + if (syncStatus) { + // Resetting to block before latest indexed as all events might not be processed in latest indexed block. + // Reprocessing of events in subgraph watchers is not possible as DB transaction is not implemented. + // TODO: Check updating latestIndexedBlock after blockProgress.isComplete is set to true. + await this._indexer.resetWatcherToBlock(syncStatus.latestIndexedBlockNumber - 1); + } + } + handleShutdown (): void { process.on('SIGINT', this._processShutdown.bind(this)); process.on('SIGTERM', this._processShutdown.bind(this)); diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 243f582c..8dd923af 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -2,7 +2,7 @@ // Copyright 2021 Vulcanize, Inc. // -import { Connection, DeepPartial, FindConditions, FindManyOptions, QueryRunner } from 'typeorm'; +import { Connection, DeepPartial, EntityTarget, FindConditions, FindManyOptions, QueryRunner } from 'typeorm'; import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; import { ServerConfig } from './config'; @@ -120,6 +120,7 @@ export interface IndexerInterface { updateSubgraphState?: (contractAddress: string, data: any) => void updateStateStatusMap (address: string, stateStatus: StateStatus): void getStateData (state: StateInterface): any + resetWatcherToBlock (blockNumber: number): Promise } export interface EventWatcherInterface { @@ -130,7 +131,8 @@ export interface EventWatcherInterface { export interface DatabaseInterface { _conn: Connection; - createTransactionRunner(): Promise; + close (): Promise; + createTransactionRunner (): Promise; getBlocksAtHeight (height: number, isPruned: boolean): Promise; getBlockProgress (blockHash: string): Promise; getBlockProgressEntities (where: FindConditions, options: FindManyOptions): Promise @@ -150,6 +152,7 @@ export interface DatabaseInterface { saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise; saveEventEntity (queryRunner: QueryRunner, entity: EventInterface): Promise; removeEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions | FindConditions): Promise; + deleteEntitiesByConditions (queryRunner: QueryRunner, entity: EntityTarget, findConditions: FindConditions): Promise getContracts?: () => Promise saveContract?: (queryRunner: QueryRunner, contractAddress: string, kind: string, checkpoint: boolean, startingBlock: number) => Promise getLatestState (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise @@ -159,6 +162,8 @@ export interface DatabaseInterface { removeStates(dbTx: QueryRunner, blockNumber: number, kind: StateKind): Promise saveOrUpdateState (dbTx: QueryRunner, state: StateInterface): Promise getStateSyncStatus (): Promise + updateStateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise + updateStateSyncStatusCheckpointBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise } export interface GraphDatabaseInterface {