Refactor to create init states for template contracts

This commit is contained in:
Prathamesh Musale 2023-11-27 14:41:34 +05:30
parent 220f3ddf24
commit 39af78050b
7 changed files with 29 additions and 24 deletions

View File

@ -487,11 +487,11 @@ export class Indexer implements IndexerInterface {
await this.triggerIndexingOnEvent(event, extraData); await this.triggerIndexingOnEvent(event, extraData);
} }
async processBlock (blockProgress: BlockProgress): Promise<void> { async preEventsBlockProcessing (blockProgress: BlockProgress): Promise<void> {
console.time('time:indexer#processBlock-init_state'); console.time('time:indexer#preEventsBlockProcessing-init_state');
// Call a function to create initial state for contracts. // Call a function to create initial state for contracts.
await this._baseIndexer.createInit(this, blockProgress.blockHash, blockProgress.blockNumber); await this._baseIndexer.createInit(this, blockProgress.blockHash, blockProgress.blockNumber);
console.timeEnd('time:indexer#processBlock-init_state'); console.timeEnd('time:indexer#preEventsBlockProcessing-init_state');
{{#if (subgraphPath)}} {{#if (subgraphPath)}}
this._graphWatcher.updateEntityCacheFrothyBlocks(blockProgress); this._graphWatcher.updateEntityCacheFrothyBlocks(blockProgress);
@ -499,16 +499,16 @@ export class Indexer implements IndexerInterface {
} }
{{#if (subgraphPath)}} {{#if (subgraphPath)}}
async processBlockAfterEvents (blockHash: string, blockNumber: number, extraData: ExtraEventData): Promise<void> { async postEventsBlockProcessing (blockHash: string, blockNumber: number, extraData: ExtraEventData): Promise<void> {
console.time('time:indexer#processBlockAfterEvents-mapping_code'); console.time('time:indexer#postEventsBlockProcessing-mapping_code');
// Call subgraph handler for block. // Call subgraph handler for block.
await this._graphWatcher.handleBlock(blockHash, blockNumber, extraData); await this._graphWatcher.handleBlock(blockHash, blockNumber, extraData);
console.timeEnd('time:indexer#processBlockAfterEvents-mapping_code'); console.timeEnd('time:indexer#postEventsBlockProcessing-mapping_code');
console.time('time:indexer#processBlockAfterEvents-dump_subgraph_state'); console.time('time:indexer#postEventsBlockProcessing-dump_subgraph_state');
// Persist subgraph state to the DB. // Persist subgraph state to the DB.
await this.dumpSubgraphState(blockHash); await this.dumpSubgraphState(blockHash);
console.timeEnd('time:indexer#processBlockAfterEvents-dump_subgraph_state'); console.timeEnd('time:indexer#postEventsBlockProcessing-dump_subgraph_state');
} }
{{/if}} {{/if}}

View File

@ -259,7 +259,7 @@ export class Indexer implements IndexerInterface {
return undefined; return undefined;
} }
async processBlock (blockProgress: BlockProgressInterface): Promise<void> { async preEventsBlockProcessing (blockProgress: BlockProgressInterface): Promise<void> {
return undefined; return undefined;
} }

View File

@ -174,8 +174,15 @@ export const processBatchEvents = async (
subgraphEventsOrder: boolean; subgraphEventsOrder: boolean;
} }
): Promise<boolean> => { ): Promise<boolean> => {
let dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[]; let updatedDbEvents: EventInterface[];
let isNewContractWatched = false; let isNewContractWatched = false;
let { block: dbBlock } = data;
// Perform any operations before processing events for this block
// (if this block hasn't already been processed)
if (!dbBlock.isComplete) {
await indexer.preEventsBlockProcessing(data.block);
}
if (subgraphEventsOrder) { if (subgraphEventsOrder) {
({ dbBlock, updatedDbEvents, isNewContractWatched } = await _processEventsInSubgraphOrder(indexer, data, eventsInBatch || DEFAULT_EVENTS_IN_BATCH)); ({ dbBlock, updatedDbEvents, isNewContractWatched } = await _processEventsInSubgraphOrder(indexer, data, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
@ -183,10 +190,9 @@ export const processBatchEvents = async (
({ dbBlock, updatedDbEvents } = await _processEvents(indexer, data, eventsInBatch || DEFAULT_EVENTS_IN_BATCH)); ({ dbBlock, updatedDbEvents } = await _processEvents(indexer, data, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
} }
if (indexer.processBlockAfterEvents) { // Perform any operations after processing events for this block
if (!dbBlock.isComplete) { if (indexer.postEventsBlockProcessing && !dbBlock.isComplete) {
await indexer.processBlockAfterEvents(dbBlock.blockHash, dbBlock.blockNumber, data); await indexer.postEventsBlockProcessing(dbBlock.blockHash, dbBlock.blockNumber, data);
}
} }
dbBlock.isComplete = true; dbBlock.isComplete = true;
@ -361,6 +367,13 @@ const _processEventsInSubgraphOrder = async (
} }
} }
if (isNewContractWatched) {
// Create init states for newly watched contracts
// (needs to be done before we start processsing their events)
assert(indexer.createInit);
await indexer.createInit(block.blockHash, block.blockNumber);
}
console.time('time:common#processEventsInSubgraphOrder-processing_initially_unwatched_events'); console.time('time:common#processEventsInSubgraphOrder-processing_initially_unwatched_events');
// In the end process events of newly watched contracts // In the end process events of newly watched contracts
for (const updatedDbEvent of updatedDbEvents) { for (const updatedDbEvent of updatedDbEvents) {

View File

@ -46,9 +46,6 @@ export const indexBlock = async (
blockProgress = partialblockProgress as BlockProgressInterface; blockProgress = partialblockProgress as BlockProgressInterface;
} }
assert(indexer.processBlock);
await indexer.processBlock(blockProgress);
await processBatchEvents( await processBatchEvents(
indexer, indexer,
{ {

View File

@ -464,7 +464,6 @@ export class Indexer {
blockProgress, blockProgress,
ethFullBlock: fullBlock, ethFullBlock: fullBlock,
ethFullTransactions: blockEthFullTxs, ethFullTransactions: blockEthFullTxs,
block,
events: [] events: []
}; };
}); });

View File

@ -605,10 +605,6 @@ export class JobRunner {
}); });
} }
if (!blockProgress.isComplete) {
await this._indexer.processBlock(blockProgress);
}
// Push job to event processing queue. // Push job to event processing queue.
// Block with all events processed or no events will not be processed again due to check in _processEvents. // Block with all events processed or no events will not be processed again due to check in _processEvents.
const eventsProcessingJob: EventsJobData = { const eventsProcessingJob: EventsJobData = {

View File

@ -211,8 +211,8 @@ export interface IndexerInterface {
getRelationsMap?: () => Map<any, { [key: string]: any }> getRelationsMap?: () => Map<any, { [key: string]: any }>
processInitialState: (contractAddress: string, blockHash: string) => Promise<any> processInitialState: (contractAddress: string, blockHash: string) => Promise<any>
processStateCheckpoint: (contractAddress: string, blockHash: string) => Promise<boolean> processStateCheckpoint: (contractAddress: string, blockHash: string) => Promise<boolean>
processBlock: (blockProgres: BlockProgressInterface) => Promise<void> preEventsBlockProcessing: (blockProgres: BlockProgressInterface) => Promise<void>
processBlockAfterEvents?: (blockHash: string, blockNumber: number, data: ExtraEventData) => Promise<void> postEventsBlockProcessing?: (blockHash: string, blockNumber: number, data: ExtraEventData) => Promise<void>
processCanonicalBlock (blockHash: string, blockNumber: number): Promise<void> processCanonicalBlock (blockHash: string, blockNumber: number): Promise<void>
processCheckpoint (blockHash: string): Promise<void> processCheckpoint (blockHash: string): Promise<void>
processCLICheckpoint (contractAddress: string, blockHash?: string): Promise<string | undefined> processCLICheckpoint (contractAddress: string, blockHash?: string): Promise<string | undefined>