From 26965f372fd0950981823db208efcc6144c7aac8 Mon Sep 17 00:00:00 2001 From: Ashwin Phatak Date: Thu, 22 Jul 2021 16:32:39 +0530 Subject: [PATCH] Handle reorgs upto known canonical block hash (#157) * Handle reorgs upto known canonical block hash. * Process block events strictly in order. * Support filling old blocks, process in-order. * Always publish block progress. --- packages/uni-watcher/src/database.ts | 41 ++++++++- .../uni-watcher/src/entity/BlockProgress.ts | 3 + packages/uni-watcher/src/entity/SyncStatus.ts | 23 +++++ packages/uni-watcher/src/events.ts | 42 ++++++---- packages/uni-watcher/src/fill.ts | 4 +- packages/uni-watcher/src/indexer.ts | 32 +++++-- packages/uni-watcher/src/job-runner.ts | 84 +++++++++++++++++-- packages/util/src/job-queue.ts | 4 +- 8 files changed, 197 insertions(+), 36 deletions(-) create mode 100644 packages/uni-watcher/src/entity/SyncStatus.ts diff --git a/packages/uni-watcher/src/database.ts b/packages/uni-watcher/src/database.ts index c3f81800..efe3ec62 100644 --- a/packages/uni-watcher/src/database.ts +++ b/packages/uni-watcher/src/database.ts @@ -6,6 +6,7 @@ import { SnakeNamingStrategy } from 'typeorm-naming-strategies'; import { Event, UNKNOWN_EVENT_NAME } from './entity/Event'; import { Contract } from './entity/Contract'; import { BlockProgress } from './entity/BlockProgress'; +import { SyncStatus } from './entity/SyncStatus'; export class Database { _config: ConnectionOptions @@ -97,18 +98,48 @@ export class Database { blockTimestamp, numEvents, numProcessedEvents: 0, + lastProcessedEventIndex: -1, isComplete: (numEvents === 0) }); blockProgress = await blockProgressRepo.save(entity); // Bulk insert events. - events.forEach(event => event.block = blockProgress); + events.forEach(event => { + event.block = blockProgress; + }); + await tx.createQueryBuilder().insert().into(Event).values(events).execute(); } }); } + async updateSyncStatus (blockHash: string, blockNumber: number): Promise { + return await this._conn.transaction(async (tx) => { + const repo = tx.getRepository(SyncStatus); + + let entity = await repo.findOne(); + if (!entity) { + entity = repo.create({ + latestCanonicalBlockHash: blockHash, + latestCanonicalBlockNumber: blockNumber + }); + } + + if (blockNumber >= entity.latestCanonicalBlockNumber) { + entity.chainHeadBlockHash = blockHash; + entity.chainHeadBlockNumber = blockNumber; + } + + return await repo.save(entity); + }); + } + + async getSyncStatus (): Promise { + const repo = this._conn.getRepository(SyncStatus); + return repo.findOne(); + } + async getEvent (id: string): Promise { return this._conn.getRepository(Event).findOne(id, { relations: ['block'] }); } @@ -154,15 +185,21 @@ export class Database { return repo.findOne({ where: { blockHash } }); } - async updateBlockProgress (blockHash: string): Promise { + async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { await this._conn.transaction(async (tx) => { const repo = tx.getRepository(BlockProgress); const entity = await repo.findOne({ where: { blockHash } }); if (entity && !entity.isComplete) { + if (lastProcessedEventIndex <= entity.lastProcessedEventIndex) { + throw new Error(`Events processed out of order ${blockHash}, was ${entity.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`); + } + + entity.lastProcessedEventIndex = lastProcessedEventIndex; entity.numProcessedEvents++; if (entity.numProcessedEvents >= entity.numEvents) { entity.isComplete = true; } + await repo.save(entity); } }); diff --git a/packages/uni-watcher/src/entity/BlockProgress.ts b/packages/uni-watcher/src/entity/BlockProgress.ts index ef732154..1e9aa8d9 100644 --- a/packages/uni-watcher/src/entity/BlockProgress.ts +++ b/packages/uni-watcher/src/entity/BlockProgress.ts @@ -26,6 +26,9 @@ export class BlockProgress { @Column('integer') numProcessedEvents!: number; + @Column('integer') + lastProcessedEventIndex!: number; + @Column('boolean') isComplete!: boolean } diff --git a/packages/uni-watcher/src/entity/SyncStatus.ts b/packages/uni-watcher/src/entity/SyncStatus.ts new file mode 100644 index 00000000..4a9c167f --- /dev/null +++ b/packages/uni-watcher/src/entity/SyncStatus.ts @@ -0,0 +1,23 @@ +import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm'; + +@Entity() +export class SyncStatus { + @PrimaryGeneratedColumn() + id!: number; + + // Latest block hash and number from the chain itself. + @Column('varchar', { length: 66 }) + chainHeadBlockHash!: string; + + @Column('integer') + chainHeadBlockNumber!: number; + + // Most recent block hash and number that we can consider as part + // of the canonical/finalized chain. Reorgs older than this block + // cannot be processed and processing will halt. + @Column('varchar', { length: 66 }) + latestCanonicalBlockHash!: string; + + @Column('integer') + latestCanonicalBlockNumber!: number; +} diff --git a/packages/uni-watcher/src/events.ts b/packages/uni-watcher/src/events.ts index 20a26fdb..6b157b5e 100644 --- a/packages/uni-watcher/src/events.ts +++ b/packages/uni-watcher/src/events.ts @@ -8,7 +8,7 @@ import { JobQueue } from '@vulcanize/util'; import { Indexer } from './indexer'; import { BlockProgress } from './entity/BlockProgress'; -import { UNKNOWN_EVENT_NAME } from './entity/Event'; +import { Event, UNKNOWN_EVENT_NAME } from './entity/Event'; const log = debug('vulcanize:events'); @@ -42,22 +42,44 @@ export class EventWatcher { async start (): Promise { assert(!this._subscription, 'subscription already started'); - log('Started watching upstream blocks...'); + await this.watchBlocksAtChainHead(); + await this.initBlockProcessingOnCompleteHandler(); + await this.initEventProcessingOnCompleteHandler(); + } + async watchBlocksAtChainHead () { + log('Started watching upstream blocks...'); + this._subscription = await this._ethClient.watchBlocks(async (value) => { + const { blockHash, blockNumber, parentHash } = _.get(value, 'data.listen.relatedNode'); + + await this._indexer.updateSyncStatus(blockHash, blockNumber); + + log('watchBlock', blockHash, blockNumber); + await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber, parentHash }); + }); + } + + async initBlockProcessingOnCompleteHandler () { this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => { const { data: { request: { data: { blockHash, blockNumber } } } } = job; log(`Job onComplete block ${blockHash} ${blockNumber}`); + const blockProgress = await this._indexer.getBlockProgress(blockHash); + if (blockProgress) { + await this.publishBlockProgressToSubscribers(blockProgress); + } }); + } + async initEventProcessingOnCompleteHandler () { this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => { const { data: { request, failed, state, createdOn } } = job; const dbEvent = await this._indexer.getEvent(request.data.id); assert(dbEvent); - await this._indexer.updateBlockProgress(dbEvent.block.blockHash); + await this._indexer.updateBlockProgress(dbEvent.block.blockHash, dbEvent.index); const blockProgress = await this._indexer.getBlockProgress(dbEvent.block.blockHash); - if (blockProgress && request.data.publishBlockProgress) { + if (blockProgress) { await this.publishBlockProgressToSubscribers(blockProgress); } @@ -66,23 +88,15 @@ export class EventWatcher { if (!failed && state === 'completed' && request.data.publish) { // Check for max acceptable lag time between request and sending results to live subscribers. if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) { - return await this.publishUniswapEventToSubscribers(request.data.id, timeElapsedInSeconds); + return await this.publishUniswapEventToSubscribers(dbEvent, timeElapsedInSeconds); } else { log(`event ${request.data.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`); } } }); - - this._subscription = await this._ethClient.watchBlocks(async (value) => { - const { blockHash, blockNumber } = _.get(value, 'data.listen.relatedNode'); - log('watchBlock', blockHash, blockNumber); - await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber }); - }); } - async publishUniswapEventToSubscribers (id: string, timeElapsedInSeconds: number): Promise { - const dbEvent = await this._indexer.getEvent(id); - + async publishUniswapEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise { if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) { const resultEvent = this._indexer.getResultEvent(dbEvent); diff --git a/packages/uni-watcher/src/fill.ts b/packages/uni-watcher/src/fill.ts index be9fe2ea..2c1333bd 100644 --- a/packages/uni-watcher/src/fill.ts +++ b/packages/uni-watcher/src/fill.ts @@ -75,12 +75,12 @@ export const main = async (): Promise => { const result = await ethClient.getBlockWithTransactions(blockNumber.toString()); const { allEthHeaderCids: { nodes: blockNodes } } = result; for (let bi = 0; bi < blockNodes.length; bi++) { - const { blockHash, blockNumber } = blockNodes[bi]; + const { blockHash, blockNumber, parentHash } = blockNodes[bi]; const blockProgress = await db.getBlockProgress(blockHash); if (blockProgress) { log(`Block number ${blockNumber}, block hash ${blockHash} already known, skip filling`); } else { - await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber }); + await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber, parentHash }); } } } diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index 9444bb36..c38bdcc7 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -16,6 +16,7 @@ import { Contract, KIND_FACTORY, KIND_POOL, KIND_NFPM } from './entity/Contract' import { abi as factoryABI, storageLayout as factoryStorageLayout } from './artifacts/factory.json'; import { abi as nfpmABI, storageLayout as nfpmStorageLayout } from './artifacts/NonfungiblePositionManager.json'; import poolABI from './artifacts/pool.json'; +import { SyncStatus } from './entity/SyncStatus'; // TODO: Move to config. const MAX_EVENTS_BLOCK_RANGE = 1000; @@ -96,16 +97,20 @@ export class Indexer { const blockProgress = await this._db.getBlockProgress(blockHash); if (!blockProgress) { // Fetch and save events first and make a note in the event sync progress table. + log(`getBlockEvents: db miss, fetching from upstream server ${blockHash}`); await this.fetchAndSaveEvents(blockHash); - log('getBlockEvents: db miss, fetching from upstream server'); } const events = await this._db.getBlockEvents(blockHash); - log(`getBlockEvents: db hit, num events: ${events.length}`); + log(`getBlockEvents: db hit, ${blockHash} num events: ${events.length}`); return events; } + async getBlockEvents (blockHash: string): Promise> { + return this._db.getBlockEvents(blockHash); + } + async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise> { if (contract) { const uniContract = await this.isUniswapContract(contract); @@ -345,6 +350,19 @@ export class Indexer { await this._db.saveEvents(block, dbEvents); } + async updateSyncStatus (blockHash: string, blockNumber: number): Promise { + return this._db.updateSyncStatus(blockHash, blockNumber); + } + + async getSyncStatus (): Promise { + return this._db.getSyncStatus(); + } + + async getBlock (blockHash: string): Promise { + const { block } = await this._ethClient.getLogs({ blockHash }); + return block; + } + async getEvent (id: string): Promise { return this._db.getEvent(id); } @@ -357,8 +375,8 @@ export class Indexer { return this._db.getBlockProgress(blockHash); } - async updateBlockProgress (blockHash: string): Promise { - return this._db.updateBlockProgress(blockHash); + async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { + return this._db.updateBlockProgress(blockHash, lastProcessedEventIndex); } async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> { @@ -377,7 +395,7 @@ export class Indexer { return this._db.getEventsInRange(fromBlockNumber, toBlockNumber); } - async position (blockHash: string, tokenId: string) { + async position (blockHash: string, tokenId: string): Promise { const nfpmContract = await this._db.getLatestContract('nfpm'); assert(nfpmContract, 'No NFPM contract watched.'); const { value, proof } = await this._getStorageValue(nfpmStorageLayout, blockHash, nfpmContract.address, '_positions', BigInt(tokenId)); @@ -388,7 +406,7 @@ export class Indexer { }; } - async poolIdToPoolKey (blockHash: string, poolId: string) { + async poolIdToPoolKey (blockHash: string, poolId: string): Promise { const nfpmContract = await this._db.getLatestContract('nfpm'); assert(nfpmContract, 'No NFPM contract watched.'); const { value, proof } = await this._getStorageValue(nfpmStorageLayout, blockHash, nfpmContract.address, '_poolIdToPoolKey', BigInt(poolId)); @@ -399,7 +417,7 @@ export class Indexer { }; } - async getPool (blockHash: string, token0: string, token1: string, fee: string) { + async getPool (blockHash: string, token0: string, token1: string, fee: string): Promise { const factoryContract = await this._db.getLatestContract('factory'); assert(factoryContract, 'No Factory contract watched.'); const { value, proof } = await this._getStorageValue(factoryStorageLayout, blockHash, factoryContract.address, 'getPool', token0, token1, BigInt(fee)); diff --git a/packages/uni-watcher/src/job-runner.ts b/packages/uni-watcher/src/job-runner.ts index 17397e87..555421ef 100644 --- a/packages/uni-watcher/src/job-runner.ts +++ b/packages/uni-watcher/src/job-runner.ts @@ -10,7 +10,7 @@ import { getConfig, JobQueue } from '@vulcanize/util'; import { Indexer } from './indexer'; import { Database } from './database'; -import { UNKNOWN_EVENT_NAME } from './entity/Event'; +import { UNKNOWN_EVENT_NAME, Event } from './entity/Event'; import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './events'; const log = debug('vulcanize:job-runner'); @@ -59,10 +59,47 @@ export const main = async (): Promise => { await jobQueue.start(); await jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { - const { data: { blockHash, blockNumber } } = job; + const { data: { blockHash, blockNumber, parentHash, priority } } = job; log(`Processing block number ${blockNumber} hash ${blockHash} `); + // Check if parent block has been processed yet, if not, push a high priority job to process that first and abort. + // However, don't go beyond the `latestCanonicalBlockHash` from SyncStatus as we have to assume the reorg can't be that deep. + let syncStatus = await indexer.getSyncStatus(); + if (!syncStatus) { + syncStatus = await indexer.updateSyncStatus(blockHash, blockNumber); + } + + if (blockHash !== syncStatus.latestCanonicalBlockHash) { + const parent = await indexer.getBlockProgress(parentHash); + if (!parent) { + const { number: parentBlockNumber, parent: { hash: grandparentHash } } = await indexer.getBlock(parentHash); + + // Create a higher priority job to index parent block and then abort. + // We don't have to worry about aborting as this job will get retried later. + const newPriority = (priority || 0) + 1; + await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { + blockHash: parentHash, + blockNumber: parentBlockNumber, + parentHash: grandparentHash, + priority: newPriority + }, { priority: newPriority }); + + const message = `Parent block number ${parentBlockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash} not fetched yet, aborting`; + log(message); + + throw new Error(message); + } + + if (parentHash !== syncStatus.latestCanonicalBlockHash && !parent.isComplete) { + // Parent block indexing needs to finish before this block can be indexed. + const message = `Indexing incomplete for parent block number ${parent.blockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash}, aborting`; + log(message); + + throw new Error(message); + } + } + const events = await indexer.getOrFetchBlockEvents(blockHash); for (let ei = 0; ei < events.length; ei++) { await jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true }); @@ -79,20 +116,49 @@ export const main = async (): Promise => { let dbEvent = await indexer.getEvent(id); assert(dbEvent); - const uniContract = await indexer.isUniswapContract(dbEvent.contract); + const event: Event = dbEvent; + + // Confirm that the parent block has been completely processed. + // We don't have to worry about aborting as this job will get retried later. + const parent = await indexer.getBlockProgress(event.block.parentHash); + if (!parent || !parent.isComplete) { + const message = `Abort processing of event ${id} as parent block not processed yet`; + throw new Error(message); + } + + const blockProgress = await indexer.getBlockProgress(event.block.blockHash); + assert(blockProgress); + + const events = await indexer.getBlockEvents(event.block.blockHash); + const eventIndex = events.findIndex(e => e.id === event.id); + assert(eventIndex !== -1); + + // Check if previous event in block has been processed exactly before this and abort if not. + if (eventIndex > 0) { // Skip the first event in the block. + const prevIndex = eventIndex - 1; + const prevEvent = events[prevIndex]; + if (prevEvent.index !== blockProgress.lastProcessedEventIndex) { + throw new Error(`Events received out of order for block number ${event.block.blockNumber} hash ${event.block.blockHash},` + + ` prev event index ${prevEvent.index}, got event index ${event.index} and lastProcessedEventIndex ${blockProgress.lastProcessedEventIndex}, aborting`); + } + } + + const uniContract = await indexer.isUniswapContract(event.contract); if (uniContract) { // We might not have parsed this event yet. This can happen if the contract was added // as a result of a previous event in the same block. - if (dbEvent.eventName === UNKNOWN_EVENT_NAME) { - const logObj = JSON.parse(dbEvent.extraInfo); + if (event.eventName === UNKNOWN_EVENT_NAME) { + const logObj = JSON.parse(event.extraInfo); const { eventName, eventInfo } = indexer.parseEventNameAndArgs(uniContract.kind, logObj); - dbEvent.eventName = eventName; - dbEvent.eventInfo = JSON.stringify(eventInfo); - dbEvent = await indexer.saveEventEntity(dbEvent); + event.eventName = eventName; + event.eventInfo = JSON.stringify(eventInfo); + dbEvent = await indexer.saveEventEntity(event); } dbEvent = await indexer.getEvent(id); - await indexer.processEvent(dbEvent!); + assert(dbEvent); + + await indexer.processEvent(dbEvent); } await jobQueue.markComplete(job); diff --git a/packages/util/src/job-queue.ts b/packages/util/src/job-queue.ts index 0c1dd144..c12e1ebc 100644 --- a/packages/util/src/job-queue.ts +++ b/packages/util/src/job-queue.ts @@ -65,10 +65,10 @@ export class JobQueue { this._boss.complete(job.id); } - async pushJob (queue: string, job: any): Promise { + async pushJob (queue: string, job: any, options: PgBoss.PublishOptions = {}): Promise { assert(this._boss); - const jobId = await this._boss.publish(queue, job); + const jobId = await this._boss.publish(queue, job, options); log(`Created job in queue ${queue}: ${jobId}`); } }