diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index b8a308a5..b3a387f9 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -500,11 +500,11 @@ export class Indexer implements IndexerInterface { await this.triggerIndexingOnEvent(event, extraData); } - async processBlock (blockProgress: BlockProgress): Promise { - console.time('time:indexer#processBlock-init_state'); + async preEventsBlockProcessing (blockProgress: BlockProgress): Promise { + console.time('time:indexer#preEventsBlockProcessing-init_state'); // Call a function to create initial state for contracts. await this._baseIndexer.createInit(this, blockProgress.blockHash, blockProgress.blockNumber); - console.timeEnd('time:indexer#processBlock-init_state'); + console.timeEnd('time:indexer#preEventsBlockProcessing-init_state'); {{#if (subgraphPath)}} this._graphWatcher.updateEntityCacheFrothyBlocks(blockProgress); @@ -512,16 +512,16 @@ export class Indexer implements IndexerInterface { } {{#if (subgraphPath)}} - async processBlockAfterEvents (blockHash: string, blockNumber: number, extraData: ExtraEventData): Promise { - console.time('time:indexer#processBlockAfterEvents-mapping_code'); + async postEventsBlockProcessing (blockHash: string, blockNumber: number, extraData: ExtraEventData): Promise { + console.time('time:indexer#postEventsBlockProcessing-mapping_code'); // Call subgraph handler for block. await this._graphWatcher.handleBlock(blockHash, blockNumber, extraData); - console.timeEnd('time:indexer#processBlockAfterEvents-mapping_code'); + console.timeEnd('time:indexer#postEventsBlockProcessing-mapping_code'); - console.time('time:indexer#processBlockAfterEvents-dump_subgraph_state'); + console.time('time:indexer#postEventsBlockProcessing-dump_subgraph_state'); // Persist subgraph state to the DB. await this.dumpSubgraphState(blockHash); - console.timeEnd('time:indexer#processBlockAfterEvents-dump_subgraph_state'); + console.timeEnd('time:indexer#postEventsBlockProcessing-dump_subgraph_state'); } {{/if}} diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index 660aba1e..e92c08c7 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -260,7 +260,7 @@ export class Indexer implements IndexerInterface { return undefined; } - async processBlock (blockProgress: BlockProgressInterface): Promise { + async preEventsBlockProcessing (blockProgress: BlockProgressInterface): Promise { return undefined; } diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index 049ff22a..c9654391 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -176,8 +176,15 @@ export const processBatchEvents = async ( subgraphEventsOrder: boolean; } ): Promise => { - let dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[]; + let updatedDbEvents: EventInterface[]; let isNewContractWatched = false; + let { block: dbBlock } = data; + + // Perform any operations before processing events for this block + // (if this block hasn't already been processed) + if (!dbBlock.isComplete) { + await indexer.preEventsBlockProcessing(data.block); + } if (subgraphEventsOrder) { ({ dbBlock, updatedDbEvents, isNewContractWatched } = await _processEventsInSubgraphOrder(indexer, data, eventsInBatch || DEFAULT_EVENTS_IN_BATCH)); @@ -185,10 +192,9 @@ export const processBatchEvents = async ( ({ dbBlock, updatedDbEvents } = await _processEvents(indexer, data, eventsInBatch || DEFAULT_EVENTS_IN_BATCH)); } - if (indexer.processBlockAfterEvents) { - if (!dbBlock.isComplete) { - await indexer.processBlockAfterEvents(dbBlock.blockHash, dbBlock.blockNumber, data); - } + // Perform any operations after processing events for this block + if (indexer.postEventsBlockProcessing && !dbBlock.isComplete) { + await indexer.postEventsBlockProcessing(dbBlock.blockHash, dbBlock.blockNumber, data); } dbBlock.isComplete = true; @@ -369,6 +375,13 @@ const _processEventsInSubgraphOrder = async ( } } + if (isNewContractWatched) { + // Create init states for newly watched contracts + // (needs to be done before we start processsing their events) + assert(indexer.createInit); + await indexer.createInit(block.blockHash, block.blockNumber); + } + console.time('time:common#processEventsInSubgraphOrder-processing_initially_unwatched_events'); // In the end process events of newly watched contracts for (const updatedDbEvent of updatedDbEvents) { diff --git a/packages/util/src/index-block.ts b/packages/util/src/index-block.ts index c752f45f..c96f9bd5 100644 --- a/packages/util/src/index-block.ts +++ b/packages/util/src/index-block.ts @@ -46,9 +46,6 @@ export const indexBlock = async ( blockProgress = partialblockProgress as BlockProgressInterface; } - assert(indexer.processBlock); - await indexer.processBlock(blockProgress); - await processBatchEvents( indexer, { diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 395ba187..53e85bb0 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -394,6 +394,22 @@ export class Indexer { return blocks; } + async getBlockWithFullBlock (blockFilter: { blockNumber?: number, blockHash?: string }): Promise<{ block: DeepPartial, fullBlock: EthFullBlock }> { + const [fullBlock] = await this.getBlocks(blockFilter); + assert(fullBlock); + + const block = { + ...fullBlock, + blockTimestamp: Number(fullBlock.timestamp), + blockNumber: Number(fullBlock.blockNumber) + }; + + return { + block: block as DeepPartial, + fullBlock + }; + } + async getBlockProgress (blockHash: string): Promise { return this._db.getBlockProgress(blockHash); } @@ -458,6 +474,17 @@ export class Indexer { const { addresses, topics } = this._createLogsFilters(eventSignaturesMap); + // Create a set of starting blocks of watched contracts in range [fromBlock, toBlock] + // TODO: Optimize (avoid template contracts) + const watchedBlockNumbers = Object.keys(this._watchedContracts).reduce((acc: Set, address: string) => { + const startBlock = this._watchedContracts[address].startingBlock; + if (startBlock >= fromBlock && startBlock <= toBlock) { + acc.add(startBlock); + } + + return acc; + }, new Set()); + const { logs } = await this._ethClient.getLogsForBlockRange({ fromBlock, toBlock, @@ -474,19 +501,13 @@ export class Indexer { // Fetch blocks with transactions for the logs returned console.time(`time:indexer#fetchAndSaveFilteredEventsAndBlocks-fetch-blocks-txs-${fromBlock}-${toBlock}`); const blocksPromises = Array.from(blockLogsMap.keys()).map(async (blockHash) => { - const [fullBlock] = await this._ethClient.getFullBlocks({ blockHash }); - assert(fullBlock); + const { block, fullBlock } = await this.getBlockWithFullBlock({ blockHash }); - const block = { - ...fullBlock, - blockTimestamp: Number(fullBlock.timestamp), - blockNumber: Number(fullBlock.blockNumber) - }; + // Remove this block from watchedBlockNumbers set as it's already fetched + assert(block.blockNumber); + watchedBlockNumbers.delete(block.blockNumber); - return { - block: block as DeepPartial, - fullBlock - }; + return { block, fullBlock }; }); const ethFullTxPromises = txHashes.map(async txHash => { @@ -496,6 +517,19 @@ export class Indexer { const blocks = await Promise.all(blocksPromises); const ethFullTxs = await Promise.all(ethFullTxPromises); + // Fetch starting blocks for watched contracts + const watchedBlocks = await Promise.all( + Array.from(watchedBlockNumbers).map(async (blockNumber) => this.getBlockWithFullBlock({ blockNumber })) + ); + + // Merge and sort the two block lists + const sortedBlocks = [...blocks, ...watchedBlocks].sort((b1, b2) => { + assert(b1.block.blockNumber); + assert(b2.block.blockNumber); + + return b1.block.blockNumber - b2.block.blockNumber; + }); + const ethFullTxsMap = ethFullTxs.reduce((acc: Map, ethFullTx) => { acc.set(ethFullTx.ethTransactionCidByTxHash.txHash, ethFullTx); return acc; @@ -505,7 +539,7 @@ export class Indexer { // Map db ready events according to blockhash console.time(`time:indexer#fetchAndSaveFilteredEventsAndBlocks-db-save-blocks-events-${fromBlock}-${toBlock}`); - const blockWithDbEventsPromises = blocks.map(async ({ block, fullBlock }) => { + const blockWithDbEventsPromises = sortedBlocks.map(async ({ block, fullBlock }) => { const blockHash = block.blockHash; assert(blockHash); const logs = blockLogsMap.get(blockHash) || []; @@ -528,7 +562,6 @@ export class Indexer { blockProgress, ethFullBlock: fullBlock, ethFullTransactions: blockEthFullTxs, - block, events: [] }; }); diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index f544c290..d822d146 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -681,10 +681,6 @@ export class JobRunner { }); } - if (!blockProgress.isComplete) { - await this._indexer.processBlock(blockProgress); - } - // Push job to event processing queue. // Block with all events processed or no events will not be processed again due to check in _processEvents. await this._pushEventProcessingJobsForBlocks([blockProgress], true); diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 2d677a09..b4eb1c86 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -213,8 +213,8 @@ export interface IndexerInterface { getRelationsMap?: () => Map processInitialState: (contractAddress: string, blockHash: string) => Promise processStateCheckpoint: (contractAddress: string, blockHash: string) => Promise - processBlock: (blockProgres: BlockProgressInterface) => Promise - processBlockAfterEvents?: (blockHash: string, blockNumber: number, data: ExtraEventData) => Promise + preEventsBlockProcessing: (blockProgres: BlockProgressInterface) => Promise + postEventsBlockProcessing?: (blockHash: string, blockNumber: number, data: ExtraEventData) => Promise processCanonicalBlock (blockHash: string, blockNumber: number): Promise processCheckpoint (blockHash: string): Promise processCLICheckpoint (contractAddress: string, blockHash?: string): Promise