mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-01-21 10:39:06 +00:00
Handle reorgs upto known canonical block hash (#157)
* Handle reorgs upto known canonical block hash. * Process block events strictly in order. * Support filling old blocks, process in-order. * Always publish block progress.
This commit is contained in:
parent
34de3d51b3
commit
26965f372f
@ -6,6 +6,7 @@ import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
|
||||
import { Event, UNKNOWN_EVENT_NAME } from './entity/Event';
|
||||
import { Contract } from './entity/Contract';
|
||||
import { BlockProgress } from './entity/BlockProgress';
|
||||
import { SyncStatus } from './entity/SyncStatus';
|
||||
|
||||
export class Database {
|
||||
_config: ConnectionOptions
|
||||
@ -97,18 +98,48 @@ export class Database {
|
||||
blockTimestamp,
|
||||
numEvents,
|
||||
numProcessedEvents: 0,
|
||||
lastProcessedEventIndex: -1,
|
||||
isComplete: (numEvents === 0)
|
||||
});
|
||||
|
||||
blockProgress = await blockProgressRepo.save(entity);
|
||||
|
||||
// Bulk insert events.
|
||||
events.forEach(event => event.block = blockProgress);
|
||||
events.forEach(event => {
|
||||
event.block = blockProgress;
|
||||
});
|
||||
|
||||
await tx.createQueryBuilder().insert().into(Event).values(events).execute();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async updateSyncStatus (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return await this._conn.transaction(async (tx) => {
|
||||
const repo = tx.getRepository(SyncStatus);
|
||||
|
||||
let entity = await repo.findOne();
|
||||
if (!entity) {
|
||||
entity = repo.create({
|
||||
latestCanonicalBlockHash: blockHash,
|
||||
latestCanonicalBlockNumber: blockNumber
|
||||
});
|
||||
}
|
||||
|
||||
if (blockNumber >= entity.latestCanonicalBlockNumber) {
|
||||
entity.chainHeadBlockHash = blockHash;
|
||||
entity.chainHeadBlockNumber = blockNumber;
|
||||
}
|
||||
|
||||
return await repo.save(entity);
|
||||
});
|
||||
}
|
||||
|
||||
async getSyncStatus (): Promise<SyncStatus | undefined> {
|
||||
const repo = this._conn.getRepository(SyncStatus);
|
||||
return repo.findOne();
|
||||
}
|
||||
|
||||
async getEvent (id: string): Promise<Event | undefined> {
|
||||
return this._conn.getRepository(Event).findOne(id, { relations: ['block'] });
|
||||
}
|
||||
@ -154,15 +185,21 @@ export class Database {
|
||||
return repo.findOne({ where: { blockHash } });
|
||||
}
|
||||
|
||||
async updateBlockProgress (blockHash: string): Promise<void> {
|
||||
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
|
||||
await this._conn.transaction(async (tx) => {
|
||||
const repo = tx.getRepository(BlockProgress);
|
||||
const entity = await repo.findOne({ where: { blockHash } });
|
||||
if (entity && !entity.isComplete) {
|
||||
if (lastProcessedEventIndex <= entity.lastProcessedEventIndex) {
|
||||
throw new Error(`Events processed out of order ${blockHash}, was ${entity.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`);
|
||||
}
|
||||
|
||||
entity.lastProcessedEventIndex = lastProcessedEventIndex;
|
||||
entity.numProcessedEvents++;
|
||||
if (entity.numProcessedEvents >= entity.numEvents) {
|
||||
entity.isComplete = true;
|
||||
}
|
||||
|
||||
await repo.save(entity);
|
||||
}
|
||||
});
|
||||
|
@ -26,6 +26,9 @@ export class BlockProgress {
|
||||
@Column('integer')
|
||||
numProcessedEvents!: number;
|
||||
|
||||
@Column('integer')
|
||||
lastProcessedEventIndex!: number;
|
||||
|
||||
@Column('boolean')
|
||||
isComplete!: boolean
|
||||
}
|
||||
|
23
packages/uni-watcher/src/entity/SyncStatus.ts
Normal file
23
packages/uni-watcher/src/entity/SyncStatus.ts
Normal file
@ -0,0 +1,23 @@
|
||||
import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm';
|
||||
|
||||
@Entity()
|
||||
export class SyncStatus {
|
||||
@PrimaryGeneratedColumn()
|
||||
id!: number;
|
||||
|
||||
// Latest block hash and number from the chain itself.
|
||||
@Column('varchar', { length: 66 })
|
||||
chainHeadBlockHash!: string;
|
||||
|
||||
@Column('integer')
|
||||
chainHeadBlockNumber!: number;
|
||||
|
||||
// Most recent block hash and number that we can consider as part
|
||||
// of the canonical/finalized chain. Reorgs older than this block
|
||||
// cannot be processed and processing will halt.
|
||||
@Column('varchar', { length: 66 })
|
||||
latestCanonicalBlockHash!: string;
|
||||
|
||||
@Column('integer')
|
||||
latestCanonicalBlockNumber!: number;
|
||||
}
|
@ -8,7 +8,7 @@ import { JobQueue } from '@vulcanize/util';
|
||||
|
||||
import { Indexer } from './indexer';
|
||||
import { BlockProgress } from './entity/BlockProgress';
|
||||
import { UNKNOWN_EVENT_NAME } from './entity/Event';
|
||||
import { Event, UNKNOWN_EVENT_NAME } from './entity/Event';
|
||||
|
||||
const log = debug('vulcanize:events');
|
||||
|
||||
@ -42,22 +42,44 @@ export class EventWatcher {
|
||||
async start (): Promise<void> {
|
||||
assert(!this._subscription, 'subscription already started');
|
||||
|
||||
log('Started watching upstream blocks...');
|
||||
await this.watchBlocksAtChainHead();
|
||||
await this.initBlockProcessingOnCompleteHandler();
|
||||
await this.initEventProcessingOnCompleteHandler();
|
||||
}
|
||||
|
||||
async watchBlocksAtChainHead () {
|
||||
log('Started watching upstream blocks...');
|
||||
this._subscription = await this._ethClient.watchBlocks(async (value) => {
|
||||
const { blockHash, blockNumber, parentHash } = _.get(value, 'data.listen.relatedNode');
|
||||
|
||||
await this._indexer.updateSyncStatus(blockHash, blockNumber);
|
||||
|
||||
log('watchBlock', blockHash, blockNumber);
|
||||
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber, parentHash });
|
||||
});
|
||||
}
|
||||
|
||||
async initBlockProcessingOnCompleteHandler () {
|
||||
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||
const { data: { request: { data: { blockHash, blockNumber } } } } = job;
|
||||
log(`Job onComplete block ${blockHash} ${blockNumber}`);
|
||||
const blockProgress = await this._indexer.getBlockProgress(blockHash);
|
||||
if (blockProgress) {
|
||||
await this.publishBlockProgressToSubscribers(blockProgress);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async initEventProcessingOnCompleteHandler () {
|
||||
this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||
const { data: { request, failed, state, createdOn } } = job;
|
||||
|
||||
const dbEvent = await this._indexer.getEvent(request.data.id);
|
||||
assert(dbEvent);
|
||||
|
||||
await this._indexer.updateBlockProgress(dbEvent.block.blockHash);
|
||||
await this._indexer.updateBlockProgress(dbEvent.block.blockHash, dbEvent.index);
|
||||
const blockProgress = await this._indexer.getBlockProgress(dbEvent.block.blockHash);
|
||||
if (blockProgress && request.data.publishBlockProgress) {
|
||||
if (blockProgress) {
|
||||
await this.publishBlockProgressToSubscribers(blockProgress);
|
||||
}
|
||||
|
||||
@ -66,23 +88,15 @@ export class EventWatcher {
|
||||
if (!failed && state === 'completed' && request.data.publish) {
|
||||
// Check for max acceptable lag time between request and sending results to live subscribers.
|
||||
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
|
||||
return await this.publishUniswapEventToSubscribers(request.data.id, timeElapsedInSeconds);
|
||||
return await this.publishUniswapEventToSubscribers(dbEvent, timeElapsedInSeconds);
|
||||
} else {
|
||||
log(`event ${request.data.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
this._subscription = await this._ethClient.watchBlocks(async (value) => {
|
||||
const { blockHash, blockNumber } = _.get(value, 'data.listen.relatedNode');
|
||||
log('watchBlock', blockHash, blockNumber);
|
||||
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber });
|
||||
});
|
||||
}
|
||||
|
||||
async publishUniswapEventToSubscribers (id: string, timeElapsedInSeconds: number): Promise<void> {
|
||||
const dbEvent = await this._indexer.getEvent(id);
|
||||
|
||||
async publishUniswapEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
|
||||
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
|
||||
const resultEvent = this._indexer.getResultEvent(dbEvent);
|
||||
|
||||
|
@ -75,12 +75,12 @@ export const main = async (): Promise<any> => {
|
||||
const result = await ethClient.getBlockWithTransactions(blockNumber.toString());
|
||||
const { allEthHeaderCids: { nodes: blockNodes } } = result;
|
||||
for (let bi = 0; bi < blockNodes.length; bi++) {
|
||||
const { blockHash, blockNumber } = blockNodes[bi];
|
||||
const { blockHash, blockNumber, parentHash } = blockNodes[bi];
|
||||
const blockProgress = await db.getBlockProgress(blockHash);
|
||||
if (blockProgress) {
|
||||
log(`Block number ${blockNumber}, block hash ${blockHash} already known, skip filling`);
|
||||
} else {
|
||||
await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber });
|
||||
await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber, parentHash });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ import { Contract, KIND_FACTORY, KIND_POOL, KIND_NFPM } from './entity/Contract'
|
||||
import { abi as factoryABI, storageLayout as factoryStorageLayout } from './artifacts/factory.json';
|
||||
import { abi as nfpmABI, storageLayout as nfpmStorageLayout } from './artifacts/NonfungiblePositionManager.json';
|
||||
import poolABI from './artifacts/pool.json';
|
||||
import { SyncStatus } from './entity/SyncStatus';
|
||||
|
||||
// TODO: Move to config.
|
||||
const MAX_EVENTS_BLOCK_RANGE = 1000;
|
||||
@ -96,16 +97,20 @@ export class Indexer {
|
||||
const blockProgress = await this._db.getBlockProgress(blockHash);
|
||||
if (!blockProgress) {
|
||||
// Fetch and save events first and make a note in the event sync progress table.
|
||||
log(`getBlockEvents: db miss, fetching from upstream server ${blockHash}`);
|
||||
await this.fetchAndSaveEvents(blockHash);
|
||||
log('getBlockEvents: db miss, fetching from upstream server');
|
||||
}
|
||||
|
||||
const events = await this._db.getBlockEvents(blockHash);
|
||||
log(`getBlockEvents: db hit, num events: ${events.length}`);
|
||||
log(`getBlockEvents: db hit, ${blockHash} num events: ${events.length}`);
|
||||
|
||||
return events;
|
||||
}
|
||||
|
||||
async getBlockEvents (blockHash: string): Promise<Array<Event>> {
|
||||
return this._db.getBlockEvents(blockHash);
|
||||
}
|
||||
|
||||
async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise<Array<Event>> {
|
||||
if (contract) {
|
||||
const uniContract = await this.isUniswapContract(contract);
|
||||
@ -345,6 +350,19 @@ export class Indexer {
|
||||
await this._db.saveEvents(block, dbEvents);
|
||||
}
|
||||
|
||||
async updateSyncStatus (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return this._db.updateSyncStatus(blockHash, blockNumber);
|
||||
}
|
||||
|
||||
async getSyncStatus (): Promise<SyncStatus | undefined> {
|
||||
return this._db.getSyncStatus();
|
||||
}
|
||||
|
||||
async getBlock (blockHash: string): Promise<any> {
|
||||
const { block } = await this._ethClient.getLogs({ blockHash });
|
||||
return block;
|
||||
}
|
||||
|
||||
async getEvent (id: string): Promise<Event | undefined> {
|
||||
return this._db.getEvent(id);
|
||||
}
|
||||
@ -357,8 +375,8 @@ export class Indexer {
|
||||
return this._db.getBlockProgress(blockHash);
|
||||
}
|
||||
|
||||
async updateBlockProgress (blockHash: string): Promise<void> {
|
||||
return this._db.updateBlockProgress(blockHash);
|
||||
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
|
||||
return this._db.updateBlockProgress(blockHash, lastProcessedEventIndex);
|
||||
}
|
||||
|
||||
async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> {
|
||||
@ -377,7 +395,7 @@ export class Indexer {
|
||||
return this._db.getEventsInRange(fromBlockNumber, toBlockNumber);
|
||||
}
|
||||
|
||||
async position (blockHash: string, tokenId: string) {
|
||||
async position (blockHash: string, tokenId: string): Promise<any> {
|
||||
const nfpmContract = await this._db.getLatestContract('nfpm');
|
||||
assert(nfpmContract, 'No NFPM contract watched.');
|
||||
const { value, proof } = await this._getStorageValue(nfpmStorageLayout, blockHash, nfpmContract.address, '_positions', BigInt(tokenId));
|
||||
@ -388,7 +406,7 @@ export class Indexer {
|
||||
};
|
||||
}
|
||||
|
||||
async poolIdToPoolKey (blockHash: string, poolId: string) {
|
||||
async poolIdToPoolKey (blockHash: string, poolId: string): Promise<any> {
|
||||
const nfpmContract = await this._db.getLatestContract('nfpm');
|
||||
assert(nfpmContract, 'No NFPM contract watched.');
|
||||
const { value, proof } = await this._getStorageValue(nfpmStorageLayout, blockHash, nfpmContract.address, '_poolIdToPoolKey', BigInt(poolId));
|
||||
@ -399,7 +417,7 @@ export class Indexer {
|
||||
};
|
||||
}
|
||||
|
||||
async getPool (blockHash: string, token0: string, token1: string, fee: string) {
|
||||
async getPool (blockHash: string, token0: string, token1: string, fee: string): Promise<any> {
|
||||
const factoryContract = await this._db.getLatestContract('factory');
|
||||
assert(factoryContract, 'No Factory contract watched.');
|
||||
const { value, proof } = await this._getStorageValue(factoryStorageLayout, blockHash, factoryContract.address, 'getPool', token0, token1, BigInt(fee));
|
||||
|
@ -10,7 +10,7 @@ import { getConfig, JobQueue } from '@vulcanize/util';
|
||||
|
||||
import { Indexer } from './indexer';
|
||||
import { Database } from './database';
|
||||
import { UNKNOWN_EVENT_NAME } from './entity/Event';
|
||||
import { UNKNOWN_EVENT_NAME, Event } from './entity/Event';
|
||||
import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './events';
|
||||
|
||||
const log = debug('vulcanize:job-runner');
|
||||
@ -59,10 +59,47 @@ export const main = async (): Promise<any> => {
|
||||
await jobQueue.start();
|
||||
|
||||
await jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||
const { data: { blockHash, blockNumber } } = job;
|
||||
const { data: { blockHash, blockNumber, parentHash, priority } } = job;
|
||||
|
||||
log(`Processing block number ${blockNumber} hash ${blockHash} `);
|
||||
|
||||
// Check if parent block has been processed yet, if not, push a high priority job to process that first and abort.
|
||||
// However, don't go beyond the `latestCanonicalBlockHash` from SyncStatus as we have to assume the reorg can't be that deep.
|
||||
let syncStatus = await indexer.getSyncStatus();
|
||||
if (!syncStatus) {
|
||||
syncStatus = await indexer.updateSyncStatus(blockHash, blockNumber);
|
||||
}
|
||||
|
||||
if (blockHash !== syncStatus.latestCanonicalBlockHash) {
|
||||
const parent = await indexer.getBlockProgress(parentHash);
|
||||
if (!parent) {
|
||||
const { number: parentBlockNumber, parent: { hash: grandparentHash } } = await indexer.getBlock(parentHash);
|
||||
|
||||
// Create a higher priority job to index parent block and then abort.
|
||||
// We don't have to worry about aborting as this job will get retried later.
|
||||
const newPriority = (priority || 0) + 1;
|
||||
await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, {
|
||||
blockHash: parentHash,
|
||||
blockNumber: parentBlockNumber,
|
||||
parentHash: grandparentHash,
|
||||
priority: newPriority
|
||||
}, { priority: newPriority });
|
||||
|
||||
const message = `Parent block number ${parentBlockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash} not fetched yet, aborting`;
|
||||
log(message);
|
||||
|
||||
throw new Error(message);
|
||||
}
|
||||
|
||||
if (parentHash !== syncStatus.latestCanonicalBlockHash && !parent.isComplete) {
|
||||
// Parent block indexing needs to finish before this block can be indexed.
|
||||
const message = `Indexing incomplete for parent block number ${parent.blockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash}, aborting`;
|
||||
log(message);
|
||||
|
||||
throw new Error(message);
|
||||
}
|
||||
}
|
||||
|
||||
const events = await indexer.getOrFetchBlockEvents(blockHash);
|
||||
for (let ei = 0; ei < events.length; ei++) {
|
||||
await jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true });
|
||||
@ -79,20 +116,49 @@ export const main = async (): Promise<any> => {
|
||||
let dbEvent = await indexer.getEvent(id);
|
||||
assert(dbEvent);
|
||||
|
||||
const uniContract = await indexer.isUniswapContract(dbEvent.contract);
|
||||
const event: Event = dbEvent;
|
||||
|
||||
// Confirm that the parent block has been completely processed.
|
||||
// We don't have to worry about aborting as this job will get retried later.
|
||||
const parent = await indexer.getBlockProgress(event.block.parentHash);
|
||||
if (!parent || !parent.isComplete) {
|
||||
const message = `Abort processing of event ${id} as parent block not processed yet`;
|
||||
throw new Error(message);
|
||||
}
|
||||
|
||||
const blockProgress = await indexer.getBlockProgress(event.block.blockHash);
|
||||
assert(blockProgress);
|
||||
|
||||
const events = await indexer.getBlockEvents(event.block.blockHash);
|
||||
const eventIndex = events.findIndex(e => e.id === event.id);
|
||||
assert(eventIndex !== -1);
|
||||
|
||||
// Check if previous event in block has been processed exactly before this and abort if not.
|
||||
if (eventIndex > 0) { // Skip the first event in the block.
|
||||
const prevIndex = eventIndex - 1;
|
||||
const prevEvent = events[prevIndex];
|
||||
if (prevEvent.index !== blockProgress.lastProcessedEventIndex) {
|
||||
throw new Error(`Events received out of order for block number ${event.block.blockNumber} hash ${event.block.blockHash},` +
|
||||
` prev event index ${prevEvent.index}, got event index ${event.index} and lastProcessedEventIndex ${blockProgress.lastProcessedEventIndex}, aborting`);
|
||||
}
|
||||
}
|
||||
|
||||
const uniContract = await indexer.isUniswapContract(event.contract);
|
||||
if (uniContract) {
|
||||
// We might not have parsed this event yet. This can happen if the contract was added
|
||||
// as a result of a previous event in the same block.
|
||||
if (dbEvent.eventName === UNKNOWN_EVENT_NAME) {
|
||||
const logObj = JSON.parse(dbEvent.extraInfo);
|
||||
if (event.eventName === UNKNOWN_EVENT_NAME) {
|
||||
const logObj = JSON.parse(event.extraInfo);
|
||||
const { eventName, eventInfo } = indexer.parseEventNameAndArgs(uniContract.kind, logObj);
|
||||
dbEvent.eventName = eventName;
|
||||
dbEvent.eventInfo = JSON.stringify(eventInfo);
|
||||
dbEvent = await indexer.saveEventEntity(dbEvent);
|
||||
event.eventName = eventName;
|
||||
event.eventInfo = JSON.stringify(eventInfo);
|
||||
dbEvent = await indexer.saveEventEntity(event);
|
||||
}
|
||||
|
||||
dbEvent = await indexer.getEvent(id);
|
||||
await indexer.processEvent(dbEvent!);
|
||||
assert(dbEvent);
|
||||
|
||||
await indexer.processEvent(dbEvent);
|
||||
}
|
||||
|
||||
await jobQueue.markComplete(job);
|
||||
|
@ -65,10 +65,10 @@ export class JobQueue {
|
||||
this._boss.complete(job.id);
|
||||
}
|
||||
|
||||
async pushJob (queue: string, job: any): Promise<void> {
|
||||
async pushJob (queue: string, job: any, options: PgBoss.PublishOptions = {}): Promise<void> {
|
||||
assert(this._boss);
|
||||
|
||||
const jobId = await this._boss.publish(queue, job);
|
||||
const jobId = await this._boss.publish(queue, job, options);
|
||||
log(`Created job in queue ${queue}: ${jobId}`);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user