diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 148b480c..e28a4693 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -487,11 +487,11 @@ export class Indexer implements IndexerInterface { await this.triggerIndexingOnEvent(event, extraData); } - async processBlock (blockProgress: BlockProgress): Promise { - console.time('time:indexer#processBlock-init_state'); + async preEventsBlockProcessing (blockProgress: BlockProgress): Promise { + console.time('time:indexer#preEventsBlockProcessing-init_state'); // Call a function to create initial state for contracts. await this._baseIndexer.createInit(this, blockProgress.blockHash, blockProgress.blockNumber); - console.timeEnd('time:indexer#processBlock-init_state'); + console.timeEnd('time:indexer#preEventsBlockProcessing-init_state'); {{#if (subgraphPath)}} this._graphWatcher.updateEntityCacheFrothyBlocks(blockProgress); @@ -499,16 +499,16 @@ export class Indexer implements IndexerInterface { } {{#if (subgraphPath)}} - async processBlockAfterEvents (blockHash: string, blockNumber: number, extraData: ExtraEventData): Promise { - console.time('time:indexer#processBlockAfterEvents-mapping_code'); + async postEventsBlockProcessing (blockHash: string, blockNumber: number, extraData: ExtraEventData): Promise { + console.time('time:indexer#postEventsBlockProcessing-mapping_code'); // Call subgraph handler for block. await this._graphWatcher.handleBlock(blockHash, blockNumber, extraData); - console.timeEnd('time:indexer#processBlockAfterEvents-mapping_code'); + console.timeEnd('time:indexer#postEventsBlockProcessing-mapping_code'); - console.time('time:indexer#processBlockAfterEvents-dump_subgraph_state'); + console.time('time:indexer#postEventsBlockProcessing-dump_subgraph_state'); // Persist subgraph state to the DB. await this.dumpSubgraphState(blockHash); - console.timeEnd('time:indexer#processBlockAfterEvents-dump_subgraph_state'); + console.timeEnd('time:indexer#postEventsBlockProcessing-dump_subgraph_state'); } {{/if}} diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index c11c4a4c..2d7a4306 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -259,7 +259,7 @@ export class Indexer implements IndexerInterface { return undefined; } - async processBlock (blockProgress: BlockProgressInterface): Promise { + async preEventsBlockProcessing (blockProgress: BlockProgressInterface): Promise { return undefined; } diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index e882b8b5..6d6bb60f 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -174,8 +174,15 @@ export const processBatchEvents = async ( subgraphEventsOrder: boolean; } ): Promise => { - let dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[]; + let updatedDbEvents: EventInterface[]; let isNewContractWatched = false; + let { block: dbBlock } = data; + + // Perform any operations before processing events for this block + // (if this block hasn't already been processed) + if (!dbBlock.isComplete) { + await indexer.preEventsBlockProcessing(data.block); + } if (subgraphEventsOrder) { ({ dbBlock, updatedDbEvents, isNewContractWatched } = await _processEventsInSubgraphOrder(indexer, data, eventsInBatch || DEFAULT_EVENTS_IN_BATCH)); @@ -183,10 +190,9 @@ export const processBatchEvents = async ( ({ dbBlock, updatedDbEvents } = await _processEvents(indexer, data, eventsInBatch || DEFAULT_EVENTS_IN_BATCH)); } - if (indexer.processBlockAfterEvents) { - if (!dbBlock.isComplete) { - await indexer.processBlockAfterEvents(dbBlock.blockHash, dbBlock.blockNumber, data); - } + // Perform any operations after processing events for this block + if (indexer.postEventsBlockProcessing && !dbBlock.isComplete) { + await indexer.postEventsBlockProcessing(dbBlock.blockHash, dbBlock.blockNumber, data); } dbBlock.isComplete = true; @@ -361,6 +367,13 @@ const _processEventsInSubgraphOrder = async ( } } + if (isNewContractWatched) { + // Create init states for newly watched contracts + // (needs to be done before we start processsing their events) + assert(indexer.createInit); + await indexer.createInit(block.blockHash, block.blockNumber); + } + console.time('time:common#processEventsInSubgraphOrder-processing_initially_unwatched_events'); // In the end process events of newly watched contracts for (const updatedDbEvent of updatedDbEvents) { diff --git a/packages/util/src/index-block.ts b/packages/util/src/index-block.ts index c752f45f..c96f9bd5 100644 --- a/packages/util/src/index-block.ts +++ b/packages/util/src/index-block.ts @@ -46,9 +46,6 @@ export const indexBlock = async ( blockProgress = partialblockProgress as BlockProgressInterface; } - assert(indexer.processBlock); - await indexer.processBlock(blockProgress); - await processBatchEvents( indexer, { diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 7825a466..9ce29c16 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -464,7 +464,6 @@ export class Indexer { blockProgress, ethFullBlock: fullBlock, ethFullTransactions: blockEthFullTxs, - block, events: [] }; }); diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 93c2fc09..54f16f79 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -605,10 +605,6 @@ export class JobRunner { }); } - if (!blockProgress.isComplete) { - await this._indexer.processBlock(blockProgress); - } - // Push job to event processing queue. // Block with all events processed or no events will not be processed again due to check in _processEvents. const eventsProcessingJob: EventsJobData = { diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index f1ed7d7a..bf1ec832 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -211,8 +211,8 @@ export interface IndexerInterface { getRelationsMap?: () => Map processInitialState: (contractAddress: string, blockHash: string) => Promise processStateCheckpoint: (contractAddress: string, blockHash: string) => Promise - processBlock: (blockProgres: BlockProgressInterface) => Promise - processBlockAfterEvents?: (blockHash: string, blockNumber: number, data: ExtraEventData) => Promise + preEventsBlockProcessing: (blockProgres: BlockProgressInterface) => Promise + postEventsBlockProcessing?: (blockHash: string, blockNumber: number, data: ExtraEventData) => Promise processCanonicalBlock (blockHash: string, blockNumber: number): Promise processCheckpoint (blockHash: string): Promise processCLICheckpoint (contractAddress: string, blockHash?: string): Promise