diff --git a/packages/bayc-watcher/src/cli/import-state.ts b/packages/bayc-watcher/src/cli/import-state.ts index 729d3027..41d81a0a 100644 --- a/packages/bayc-watcher/src/cli/import-state.ts +++ b/packages/bayc-watcher/src/cli/import-state.ts @@ -71,10 +71,10 @@ export const main = async (): Promise => { // Fill the snapshot block. await fillBlocks( + jobQueueConfig, jobQueue, indexer, eventWatcher, - jobQueueConfig.blockDelayInMilliSecs, { prefetch: true, startBlock: importData.snapshotBlock.blockNumber, diff --git a/packages/bayc-watcher/src/fill.ts b/packages/bayc-watcher/src/fill.ts index f20037c7..8f46d79b 100644 --- a/packages/bayc-watcher/src/fill.ts +++ b/packages/bayc-watcher/src/fill.ts @@ -49,6 +49,11 @@ export const main = async (): Promise => { type: 'number', default: 10, describe: 'Number of blocks prefetched in batch' + }, + backFill: { + type: 'boolean', + default: false, + describe: 'Fill blocks before latest indexed block' } }).argv; @@ -76,7 +81,7 @@ export const main = async (): Promise => { const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); - await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); + await fillBlocks(jobQueueConfig, jobQueue, indexer, eventWatcher, argv); }; main().catch(err => { diff --git a/packages/eden-watcher/src/cli/import-state.ts b/packages/eden-watcher/src/cli/import-state.ts index b22f5fc5..935d2fb0 100644 --- a/packages/eden-watcher/src/cli/import-state.ts +++ b/packages/eden-watcher/src/cli/import-state.ts @@ -80,10 +80,10 @@ export const main = async (): Promise => { // Fill the snapshot block. await fillBlocks( + jobQueueConfig, jobQueue, indexer, eventWatcher, - jobQueueConfig.blockDelayInMilliSecs, { prefetch: true, startBlock: importData.snapshotBlock.blockNumber, diff --git a/packages/eden-watcher/src/fill.ts b/packages/eden-watcher/src/fill.ts index b6f93651..66f48a26 100644 --- a/packages/eden-watcher/src/fill.ts +++ b/packages/eden-watcher/src/fill.ts @@ -56,6 +56,11 @@ export const main = async (): Promise => { type: 'boolean', default: false, describe: 'Fill state for subgraph entities' + }, + backFill: { + type: 'boolean', + default: false, + describe: 'Fill blocks before latest indexed block' } }).argv; @@ -98,7 +103,7 @@ export const main = async (): Promise => { const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); - await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); + await fillBlocks(jobQueueConfig, jobQueue, indexer, eventWatcher, argv); }; main().catch(err => { diff --git a/packages/erc20-watcher/src/fill.ts b/packages/erc20-watcher/src/fill.ts index 03250034..5553b5a5 100644 --- a/packages/erc20-watcher/src/fill.ts +++ b/packages/erc20-watcher/src/fill.ts @@ -52,6 +52,11 @@ export const main = async (): Promise => { type: 'number', default: 10, describe: 'Number of blocks prefetched in batch' + }, + backFill: { + type: 'boolean', + default: false, + describe: 'Fill blocks before latest indexed block' } }).argv; @@ -78,7 +83,7 @@ export const main = async (): Promise => { const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); - await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); + await fillBlocks(jobQueueConfig, jobQueue, indexer, eventWatcher, argv); }; main().catch(err => { diff --git a/packages/erc721-watcher/src/cli/import-state.ts b/packages/erc721-watcher/src/cli/import-state.ts index 729d3027..41d81a0a 100644 --- a/packages/erc721-watcher/src/cli/import-state.ts +++ b/packages/erc721-watcher/src/cli/import-state.ts @@ -71,10 +71,10 @@ export const main = async (): Promise => { // Fill the snapshot block. await fillBlocks( + jobQueueConfig, jobQueue, indexer, eventWatcher, - jobQueueConfig.blockDelayInMilliSecs, { prefetch: true, startBlock: importData.snapshotBlock.blockNumber, diff --git a/packages/erc721-watcher/src/fill.ts b/packages/erc721-watcher/src/fill.ts index f20037c7..8f46d79b 100644 --- a/packages/erc721-watcher/src/fill.ts +++ b/packages/erc721-watcher/src/fill.ts @@ -49,6 +49,11 @@ export const main = async (): Promise => { type: 'number', default: 10, describe: 'Number of blocks prefetched in batch' + }, + backFill: { + type: 'boolean', + default: false, + describe: 'Fill blocks before latest indexed block' } }).argv; @@ -76,7 +81,7 @@ export const main = async (): Promise => { const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); - await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); + await fillBlocks(jobQueueConfig, jobQueue, indexer, eventWatcher, argv); }; main().catch(err => { diff --git a/packages/graph-test-watcher/src/cli/import-state.ts b/packages/graph-test-watcher/src/cli/import-state.ts index b22f5fc5..935d2fb0 100644 --- a/packages/graph-test-watcher/src/cli/import-state.ts +++ b/packages/graph-test-watcher/src/cli/import-state.ts @@ -80,10 +80,10 @@ export const main = async (): Promise => { // Fill the snapshot block. await fillBlocks( + jobQueueConfig, jobQueue, indexer, eventWatcher, - jobQueueConfig.blockDelayInMilliSecs, { prefetch: true, startBlock: importData.snapshotBlock.blockNumber, diff --git a/packages/graph-test-watcher/src/fill.ts b/packages/graph-test-watcher/src/fill.ts index bdb79f6e..e8cfc88d 100644 --- a/packages/graph-test-watcher/src/fill.ts +++ b/packages/graph-test-watcher/src/fill.ts @@ -50,6 +50,11 @@ export const main = async (): Promise => { type: 'number', default: 10, describe: 'Number of blocks prefetched in batch' + }, + backFill: { + type: 'boolean', + default: false, + describe: 'Fill blocks before latest indexed block' } }).argv; @@ -85,7 +90,7 @@ export const main = async (): Promise => { const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); - await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); + await fillBlocks(jobQueueConfig, jobQueue, indexer, eventWatcher, argv); }; main().catch(err => { diff --git a/packages/mobymask-watcher/src/cli/import-state.ts b/packages/mobymask-watcher/src/cli/import-state.ts index 729d3027..41d81a0a 100644 --- a/packages/mobymask-watcher/src/cli/import-state.ts +++ b/packages/mobymask-watcher/src/cli/import-state.ts @@ -71,10 +71,10 @@ export const main = async (): Promise => { // Fill the snapshot block. await fillBlocks( + jobQueueConfig, jobQueue, indexer, eventWatcher, - jobQueueConfig.blockDelayInMilliSecs, { prefetch: true, startBlock: importData.snapshotBlock.blockNumber, diff --git a/packages/mobymask-watcher/src/fill.ts b/packages/mobymask-watcher/src/fill.ts index f20037c7..8f46d79b 100644 --- a/packages/mobymask-watcher/src/fill.ts +++ b/packages/mobymask-watcher/src/fill.ts @@ -49,6 +49,11 @@ export const main = async (): Promise => { type: 'number', default: 10, describe: 'Number of blocks prefetched in batch' + }, + backFill: { + type: 'boolean', + default: false, + describe: 'Fill blocks before latest indexed block' } }).argv; @@ -76,7 +81,7 @@ export const main = async (): Promise => { const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); - await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); + await fillBlocks(jobQueueConfig, jobQueue, indexer, eventWatcher, argv); }; main().catch(err => { diff --git a/packages/util/src/fill.ts b/packages/util/src/fill.ts index e72a4011..8f387954 100644 --- a/packages/util/src/fill.ts +++ b/packages/util/src/fill.ts @@ -8,24 +8,27 @@ import { JobQueue } from './job-queue'; import { EventWatcherInterface, IndexerInterface } from './types'; import { wait } from './misc'; import { processBlockByNumberWithCache } from './common'; +import { indexBlock } from './index-block'; +import { JobQueueConfig } from './config'; const log = debug('vulcanize:fill'); const DEFAULT_PREFETCH_BATCH_SIZE = 10; export const fillBlocks = async ( + jobQueConfig: JobQueueConfig, jobQueue: JobQueue, indexer: IndexerInterface, eventWatcher: EventWatcherInterface, - blockDelayInMilliSecs: number, argv: { startBlock: number, endBlock: number, prefetch?: boolean, batchBlocks?: number, + backFill?: boolean } ): Promise => { - let { startBlock, endBlock, prefetch = false, batchBlocks = DEFAULT_PREFETCH_BATCH_SIZE } = argv; + let { startBlock, endBlock, prefetch = false, batchBlocks = DEFAULT_PREFETCH_BATCH_SIZE, backFill = false } = argv; if (startBlock > endBlock) { throw new Error(`endBlock ${endBlock} should be greater than or equal to startBlock ${startBlock}`); @@ -38,7 +41,12 @@ export const fillBlocks = async ( throw new Error(`startBlock should be greater than chain head ${syncStatus.chainHeadBlockNumber}`); } - await prefetchBlocks(indexer, blockDelayInMilliSecs, { startBlock, endBlock, batchBlocks }); + await prefetchBlocks(indexer, jobQueConfig.blockDelayInMilliSecs, { startBlock, endBlock, batchBlocks }); + return; + } + + if (backFill) { + await backFillBlocks(indexer, jobQueConfig.eventsInBatch, { startBlock, endBlock }); return; } @@ -143,3 +151,16 @@ const prefetchBlocks = async ( } } }; + +const backFillBlocks = async ( + indexer: IndexerInterface, + eventsInBatch: number, + { startBlock, endBlock }: { + startBlock: number, + endBlock: number + } +) => { + for (let i = startBlock; i <= endBlock; i++) { + await indexBlock(indexer, eventsInBatch, { block: i }); + } +};