Handle subgraph template create when filterLogsByAddresses set to true (#447)

* Handle subgraph template create when filterLogsByAddresses set to true

* Fix historical processing stopping after running for multiple batches

* Add new method in graph-node test dummy indexer
This commit is contained in:
Nabarun Gogoi 2023-11-03 14:08:55 +05:30 committed by GitHub
parent 7f37dac888
commit 0d7e3ddc8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 168 additions and 41 deletions

View File

@ -30,8 +30,8 @@
filterLogsByTopics = false
# Boolean to switch between modes of processing events when starting the server.
# Setting to true will fetch filtered events and required blocks in a range of blocks and then process them
# Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head)
# Setting to true will fetch filtered events and required blocks in a range of blocks and then process them.
# Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head).
useBlockRanges = true
# Max block range for which to return events in eventsInRange GQL query.

View File

@ -119,6 +119,14 @@ export class Indexer implements IndexerInterface {
return [];
}
async fetchEventsForContracts (blockHash: string, blockNumber: number, addresses: string[]): Promise<DeepPartial<EventInterface>[]> {
assert(blockHash);
assert(blockNumber);
assert(addresses);
return [];
}
async saveBlockAndFetchEvents (block: BlockProgressInterface): Promise<[BlockProgressInterface, DeepPartial<EventInterface>[]]> {
return [block, []];
}

View File

@ -281,12 +281,19 @@ export const _fetchBatchBlocks = async (
* @param block
* @param eventsInBatch
*/
export const processBatchEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number, subgraphEventsOrder: boolean): Promise<void> => {
let dbBlock: BlockProgressInterface, dbEvents: EventInterface[];
export const processBatchEvents = async (
indexer: IndexerInterface,
block: BlockProgressInterface,
eventsInBatch: number,
subgraphEventsOrder: boolean
): Promise<boolean> => {
let dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[];
let isNewContractWatched = false;
if (subgraphEventsOrder) {
({ dbBlock, dbEvents } = await _processEventsInSubgraphOrder(indexer, block, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
({ dbBlock, updatedDbEvents, isNewContractWatched } = await _processEventsInSubgraphOrder(indexer, block, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
} else {
({ dbBlock, dbEvents } = await _processEvents(indexer, block, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
({ dbBlock, updatedDbEvents } = await _processEvents(indexer, block, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
}
if (indexer.processBlockAfterEvents) {
@ -300,13 +307,15 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block
console.time('time:common#processBatchEvents-updateBlockProgress-saveEvents');
await Promise.all([
indexer.updateBlockProgress(dbBlock, dbBlock.lastProcessedEventIndex),
indexer.saveEvents(dbEvents)
indexer.saveEvents(updatedDbEvents)
]);
console.timeEnd('time:common#processBatchEvents-updateBlockProgress-saveEvents');
return isNewContractWatched;
};
const _processEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => {
const dbEvents: EventInterface[] = [];
const _processEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[] }> => {
const updatedDbEvents: EventInterface[] = [];
let page = 0;
let numFetchedEvents = 0;
@ -344,7 +353,7 @@ const _processEvents = async (indexer: IndexerInterface, block: BlockProgressInt
if (event.eventName === UNKNOWN_EVENT_NAME) {
// Parse the unknown event and save updated event to the db
event = _parseUnknownEvent(indexer, event, watchedContract.kind);
dbEvents.push(event);
updatedDbEvents.push(event);
}
await indexer.processEvent(event);
@ -357,15 +366,18 @@ const _processEvents = async (indexer: IndexerInterface, block: BlockProgressInt
console.timeEnd('time:common#processEvents-processing_events_batch');
}
return { dbBlock: block, dbEvents };
// TODO: Fetch and reprocess events if filterByAddresses true and new contracts found
return { dbBlock: block, updatedDbEvents: updatedDbEvents };
};
const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => {
const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[], isNewContractWatched: boolean }> => {
// Create list of initially watched contracts
const initiallyWatchedContracts: string[] = indexer.getWatchedContracts().map(contract => contract.address);
const unwatchedContractEvents: EventInterface[] = [];
let isNewContractWatched = false;
const dbEvents: EventInterface[] = [];
const updatedDbEvents: EventInterface[] = [];
let page = 0;
let numFetchedEvents = 0;
@ -408,9 +420,26 @@ const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: B
console.timeEnd('time:common#processEventsInSubgraphOrder-processing_events_batch');
}
console.time('time:common#processEventsInSubgraphOrder-processing_unwatched_events');
const watchedContracts = indexer.getWatchedContracts().map(contract => contract.address);
// At last, process all the events of newly watched contracts
// Check if there are new watched contracts
if (watchedContracts.length > initiallyWatchedContracts.length) {
isNewContractWatched = true;
// Check if filterLogsByAddresses is set to true
if (indexer.serverConfig.filterLogsByAddresses) {
// Fetch and parse events for newly watched contracts
const newContracts = watchedContracts.filter(contract => !initiallyWatchedContracts.includes(contract));
const events = await indexer.fetchEventsForContracts(block.blockHash, block.blockNumber, newContracts);
events.forEach(event => {
event.block = block;
updatedDbEvents.push(event as EventInterface);
});
}
}
// Parse events of initially unwatched contracts
for (let event of unwatchedContractEvents) {
const watchedContract = indexer.isWatchedContract(event.contract);
@ -420,19 +449,22 @@ const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: B
if (event.eventName === UNKNOWN_EVENT_NAME) {
// Parse the unknown event and save updated event to the db
event = _parseUnknownEvent(indexer, event, watchedContract.kind);
dbEvents.push(event);
updatedDbEvents.push(event);
}
await indexer.processEvent(event);
}
block.lastProcessedEventIndex = Math.max(block.lastProcessedEventIndex + 1, event.index);
block.numProcessedEvents++;
}
console.timeEnd('time:common#processEventsInSubgraphOrder-processing_unwatched_events');
console.time('time:common#processEventsInSubgraphOrder-processing_initially_unwatched_events');
// In the end process events of newly watched contracts
for (const updatedDbEvent of updatedDbEvents) {
await indexer.processEvent(updatedDbEvent);
return { dbBlock: block, dbEvents };
block.lastProcessedEventIndex = Math.max(block.lastProcessedEventIndex + 1, updatedDbEvent.index);
block.numProcessedEvents++;
}
console.timeEnd('time:common#processEventsInSubgraphOrder-processing_initially_unwatched_events');
return { dbBlock: block, updatedDbEvents: updatedDbEvents, isNewContractWatched };
};
const _getEventsBatch = async (indexer: IndexerInterface, blockHash: string, eventsInBatch: number, page: number): Promise<EventInterface[]> => {

View File

@ -206,8 +206,8 @@ export interface ServerConfig {
clearEntitiesCacheInterval: number;
// Boolean to switch between modes of processing events when starting the server.
// Setting to true will fetch filtered events and required blocks in a range of blocks and then process them
// Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head)
// Setting to true will fetch filtered events and required blocks in a range of blocks and then process them.
// Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head).
useBlockRanges: boolean;
// Boolean to skip updating entity fields required in state creation and not required in the frontend.

View File

@ -18,6 +18,9 @@ import { ServerConfig } from './config';
const EVENT = 'event';
// TODO: Make configurable
const HISTORICAL_MAX_FETCH_AHEAD = 20_000;
const log = debug('vulcanize:events');
export const BlockProgressEvent = 'block-progress-event';
@ -94,7 +97,13 @@ export class EventWatcher {
// Check if filter for logs is enabled
// Check if starting block for watcher is before latest canonical block
if (this._serverConfig.useBlockRanges && startBlockNumber < latestCanonicalBlockNumber) {
await this.startHistoricalBlockProcessing(startBlockNumber, latestCanonicalBlockNumber);
let endBlockNumber = latestCanonicalBlockNumber;
if (HISTORICAL_MAX_FETCH_AHEAD > 0) {
endBlockNumber = Math.min(startBlockNumber + HISTORICAL_MAX_FETCH_AHEAD, endBlockNumber);
}
await this.startHistoricalBlockProcessing(startBlockNumber, endBlockNumber);
return;
}
@ -103,6 +112,8 @@ export class EventWatcher {
}
async startHistoricalBlockProcessing (startBlockNumber: number, endBlockNumber: number): Promise<void> {
// TODO: Wait for events job queue to be empty so that historical processing does not move far ahead
this._historicalProcessingEndBlockNumber = endBlockNumber;
log(`Starting historical block processing up to block ${this._historicalProcessingEndBlockNumber}`);
@ -184,15 +195,15 @@ export class EventWatcher {
async historicalProcessingCompleteHandler (job: PgBoss.Job<any>): Promise<void> {
const { id, data: { failed, request: { data } } } = job;
const { blockNumber, processingEndBlockNumber }: HistoricalJobData = data;
const { blockNumber, isComplete }: HistoricalJobData = data;
if (failed) {
if (failed || isComplete) {
log(`Job ${id} for queue ${QUEUE_HISTORICAL_PROCESSING} failed`);
return;
}
// TODO: Get batch size from config
const batchEndBlockNumber = Math.min(blockNumber + HISTORICAL_BLOCKS_BATCH_SIZE, processingEndBlockNumber);
const batchEndBlockNumber = Math.min(blockNumber + HISTORICAL_BLOCKS_BATCH_SIZE, this._historicalProcessingEndBlockNumber);
const nextBatchStartBlockNumber = batchEndBlockNumber + 1;
log(`Historical block processing completed for block range: ${blockNumber} to ${batchEndBlockNumber}`);
@ -201,12 +212,15 @@ export class EventWatcher {
const [block] = await this._indexer.getBlocks({ blockNumber: this._historicalProcessingEndBlockNumber });
const historicalProcessingEndBlockHash = block ? block.blockHash : constants.AddressZero;
// Update sync status to max of latest processed block or latest canonical block
const syncStatus = await this._indexer.forceUpdateSyncStatus(historicalProcessingEndBlockHash, this._historicalProcessingEndBlockNumber);
// Update sync status chain head and canonical block to end block of historical processing
const [syncStatus] = await Promise.all([
this._indexer.updateSyncStatusCanonicalBlock(historicalProcessingEndBlockHash, this._historicalProcessingEndBlockNumber, true),
this._indexer.updateSyncStatusChainHead(historicalProcessingEndBlockHash, this._historicalProcessingEndBlockNumber, true)
]);
log(`Sync status canonical block updated to ${syncStatus.latestCanonicalBlockNumber}`);
// Start realtime processing
this.startBlockProcessing();
return;
}
@ -215,7 +229,7 @@ export class EventWatcher {
QUEUE_HISTORICAL_PROCESSING,
{
blockNumber: nextBatchStartBlockNumber,
processingEndBlockNumber
processingEndBlockNumber: this._historicalProcessingEndBlockNumber
}
);
}

View File

@ -465,7 +465,19 @@ export class Indexer {
// Fetch events (to be saved to db) for a particular block
async fetchEvents (blockHash: string, blockNumber: number, eventSignaturesMap: Map<string, string[]>, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<DeepPartial<EventInterface>[]> {
const { addresses, topics } = this._createLogsFilters(eventSignaturesMap);
const { logs, transactions } = await this._fetchLogsAndTransactions(blockHash, blockNumber, addresses, topics);
return this.createDbEventsFromLogsAndTxs(blockHash, logs, transactions, parseEventNameAndArgs);
}
async fetchEventsForContracts (blockHash: string, blockNumber: number, addresses: string[], eventSignaturesMap: Map<string, string[]>, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<DeepPartial<EventInterface>[]> {
const { topics } = this._createLogsFilters(eventSignaturesMap);
const { logs, transactions } = await this._fetchLogsAndTransactions(blockHash, blockNumber, addresses, topics);
return this.createDbEventsFromLogsAndTxs(blockHash, logs, transactions, parseEventNameAndArgs);
}
async _fetchLogsAndTransactions (blockHash: string, blockNumber: number, addresses?: string[], topics?: string[][]): Promise<{ logs: any[]; transactions: any[] }> {
const logsPromise = await this._ethClient.getLogs({
blockHash,
blockNumber: blockNumber.toString(),
@ -490,7 +502,7 @@ export class Indexer {
}
] = await Promise.all([logsPromise, transactionsPromise]);
return this.createDbEventsFromLogsAndTxs(blockHash, logs, transactions, parseEventNameAndArgs);
return { logs, transactions };
}
// Create events to be saved to db for a block given blockHash, logs, transactions and a parser function
@ -682,7 +694,7 @@ export class Indexer {
return res;
}
async saveEvents (dbEvents: EventInterface[]): Promise<void> {
async saveEvents (dbEvents: DeepPartial<EventInterface>[]): Promise<void> {
const dbTx = await this._db.createTransactionRunner();
try {

View File

@ -128,8 +128,8 @@ export class JobQueue {
);
}
async markComplete (job: PgBoss.Job): Promise<void> {
this._boss.complete(job.id);
async markComplete (job: PgBoss.Job, data: object = {}): Promise<void> {
this._boss.complete(job.id, { ...job.data, ...data });
}
async pushJob (queue: string, job: any, options: PgBoss.PublishOptions = {}): Promise<void> {

View File

@ -41,11 +41,12 @@ const log = debug('vulcanize:job-runner');
const EVENTS_PROCESSING_RETRY_WAIT = 2000;
// TODO: Get batch size from config
export const HISTORICAL_BLOCKS_BATCH_SIZE = 100;
export const HISTORICAL_BLOCKS_BATCH_SIZE = 2000;
export interface HistoricalJobData {
blockNumber: number;
processingEndBlockNumber: number;
isComplete?: boolean;
}
export class JobRunner {
@ -54,8 +55,11 @@ export class JobRunner {
_jobQueueConfig: JobQueueConfig;
_blockProcessStartTime?: Date;
_endBlockProcessTimer?: () => void;
_historicalProcessingCompletedUpto?: number;
// TODO: Check and remove events (always set to empty list as fetched from DB) from map structure
_blockAndEventsMap: Map<string, PrefetchedBlock> = new Map();
_lastHistoricalProcessingEndBlockNumber = 0;
_shutDown = false;
_signalCount = 0;
@ -152,6 +156,30 @@ export class JobRunner {
async processHistoricalBlocks (job: PgBoss.JobWithDoneCallback<HistoricalJobData, HistoricalJobData>): Promise<void> {
const { data: { blockNumber: startBlock, processingEndBlockNumber } } = job;
if (this._historicalProcessingCompletedUpto) {
if (startBlock < this._historicalProcessingCompletedUpto) {
await this.jobQueue.deleteAllJobs();
// Remove all watcher blocks and events data if startBlock is less than this._historicalProcessingCompletedUpto
// This occurs when new contract is added (with filterLogsByAddresses set to true) and historical processing is restarted from a previous block
log(`Restarting historical processing from block ${startBlock}`);
await this._indexer.resetWatcherToBlock(startBlock - 1);
} else {
// Check that startBlock is one greater than previous batch end block
if (startBlock - 1 !== this._historicalProcessingCompletedUpto) {
// TODO: Debug jobQueue deleteAllJobs not working
await this.jobQueue.markComplete(
job,
{ isComplete: true }
);
return;
}
}
}
this._lastHistoricalProcessingEndBlockNumber = processingEndBlockNumber;
const endBlock = Math.min(startBlock + HISTORICAL_BLOCKS_BATCH_SIZE, processingEndBlockNumber);
log(`Processing historical blocks from ${startBlock} to ${endBlock}`);
@ -175,7 +203,12 @@ export class JobRunner {
));
await Promise.all(pushJobForBlockPromises);
await this.jobQueue.markComplete(job);
this._historicalProcessingCompletedUpto = endBlock;
await this.jobQueue.markComplete(
job,
{ isComplete: true }
);
}
async processEvent (job: any): Promise<EventInterface | void> {
@ -521,7 +554,12 @@ export class JobRunner {
const { block } = prefetchedBlock;
console.time('time:job-runner#_processEvents-events');
await processBatchEvents(this._indexer, block, this._jobQueueConfig.eventsInBatch, this._jobQueueConfig.subgraphEventsOrder);
const isNewContractWatched = await processBatchEvents(
this._indexer,
block,
this._jobQueueConfig.eventsInBatch,
this._jobQueueConfig.subgraphEventsOrder
);
console.timeEnd('time:job-runner#_processEvents-events');
// Update metrics
@ -530,6 +568,28 @@ export class JobRunner {
this._blockAndEventsMap.delete(block.blockHash);
// Check if new contract was added and filterLogsByAddresses is set to true
if (isNewContractWatched && this._indexer.serverConfig.filterLogsByAddresses) {
// Delete jobs for any pending events and blocks processing
await this.jobQueue.deleteAllJobs();
// Check if historical processing is running and that current block is being processed was trigerred by historical processing
if (this._historicalProcessingCompletedUpto && this._historicalProcessingCompletedUpto > block.blockNumber) {
const nextBlockNumberToProcess = block.blockNumber + 1;
// Push a new job to restart historical blocks processing afyre current block
log('New contract added in historical processing with filterLogsByAddresses set to true');
await this.jobQueue.pushJob(
QUEUE_HISTORICAL_PROCESSING,
{
blockNumber: nextBlockNumberToProcess,
processingEndBlockNumber: this._lastHistoricalProcessingEndBlockNumber
},
{ priority: 1 }
);
}
}
if (this._endBlockProcessTimer) {
this._endBlockProcessTimer();
}

View File

@ -98,6 +98,7 @@ export interface IndexerInterface {
fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgressInterface>[]): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]>
saveBlockAndFetchEvents (block: DeepPartial<BlockProgressInterface>): Promise<[BlockProgressInterface, DeepPartial<EventInterface>[]]>
fetchAndSaveFilteredEventsAndBlocks (startBlock: number, endBlock: number): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]>
fetchEventsForContracts (blockHash: string, blockNumber: number, addresses: string[]): Promise<DeepPartial<EventInterface>[]>
removeUnknownEvents (block: BlockProgressInterface): Promise<void>
updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<BlockProgressInterface>
updateSyncStatusChainHead (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
@ -108,7 +109,7 @@ export interface IndexerInterface {
updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface>
markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise<void>
saveEventEntity (dbEvent: EventInterface): Promise<EventInterface>
saveEvents (dbEvents: EventInterface[]): Promise<void>
saveEvents (dbEvents: DeepPartial<EventInterface>[]): Promise<void>
processEvent (event: EventInterface): Promise<void>
parseEventNameAndArgs?: (kind: string, logObj: any) => any
isWatchedContract: (address: string) => ContractInterface | undefined;