Process template events at last when following subgraph events order (#439)

* Process template events at last when following subgraph events order

* Update mock test indexer
This commit is contained in:
prathamesh0 2023-10-26 12:05:29 +05:30 committed by GitHub
parent b63a93d8a0
commit 445d5a9293
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 156 additions and 18 deletions

View File

@ -95,7 +95,7 @@ export class IndexBlockCmd {
assert(indexer); assert(indexer);
assert(database); assert(database);
await indexBlock(indexer, config.jobQueue.eventsInBatch, this._argv); await indexBlock(indexer, config.jobQueue.eventsInBatch, config.jobQueue.subgraphEventsOrder, this._argv);
await database.close(); await database.close();
} }

View File

@ -564,6 +564,10 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.saveEventEntity(dbEvent); return this._baseIndexer.saveEventEntity(dbEvent);
} }
async saveEvents (dbEvents: Event[]): Promise<void> {
return this._baseIndexer.saveEvents(dbEvents);
}
async getEventsByFilter (blockHash: string, contract?: string, name?: string): Promise<Array<Event>> { async getEventsByFilter (blockHash: string, contract?: string, name?: string): Promise<Array<Event>> {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name); return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
} }
@ -572,6 +576,10 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.isWatchedContract(address); return this._baseIndexer.isWatchedContract(address);
} }
getWatchedContracts (): Contract[] {
return this._baseIndexer.getWatchedContracts();
}
getContractsByKind (kind: string): Contract[] { getContractsByKind (kind: string): Contract[] {
return this._baseIndexer.getContractsByKind(kind); return this._baseIndexer.getContractsByKind(kind);
} }

View File

@ -177,6 +177,10 @@ export class Indexer implements IndexerInterface {
return dbEvent; return dbEvent;
} }
async saveEvents (dbEvents: EventInterface[]): Promise<void> {
assert(dbEvents);
}
async processEvent (event: EventInterface): Promise<void> { async processEvent (event: EventInterface): Promise<void> {
assert(event); assert(event);
} }
@ -201,6 +205,10 @@ export class Indexer implements IndexerInterface {
return undefined; return undefined;
} }
getWatchedContracts (): ContractInterface[] {
return [];
}
async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> { async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> {
return undefined; return undefined;
} }

View File

@ -255,12 +255,37 @@ export const _fetchBatchBlocks = async (
* @param block * @param block
* @param eventsInBatch * @param eventsInBatch
*/ */
export const processBatchEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<void> => { export const processBatchEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number, subgraphEventsOrder: boolean): Promise<void> => {
let dbBlock: BlockProgressInterface, dbEvents: EventInterface[];
if (subgraphEventsOrder) {
({ dbBlock, dbEvents } = await processEventsInSubgraphOrder(indexer, block, eventsInBatch));
} else {
({ dbBlock, dbEvents } = await processEvents(indexer, block, eventsInBatch));
}
if (indexer.processBlockAfterEvents) {
if (!dbBlock.isComplete) {
await indexer.processBlockAfterEvents(block.blockHash, block.blockNumber);
}
}
dbBlock.isComplete = true;
console.time('time:common#processBatchEvents-updateBlockProgress-saveEvents');
await Promise.all([
indexer.updateBlockProgress(dbBlock, dbBlock.lastProcessedEventIndex),
indexer.saveEvents(dbEvents)
]);
console.timeEnd('time:common#processBatchEvents-updateBlockProgress-saveEvents');
};
export const processEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => {
const dbEvents: EventInterface[] = [];
let page = 0; let page = 0;
// Check if block processing is complete. // Check if block processing is complete.
while (block.numProcessedEvents < block.numEvents) { while (block.numProcessedEvents < block.numEvents) {
console.time('time:common#processBacthEvents-fetching_events_batch'); console.time('time:common#processEvents-fetching_events_batch');
// Fetch events in batches // Fetch events in batches
const events = await indexer.getBlockEvents( const events = await indexer.getBlockEvents(
@ -274,16 +299,16 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block
} }
); );
console.timeEnd('time:common#processBacthEvents-fetching_events_batch'); console.timeEnd('time:common#processEvents-fetching_events_batch');
if (events.length) { if (events.length) {
log(`Processing events batch from index ${events[0].index} to ${events[0].index + events.length - 1}`); log(`Processing events batch from index ${events[0].index} to ${events[0].index + events.length - 1}`);
} }
console.time('time:common#processBatchEvents-processing_events_batch'); console.time('time:common#processEvents-processing_events_batch');
// Process events in loop // Process events in loop
for (let event of events) { for (const event of events) {
// Skipping check for order of events processing since logIndex in FEVM is not index of log in block // Skipping check for order of events processing since logIndex in FEVM is not index of log in block
// Check was introduced to avoid reprocessing block events incase of restarts. But currently on restarts, unprocessed block is removed and reprocessed from first event log // Check was introduced to avoid reprocessing block events incase of restarts. But currently on restarts, unprocessed block is removed and reprocessed from first event log
// if (event.index <= block.lastProcessedEventIndex) { // if (event.index <= block.lastProcessedEventIndex) {
@ -308,28 +333,122 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block
...logObj, ...logObj,
eventSignature eventSignature
}); });
event = await indexer.saveEventEntity(event);
// Save updated event to the db
dbEvents.push(event);
} }
await indexer.processEvent(event); await indexer.processEvent(event);
} }
block = await indexer.updateBlockProgress(block, event.index); block.lastProcessedEventIndex = event.index;
block.numProcessedEvents++;
} }
console.timeEnd('time:common#processBatchEvents-processing_events_batch'); console.timeEnd('time:common#processEvents-processing_events_batch');
} }
if (indexer.processBlockAfterEvents) { return { dbBlock: block, dbEvents };
if (!block.isComplete) { };
await indexer.processBlockAfterEvents(block.blockHash, block.blockNumber);
export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => {
// Create list of initially watched contracts
const initiallyWatchedContracts: string[] = indexer.getWatchedContracts().map(contract => contract.address);
const unwatchedContractEvents: EventInterface[] = [];
const dbEvents: EventInterface[] = [];
let page = 0;
// Check if we are out of events.
let numFetchedEvents = 0;
while (numFetchedEvents < block.numEvents) {
console.time('time:common#processEventsInSubgraphOrder-fetching_events_batch');
// Fetch events in batches
const events = await indexer.getBlockEvents(
block.blockHash,
{},
{
skip: (page++) * (eventsInBatch || DEFAULT_EVENTS_IN_BATCH),
limit: eventsInBatch || DEFAULT_EVENTS_IN_BATCH,
orderBy: 'index',
orderDirection: OrderDirection.asc
}
);
numFetchedEvents += events.length;
console.timeEnd('time:common#processEventsInSubgraphOrder-fetching_events_batch');
if (events.length) {
log(`Processing events batch from index ${events[0].index} to ${events[0].index + events.length - 1}`);
} }
console.time('time:common#processEventsInSubgraphOrder-processing_events_batch');
// First process events for initially watched contracts
const watchedContractEvents: EventInterface[] = [];
events.forEach(event => {
if (initiallyWatchedContracts.includes(event.contract)) {
watchedContractEvents.push(event);
} else {
unwatchedContractEvents.push(event);
}
});
// Process known events in a loop
for (const event of watchedContractEvents) {
// Skipping check for order of events processing since logIndex in FEVM is not index of log in block
// Check was introduced to avoid reprocessing block events incase of restarts. But currently on restarts, unprocessed block is removed and reprocessed from first event log
// if (event.index <= block.lastProcessedEventIndex) {
// throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash}, got event index ${eventIndex} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`);
// }
await indexer.processEvent(event);
block.lastProcessedEventIndex = event.index;
block.numProcessedEvents++;
}
console.timeEnd('time:common#processEventsInSubgraphOrder-processing_events_batch');
} }
block.isComplete = true; console.time('time:common#processEventsInSubgraphOrder-processing_unwatched_events');
console.time('time:common#processBatchEvents-updateBlockProgress');
await indexer.updateBlockProgress(block, block.lastProcessedEventIndex); // At last, process all the events of newly watched contracts
console.timeEnd('time:common#processBatchEvents-updateBlockProgress'); for (const event of unwatchedContractEvents) {
const watchedContract = indexer.isWatchedContract(event.contract);
if (watchedContract) {
// We might not have parsed this event yet. This can happen if the contract was added
// as a result of a previous event in the same block.
if (event.eventName === UNKNOWN_EVENT_NAME) {
const logObj = JSON.parse(event.extraInfo);
assert(indexer.parseEventNameAndArgs);
assert(typeof watchedContract !== 'boolean');
const { eventName, eventInfo, eventSignature } = indexer.parseEventNameAndArgs(watchedContract.kind, logObj);
event.eventName = eventName;
event.eventInfo = JSONbigNative.stringify(eventInfo);
event.extraInfo = JSONbigNative.stringify({
...logObj,
eventSignature
});
// Save updated event to the db
dbEvents.push(event);
}
await indexer.processEvent(event);
}
block.lastProcessedEventIndex = Math.max(block.lastProcessedEventIndex + 1, event.index);
block.numProcessedEvents++;
}
console.timeEnd('time:common#processEventsInSubgraphOrder-processing_unwatched_events');
return { dbBlock: block, dbEvents };
}; };
/** /**

View File

@ -10,6 +10,7 @@ import { processBatchEvents } from './common';
export const indexBlock = async ( export const indexBlock = async (
indexer: IndexerInterface, indexer: IndexerInterface,
eventsInBatch: number, eventsInBatch: number,
subgraphEventsOrder: boolean,
argv: { argv: {
block: number, block: number,
} }
@ -44,6 +45,6 @@ export const indexBlock = async (
assert(indexer.processBlock); assert(indexer.processBlock);
await indexer.processBlock(blockProgress); await indexer.processBlock(blockProgress);
await processBatchEvents(indexer, blockProgress, eventsInBatch); await processBatchEvents(indexer, blockProgress, eventsInBatch, subgraphEventsOrder);
} }
}; };

View File

@ -461,7 +461,7 @@ export class JobRunner {
const { block } = prefetchedBlock; const { block } = prefetchedBlock;
console.time('time:job-runner#_processEvents-events'); console.time('time:job-runner#_processEvents-events');
await processBatchEvents(this._indexer, block, this._jobQueueConfig.eventsInBatch); await processBatchEvents(this._indexer, block, this._jobQueueConfig.eventsInBatch, this._jobQueueConfig.subgraphEventsOrder);
console.timeEnd('time:job-runner#_processEvents-events'); console.timeEnd('time:job-runner#_processEvents-events');
// Update metrics // Update metrics

View File

@ -106,9 +106,11 @@ export interface IndexerInterface {
updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface> updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface>
markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise<void> markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise<void>
saveEventEntity (dbEvent: EventInterface): Promise<EventInterface> saveEventEntity (dbEvent: EventInterface): Promise<EventInterface>
saveEvents (dbEvents: EventInterface[]): Promise<void>
processEvent (event: EventInterface): Promise<void> processEvent (event: EventInterface): Promise<void>
parseEventNameAndArgs?: (kind: string, logObj: any) => any parseEventNameAndArgs?: (kind: string, logObj: any) => any
isWatchedContract: (address: string) => ContractInterface | undefined; isWatchedContract: (address: string) => ContractInterface | undefined;
getWatchedContracts: () => ContractInterface[]
getContractsByKind?: (kind: string) => ContractInterface[] getContractsByKind?: (kind: string) => ContractInterface[]
addContracts?: () => Promise<void> addContracts?: () => Promise<void>
cacheContract: (contract: ContractInterface) => void; cacheContract: (contract: ContractInterface) => void;