mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-07-27 02:32:07 +00:00
Fix endpoint switch on max retries of new block
This commit is contained in:
parent
e4192e7f77
commit
fe3ae1e892
@ -249,7 +249,7 @@ export class Indexer implements IndexerInterface {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
const { block: { number } } = await this._ethClient.getBlockByHash(blockHash);
|
const { block: { number } } = await this.getBlockByHash(blockHash);
|
||||||
const blockNumber = ethers.BigNumber.from(number).toNumber();
|
const blockNumber = ethers.BigNumber.from(number).toNumber();
|
||||||
|
|
||||||
log('{{query.name}}: db miss, fetching from upstream server');
|
log('{{query.name}}: db miss, fetching from upstream server');
|
||||||
@ -679,6 +679,10 @@ export class Indexer implements IndexerInterface {
|
|||||||
return this._baseIndexer.getBlocks(blockFilter);
|
return this._baseIndexer.getBlocks(blockFilter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async getBlockByHash (blockHash?: string): Promise<{ block: any }> {
|
||||||
|
return this._baseIndexer.getBlockByHash(blockHash);
|
||||||
|
}
|
||||||
|
|
||||||
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
|
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
|
||||||
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force);
|
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force);
|
||||||
}
|
}
|
||||||
|
@ -71,6 +71,22 @@ export const fetchBlocksAtHeight = async (
|
|||||||
|
|
||||||
// Try fetching blocks from eth-server until found.
|
// Try fetching blocks from eth-server until found.
|
||||||
while (!blocks.length) {
|
while (!blocks.length) {
|
||||||
|
const { block: latestBlock } = await indexer.getBlockByHash();
|
||||||
|
const blockProcessingOffset = jobQueueConfig.blockProcessingOffset ?? 0;
|
||||||
|
|
||||||
|
// Process block if it is blockProcessingOffset blocks behind latest block
|
||||||
|
if (latestBlock.number < blockNumber + blockProcessingOffset) {
|
||||||
|
// Check number of retries for fetching new block
|
||||||
|
if (jobQueueConfig.maxNewBlockRetries && newBlockRetries > jobQueueConfig.maxNewBlockRetries) {
|
||||||
|
throw new Error(NEW_BLOCK_MAX_RETRIES_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
newBlockRetries++;
|
||||||
|
log(`Latest block: ${latestBlock.number}, blockProcessingOffset: ${blockProcessingOffset}; retry block to process: ${blockNumber} after ${jobQueueConfig.blockDelayInMilliSecs}ms`);
|
||||||
|
await wait(jobQueueConfig.blockDelayInMilliSecs);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
console.time(`time:common#_fetchBlocks-eth-server-${blockNumber}`);
|
console.time(`time:common#_fetchBlocks-eth-server-${blockNumber}`);
|
||||||
const ethFullBlocks = await indexer.getBlocks({ blockNumber });
|
const ethFullBlocks = await indexer.getBlocks({ blockNumber });
|
||||||
console.timeEnd(`time:common#_fetchBlocks-eth-server-${blockNumber}`);
|
console.timeEnd(`time:common#_fetchBlocks-eth-server-${blockNumber}`);
|
||||||
@ -84,32 +100,21 @@ export const fetchBlocksAtHeight = async (
|
|||||||
|
|
||||||
// Fitler null blocks
|
// Fitler null blocks
|
||||||
blocks = ethFullBlocks.filter(block => Boolean(block)) as EthFullBlock[];
|
blocks = ethFullBlocks.filter(block => Boolean(block)) as EthFullBlock[];
|
||||||
|
assert(blocks.length, `Blocks at ${blockNumber} should exist as latest block is ${latestBlock}`);
|
||||||
|
|
||||||
if (!blocks.length) {
|
blocks.forEach(block => {
|
||||||
log(`No blocks fetched for block number ${blockNumber}, retrying after ${jobQueueConfig.blockDelayInMilliSecs} ms delay.`);
|
blockAndEventsMap.set(
|
||||||
|
block.blockHash,
|
||||||
// Check number of retries for fetching new block
|
{
|
||||||
if (jobQueueConfig.maxNewBlockRetries && newBlockRetries > jobQueueConfig.maxNewBlockRetries) {
|
// Block is set later in job-runner when saving to database
|
||||||
throw new Error(NEW_BLOCK_MAX_RETRIES_ERROR);
|
block: {} as BlockProgressInterface,
|
||||||
}
|
events: [],
|
||||||
|
ethFullBlock: block,
|
||||||
newBlockRetries++;
|
// Transactions are set later in job-runner when fetching events
|
||||||
await wait(jobQueueConfig.blockDelayInMilliSecs);
|
ethFullTransactions: []
|
||||||
} else {
|
}
|
||||||
blocks.forEach(block => {
|
);
|
||||||
blockAndEventsMap.set(
|
});
|
||||||
block.blockHash,
|
|
||||||
{
|
|
||||||
// Block is set later in job-runner when saving to database
|
|
||||||
block: {} as BlockProgressInterface,
|
|
||||||
events: [],
|
|
||||||
ethFullBlock: block,
|
|
||||||
// Transactions are set later in job-runner when fetching events
|
|
||||||
ethFullTransactions: []
|
|
||||||
}
|
|
||||||
);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(blocks.length, 'Blocks not fetched');
|
assert(blocks.length, 'Blocks not fetched');
|
||||||
|
@ -14,7 +14,6 @@ import { createPruningJob, processBlockByNumber } from './common';
|
|||||||
import { OrderDirection } from './database';
|
import { OrderDirection } from './database';
|
||||||
import { HistoricalJobData, HistoricalJobResponseData } from './job-runner';
|
import { HistoricalJobData, HistoricalJobResponseData } from './job-runner';
|
||||||
import { JobQueueConfig, ServerConfig } from './config';
|
import { JobQueueConfig, ServerConfig } from './config';
|
||||||
import { wait } from './misc';
|
|
||||||
|
|
||||||
const EVENT = 'event';
|
const EVENT = 'event';
|
||||||
const BLOCK_PROGRESS_EVENT = 'block-progress-event';
|
const BLOCK_PROGRESS_EVENT = 'block-progress-event';
|
||||||
@ -105,7 +104,7 @@ export class EventWatcher {
|
|||||||
// Get latest block in chain and sync status from DB
|
// Get latest block in chain and sync status from DB
|
||||||
// Also get historical-processing queue size
|
// Also get historical-processing queue size
|
||||||
const [{ block: latestBlock }, syncStatus, historicalProcessingQueueSize] = await Promise.all([
|
const [{ block: latestBlock }, syncStatus, historicalProcessingQueueSize] = await Promise.all([
|
||||||
this._ethClient.getBlockByHash(),
|
this._indexer.getBlockByHash(),
|
||||||
this._indexer.getSyncStatus(),
|
this._indexer.getSyncStatus(),
|
||||||
this._jobQueue.getQueueSize(QUEUE_HISTORICAL_PROCESSING, 'completed')
|
this._jobQueue.getQueueSize(QUEUE_HISTORICAL_PROCESSING, 'completed')
|
||||||
]);
|
]);
|
||||||
@ -196,18 +195,7 @@ export class EventWatcher {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (isComplete) {
|
if (isComplete) {
|
||||||
while (true) {
|
await processBlockByNumber(this._jobQueue, blockNumber + 1);
|
||||||
const { block: latestBlock } = await this._ethClient.getBlockByHash();
|
|
||||||
|
|
||||||
// Process block if it is blockProcessingOffset blocks behind latest block
|
|
||||||
if (latestBlock.number >= blockNumber + (this._config.jobQueue.blockProcessingOffset ?? 0)) {
|
|
||||||
await processBlockByNumber(this._jobQueue, blockNumber + 1);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
log(`Latest block: ${latestBlock.number}; retry next block to process: ${blockNumber + 1} after ${this._config.jobQueue.blockDelayInMilliSecs}ms`);
|
|
||||||
await wait(this._config.jobQueue.blockDelayInMilliSecs);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -394,6 +394,10 @@ export class Indexer {
|
|||||||
return blocks;
|
return blocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async getBlockByHash (blockHash?: string): Promise<{ block: any }> {
|
||||||
|
return this._ethClient.getBlockByHash(blockHash);
|
||||||
|
}
|
||||||
|
|
||||||
async getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined> {
|
async getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined> {
|
||||||
return this._db.getBlockProgress(blockHash);
|
return this._db.getBlockProgress(blockHash);
|
||||||
}
|
}
|
||||||
|
@ -171,6 +171,7 @@ export interface IndexerInterface {
|
|||||||
getSyncStatus (): Promise<SyncStatusInterface | undefined>
|
getSyncStatus (): Promise<SyncStatusInterface | undefined>
|
||||||
getStateSyncStatus (): Promise<StateSyncStatusInterface | undefined>
|
getStateSyncStatus (): Promise<StateSyncStatusInterface | undefined>
|
||||||
getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise<Array<EthFullBlock | null>>
|
getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise<Array<EthFullBlock | null>>
|
||||||
|
getBlockByHash (blockHash?: string): Promise<{ block: any }>
|
||||||
getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]>
|
getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]>
|
||||||
getLatestCanonicalBlock (): Promise<BlockProgressInterface | undefined>
|
getLatestCanonicalBlock (): Promise<BlockProgressInterface | undefined>
|
||||||
getLatestStateIndexedBlock (): Promise<BlockProgressInterface>
|
getLatestStateIndexedBlock (): Promise<BlockProgressInterface>
|
||||||
|
Loading…
Reference in New Issue
Block a user