From 39af78050bfb76f6621c34bcc77859ba50f65a39 Mon Sep 17 00:00:00 2001 From: Prathamesh Musale Date: Mon, 27 Nov 2023 14:41:34 +0530 Subject: [PATCH 1/2] Refactor to create init states for template contracts --- .../src/templates/indexer-template.handlebars | 16 ++++++------- packages/graph-node/test/utils/indexer.ts | 2 +- packages/util/src/common.ts | 23 +++++++++++++++---- packages/util/src/index-block.ts | 3 --- packages/util/src/indexer.ts | 1 - packages/util/src/job-runner.ts | 4 ---- packages/util/src/types.ts | 4 ++-- 7 files changed, 29 insertions(+), 24 deletions(-) diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 148b480c..e28a4693 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -487,11 +487,11 @@ export class Indexer implements IndexerInterface { await this.triggerIndexingOnEvent(event, extraData); } - async processBlock (blockProgress: BlockProgress): Promise { - console.time('time:indexer#processBlock-init_state'); + async preEventsBlockProcessing (blockProgress: BlockProgress): Promise { + console.time('time:indexer#preEventsBlockProcessing-init_state'); // Call a function to create initial state for contracts. await this._baseIndexer.createInit(this, blockProgress.blockHash, blockProgress.blockNumber); - console.timeEnd('time:indexer#processBlock-init_state'); + console.timeEnd('time:indexer#preEventsBlockProcessing-init_state'); {{#if (subgraphPath)}} this._graphWatcher.updateEntityCacheFrothyBlocks(blockProgress); @@ -499,16 +499,16 @@ export class Indexer implements IndexerInterface { } {{#if (subgraphPath)}} - async processBlockAfterEvents (blockHash: string, blockNumber: number, extraData: ExtraEventData): Promise { - console.time('time:indexer#processBlockAfterEvents-mapping_code'); + async postEventsBlockProcessing (blockHash: string, blockNumber: number, extraData: ExtraEventData): Promise { + console.time('time:indexer#postEventsBlockProcessing-mapping_code'); // Call subgraph handler for block. await this._graphWatcher.handleBlock(blockHash, blockNumber, extraData); - console.timeEnd('time:indexer#processBlockAfterEvents-mapping_code'); + console.timeEnd('time:indexer#postEventsBlockProcessing-mapping_code'); - console.time('time:indexer#processBlockAfterEvents-dump_subgraph_state'); + console.time('time:indexer#postEventsBlockProcessing-dump_subgraph_state'); // Persist subgraph state to the DB. await this.dumpSubgraphState(blockHash); - console.timeEnd('time:indexer#processBlockAfterEvents-dump_subgraph_state'); + console.timeEnd('time:indexer#postEventsBlockProcessing-dump_subgraph_state'); } {{/if}} diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index c11c4a4c..2d7a4306 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -259,7 +259,7 @@ export class Indexer implements IndexerInterface { return undefined; } - async processBlock (blockProgress: BlockProgressInterface): Promise { + async preEventsBlockProcessing (blockProgress: BlockProgressInterface): Promise { return undefined; } diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index e882b8b5..6d6bb60f 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -174,8 +174,15 @@ export const processBatchEvents = async ( subgraphEventsOrder: boolean; } ): Promise => { - let dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[]; + let updatedDbEvents: EventInterface[]; let isNewContractWatched = false; + let { block: dbBlock } = data; + + // Perform any operations before processing events for this block + // (if this block hasn't already been processed) + if (!dbBlock.isComplete) { + await indexer.preEventsBlockProcessing(data.block); + } if (subgraphEventsOrder) { ({ dbBlock, updatedDbEvents, isNewContractWatched } = await _processEventsInSubgraphOrder(indexer, data, eventsInBatch || DEFAULT_EVENTS_IN_BATCH)); @@ -183,10 +190,9 @@ export const processBatchEvents = async ( ({ dbBlock, updatedDbEvents } = await _processEvents(indexer, data, eventsInBatch || DEFAULT_EVENTS_IN_BATCH)); } - if (indexer.processBlockAfterEvents) { - if (!dbBlock.isComplete) { - await indexer.processBlockAfterEvents(dbBlock.blockHash, dbBlock.blockNumber, data); - } + // Perform any operations after processing events for this block + if (indexer.postEventsBlockProcessing && !dbBlock.isComplete) { + await indexer.postEventsBlockProcessing(dbBlock.blockHash, dbBlock.blockNumber, data); } dbBlock.isComplete = true; @@ -361,6 +367,13 @@ const _processEventsInSubgraphOrder = async ( } } + if (isNewContractWatched) { + // Create init states for newly watched contracts + // (needs to be done before we start processsing their events) + assert(indexer.createInit); + await indexer.createInit(block.blockHash, block.blockNumber); + } + console.time('time:common#processEventsInSubgraphOrder-processing_initially_unwatched_events'); // In the end process events of newly watched contracts for (const updatedDbEvent of updatedDbEvents) { diff --git a/packages/util/src/index-block.ts b/packages/util/src/index-block.ts index c752f45f..c96f9bd5 100644 --- a/packages/util/src/index-block.ts +++ b/packages/util/src/index-block.ts @@ -46,9 +46,6 @@ export const indexBlock = async ( blockProgress = partialblockProgress as BlockProgressInterface; } - assert(indexer.processBlock); - await indexer.processBlock(blockProgress); - await processBatchEvents( indexer, { diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 7825a466..9ce29c16 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -464,7 +464,6 @@ export class Indexer { blockProgress, ethFullBlock: fullBlock, ethFullTransactions: blockEthFullTxs, - block, events: [] }; }); diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 93c2fc09..54f16f79 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -605,10 +605,6 @@ export class JobRunner { }); } - if (!blockProgress.isComplete) { - await this._indexer.processBlock(blockProgress); - } - // Push job to event processing queue. // Block with all events processed or no events will not be processed again due to check in _processEvents. const eventsProcessingJob: EventsJobData = { diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index f1ed7d7a..bf1ec832 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -211,8 +211,8 @@ export interface IndexerInterface { getRelationsMap?: () => Map processInitialState: (contractAddress: string, blockHash: string) => Promise processStateCheckpoint: (contractAddress: string, blockHash: string) => Promise - processBlock: (blockProgres: BlockProgressInterface) => Promise - processBlockAfterEvents?: (blockHash: string, blockNumber: number, data: ExtraEventData) => Promise + preEventsBlockProcessing: (blockProgres: BlockProgressInterface) => Promise + postEventsBlockProcessing?: (blockHash: string, blockNumber: number, data: ExtraEventData) => Promise processCanonicalBlock (blockHash: string, blockNumber: number): Promise processCheckpoint (blockHash: string): Promise processCLICheckpoint (contractAddress: string, blockHash?: string): Promise From 0ae926224c44e53970d7fa1856e55f062a715dcc Mon Sep 17 00:00:00 2001 From: Prathamesh Musale Date: Mon, 27 Nov 2023 16:08:39 +0530 Subject: [PATCH 2/2] Index start blocks to create init states of watched contracts --- packages/util/src/indexer.ts | 58 ++++++++++++++++++++++++++++-------- 1 file changed, 46 insertions(+), 12 deletions(-) diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 9ce29c16..1d5ebc69 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -330,6 +330,22 @@ export class Indexer { return blocks; } + async getBlockWithFullBlock (blockFilter: { blockNumber?: number, blockHash?: string }): Promise<{ block: DeepPartial, fullBlock: EthFullBlock }> { + const [fullBlock] = await this.getBlocks(blockFilter); + assert(fullBlock); + + const block = { + ...fullBlock, + blockTimestamp: Number(fullBlock.timestamp), + blockNumber: Number(fullBlock.blockNumber) + }; + + return { + block: block as DeepPartial, + fullBlock + }; + } + async getBlockProgress (blockHash: string): Promise { return this._db.getBlockProgress(blockHash); } @@ -394,6 +410,17 @@ export class Indexer { const { addresses, topics } = this._createLogsFilters(eventSignaturesMap); + // Create a set of starting blocks of watched contracts in range [fromBlock, toBlock] + // TODO: Optimize (avoid template contracts) + const watchedBlockNumbers = Object.keys(this._watchedContracts).reduce((acc: Set, address: string) => { + const startBlock = this._watchedContracts[address].startingBlock; + if (startBlock >= fromBlock && startBlock <= toBlock) { + acc.add(startBlock); + } + + return acc; + }, new Set()); + const { logs } = await this._ethClient.getLogsForBlockRange({ fromBlock, toBlock, @@ -410,19 +437,13 @@ export class Indexer { // Fetch blocks with transactions for the logs returned console.time(`time:indexer#fetchAndSaveFilteredEventsAndBlocks-fetch-blocks-txs-${fromBlock}-${toBlock}`); const blocksPromises = Array.from(blockLogsMap.keys()).map(async (blockHash) => { - const [fullBlock] = await this._ethClient.getFullBlocks({ blockHash }); - assert(fullBlock); + const { block, fullBlock } = await this.getBlockWithFullBlock({ blockHash }); - const block = { - ...fullBlock, - blockTimestamp: Number(fullBlock.timestamp), - blockNumber: Number(fullBlock.blockNumber) - }; + // Remove this block from watchedBlockNumbers set as it's already fetched + assert(block.blockNumber); + watchedBlockNumbers.delete(block.blockNumber); - return { - block: block as DeepPartial, - fullBlock - }; + return { block, fullBlock }; }); const ethFullTxPromises = txHashes.map(async txHash => { @@ -432,6 +453,19 @@ export class Indexer { const blocks = await Promise.all(blocksPromises); const ethFullTxs = await Promise.all(ethFullTxPromises); + // Fetch starting blocks for watched contracts + const watchedBlocks = await Promise.all( + Array.from(watchedBlockNumbers).map(async (blockNumber) => this.getBlockWithFullBlock({ blockNumber })) + ); + + // Merge and sort the two block lists + const sortedBlocks = [...blocks, ...watchedBlocks].sort((b1, b2) => { + assert(b1.block.blockNumber); + assert(b2.block.blockNumber); + + return b1.block.blockNumber - b2.block.blockNumber; + }); + const ethFullTxsMap = ethFullTxs.reduce((acc: Map, ethFullTx) => { acc.set(ethFullTx.ethTransactionCidByTxHash.txHash, ethFullTx); return acc; @@ -441,7 +475,7 @@ export class Indexer { // Map db ready events according to blockhash console.time(`time:indexer#fetchAndSaveFilteredEventsAndBlocks-db-save-blocks-events-${fromBlock}-${toBlock}`); - const blockWithDbEventsPromises = blocks.map(async ({ block, fullBlock }) => { + const blockWithDbEventsPromises = sortedBlocks.map(async ({ block, fullBlock }) => { const blockHash = block.blockHash; assert(blockHash); const logs = blockLogsMap.get(blockHash) || [];