diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index b8a308a5..11624ccc 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -249,7 +249,7 @@ export class Indexer implements IndexerInterface { }; } - const { block: { number } } = await this._ethClient.getBlockByHash(blockHash); + const { block: { number } } = await this.getBlockByHash(blockHash); const blockNumber = ethers.BigNumber.from(number).toNumber(); log('{{query.name}}: db miss, fetching from upstream server'); @@ -679,6 +679,10 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getBlocks(blockFilter); } + async getBlockByHash (blockHash?: string): Promise<{ block: any }> { + return this._baseIndexer.getBlockByHash(blockHash); + } + async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force = false): Promise { return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force); } diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index 049ff22a..f5eb1c9f 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -71,6 +71,22 @@ export const fetchBlocksAtHeight = async ( // Try fetching blocks from eth-server until found. while (!blocks.length) { + const { block: latestBlock } = await indexer.getBlockByHash(); + const blockProcessingOffset = jobQueueConfig.blockProcessingOffset ?? 0; + + // Process block if it is blockProcessingOffset blocks behind latest block + if (latestBlock.number < blockNumber + blockProcessingOffset) { + // Check number of retries for fetching new block + if (jobQueueConfig.maxNewBlockRetries && newBlockRetries > jobQueueConfig.maxNewBlockRetries) { + throw new Error(NEW_BLOCK_MAX_RETRIES_ERROR); + } + + newBlockRetries++; + log(`Latest block: ${latestBlock.number}, blockProcessingOffset: ${blockProcessingOffset}; retry block to process: ${blockNumber} after ${jobQueueConfig.blockDelayInMilliSecs}ms`); + await wait(jobQueueConfig.blockDelayInMilliSecs); + continue; + } + console.time(`time:common#_fetchBlocks-eth-server-${blockNumber}`); const ethFullBlocks = await indexer.getBlocks({ blockNumber }); console.timeEnd(`time:common#_fetchBlocks-eth-server-${blockNumber}`); @@ -84,32 +100,21 @@ export const fetchBlocksAtHeight = async ( // Fitler null blocks blocks = ethFullBlocks.filter(block => Boolean(block)) as EthFullBlock[]; + assert(blocks.length, `Blocks at ${blockNumber} should exist as latest block is ${latestBlock}`); - if (!blocks.length) { - log(`No blocks fetched for block number ${blockNumber}, retrying after ${jobQueueConfig.blockDelayInMilliSecs} ms delay.`); - - // Check number of retries for fetching new block - if (jobQueueConfig.maxNewBlockRetries && newBlockRetries > jobQueueConfig.maxNewBlockRetries) { - throw new Error(NEW_BLOCK_MAX_RETRIES_ERROR); - } - - newBlockRetries++; - await wait(jobQueueConfig.blockDelayInMilliSecs); - } else { - blocks.forEach(block => { - blockAndEventsMap.set( - block.blockHash, - { - // Block is set later in job-runner when saving to database - block: {} as BlockProgressInterface, - events: [], - ethFullBlock: block, - // Transactions are set later in job-runner when fetching events - ethFullTransactions: [] - } - ); - }); - } + blocks.forEach(block => { + blockAndEventsMap.set( + block.blockHash, + { + // Block is set later in job-runner when saving to database + block: {} as BlockProgressInterface, + events: [], + ethFullBlock: block, + // Transactions are set later in job-runner when fetching events + ethFullTransactions: [] + } + ); + }); } assert(blocks.length, 'Blocks not fetched'); diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index b6b2e5dc..ace4da5a 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -14,7 +14,6 @@ import { createPruningJob, processBlockByNumber } from './common'; import { OrderDirection } from './database'; import { HistoricalJobData, HistoricalJobResponseData } from './job-runner'; import { JobQueueConfig, ServerConfig } from './config'; -import { wait } from './misc'; const EVENT = 'event'; const BLOCK_PROGRESS_EVENT = 'block-progress-event'; @@ -105,7 +104,7 @@ export class EventWatcher { // Get latest block in chain and sync status from DB // Also get historical-processing queue size const [{ block: latestBlock }, syncStatus, historicalProcessingQueueSize] = await Promise.all([ - this._ethClient.getBlockByHash(), + this._indexer.getBlockByHash(), this._indexer.getSyncStatus(), this._jobQueue.getQueueSize(QUEUE_HISTORICAL_PROCESSING, 'completed') ]); @@ -196,18 +195,7 @@ export class EventWatcher { } if (isComplete) { - while (true) { - const { block: latestBlock } = await this._ethClient.getBlockByHash(); - - // Process block if it is blockProcessingOffset blocks behind latest block - if (latestBlock.number >= blockNumber + (this._config.jobQueue.blockProcessingOffset ?? 0)) { - await processBlockByNumber(this._jobQueue, blockNumber + 1); - break; - } - - log(`Latest block: ${latestBlock.number}; retry next block to process: ${blockNumber + 1} after ${this._config.jobQueue.blockDelayInMilliSecs}ms`); - await wait(this._config.jobQueue.blockDelayInMilliSecs); - } + await processBlockByNumber(this._jobQueue, blockNumber + 1); } } } diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 395ba187..0fd0f0ba 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -394,6 +394,10 @@ export class Indexer { return blocks; } + async getBlockByHash (blockHash?: string): Promise<{ block: any }> { + return this._ethClient.getBlockByHash(blockHash); + } + async getBlockProgress (blockHash: string): Promise { return this._db.getBlockProgress(blockHash); } diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 2d677a09..ee49108c 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -171,6 +171,7 @@ export interface IndexerInterface { getSyncStatus (): Promise getStateSyncStatus (): Promise getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise> + getBlockByHash (blockHash?: string): Promise<{ block: any }> getBlocksAtHeight (height: number, isPruned: boolean): Promise getLatestCanonicalBlock (): Promise getLatestStateIndexedBlock (): Promise