mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-01-08 12:28:05 +00:00
Handle restarts during historical processing in watcher (#455)
* Reset to latest processed block on restarting job-runner * Update sync status during historical processing in job-runner * Codegen changes * Use sync status latest processed block for subgraph _meta GQL query * Set job per interval for subscribing events queue to 1 * Fix events processing skipped for blocks after template create
This commit is contained in:
parent
97bd4014d6
commit
f2c5f67777
@ -114,7 +114,8 @@ export class JobRunnerCmd {
|
||||
|
||||
// Delete all active and pending (before completed) jobs to start job-runner without old queued jobs
|
||||
await jobRunner.jobQueue.deleteAllJobs('completed');
|
||||
await jobRunner.resetToPrevIndexedBlock();
|
||||
|
||||
await jobRunner.resetToLatestProcessedBlock();
|
||||
await indexer.updateSyncStatusIndexingError(false);
|
||||
|
||||
await startJobRunner(jobRunner);
|
||||
|
@ -27,6 +27,17 @@ columns:
|
||||
pgType: integer
|
||||
tsType: number
|
||||
columnType: Column
|
||||
- name: latestProcessedBlockHash
|
||||
pgType: varchar
|
||||
tsType: string
|
||||
columnType: Column
|
||||
columnOptions:
|
||||
- option: length
|
||||
value: 66
|
||||
- name: latestProcessedBlockNumber
|
||||
pgType: integer
|
||||
tsType: number
|
||||
columnType: Column
|
||||
- name: latestCanonicalBlockHash
|
||||
pgType: varchar
|
||||
tsType: string
|
||||
|
@ -460,16 +460,28 @@ export class Schema {
|
||||
}
|
||||
|
||||
_addMeta (): void {
|
||||
const typeComposer = this._composer.createObjectTC({
|
||||
// Create the Block type.
|
||||
const metaBlocktypeComposer = this._composer.createObjectTC({
|
||||
name: '_MetaBlock_',
|
||||
fields: {
|
||||
hash: 'Bytes',
|
||||
number: 'Int!',
|
||||
timestamp: 'Int'
|
||||
}
|
||||
});
|
||||
|
||||
this._composer.addSchemaMustHaveType(metaBlocktypeComposer);
|
||||
|
||||
const metaTypeComposer = this._composer.createObjectTC({
|
||||
name: '_Meta_',
|
||||
fields: {
|
||||
block: this._composer.getOTC('_Block_').NonNull,
|
||||
block: metaBlocktypeComposer.NonNull,
|
||||
deployment: { type: new GraphQLNonNull(GraphQLString) },
|
||||
hasIndexingErrors: { type: new GraphQLNonNull(GraphQLBoolean) }
|
||||
}
|
||||
});
|
||||
|
||||
this._composer.addSchemaMustHaveType(typeComposer);
|
||||
this._composer.addSchemaMustHaveType(metaTypeComposer);
|
||||
|
||||
this._composer.Query.addFields({
|
||||
_meta: {
|
||||
|
@ -253,13 +253,13 @@ export class Database implements DatabaseInterface {
|
||||
return this._baseDatabase.updateSyncStatusChainHead(repo, blockHash, blockNumber, force);
|
||||
}
|
||||
|
||||
async forceUpdateSyncStatus (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
async updateSyncStatusProcessedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
|
||||
const repo = queryRunner.manager.getRepository(SyncStatus);
|
||||
|
||||
return this._baseDatabase.forceUpdateSyncStatus(repo, blockHash, blockNumber);
|
||||
return this._baseDatabase.updateSyncStatusProcessedBlock(repo, blockHash, blockNumber, force);
|
||||
}
|
||||
|
||||
async updateSyncStatusIndexingError (queryRunner: QueryRunner, hasIndexingError: boolean): Promise<SyncStatus> {
|
||||
async updateSyncStatusIndexingError (queryRunner: QueryRunner, hasIndexingError: boolean): Promise<SyncStatus | undefined> {
|
||||
const repo = queryRunner.manager.getRepository(SyncStatus);
|
||||
|
||||
return this._baseDatabase.updateSyncStatusIndexingError(repo, hasIndexingError);
|
||||
|
@ -661,11 +661,11 @@ export class Indexer implements IndexerInterface {
|
||||
return syncStatus;
|
||||
}
|
||||
|
||||
async forceUpdateSyncStatus (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return this._baseIndexer.forceUpdateSyncStatus(blockHash, blockNumber);
|
||||
async updateSyncStatusProcessedBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
|
||||
return this._baseIndexer.updateSyncStatusProcessedBlock(blockHash, blockNumber, force);
|
||||
}
|
||||
|
||||
async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise<SyncStatus> {
|
||||
async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise<SyncStatus | undefined> {
|
||||
return this._baseIndexer.updateSyncStatusIndexingError(hasIndexingError);
|
||||
}
|
||||
|
||||
|
@ -170,17 +170,18 @@ export class Indexer implements IndexerInterface {
|
||||
return {} as SyncStatusInterface;
|
||||
}
|
||||
|
||||
async forceUpdateSyncStatus (blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
|
||||
async updateSyncStatusProcessedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface> {
|
||||
assert(blockNumber);
|
||||
assert(blockHash);
|
||||
assert(force);
|
||||
|
||||
return {} as SyncStatusInterface;
|
||||
}
|
||||
|
||||
async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise<SyncStatusInterface> {
|
||||
async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise<SyncStatusInterface | undefined> {
|
||||
assert(hasIndexingError);
|
||||
|
||||
return {} as SyncStatusInterface;
|
||||
return undefined;
|
||||
}
|
||||
|
||||
async markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise<void> {
|
||||
|
@ -285,8 +285,8 @@ export class EthClient implements EthClientInterface {
|
||||
'eth_getLogs',
|
||||
[{
|
||||
address: addresses.map(address => address.toLowerCase()),
|
||||
fromBlock: fromBlock && utils.hexlify(fromBlock),
|
||||
toBlock: toBlock && utils.hexlify(toBlock),
|
||||
fromBlock: fromBlock && utils.hexValue(fromBlock),
|
||||
toBlock: toBlock && utils.hexValue(toBlock),
|
||||
blockHash,
|
||||
topics
|
||||
}]
|
||||
|
@ -384,20 +384,20 @@ const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: B
|
||||
|
||||
// Check if we are out of events.
|
||||
while (numFetchedEvents < block.numEvents) {
|
||||
console.time('time:common#processEventsInSubgraphOrder-fetching_events_batch');
|
||||
console.time(`time:common#processEventsInSubgraphOrder-fetching_events_batch-${block.blockNumber}`);
|
||||
|
||||
// Fetch events in batches
|
||||
const events = await _getEventsBatch(indexer, block.blockHash, eventsInBatch, page);
|
||||
page++;
|
||||
numFetchedEvents += events.length;
|
||||
|
||||
console.timeEnd('time:common#processEventsInSubgraphOrder-fetching_events_batch');
|
||||
console.timeEnd(`time:common#processEventsInSubgraphOrder-fetching_events_batch-${block.blockNumber}`);
|
||||
|
||||
if (events.length) {
|
||||
log(`Processing events batch from index ${events[0].index} to ${events[0].index + events.length - 1}`);
|
||||
}
|
||||
|
||||
console.time('time:common#processEventsInSubgraphOrder-processing_events_batch');
|
||||
console.time(`time:common#processEventsInSubgraphOrder-processing_events_batch-${block.blockNumber}`);
|
||||
|
||||
// First process events for initially watched contracts
|
||||
const watchedContractEvents: EventInterface[] = [];
|
||||
@ -417,7 +417,7 @@ const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: B
|
||||
block.numProcessedEvents++;
|
||||
}
|
||||
|
||||
console.timeEnd('time:common#processEventsInSubgraphOrder-processing_events_batch');
|
||||
console.timeEnd(`time:common#processEventsInSubgraphOrder-processing_events_batch-${block.blockNumber}`);
|
||||
}
|
||||
|
||||
const watchedContracts = indexer.getWatchedContracts().map(contract => contract.address);
|
||||
|
@ -169,6 +169,8 @@ export class Database {
|
||||
latestCanonicalBlockNumber: blockNumber,
|
||||
latestIndexedBlockHash: '',
|
||||
latestIndexedBlockNumber: -1,
|
||||
latestProcessedBlockHash: '',
|
||||
latestProcessedBlockNumber: -1,
|
||||
initialIndexedBlockHash: blockHash,
|
||||
initialIndexedBlockNumber: blockNumber
|
||||
});
|
||||
@ -182,29 +184,24 @@ export class Database {
|
||||
return await repo.save(entity);
|
||||
}
|
||||
|
||||
async forceUpdateSyncStatus (repo: Repository<SyncStatusInterface>, blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
|
||||
let entity = await repo.findOne();
|
||||
async updateSyncStatusProcessedBlock (repo: Repository<SyncStatusInterface>, blockHash: string, blockNumber: number, force = false): Promise<SyncStatusInterface> {
|
||||
const entity = await repo.findOne();
|
||||
assert(entity);
|
||||
|
||||
if (!entity) {
|
||||
entity = repo.create({
|
||||
initialIndexedBlockHash: blockHash,
|
||||
initialIndexedBlockNumber: blockNumber
|
||||
});
|
||||
if (force || blockNumber >= entity.latestProcessedBlockNumber) {
|
||||
entity.latestProcessedBlockHash = blockHash;
|
||||
entity.latestProcessedBlockNumber = blockNumber;
|
||||
}
|
||||
|
||||
entity.chainHeadBlockHash = blockHash;
|
||||
entity.chainHeadBlockNumber = blockNumber;
|
||||
entity.latestCanonicalBlockHash = blockHash;
|
||||
entity.latestCanonicalBlockNumber = blockNumber;
|
||||
entity.latestIndexedBlockHash = blockHash;
|
||||
entity.latestIndexedBlockNumber = blockNumber;
|
||||
|
||||
return await repo.save(entity);
|
||||
}
|
||||
|
||||
async updateSyncStatusIndexingError (repo: Repository<SyncStatusInterface>, hasIndexingError: boolean): Promise<SyncStatusInterface> {
|
||||
async updateSyncStatusIndexingError (repo: Repository<SyncStatusInterface>, hasIndexingError: boolean): Promise<SyncStatusInterface | undefined> {
|
||||
const entity = await repo.findOne();
|
||||
assert(entity);
|
||||
|
||||
if (!entity) {
|
||||
return;
|
||||
}
|
||||
|
||||
entity.hasIndexingError = hasIndexingError;
|
||||
|
||||
|
@ -6,7 +6,6 @@ import assert from 'assert';
|
||||
import debug from 'debug';
|
||||
import { PubSub } from 'graphql-subscriptions';
|
||||
import PgBoss from 'pg-boss';
|
||||
import { constants } from 'ethers';
|
||||
|
||||
import { JobQueue } from './job-queue';
|
||||
import { BlockProgressInterface, EventInterface, IndexerInterface, EthClient, EventsJobData, EventsQueueJobKind } from './types';
|
||||
@ -15,13 +14,9 @@ import { createPruningJob, processBlockByNumber } from './common';
|
||||
import { OrderDirection } from './database';
|
||||
import { HistoricalJobData, HistoricalJobResponseData } from './job-runner';
|
||||
import { JobQueueConfig, ServerConfig } from './config';
|
||||
import { wait } from './misc';
|
||||
|
||||
const EVENT = 'event';
|
||||
|
||||
// Time to wait for events queue to be empty
|
||||
const EMPTY_EVENTS_QUEUE_WAIT_TIME = 5000;
|
||||
|
||||
const DEFAULT_HISTORICAL_MAX_FETCH_AHEAD = 20_000;
|
||||
|
||||
const log = debug('vulcanize:events');
|
||||
@ -121,7 +116,7 @@ export class EventWatcher {
|
||||
|
||||
async startHistoricalBlockProcessing (startBlockNumber: number, endBlockNumber: number): Promise<void> {
|
||||
// Wait for events job queue to be empty so that historical processing does not move far ahead
|
||||
await this._waitForEmptyEventsQueue();
|
||||
await this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING);
|
||||
|
||||
this._historicalProcessingEndBlockNumber = endBlockNumber;
|
||||
log(`Starting historical block processing in batches from ${startBlockNumber} up to block ${this._historicalProcessingEndBlockNumber}`);
|
||||
@ -136,19 +131,6 @@ export class EventWatcher {
|
||||
);
|
||||
}
|
||||
|
||||
async _waitForEmptyEventsQueue (): Promise<void> {
|
||||
while (true) {
|
||||
// Get queue size for active and pending jobs
|
||||
const queueSize = await this._jobQueue.getQueueSize(QUEUE_EVENT_PROCESSING, 'completed');
|
||||
|
||||
if (queueSize === 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
await wait(EMPTY_EVENTS_QUEUE_WAIT_TIME);
|
||||
}
|
||||
}
|
||||
|
||||
async startRealtimeBlockProcessing (startBlockNumber: number): Promise<void> {
|
||||
log(`Starting realtime block processing from block ${startBlockNumber}`);
|
||||
await processBlockByNumber(this._jobQueue, startBlockNumber);
|
||||
@ -233,16 +215,6 @@ export class EventWatcher {
|
||||
|
||||
// Check if historical processing end block is reached
|
||||
if (nextBatchStartBlockNumber > this._historicalProcessingEndBlockNumber) {
|
||||
const [block] = await this._indexer.getBlocks({ blockNumber: this._historicalProcessingEndBlockNumber });
|
||||
const historicalProcessingEndBlockHash = block ? block.blockHash : constants.AddressZero;
|
||||
|
||||
// Update sync status chain head and canonical block to end block of historical processing
|
||||
const [syncStatus] = await Promise.all([
|
||||
this._indexer.updateSyncStatusCanonicalBlock(historicalProcessingEndBlockHash, this._historicalProcessingEndBlockNumber, true),
|
||||
this._indexer.updateSyncStatusChainHead(historicalProcessingEndBlockHash, this._historicalProcessingEndBlockNumber, true)
|
||||
]);
|
||||
log(`Sync status canonical block updated to ${syncStatus.latestCanonicalBlockNumber}`);
|
||||
|
||||
// Start realtime processing
|
||||
this.startBlockProcessing();
|
||||
return;
|
||||
|
@ -92,11 +92,9 @@ export type ResultEvent = {
|
||||
|
||||
export type ResultMeta = {
|
||||
block: {
|
||||
cid: string | null;
|
||||
hash: string;
|
||||
hash: string | null;
|
||||
number: number;
|
||||
timestamp: number;
|
||||
parentHash: string;
|
||||
timestamp: number | null;
|
||||
};
|
||||
deployment: string;
|
||||
hasIndexingErrors: boolean;
|
||||
@ -146,37 +144,42 @@ export class Indexer {
|
||||
}
|
||||
|
||||
async getMetaData (block: BlockHeight): Promise<ResultMeta | null> {
|
||||
let resultBlock: BlockProgressInterface | undefined;
|
||||
const resultBlock: ResultMeta['block'] = {
|
||||
hash: block.hash ?? null,
|
||||
number: block.number ?? 0,
|
||||
timestamp: null
|
||||
};
|
||||
|
||||
const syncStatus = await this.getSyncStatus();
|
||||
assert(syncStatus);
|
||||
|
||||
if (block.hash) {
|
||||
resultBlock = await this.getBlockProgress(block.hash);
|
||||
const blockProgress = await this.getBlockProgress(block.hash);
|
||||
assert(blockProgress, 'No block with hash found');
|
||||
resultBlock.number = blockProgress.blockNumber;
|
||||
resultBlock.timestamp = blockProgress.blockTimestamp;
|
||||
} else {
|
||||
const blockHeight = block.number ? block.number : syncStatus.latestIndexedBlockNumber - 1;
|
||||
let blockHeight = block.number;
|
||||
|
||||
if (!blockHeight) {
|
||||
blockHeight = syncStatus.latestProcessedBlockNumber;
|
||||
}
|
||||
|
||||
// Get all the blocks at a height
|
||||
const blocksAtHeight = await this.getBlocksAtHeight(blockHeight, false);
|
||||
const [blockProgress] = await this.getBlocksAtHeight(blockHeight, false);
|
||||
|
||||
if (blocksAtHeight.length) {
|
||||
resultBlock = blocksAtHeight[0];
|
||||
if (blockProgress) {
|
||||
resultBlock.hash = blockProgress.blockHash;
|
||||
resultBlock.number = blockProgress.blockNumber;
|
||||
resultBlock.timestamp = blockProgress.blockTimestamp;
|
||||
}
|
||||
}
|
||||
|
||||
return resultBlock
|
||||
? {
|
||||
block: {
|
||||
cid: resultBlock.cid,
|
||||
number: resultBlock.blockNumber,
|
||||
hash: resultBlock.blockHash,
|
||||
timestamp: resultBlock.blockTimestamp,
|
||||
parentHash: resultBlock.parentHash
|
||||
},
|
||||
deployment: '',
|
||||
hasIndexingErrors: syncStatus.hasIndexingError
|
||||
}
|
||||
: null;
|
||||
return {
|
||||
block: resultBlock,
|
||||
hasIndexingErrors: syncStatus.hasIndexingError,
|
||||
deployment: ''
|
||||
};
|
||||
}
|
||||
|
||||
async getSyncStatus (): Promise<SyncStatusInterface | undefined> {
|
||||
@ -247,12 +250,12 @@ export class Indexer {
|
||||
return res;
|
||||
}
|
||||
|
||||
async forceUpdateSyncStatus (blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
|
||||
async updateSyncStatusProcessedBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatusInterface> {
|
||||
const dbTx = await this._db.createTransactionRunner();
|
||||
let res;
|
||||
|
||||
try {
|
||||
res = await this._db.forceUpdateSyncStatus(dbTx, blockHash, blockNumber);
|
||||
res = await this._db.updateSyncStatusProcessedBlock(dbTx, blockHash, blockNumber, force);
|
||||
await dbTx.commitTransaction();
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
@ -264,7 +267,7 @@ export class Indexer {
|
||||
return res;
|
||||
}
|
||||
|
||||
async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise<SyncStatusInterface> {
|
||||
async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise<SyncStatusInterface | undefined> {
|
||||
const dbTx = await this._db.createTransactionRunner();
|
||||
let res;
|
||||
|
||||
@ -1320,6 +1323,10 @@ export class Indexer {
|
||||
await this.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
|
||||
}
|
||||
|
||||
if (syncStatus.latestProcessedBlockNumber > blockProgress.blockNumber) {
|
||||
await this.updateSyncStatusProcessedBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
|
||||
}
|
||||
|
||||
if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) {
|
||||
await this.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import debug from 'debug';
|
||||
import PgBoss from 'pg-boss';
|
||||
|
||||
import { jobCount, lastJobCompletedOn } from './metrics';
|
||||
import { wait } from './misc';
|
||||
|
||||
interface Config {
|
||||
dbConnectionString: string
|
||||
@ -15,7 +16,11 @@ interface Config {
|
||||
|
||||
type JobCallback = (job: PgBoss.JobWithDoneCallback<any, any>) => Promise<void>;
|
||||
|
||||
const JOBS_PER_INTERVAL = 5;
|
||||
// Default number of jobs fetched from DB per polling interval (newJobCheckInterval)
|
||||
const DEFAULT_JOBS_PER_INTERVAL = 5;
|
||||
|
||||
// Interval time to check for events queue to be empty
|
||||
const EMPTY_QUEUE_CHECK_INTERVAL = 5000;
|
||||
|
||||
const log = debug('vulcanize:job-queue');
|
||||
|
||||
@ -86,12 +91,13 @@ export class JobQueue {
|
||||
await this._boss.stop();
|
||||
}
|
||||
|
||||
async subscribe (queue: string, callback: JobCallback): Promise<string> {
|
||||
async subscribe (queue: string, callback: JobCallback, subscribeOptions: PgBoss.SubscribeOptions = {}): Promise<string> {
|
||||
return await this._boss.subscribe(
|
||||
queue,
|
||||
{
|
||||
teamSize: JOBS_PER_INTERVAL,
|
||||
teamConcurrency: 1
|
||||
teamSize: DEFAULT_JOBS_PER_INTERVAL,
|
||||
teamConcurrency: 1,
|
||||
...subscribeOptions
|
||||
},
|
||||
async (job) => {
|
||||
try {
|
||||
@ -111,7 +117,7 @@ export class JobQueue {
|
||||
return await this._boss.onComplete(
|
||||
queue,
|
||||
{
|
||||
teamSize: JOBS_PER_INTERVAL,
|
||||
teamSize: DEFAULT_JOBS_PER_INTERVAL,
|
||||
teamConcurrency: 1
|
||||
},
|
||||
async (job: PgBoss.JobWithDoneCallback<any, any>) => {
|
||||
@ -154,4 +160,17 @@ export class JobQueue {
|
||||
async getQueueSize (name: string, before: PgBoss.Subscription['state'] = 'active'): Promise<number> {
|
||||
return this._boss.getQueueSize(name, { before });
|
||||
}
|
||||
|
||||
async waitForEmptyQueue (queue: string): Promise<void> {
|
||||
while (true) {
|
||||
// Get queue size for active and pending jobs
|
||||
const queueSize = await this.getQueueSize(queue, 'completed');
|
||||
|
||||
if (queueSize === 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
await wait(EMPTY_QUEUE_CHECK_INTERVAL);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -4,7 +4,7 @@
|
||||
|
||||
import assert from 'assert';
|
||||
import debug from 'debug';
|
||||
import { ethers } from 'ethers';
|
||||
import { constants, ethers } from 'ethers';
|
||||
import { DeepPartial, In } from 'typeorm';
|
||||
import PgBoss from 'pg-boss';
|
||||
|
||||
@ -78,15 +78,27 @@ export class JobRunner {
|
||||
}
|
||||
|
||||
async subscribeHistoricalProcessingQueue (): Promise<void> {
|
||||
await this.jobQueue.subscribe(QUEUE_HISTORICAL_PROCESSING, async (job) => {
|
||||
await this.jobQueue.subscribe(
|
||||
QUEUE_HISTORICAL_PROCESSING,
|
||||
async (job) => {
|
||||
await this.processHistoricalBlocks(job);
|
||||
});
|
||||
},
|
||||
{
|
||||
teamSize: 1
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
async subscribeEventProcessingQueue (): Promise<void> {
|
||||
await this.jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||
await this.jobQueue.subscribe(
|
||||
QUEUE_EVENT_PROCESSING,
|
||||
async (job) => {
|
||||
await this.processEvent(job);
|
||||
});
|
||||
},
|
||||
{
|
||||
teamSize: 1
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
async subscribeHooksQueue (): Promise<void> {
|
||||
@ -162,6 +174,9 @@ export class JobRunner {
|
||||
if (startBlock < this._historicalProcessingCompletedUpto) {
|
||||
await this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING);
|
||||
|
||||
// Wait for events queue to be empty
|
||||
await this.jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING);
|
||||
|
||||
// Remove all watcher blocks and events data if startBlock is less than this._historicalProcessingCompletedUpto
|
||||
// This occurs when new contract is added (with filterLogsByAddresses set to true) and historical processing is restarted from a previous block
|
||||
log(`Restarting historical processing from block ${startBlock}`);
|
||||
@ -191,8 +206,48 @@ export class JobRunner {
|
||||
endBlock
|
||||
);
|
||||
|
||||
let batchEndBlockHash = constants.AddressZero;
|
||||
const blocksLength = blocks.length;
|
||||
|
||||
if (blocksLength) {
|
||||
// Push event processing job for each block
|
||||
const pushJobForBlockPromises = blocks.map(async block => {
|
||||
const pushEventProcessingJobsForBlocksPromise = this._pushEventProcessingJobsForBlocks(blocks);
|
||||
|
||||
if (blocks[blocksLength - 1].blockNumber === endBlock) {
|
||||
// If in blocks returned end block is same as the batch end block, set batchEndBlockHash
|
||||
batchEndBlockHash = blocks[blocksLength - 1].blockHash;
|
||||
} else {
|
||||
// Else fetch block hash from upstream for batch end block
|
||||
const [block] = await this._indexer.getBlocks({ blockNumber: endBlock });
|
||||
|
||||
if (block) {
|
||||
batchEndBlockHash = block.blockHash;
|
||||
}
|
||||
}
|
||||
|
||||
await pushEventProcessingJobsForBlocksPromise;
|
||||
}
|
||||
|
||||
// Update sync status canonical, indexed and chain head block to end block
|
||||
await Promise.all([
|
||||
this._indexer.updateSyncStatusCanonicalBlock(batchEndBlockHash, endBlock, true),
|
||||
this._indexer.updateSyncStatusIndexedBlock(batchEndBlockHash, endBlock, true),
|
||||
this._indexer.updateSyncStatusChainHead(batchEndBlockHash, endBlock, true)
|
||||
]);
|
||||
log(`Sync status canonical, indexed and chain head block updated to ${endBlock}`);
|
||||
|
||||
this._historicalProcessingCompletedUpto = endBlock;
|
||||
|
||||
await this.jobQueue.markComplete(
|
||||
job,
|
||||
{ isComplete: true, endBlock }
|
||||
);
|
||||
}
|
||||
|
||||
async _pushEventProcessingJobsForBlocks (blocks: BlockProgressInterface[]): Promise<void> {
|
||||
// Push event processing job for each block
|
||||
// const pushJobForBlockPromises = blocks.map(async block => {
|
||||
for (const block of blocks) {
|
||||
const eventsProcessingJob: EventsJobData = {
|
||||
kind: EventsQueueJobKind.EVENTS,
|
||||
blockHash: block.blockHash,
|
||||
@ -201,16 +256,9 @@ export class JobRunner {
|
||||
// Publishing when realtime processing is listening to events will cause problems
|
||||
publish: false
|
||||
};
|
||||
this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, eventsProcessingJob);
|
||||
});
|
||||
|
||||
await Promise.all(pushJobForBlockPromises);
|
||||
this._historicalProcessingCompletedUpto = endBlock;
|
||||
|
||||
await this.jobQueue.markComplete(
|
||||
job,
|
||||
{ isComplete: true, endBlock }
|
||||
);
|
||||
await this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, eventsProcessingJob);
|
||||
}
|
||||
}
|
||||
|
||||
async processEvent (job: PgBoss.JobWithDoneCallback<EventsJobData | ContractJobData, any>): Promise<EventInterface | void> {
|
||||
@ -305,7 +353,7 @@ export class JobRunner {
|
||||
await this.jobQueue.markComplete(job);
|
||||
}
|
||||
|
||||
async resetToPrevIndexedBlock (): Promise<void> {
|
||||
async resetToLatestProcessedBlock (): Promise<void> {
|
||||
const syncStatus = await this._indexer.getSyncStatus();
|
||||
|
||||
// Watcher running for first time if syncStatus does not exist
|
||||
@ -313,17 +361,13 @@ export class JobRunner {
|
||||
return;
|
||||
}
|
||||
|
||||
const blockProgress = await this._indexer.getBlockProgress(syncStatus.latestIndexedBlockHash);
|
||||
const blockProgress = await this._indexer.getBlockProgress(syncStatus.latestProcessedBlockHash);
|
||||
assert(blockProgress);
|
||||
assert(blockProgress.isComplete);
|
||||
|
||||
// Don't reset to previous block if block is complete (all events processed)
|
||||
if (blockProgress.isComplete) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Resetting to block before latest indexed block as all events should be processed in the previous block.
|
||||
// Reprocessing of events in subgraph watchers is not possible as DB transaction is not implemented.
|
||||
await this._indexer.resetWatcherToBlock(syncStatus.latestIndexedBlockNumber - 1);
|
||||
// Resetting to block with events that have been processed completely
|
||||
// Reprocessing of block events in subgraph watchers is not possible as DB transaction is not implemented.
|
||||
await this._indexer.resetWatcherToBlock(blockProgress.blockNumber);
|
||||
}
|
||||
|
||||
handleShutdown (): void {
|
||||
@ -554,17 +598,17 @@ export class JobRunner {
|
||||
|
||||
const prefetchedBlock = this._blockAndEventsMap.get(blockHash);
|
||||
assert(prefetchedBlock);
|
||||
|
||||
const { block } = prefetchedBlock;
|
||||
log(`Processing events for block ${block.blockNumber}`);
|
||||
|
||||
console.time('time:job-runner#_processEvents-events');
|
||||
console.time(`time:job-runner#_processEvents-events-${block.blockNumber}`);
|
||||
const isNewContractWatched = await processBatchEvents(
|
||||
this._indexer,
|
||||
block,
|
||||
this._jobQueueConfig.eventsInBatch,
|
||||
this._jobQueueConfig.subgraphEventsOrder
|
||||
);
|
||||
console.timeEnd('time:job-runner#_processEvents-events');
|
||||
console.timeEnd(`time:job-runner#_processEvents-events-${block.blockNumber}`);
|
||||
|
||||
// Update metrics
|
||||
lastProcessedBlockNumber.set(block.blockNumber);
|
||||
@ -574,13 +618,13 @@ export class JobRunner {
|
||||
|
||||
// Check if new contract was added and filterLogsByAddresses is set to true
|
||||
if (isNewContractWatched && this._indexer.upstreamConfig.ethServer.filterLogsByAddresses) {
|
||||
// Delete jobs for any pending events processing
|
||||
await this.jobQueue.deleteJobs(QUEUE_EVENT_PROCESSING);
|
||||
|
||||
// Check if historical processing is running and that current block is being processed was trigerred by historical processing
|
||||
if (this._historicalProcessingCompletedUpto && this._historicalProcessingCompletedUpto > block.blockNumber) {
|
||||
const nextBlockNumberToProcess = block.blockNumber + 1;
|
||||
|
||||
// Delete jobs for any pending historical processing
|
||||
await this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING);
|
||||
|
||||
// Push a new job to restart historical blocks processing after current block
|
||||
log('New contract added in historical processing with filterLogsByAddresses set to true');
|
||||
await this.jobQueue.pushJob(
|
||||
@ -592,6 +636,9 @@ export class JobRunner {
|
||||
{ priority: 1 }
|
||||
);
|
||||
}
|
||||
|
||||
// Delete jobs for any pending events processing
|
||||
await this.jobQueue.deleteJobs(QUEUE_EVENT_PROCESSING);
|
||||
}
|
||||
|
||||
if (this._endBlockProcessTimer) {
|
||||
@ -599,6 +646,7 @@ export class JobRunner {
|
||||
}
|
||||
|
||||
this._endBlockProcessTimer = lastBlockProcessDuration.startTimer();
|
||||
await this._indexer.updateSyncStatusProcessedBlock(block.blockHash, block.blockNumber);
|
||||
|
||||
// If this was a retry attempt, unset the indexing error flag in sync status
|
||||
if (isRetryAttempt) {
|
||||
@ -620,18 +668,18 @@ export class JobRunner {
|
||||
|
||||
// TODO: Remove processed entities for current block to avoid reprocessing of events
|
||||
|
||||
// Catch event processing error and push to job queue after some time with higher priority
|
||||
// Catch event processing error and push job again to job queue with higher priority
|
||||
log(`Retrying event processing after ${EVENTS_PROCESSING_RETRY_WAIT} ms`);
|
||||
await wait(EVENTS_PROCESSING_RETRY_WAIT);
|
||||
|
||||
// TODO: Stop job for next block in queue (in historical processing)
|
||||
|
||||
const eventsProcessingRetryJob: EventsJobData = { ...jobData, isRetryAttempt: true };
|
||||
|
||||
await this.jobQueue.pushJob(
|
||||
QUEUE_EVENT_PROCESSING,
|
||||
eventsProcessingRetryJob,
|
||||
{ priority: 1 }
|
||||
);
|
||||
|
||||
// Wait for some time before retrying job
|
||||
await wait(EVENTS_PROCESSING_RETRY_WAIT);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -39,6 +39,8 @@ export interface SyncStatusInterface {
|
||||
chainHeadBlockNumber: number;
|
||||
latestIndexedBlockHash: string;
|
||||
latestIndexedBlockNumber: number;
|
||||
latestProcessedBlockHash: string;
|
||||
latestProcessedBlockNumber: number;
|
||||
latestCanonicalBlockHash: string;
|
||||
latestCanonicalBlockNumber: number;
|
||||
initialIndexedBlockHash: string;
|
||||
@ -107,8 +109,8 @@ export interface IndexerInterface {
|
||||
updateSyncStatusChainHead (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
|
||||
updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
|
||||
updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
|
||||
forceUpdateSyncStatus (blockHash: string, blockNumber: number): Promise<SyncStatusInterface>
|
||||
updateSyncStatusIndexingError (hasIndexingError: boolean): Promise<SyncStatusInterface>
|
||||
updateSyncStatusIndexingError (hasIndexingError: boolean): Promise<SyncStatusInterface | undefined>
|
||||
updateSyncStatusProcessedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
|
||||
updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface | undefined>
|
||||
updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface>
|
||||
markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise<void>
|
||||
@ -171,8 +173,8 @@ export interface DatabaseInterface {
|
||||
updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
|
||||
updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
|
||||
updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
|
||||
forceUpdateSyncStatus (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatusInterface>;
|
||||
updateSyncStatusIndexingError (queryRunner: QueryRunner, hasIndexingError: boolean): Promise<SyncStatusInterface>;
|
||||
updateSyncStatusIndexingError (queryRunner: QueryRunner, hasIndexingError: boolean): Promise<SyncStatusInterface | undefined>;
|
||||
updateSyncStatusProcessedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
|
||||
saveEvents (queryRunner: QueryRunner, events: DeepPartial<EventInterface>[]): Promise<void>;
|
||||
saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<BlockProgressInterface>;
|
||||
saveEventEntity (queryRunner: QueryRunner, entity: EventInterface): Promise<EventInterface>;
|
||||
|
Loading…
Reference in New Issue
Block a user