Implement back filling blocks before latest indexed block

This commit is contained in:
nabarun 2022-11-15 13:17:54 +05:30
parent 9a4a7cf018
commit 4f0965eb34
12 changed files with 65 additions and 14 deletions

View File

@ -71,10 +71,10 @@ export const main = async (): Promise<any> => {
// Fill the snapshot block. // Fill the snapshot block.
await fillBlocks( await fillBlocks(
jobQueueConfig,
jobQueue, jobQueue,
indexer, indexer,
eventWatcher, eventWatcher,
jobQueueConfig.blockDelayInMilliSecs,
{ {
prefetch: true, prefetch: true,
startBlock: importData.snapshotBlock.blockNumber, startBlock: importData.snapshotBlock.blockNumber,

View File

@ -49,6 +49,11 @@ export const main = async (): Promise<any> => {
type: 'number', type: 'number',
default: 10, default: 10,
describe: 'Number of blocks prefetched in batch' describe: 'Number of blocks prefetched in batch'
},
backFill: {
type: 'boolean',
default: false,
describe: 'Fill blocks before latest indexed block'
} }
}).argv; }).argv;
@ -76,7 +81,7 @@ export const main = async (): Promise<any> => {
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); await fillBlocks(jobQueueConfig, jobQueue, indexer, eventWatcher, argv);
}; };
main().catch(err => { main().catch(err => {

View File

@ -80,10 +80,10 @@ export const main = async (): Promise<any> => {
// Fill the snapshot block. // Fill the snapshot block.
await fillBlocks( await fillBlocks(
jobQueueConfig,
jobQueue, jobQueue,
indexer, indexer,
eventWatcher, eventWatcher,
jobQueueConfig.blockDelayInMilliSecs,
{ {
prefetch: true, prefetch: true,
startBlock: importData.snapshotBlock.blockNumber, startBlock: importData.snapshotBlock.blockNumber,

View File

@ -56,6 +56,11 @@ export const main = async (): Promise<any> => {
type: 'boolean', type: 'boolean',
default: false, default: false,
describe: 'Fill state for subgraph entities' describe: 'Fill state for subgraph entities'
},
backFill: {
type: 'boolean',
default: false,
describe: 'Fill blocks before latest indexed block'
} }
}).argv; }).argv;
@ -98,7 +103,7 @@ export const main = async (): Promise<any> => {
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); await fillBlocks(jobQueueConfig, jobQueue, indexer, eventWatcher, argv);
}; };
main().catch(err => { main().catch(err => {

View File

@ -52,6 +52,11 @@ export const main = async (): Promise<any> => {
type: 'number', type: 'number',
default: 10, default: 10,
describe: 'Number of blocks prefetched in batch' describe: 'Number of blocks prefetched in batch'
},
backFill: {
type: 'boolean',
default: false,
describe: 'Fill blocks before latest indexed block'
} }
}).argv; }).argv;
@ -78,7 +83,7 @@ export const main = async (): Promise<any> => {
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); await fillBlocks(jobQueueConfig, jobQueue, indexer, eventWatcher, argv);
}; };
main().catch(err => { main().catch(err => {

View File

@ -71,10 +71,10 @@ export const main = async (): Promise<any> => {
// Fill the snapshot block. // Fill the snapshot block.
await fillBlocks( await fillBlocks(
jobQueueConfig,
jobQueue, jobQueue,
indexer, indexer,
eventWatcher, eventWatcher,
jobQueueConfig.blockDelayInMilliSecs,
{ {
prefetch: true, prefetch: true,
startBlock: importData.snapshotBlock.blockNumber, startBlock: importData.snapshotBlock.blockNumber,

View File

@ -49,6 +49,11 @@ export const main = async (): Promise<any> => {
type: 'number', type: 'number',
default: 10, default: 10,
describe: 'Number of blocks prefetched in batch' describe: 'Number of blocks prefetched in batch'
},
backFill: {
type: 'boolean',
default: false,
describe: 'Fill blocks before latest indexed block'
} }
}).argv; }).argv;
@ -76,7 +81,7 @@ export const main = async (): Promise<any> => {
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); await fillBlocks(jobQueueConfig, jobQueue, indexer, eventWatcher, argv);
}; };
main().catch(err => { main().catch(err => {

View File

@ -80,10 +80,10 @@ export const main = async (): Promise<any> => {
// Fill the snapshot block. // Fill the snapshot block.
await fillBlocks( await fillBlocks(
jobQueueConfig,
jobQueue, jobQueue,
indexer, indexer,
eventWatcher, eventWatcher,
jobQueueConfig.blockDelayInMilliSecs,
{ {
prefetch: true, prefetch: true,
startBlock: importData.snapshotBlock.blockNumber, startBlock: importData.snapshotBlock.blockNumber,

View File

@ -50,6 +50,11 @@ export const main = async (): Promise<any> => {
type: 'number', type: 'number',
default: 10, default: 10,
describe: 'Number of blocks prefetched in batch' describe: 'Number of blocks prefetched in batch'
},
backFill: {
type: 'boolean',
default: false,
describe: 'Fill blocks before latest indexed block'
} }
}).argv; }).argv;
@ -85,7 +90,7 @@ export const main = async (): Promise<any> => {
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); await fillBlocks(jobQueueConfig, jobQueue, indexer, eventWatcher, argv);
}; };
main().catch(err => { main().catch(err => {

View File

@ -71,10 +71,10 @@ export const main = async (): Promise<any> => {
// Fill the snapshot block. // Fill the snapshot block.
await fillBlocks( await fillBlocks(
jobQueueConfig,
jobQueue, jobQueue,
indexer, indexer,
eventWatcher, eventWatcher,
jobQueueConfig.blockDelayInMilliSecs,
{ {
prefetch: true, prefetch: true,
startBlock: importData.snapshotBlock.blockNumber, startBlock: importData.snapshotBlock.blockNumber,

View File

@ -49,6 +49,11 @@ export const main = async (): Promise<any> => {
type: 'number', type: 'number',
default: 10, default: 10,
describe: 'Number of blocks prefetched in batch' describe: 'Number of blocks prefetched in batch'
},
backFill: {
type: 'boolean',
default: false,
describe: 'Fill blocks before latest indexed block'
} }
}).argv; }).argv;
@ -76,7 +81,7 @@ export const main = async (): Promise<any> => {
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); await fillBlocks(jobQueueConfig, jobQueue, indexer, eventWatcher, argv);
}; };
main().catch(err => { main().catch(err => {

View File

@ -8,24 +8,27 @@ import { JobQueue } from './job-queue';
import { EventWatcherInterface, IndexerInterface } from './types'; import { EventWatcherInterface, IndexerInterface } from './types';
import { wait } from './misc'; import { wait } from './misc';
import { processBlockByNumberWithCache } from './common'; import { processBlockByNumberWithCache } from './common';
import { indexBlock } from './index-block';
import { JobQueueConfig } from './config';
const log = debug('vulcanize:fill'); const log = debug('vulcanize:fill');
const DEFAULT_PREFETCH_BATCH_SIZE = 10; const DEFAULT_PREFETCH_BATCH_SIZE = 10;
export const fillBlocks = async ( export const fillBlocks = async (
jobQueConfig: JobQueueConfig,
jobQueue: JobQueue, jobQueue: JobQueue,
indexer: IndexerInterface, indexer: IndexerInterface,
eventWatcher: EventWatcherInterface, eventWatcher: EventWatcherInterface,
blockDelayInMilliSecs: number,
argv: { argv: {
startBlock: number, startBlock: number,
endBlock: number, endBlock: number,
prefetch?: boolean, prefetch?: boolean,
batchBlocks?: number, batchBlocks?: number,
backFill?: boolean
} }
): Promise<any> => { ): Promise<any> => {
let { startBlock, endBlock, prefetch = false, batchBlocks = DEFAULT_PREFETCH_BATCH_SIZE } = argv; let { startBlock, endBlock, prefetch = false, batchBlocks = DEFAULT_PREFETCH_BATCH_SIZE, backFill = false } = argv;
if (startBlock > endBlock) { if (startBlock > endBlock) {
throw new Error(`endBlock ${endBlock} should be greater than or equal to startBlock ${startBlock}`); throw new Error(`endBlock ${endBlock} should be greater than or equal to startBlock ${startBlock}`);
@ -38,7 +41,12 @@ export const fillBlocks = async (
throw new Error(`startBlock should be greater than chain head ${syncStatus.chainHeadBlockNumber}`); throw new Error(`startBlock should be greater than chain head ${syncStatus.chainHeadBlockNumber}`);
} }
await prefetchBlocks(indexer, blockDelayInMilliSecs, { startBlock, endBlock, batchBlocks }); await prefetchBlocks(indexer, jobQueConfig.blockDelayInMilliSecs, { startBlock, endBlock, batchBlocks });
return;
}
if (backFill) {
await backFillBlocks(indexer, jobQueConfig.eventsInBatch, { startBlock, endBlock });
return; return;
} }
@ -143,3 +151,16 @@ const prefetchBlocks = async (
} }
} }
}; };
const backFillBlocks = async (
indexer: IndexerInterface,
eventsInBatch: number,
{ startBlock, endBlock }: {
startBlock: number,
endBlock: number
}
) => {
for (let i = startBlock; i <= endBlock; i++) {
await indexBlock(indexer, eventsInBatch, { block: i });
}
};