diff --git a/packages/codegen/README.md b/packages/codegen/README.md index 8b466b93..bfdebb6d 100644 --- a/packages/codegen/README.md +++ b/packages/codegen/README.md @@ -82,7 +82,7 @@ * Edit the custom hook function `handleEvent` (triggered on an event) in `src/hooks.ts` to perform corresponding indexing using the `Indexer` object. - * While using the indexer storage methods for indexing, pass the optional arg. `state` as `diff` or `checkpoint` if default state is desired to be generated using the state variables being indexed else pass `none`. + * While using the indexer storage methods for indexing, pass `diff` as true if default state is desired to be generated using the state variables being indexed. * Generating state: @@ -131,6 +131,20 @@ ```bash yarn checkpoint --address --block-hash [block-hash] ``` + + * To reset the watcher to a previous block number: + + * Reset state: + + ```bash + yarn reset state --block-number + ``` + + * Reset job-queue: + + ```bash + yarn reset job-queue --block-number + ``` ## Known Issues diff --git a/packages/codegen/src/templates/database-template.handlebars b/packages/codegen/src/templates/database-template.handlebars index 5c2b1092..c2c1ae35 100644 --- a/packages/codegen/src/templates/database-template.handlebars +++ b/packages/codegen/src/templates/database-template.handlebars @@ -3,7 +3,7 @@ // import assert from 'assert'; -import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions, In, Between } from 'typeorm'; +import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions, MoreThan } from 'typeorm'; import path from 'path'; import { Database as BaseDatabase, QueryOptions, Where, MAX_REORG_DEPTH } from '@vulcanize/util'; @@ -83,17 +83,22 @@ export class Database { return repo.find({ where, relations: ['block'] }); } - async getLatestCheckpoint (queryRunner: QueryRunner, contractAddress: string): Promise { - // Get the latest checkpoints for a contract. - const result = await queryRunner.manager.createQueryBuilder(IPLDBlock, 'ipld_block') + async getLastIPLDBlock (contractAddress: string, kind?: string): Promise { + const repo = this._conn.getRepository(IPLDBlock); + + let queryBuilder = repo.createQueryBuilder('ipld_block') .leftJoinAndSelect('ipld_block.block', 'block') .where('block.is_pruned = false') - .andWhere('ipld_block.contractAddress = :contractAddress', { contractAddress }) - .andWhere('ipld_block.kind = :kind', { kind: 'checkpoint' }) - .orderBy('ipld_block.block_id', 'DESC') - .getOne(); + .andWhere('ipld_block.contract_address = :contractAddress', { contractAddress }) + .orderBy('block.block_number', 'DESC'); - return result; + // Filter using kind if specified else order by id to give preference to checkpoint. + queryBuilder = kind + ? queryBuilder.andWhere('ipld_block.kind = :kind', { kind }) + : queryBuilder.andWhere('ipld_block.kind != :kind', { kind: 'diff_staged' }) + .addOrderBy('ipld_block.id', 'DESC'); + + return queryBuilder.getOne(); } async getPrevIPLDBlock (queryRunner: QueryRunner, blockHash: string, contractAddress: string, kind?: string): Promise { @@ -172,67 +177,23 @@ export class Database { return result; } - async getPrevIPLDBlocksAfterCheckpoint (queryRunner: QueryRunner, blockHash: string, checkpointBlockNumber: number, contractAddress: string): Promise { - const heirerchicalQuery = ` - WITH RECURSIVE cte_query AS - ( - SELECT - b.block_hash, - b.block_number, - b.parent_hash, - 1 as depth, - i.id - FROM - block_progress b - LEFT JOIN - ipld_block i ON i.block_id = b.id - AND i.contract_address = $2 - WHERE - b.block_hash = $1 - UNION ALL - SELECT - b.block_hash, - b.block_number, - b.parent_hash, - c.depth + 1, - i.id - FROM - block_progress b - LEFT JOIN - ipld_block i - ON i.block_id = b.id - AND i.contract_address = $2 - INNER JOIN - cte_query c ON c.parent_hash = b.block_hash - WHERE - c.depth < $3 - ) - SELECT - block_number, id - FROM - cte_query - ORDER BY block_number ASC - `; + async getPrevIPLDBlocksAfterCheckpoint (contractAddress: string, checkpointBlockNumber: number): Promise { + const repo = this._conn.getRepository(IPLDBlock); - // Fetching ids for previous IPLDBlocks in the frothy region. - const queryResult = await queryRunner.query(heirerchicalQuery, [blockHash, contractAddress, MAX_REORG_DEPTH]); - - let frothyIds = queryResult.map((obj: any) => obj.id); - frothyIds = frothyIds.filter((id: any) => id !== null); - - const frothyBlockNumber = queryResult[0].block_number; - - // Fetching all diff blocks after checkpoint till current blockNumber. - const ipldBlocks = await queryRunner.manager.find(IPLDBlock, { + return repo.find({ relations: ['block'], - where: [ - { contractAddress, block: { isPruned: false, blockNumber: Between(checkpointBlockNumber + 1, frothyBlockNumber - 1) } }, - { id: In(frothyIds) } - ], - order: { block: 'ASC' } + where: { + contractAddress, + kind: 'diff', + block: { + isPruned: false, + blockNumber: MoreThan(checkpointBlockNumber) + } + }, + order: { + block: 'ASC' + } }); - - return ipldBlocks; } async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlock): Promise { @@ -268,6 +229,11 @@ export class Database { return repo.find({ where }); } + async getLastCompleteBlock (): Promise { + const repo = this._conn.getRepository(BlockProgress); + return repo.findOne({ where: { isComplete: true }, order: { blockNumber: 'DESC' } }); + } + async getContract (address: string): Promise { const repo = this._conn.getRepository(Contract); @@ -370,11 +336,6 @@ export class Database { return this._baseDatabase.getBlockProgressEntities(repo, where, options); } - async getLatestBlockProgress (): Promise { - const repo = this._conn.getRepository(BlockProgress); - return repo.findOne({ order: { blockNumber: 'DESC' } }); - } - async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); diff --git a/packages/codegen/src/templates/events-template.handlebars b/packages/codegen/src/templates/events-template.handlebars index dcc3e75d..a28b3ac0 100644 --- a/packages/codegen/src/templates/events-template.handlebars +++ b/packages/codegen/src/templates/events-template.handlebars @@ -16,6 +16,7 @@ import { QUEUE_HOOKS, UNKNOWN_EVENT_NAME, UpstreamConfig + JOB_KIND_PRUNE } from '@vulcanize/util'; import { Indexer } from './indexer'; @@ -75,6 +76,23 @@ export class EventWatcher { } await this._baseEventWatcher.blockProcessingCompleteHandler(job); + + const { data: { request: { data: { kind } } } } = job; + + // If it's a pruning job: + // Create a hook job for the latest canonical block. + if (kind === JOB_KIND_PRUNE) { + const syncStatus = await this._indexer.getSyncStatus(); + assert(syncStatus); + + this._jobQueue.pushJob( + QUEUE_HOOKS, + { + blockHash: syncStatus.latestCanonicalBlockHash, + blockNumber: syncStatus.latestCanonicalBlockNumber + } + ); + } }); } @@ -110,14 +128,18 @@ export class EventWatcher { async initHooksOnCompleteHandler (): Promise { this._jobQueue.onComplete(QUEUE_HOOKS, async (job) => { - const { data: { request: { data: { blockHash, blockNumber } } } } = job; + const { data: { request: { data: { blockNumber, blockHash } } } } = job; await this._indexer.updateHookStatusProcessedBlock(blockNumber); - // Push checkpointing job only after post-block hook job is marked complete and checkpointing is on. - if (this._indexer._serverConfig.checkpointing) { - this._jobQueue.pushJob(QUEUE_BLOCK_CHECKPOINT, { blockHash }); - } + // Create a checkpoint job after completion of a hook job. + this._jobQueue.pushJob( + QUEUE_BLOCK_CHECKPOINT, + { + blockHash, + blockNumber + } + ); }); } diff --git a/packages/codegen/src/templates/hooks-template.handlebars b/packages/codegen/src/templates/hooks-template.handlebars index 9e600051..0b7d6d50 100644 --- a/packages/codegen/src/templates/hooks-template.handlebars +++ b/packages/codegen/src/templates/hooks-template.handlebars @@ -7,7 +7,6 @@ import assert from 'assert'; import { UNKNOWN_EVENT_NAME, updateStateForMappingType, updateStateForElementaryType } from '@vulcanize/util'; import { Indexer, ResultEvent } from './indexer'; -import { BlockProgress } from './entity/BlockProgress'; const ACCOUNTS = [ '0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc' @@ -19,9 +18,9 @@ const ACCOUNTS = [ * @param block Concerned block. * @param contractAddress Address of the concerned contract. */ -export async function createInitialCheckpoint (indexer: Indexer, block: BlockProgress, contractAddress: string): Promise { +export async function createInitialCheckpoint (indexer: Indexer, contractAddress: string, blockHash: string): Promise { assert(indexer); - assert(block); + assert(blockHash); assert(contractAddress); // Store the initial state values in an IPLDBlock. @@ -29,12 +28,11 @@ export async function createInitialCheckpoint (indexer: Indexer, block: BlockPro // Setting the initial balances of accounts. for (const account of ACCOUNTS) { - const balance = await indexer._balances(block.blockHash, contractAddress, account); + const balance = await indexer._balances(blockHash, contractAddress, account); ipldBlockData = updateStateForMappingType(ipldBlockData, '_balances', [account], balance.value.toString()); } - const ipldBlock = await indexer.prepareIPLDBlock(block, contractAddress, ipldBlockData, 'checkpoint'); - await indexer.saveOrUpdateIPLDBlock(ipldBlock); + await indexer.createCheckpoint(contractAddress, blockHash, ipldBlockData); } /** @@ -59,7 +57,6 @@ export async function createStateDiff (indexer: Indexer, blockHash: string): Pro continue; } - const block = event.block; const contractAddress = event.contract; const eventData = indexer.getResultEvent(event); @@ -102,8 +99,7 @@ export async function createStateDiff (indexer: Indexer, blockHash: string): Pro } } - const ipldBlock = await indexer.prepareIPLDBlock(block, contractAddress, ipldBlockData, 'diff'); - await indexer.saveOrUpdateIPLDBlock(ipldBlock); + await indexer.createDiff(contractAddress, blockHash, ipldBlockData); } } diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index b59f6138..17a28c3e 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -160,9 +160,7 @@ export class Indexer { async {{query.name}} (blockHash: string, contractAddress: string {{~#each query.params}}, {{this.name~}}: {{this.type~}} {{/each}} {{~#if query.stateVariableType~}} - , state = 'none'): Promise { - assert(_.includes(['diff', 'checkpoint', 'none'], state)); - + , diff = false): Promise { {{else~}} ): Promise { {{/if}} @@ -214,19 +212,19 @@ export class Indexer { {{#if query.stateVariableType}} {{#if (compare query.stateVariableType 'Mapping')}} - if (state !== 'none') { + if (diff) { const stateUpdate = updateStateForMappingType({}, '{{query.name}}', [ {{~#each query.params}} {{~this.name}}.toString() {{~#unless @last}}, {{/unless~}} {{/each~}} ], result.value.toString()); - await this.storeIPLDData(blockHash, contractAddress, stateUpdate, state); + await this.createDiffStaged(contractAddress, blockHash, stateUpdate); } {{else if (compare query.stateVariableType 'ElementaryTypeName')}} - if (state !== 'none') { + if (diff) { const stateUpdate = updateStateForElementaryType({}, '{{query.name}}', result.value.toString()); - await this.storeIPLDData(blockHash, contractAddress, stateUpdate, state); + await this.createDiffStaged(contractAddress, blockHash, stateUpdate); } {{else}} @@ -238,49 +236,125 @@ export class Indexer { } {{/each}} - async processBlock (job: any): Promise { + async processCanonicalBlock (job: any): Promise { const { data: { blockHash } } = job; + // Finalize staged diff blocks if any. + await this.finalizeDiffStaged(blockHash); + // Call custom stateDiff hook. await createStateDiff(this, blockHash); } - async processCheckpoint (job: any): Promise { - // Create a checkpoint IPLDBlock for contracts that were checkpointed checkPointInterval blocks before. + async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise { + const block = await this.getBlockProgress(blockHash); + assert(block); + // Create a staged diff block. + const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, 'diff_staged'); + await this.saveOrUpdateIPLDBlock(ipldBlock); + } + + async finalizeDiffStaged (blockHash: string): Promise { + const block = await this.getBlockProgress(blockHash); + assert(block); + + // Get all the staged diff blocks for the given blockHash. + const stagedBlocks = await this._db.getIPLDBlocks({ block, kind: 'diff_staged' }); + + // For each staged block, create a diff block. + for (const stagedBlock of stagedBlocks) { + const data = codec.decode(Buffer.from(stagedBlock.data)); + await this.createDiff(stagedBlock.contractAddress, stagedBlock.block.blockHash, data); + } + + // Remove all the staged diff blocks for current blockNumber. + await this.removeStagedIPLDBlocks(block.blockNumber); + } + + async createDiff (contractAddress: string, blockHash: string, data: any): Promise { + const block = await this.getBlockProgress(blockHash); + assert(block); + + // Fetch the latest checkpoint for the contract. + const checkpoint = await this.getLastIPLDBlock(contractAddress, 'checkpoint'); + + // There should be an initial checkpoint at least. + assert(checkpoint, 'Initial checkpoint doesn\'t exist'); + + // Check if the latest checkpoint is in the same block. + assert(checkpoint.block.blockHash !== block.blockHash, 'Checkpoint already created for the block hash.'); + + const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, 'diff'); + await this.saveOrUpdateIPLDBlock(ipldBlock); + } + + async processCheckpoint (job: any): Promise { // Return if checkpointInterval is <= 0. const checkpointInterval = this._serverConfig.checkpointInterval; if (checkpointInterval <= 0) return; - const { data: { blockHash: currentBlockHash } } = job; + const { data: { blockHash, blockNumber } } = job; // Get all the contracts. const contracts = await this._db.getContracts({}); // For each contract, merge the diff till now to create a checkpoint. for (const contract of contracts) { + // Check if contract has checkpointing on. if (contract.checkpoint) { - await this.createCheckpoint(contract.address, currentBlockHash, checkpointInterval); + // If a checkpoint doesn't already exist and blockNumber is equal to startingBlock, create an initial checkpoint. + const checkpointBlock = await this.getLastIPLDBlock(contract.address, 'checkpoint'); + + if (!checkpointBlock) { + if (blockNumber === contract.startingBlock) { + await createInitialCheckpoint(this, contract.address, blockHash); + } + } else { + await this.createCheckpoint(contract.address, blockHash, undefined, checkpointInterval); + } } } } - async createCheckpoint (contractAddress: string, currentBlockHash?: string, checkpointInterval?: number): Promise { + async createCheckpoint (contractAddress: string, blockHash?: string, data?: any, checkpointInterval?: number): Promise { + const syncStatus = await this.getSyncStatus(); + assert(syncStatus); + // Getting the current block. let currentBlock; - if (currentBlockHash) { - currentBlock = await this.getBlockProgress(currentBlockHash); + + if (blockHash) { + currentBlock = await this.getBlockProgress(blockHash); } else { - currentBlock = await this._db.getLatestBlockProgress(); + // In case of empty blockHash from checkpoint CLI, get the latest canonical block for the checkpoint. + currentBlock = await this.getBlockProgress(syncStatus.latestCanonicalBlockHash); } + assert(currentBlock); - // Fetching the latest checkpoint for a contract. - // Assuming checkPointInterval > MAX_REORG_DEPTH. - const checkpointBlock = await this.getLatestCheckpoint(contractAddress); + // Data is passed in case of initial checkpoint. + // Assuming there will be no events for the contract in this block. + if (data) { + const ipldBlock = await this.prepareIPLDBlock(currentBlock, contractAddress, data, 'checkpoint'); + await this.saveOrUpdateIPLDBlock(ipldBlock); + + return; + } + + // If data is not passed, create from previous checkpoint and diffs after that. + + // Make sure the block is marked complete. + assert(currentBlock.isComplete, 'Block for a checkpoint should be marked as complete'); + + // Make sure the block is in the pruned region. + assert(currentBlock.blockNumber <= syncStatus.latestCanonicalBlockNumber, 'Block for a checkpoint should be in the pruned region'); + + // Fetch the latest checkpoint for the contract. + const checkpointBlock = await this.getLastIPLDBlock(contractAddress, 'checkpoint'); assert(checkpointBlock); - // Check if it is time for a new checkpoint. + // Check (only if checkpointInterval is passed) if it is time for a new checkpoint. if (checkpointInterval && checkpointBlock.block.blockNumber > (currentBlock.blockNumber - checkpointInterval)) { return; } @@ -288,44 +362,21 @@ export class Indexer { const { block: { blockNumber: checkpointBlockNumber } } = checkpointBlock; // Fetching all diff blocks after checkpoint. - const diffBlocks = await this.getPrevIPLDBlocksAfterCheckpoint(currentBlock.blockHash, checkpointBlockNumber, contractAddress); + const diffBlocks = await this.getPrevIPLDBlocksAfterCheckpoint(contractAddress, checkpointBlockNumber); - let checkPoint = codec.decode(Buffer.from(checkpointBlock.data)) as any; + data = codec.decode(Buffer.from(checkpointBlock.data)) as any; for (const diffBlock of diffBlocks) { const diff = codec.decode(Buffer.from(diffBlock.data)); - checkPoint = _.merge(checkPoint, diff); + data = _.merge(data, diff); } - const ipldBlock = await this.prepareIPLDBlock(currentBlock, contractAddress, checkPoint, 'checkpoint'); + const ipldBlock = await this.prepareIPLDBlock(currentBlock, contractAddress, data, 'checkpoint'); await this.saveOrUpdateIPLDBlock(ipldBlock); return currentBlock.blockHash; } - async getLatestCheckpoint (contractAddress: string): Promise { - // Get the latest checkpoints for a contract. - const dbTx = await this._db.createTransactionRunner(); - let res; - - try { - res = await this._db.getLatestCheckpoint(dbTx, contractAddress); - await dbTx.commitTransaction(); - } catch (error) { - await dbTx.rollbackTransaction(); - throw error; - } finally { - await dbTx.release(); - } - return res; - } - - async getIPLDBlocks (block: BlockProgress, contractAddress: string, kind?: string): Promise { - const ipldBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind }); - - return ipldBlocks; - } - async getIPLDBlockByCid (cid: string): Promise { const ipldBlocks = await this._db.getIPLDBlocks({ cid }); @@ -335,13 +386,8 @@ export class Indexer { return ipldBlocks[0]; } - async getPrevState (blockHash: string, contractAddress: string, kind?: string): Promise { - const ipldBlock = await this.getPrevIPLDBlock(blockHash, contractAddress, kind); - - if (ipldBlock) { - const data = codec.decode(Buffer.from(ipldBlock.data)) as any; - return data.state; - } + async getLastIPLDBlock (contractAddress: string, kind?: string): Promise { + return this._db.getLastIPLDBlock(contractAddress, kind); } async getPrevIPLDBlock (blockHash: string, contractAddress: string, kind?: string): Promise { @@ -360,47 +406,26 @@ export class Indexer { return res; } - async getPrevIPLDBlocksAfterCheckpoint (blockHash: string, checkpointBlockNumber: number, contractAddress: string): Promise { - const dbTx = await this._db.createTransactionRunner(); - let res; - - try { - res = await this._db.getPrevIPLDBlocksAfterCheckpoint(dbTx, blockHash, checkpointBlockNumber, contractAddress); - await dbTx.commitTransaction(); - } catch (error) { - await dbTx.rollbackTransaction(); - throw error; - } finally { - await dbTx.release(); - } - - return res; - } - - async storeIPLDData (blockHash: string, contractAddress: string, data: any, kind: string): Promise { - const block = await this.getBlockProgress(blockHash); - assert(block); - - const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, kind); - await this.saveOrUpdateIPLDBlock(ipldBlock); - } - - async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlock): Promise { - return this._db.saveOrUpdateIPLDBlock(ipldBlock); + async getPrevIPLDBlocksAfterCheckpoint (contractAddress: string, checkpointBlockNumber: number): Promise { + return this._db.getPrevIPLDBlocksAfterCheckpoint(contractAddress, checkpointBlockNumber); } async prepareIPLDBlock (block: BlockProgress, contractAddress: string, data: any, kind: string):Promise { - assert(_.includes(['diff', 'checkpoint'], kind)); + assert(_.includes(['diff', 'checkpoint', 'diff_staged'], kind)); - // Get an existing IPLDBlock for current block and contractAddress. - const currentIPLDBlocks = await this.getIPLDBlocks(block, contractAddress, 'diff'); - // There can be only one IPLDBlock for a (block, contractAddress, 'diff') combination. + // Get an existing 'diff' | 'diff_staged' IPLDBlock for current block, contractAddress. + let currentIPLDBlocks: IPLDBlock[] = []; + if (kind !== 'checkpoint') { + currentIPLDBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind }); + } + + // There can be only one IPLDBlock for a (block, contractAddress, kind) combination. assert(currentIPLDBlocks.length <= 1); const currentIPLDBlock = currentIPLDBlocks[0]; // Update currentIPLDBlock if it exists and is of same kind. let ipldBlock; - if (currentIPLDBlock && currentIPLDBlock.kind === kind) { + if (currentIPLDBlock) { ipldBlock = currentIPLDBlock; // Update the data field. @@ -410,7 +435,7 @@ export class Indexer { ipldBlock = new IPLDBlock(); // Fetch the parent IPLDBlock. - const parentIPLDBlock = await this.getPrevIPLDBlock(block.blockHash, contractAddress); + const parentIPLDBlock = await this.getLastIPLDBlock(contractAddress); // Setting the meta-data for an IPLDBlock (done only once per block). data.meta = { @@ -449,6 +474,24 @@ export class Indexer { return ipldBlock; } + async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlock): Promise { + return this._db.saveOrUpdateIPLDBlock(ipldBlock); + } + + async removeStagedIPLDBlocks (blockNumber: number): Promise { + const dbTx = await this._db.createTransactionRunner(); + + try { + await this._db.removeEntities(dbTx, IPLDBlock, { relations: ['block'], where: { block: { blockNumber }, kind: 'diff_staged' } }); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + } + async triggerIndexingOnEvent (event: Event): Promise { const resultEvent = this.getResultEvent(event); @@ -477,7 +520,7 @@ export class Indexer { {{#each event.params}} {{#if (compare this.type 'bigint')}} {{this.name}}: BigInt(ethers.BigNumber.from({{this.name}}).toString()) - {{else}} + {{~else}} {{this.name}} {{~/if}} {{~#unless @last}},{{/unless}} @@ -492,22 +535,21 @@ export class Indexer { return { eventName, eventInfo }; } - async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { + async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock?: number): Promise { // Use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress) if input to address is a contract address. // If a contract identifier is passed as address instead, no need to convert to checksum address. // Customize: use the kind input to filter out non-contract-address input to address. const formattedAddress = (kind === '__protocol__') ? address : ethers.utils.getAddress(address); - await this._db.saveContract(formattedAddress, kind, checkpoint, startingBlock); - if (checkpoint) { - // Getting the current block. - const currentBlock = await this._db.getLatestBlockProgress(); - assert(currentBlock); + if (!startingBlock) { + const syncStatus = await this.getSyncStatus(); + assert(syncStatus); - // Call custom initial checkpoint hook. - await createInitialCheckpoint(this, currentBlock, address); + startingBlock = syncStatus.latestIndexedBlockNumber; } + await this._db.saveContract(formattedAddress, kind, checkpoint, startingBlock); + return true; } diff --git a/packages/codegen/src/templates/job-runner-template.handlebars b/packages/codegen/src/templates/job-runner-template.handlebars index 44e460fe..8db8f5ce 100644 --- a/packages/codegen/src/templates/job-runner-template.handlebars +++ b/packages/codegen/src/templates/job-runner-template.handlebars @@ -78,14 +78,14 @@ export class JobRunner { const hookStatus = await this._indexer.getHookStatus(); - if (hookStatus && hookStatus.latestProcessedBlockNumber !== blockNumber - 1) { + if (hookStatus && hookStatus.latestProcessedBlockNumber < (blockNumber - 1)) { const message = `Hooks for blockNumber ${blockNumber - 1} not processed yet, aborting`; log(message); throw new Error(message); } - await this._indexer.processBlock(job); + await this._indexer.processCanonicalBlock(job); await this._jobQueue.markComplete(job); }); diff --git a/packages/codegen/src/templates/readme-template.handlebars b/packages/codegen/src/templates/readme-template.handlebars index 79f4359c..c1c0cbe8 100644 --- a/packages/codegen/src/templates/readme-template.handlebars +++ b/packages/codegen/src/templates/readme-template.handlebars @@ -47,7 +47,7 @@ * Edit the custom hook function `handleEvent` (triggered on an event) in [hooks.ts](./src/hooks.ts) to perform corresponding indexing using the `Indexer` object. - * While using the indexer storage methods for indexing, pass the optional arg. `state` as `diff` or `checkpoint` if default state is desired to be generated using the state variables being indexed else pass `none`. + * While using the indexer storage methods for indexing, pass `diff` as true if default state is desired to be generated using the state variables being indexed. * Generating state: @@ -116,4 +116,20 @@ GQL console: http://localhost:3008/graphql ``` * `address`: Address or identifier of the contract for which to create a checkpoint. - * `block-hash`: Hash of the block at which to create the checkpoint (default: current block hash). + * `block-hash`: Hash of a block (in the pruned region) at which to create the checkpoint (default: latest canonical block hash). + + * To reset the watcher to a previous block number: + + * Reset state: + + ```bash + yarn reset state --block-number + ``` + + * Reset job-queue: + + ```bash + yarn reset job-queue --block-number + ``` + + * `block-number`: Block number to reset the watcher to. diff --git a/packages/codegen/src/templates/resolvers-template.handlebars b/packages/codegen/src/templates/resolvers-template.handlebars index 7d90e654..017225fe 100644 --- a/packages/codegen/src/templates/resolvers-template.handlebars +++ b/packages/codegen/src/templates/resolvers-template.handlebars @@ -34,8 +34,9 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch }, Mutation: { - watchContract: (_: any, { address, kind, checkpoint, startingBlock = 1 }: { address: string, kind: string, checkpoint: boolean, startingBlock: number }): Promise => { + watchContract: (_: any, { address, kind, checkpoint, startingBlock }: { address: string, kind: string, checkpoint: boolean, startingBlock: number }): Promise => { log('watchContract', address, kind, checkpoint, startingBlock); + return indexer.watchContract(address, kind, checkpoint, startingBlock); } }, diff --git a/packages/codegen/src/templates/watch-contract-template.handlebars b/packages/codegen/src/templates/watch-contract-template.handlebars index 61603fc6..26866188 100644 --- a/packages/codegen/src/templates/watch-contract-template.handlebars +++ b/packages/codegen/src/templates/watch-contract-template.handlebars @@ -49,7 +49,6 @@ const main = async (): Promise => { }, startingBlock: { type: 'number', - default: 1, describe: 'Starting block' } }).argv; diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 23642deb..6cefa79d 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -15,8 +15,7 @@ import { MAX_REORG_DEPTH, UNKNOWN_EVENT_NAME, QUEUE_BLOCK_PROCESSING, - QUEUE_EVENT_PROCESSING, - QUEUE_HOOKS + QUEUE_EVENT_PROCESSING } from './constants'; import { JobQueue } from './job-queue'; import { EventInterface, IndexerInterface, SyncStatusInterface } from './types'; @@ -235,11 +234,6 @@ export class JobRunner { await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: blockProgress.blockHash, publish: true }); } - if (!blockProgress.numEvents) { - // Push post-block hook and checkpointing jobs if there are no events as the block is already marked as complete. - await this._jobQueue.pushJob(QUEUE_HOOKS, { blockHash, blockNumber }); - } - const indexBlockDuration = new Date().getTime() - indexBlockStartTime.getTime(); log(`time:job-runner#_indexBlock: ${indexBlockDuration}ms`); } diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 5003bbb9..6b04f69f 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -72,7 +72,6 @@ export interface IndexerInterface { parseEventNameAndArgs?: (kind: string, logObj: any) => any; isWatchedContract?: (address: string) => Promise; cacheContract?: (contract: ContractInterface) => void; - processBlock(blockHash: string): Promise; } export interface EventWatcherInterface {