mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-02-08 11:02:52 +00:00
Prune abandoned branches (#166)
* Prune chain queue, refactoring job runner. * Prune blocks on non-canonical branches. * Prune blocks not reachable from head, update canonical block. * Note to move pruning to block processing queue.
This commit is contained in:
parent
df01b6539b
commit
175fa48d71
@ -114,19 +114,55 @@ export class Database {
|
||||
});
|
||||
}
|
||||
|
||||
async updateSyncStatus (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return await this._conn.transaction(async (tx) => {
|
||||
const repo = tx.getRepository(SyncStatus);
|
||||
|
||||
const entity = await repo.findOne();
|
||||
assert(entity);
|
||||
|
||||
if (blockNumber >= entity.latestIndexedBlockNumber) {
|
||||
entity.latestIndexedBlockHash = blockHash;
|
||||
entity.latestIndexedBlockNumber = blockNumber;
|
||||
}
|
||||
|
||||
return await repo.save(entity);
|
||||
});
|
||||
}
|
||||
|
||||
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return await this._conn.transaction(async (tx) => {
|
||||
const repo = tx.getRepository(SyncStatus);
|
||||
|
||||
const entity = await repo.findOne();
|
||||
assert(entity);
|
||||
|
||||
if (blockNumber >= entity.latestCanonicalBlockNumber) {
|
||||
entity.latestCanonicalBlockHash = blockHash;
|
||||
entity.latestCanonicalBlockNumber = blockNumber;
|
||||
}
|
||||
|
||||
return await repo.save(entity);
|
||||
});
|
||||
}
|
||||
|
||||
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return await this._conn.transaction(async (tx) => {
|
||||
const repo = tx.getRepository(SyncStatus);
|
||||
|
||||
let entity = await repo.findOne();
|
||||
if (!entity) {
|
||||
entity = repo.create({
|
||||
chainHeadBlockHash: blockHash,
|
||||
chainHeadBlockNumber: blockNumber,
|
||||
latestCanonicalBlockHash: blockHash,
|
||||
latestCanonicalBlockNumber: blockNumber
|
||||
latestCanonicalBlockNumber: blockNumber,
|
||||
latestIndexedBlockHash: '',
|
||||
latestIndexedBlockNumber: -1
|
||||
});
|
||||
}
|
||||
|
||||
if (blockNumber >= entity.latestCanonicalBlockNumber) {
|
||||
if (blockNumber >= entity.chainHeadBlockNumber) {
|
||||
entity.chainHeadBlockHash = blockHash;
|
||||
entity.chainHeadBlockNumber = blockNumber;
|
||||
}
|
||||
@ -180,6 +216,19 @@ export class Database {
|
||||
});
|
||||
}
|
||||
|
||||
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgress[]> {
|
||||
return this._conn.getRepository(BlockProgress)
|
||||
.createQueryBuilder('block_progress')
|
||||
.where('block_number = :height AND is_pruned = :isPruned', { height, isPruned })
|
||||
.getMany();
|
||||
}
|
||||
|
||||
async markBlockAsPruned (block: BlockProgress): Promise<BlockProgress> {
|
||||
const repo = this._conn.getRepository(BlockProgress);
|
||||
block.isPruned = true;
|
||||
return repo.save(block);
|
||||
}
|
||||
|
||||
async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {
|
||||
const repo = this._conn.getRepository(BlockProgress);
|
||||
return repo.findOne({ where: { blockHash } });
|
||||
|
@ -31,4 +31,7 @@ export class BlockProgress {
|
||||
|
||||
@Column('boolean')
|
||||
isComplete!: boolean
|
||||
|
||||
@Column('boolean', { default: false })
|
||||
isPruned!: boolean
|
||||
}
|
||||
|
@ -12,6 +12,14 @@ export class SyncStatus {
|
||||
@Column('integer')
|
||||
chainHeadBlockNumber!: number;
|
||||
|
||||
// Most recent block hash that's been indexed.
|
||||
@Column('varchar', { length: 66 })
|
||||
latestIndexedBlockHash!: string;
|
||||
|
||||
// Most recent block number that's been indexed.
|
||||
@Column('integer')
|
||||
latestIndexedBlockNumber!: number;
|
||||
|
||||
// Most recent block hash and number that we can consider as part
|
||||
// of the canonical/finalized chain. Reorgs older than this block
|
||||
// cannot be processed and processing will halt.
|
||||
|
@ -4,7 +4,7 @@ import _ from 'lodash';
|
||||
import { PubSub } from 'apollo-server-express';
|
||||
|
||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||
import { JobQueue } from '@vulcanize/util';
|
||||
import { JobQueue, MAX_REORG_DEPTH } from '@vulcanize/util';
|
||||
|
||||
import { Indexer } from './indexer';
|
||||
import { BlockProgress } from './entity/BlockProgress';
|
||||
@ -16,6 +16,7 @@ export const UniswapEvent = 'uniswap-event';
|
||||
export const BlockProgressEvent = 'block-progress-event';
|
||||
export const QUEUE_EVENT_PROCESSING = 'event-processing';
|
||||
export const QUEUE_BLOCK_PROCESSING = 'block-processing';
|
||||
export const QUEUE_CHAIN_PRUNING = 'chain-pruning';
|
||||
|
||||
export class EventWatcher {
|
||||
_ethClient: EthClient
|
||||
@ -45,6 +46,7 @@ export class EventWatcher {
|
||||
await this.watchBlocksAtChainHead();
|
||||
await this.initBlockProcessingOnCompleteHandler();
|
||||
await this.initEventProcessingOnCompleteHandler();
|
||||
await this.initChainPruningOnCompleteHandler();
|
||||
}
|
||||
|
||||
async watchBlocksAtChainHead (): Promise<void> {
|
||||
@ -52,7 +54,7 @@ export class EventWatcher {
|
||||
this._subscription = await this._ethClient.watchBlocks(async (value) => {
|
||||
const { blockHash, blockNumber, parentHash } = _.get(value, 'data.listen.relatedNode');
|
||||
|
||||
await this._indexer.updateSyncStatus(blockHash, blockNumber);
|
||||
await this._indexer.updateSyncStatusChainHead(blockHash, blockNumber);
|
||||
|
||||
log('watchBlock', blockHash, blockNumber);
|
||||
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber, parentHash });
|
||||
@ -63,6 +65,19 @@ export class EventWatcher {
|
||||
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||
const { data: { request: { data: { blockHash, blockNumber } } } } = job;
|
||||
log(`Job onComplete block ${blockHash} ${blockNumber}`);
|
||||
|
||||
// Update sync progress.
|
||||
const syncStatus = await this._indexer.updateSyncStatusIndexedBlock(blockHash, blockNumber);
|
||||
|
||||
// Create pruning job if required.
|
||||
if (syncStatus && syncStatus.latestIndexedBlockNumber > (syncStatus.latestCanonicalBlockNumber + MAX_REORG_DEPTH)) {
|
||||
// Create a job to prune at block height (latestCanonicalBlockNumber + 1)
|
||||
const pruneBlockHeight = syncStatus.latestCanonicalBlockNumber + 1;
|
||||
// TODO: Move this to the block processing queue to run pruning jobs at a higher priority than block processing jobs.
|
||||
await this._jobQueue.pushJob(QUEUE_CHAIN_PRUNING, { pruneBlockHeight });
|
||||
}
|
||||
|
||||
// Publish block progress event.
|
||||
const blockProgress = await this._indexer.getBlockProgress(blockHash);
|
||||
if (blockProgress) {
|
||||
await this.publishBlockProgressToSubscribers(blockProgress);
|
||||
@ -96,6 +111,21 @@ export class EventWatcher {
|
||||
});
|
||||
}
|
||||
|
||||
async initChainPruningOnCompleteHandler (): Promise<void> {
|
||||
this._jobQueue.onComplete(QUEUE_CHAIN_PRUNING, async (job) => {
|
||||
const { data: { request: { data: { pruneBlockHeight } } } } = job;
|
||||
log(`Job onComplete chain pruning ${pruneBlockHeight}`);
|
||||
|
||||
const blocks = await this._indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
||||
|
||||
// Only one canonical (not pruned) block should exist at the pruned height.
|
||||
assert(blocks.length === 1);
|
||||
const [block] = blocks;
|
||||
|
||||
await this._indexer.updateSyncStatusCanonicalBlock(block.blockHash, block.blockNumber);
|
||||
});
|
||||
}
|
||||
|
||||
async publishUniswapEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
|
||||
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
|
||||
const resultEvent = this._indexer.getResultEvent(dbEvent);
|
||||
|
@ -350,8 +350,16 @@ export class Indexer {
|
||||
await this._db.saveEvents(block, dbEvents);
|
||||
}
|
||||
|
||||
async updateSyncStatus (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return this._db.updateSyncStatus(blockHash, blockNumber);
|
||||
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return this._db.updateSyncStatusIndexedBlock(blockHash, blockNumber);
|
||||
}
|
||||
|
||||
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return this._db.updateSyncStatusChainHead(blockHash, blockNumber);
|
||||
}
|
||||
|
||||
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return this._db.updateSyncStatusCanonicalBlock(blockHash, blockNumber);
|
||||
}
|
||||
|
||||
async getSyncStatus (): Promise<SyncStatus | undefined> {
|
||||
@ -375,6 +383,41 @@ export class Indexer {
|
||||
return this._db.getBlockProgress(blockHash);
|
||||
}
|
||||
|
||||
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgress[]> {
|
||||
return this._db.getBlocksAtHeight(height, isPruned);
|
||||
}
|
||||
|
||||
async blockIsAncestor (ancestorBlockHash: string, blockHash: string, maxDepth: number): Promise<boolean> {
|
||||
assert(maxDepth > 0);
|
||||
|
||||
let depth = 0;
|
||||
let currentBlockHash = blockHash;
|
||||
let currentBlock;
|
||||
|
||||
// TODO: Use a hierarchical query to optimize this.
|
||||
while (depth < maxDepth) {
|
||||
depth++;
|
||||
|
||||
currentBlock = await this._db.getBlockProgress(currentBlockHash);
|
||||
if (!currentBlock) {
|
||||
break;
|
||||
} else {
|
||||
if (currentBlock.parentHash === ancestorBlockHash) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Descend the chain.
|
||||
currentBlockHash = currentBlock.parentHash;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
async markBlockAsPruned (block: BlockProgress): Promise<BlockProgress> {
|
||||
return this._db.markBlockAsPruned(block);
|
||||
}
|
||||
|
||||
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
|
||||
return this._db.updateBlockProgress(blockHash, lastProcessedEventIndex);
|
||||
}
|
||||
|
@ -6,15 +6,181 @@ import debug from 'debug';
|
||||
|
||||
import { getCache } from '@vulcanize/cache';
|
||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||
import { getConfig, JobQueue } from '@vulcanize/util';
|
||||
import { getConfig, JobQueue, MAX_REORG_DEPTH } from '@vulcanize/util';
|
||||
|
||||
import { Indexer } from './indexer';
|
||||
import { Database } from './database';
|
||||
import { UNKNOWN_EVENT_NAME, Event } from './entity/Event';
|
||||
import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './events';
|
||||
import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_CHAIN_PRUNING } from './events';
|
||||
|
||||
const log = debug('vulcanize:job-runner');
|
||||
|
||||
export class JobRunner {
|
||||
_indexer: Indexer
|
||||
_jobQueue: JobQueue
|
||||
|
||||
constructor (indexer: Indexer, jobQueue: JobQueue) {
|
||||
this._indexer = indexer;
|
||||
this._jobQueue = jobQueue;
|
||||
}
|
||||
|
||||
async start (): Promise<void> {
|
||||
await this.subscribeBlockProcessingQueue();
|
||||
await this.subscribeEventProcessingQueue();
|
||||
await this.subscribeChainPruningQueue();
|
||||
}
|
||||
|
||||
async subscribeBlockProcessingQueue (): Promise<void> {
|
||||
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||
const { data: { blockHash, blockNumber, parentHash, priority } } = job;
|
||||
|
||||
log(`Processing block number ${blockNumber} hash ${blockHash} `);
|
||||
|
||||
// Init sync status record if none exists.
|
||||
let syncStatus = await this._indexer.getSyncStatus();
|
||||
if (!syncStatus) {
|
||||
syncStatus = await this._indexer.updateSyncStatusChainHead(blockHash, blockNumber);
|
||||
}
|
||||
|
||||
// Check if parent block has been processed yet, if not, push a high priority job to process that first and abort.
|
||||
// However, don't go beyond the `latestCanonicalBlockHash` from SyncStatus as we have to assume the reorg can't be that deep.
|
||||
if (blockHash !== syncStatus.latestCanonicalBlockHash) {
|
||||
const parent = await this._indexer.getBlockProgress(parentHash);
|
||||
if (!parent) {
|
||||
const { number: parentBlockNumber, parent: { hash: grandparentHash } } = await this._indexer.getBlock(parentHash);
|
||||
|
||||
// Create a higher priority job to index parent block and then abort.
|
||||
// We don't have to worry about aborting as this job will get retried later.
|
||||
const newPriority = (priority || 0) + 1;
|
||||
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, {
|
||||
blockHash: parentHash,
|
||||
blockNumber: parentBlockNumber,
|
||||
parentHash: grandparentHash,
|
||||
priority: newPriority
|
||||
}, { priority: newPriority });
|
||||
|
||||
const message = `Parent block number ${parentBlockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash} not fetched yet, aborting`;
|
||||
log(message);
|
||||
|
||||
throw new Error(message);
|
||||
}
|
||||
|
||||
if (parentHash !== syncStatus.latestCanonicalBlockHash && !parent.isComplete) {
|
||||
// Parent block indexing needs to finish before this block can be indexed.
|
||||
const message = `Indexing incomplete for parent block number ${parent.blockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash}, aborting`;
|
||||
log(message);
|
||||
|
||||
throw new Error(message);
|
||||
}
|
||||
}
|
||||
|
||||
const events = await this._indexer.getOrFetchBlockEvents(blockHash);
|
||||
for (let ei = 0; ei < events.length; ei++) {
|
||||
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true });
|
||||
}
|
||||
|
||||
await this._jobQueue.markComplete(job);
|
||||
});
|
||||
}
|
||||
|
||||
async subscribeEventProcessingQueue (): Promise<void> {
|
||||
await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||
const { data: { id } } = job;
|
||||
|
||||
log(`Processing event ${id}`);
|
||||
|
||||
let dbEvent = await this._indexer.getEvent(id);
|
||||
assert(dbEvent);
|
||||
|
||||
const event: Event = dbEvent;
|
||||
|
||||
// Confirm that the parent block has been completely processed.
|
||||
// We don't have to worry about aborting as this job will get retried later.
|
||||
const parent = await this._indexer.getBlockProgress(event.block.parentHash);
|
||||
if (!parent || !parent.isComplete) {
|
||||
const message = `Abort processing of event ${id} as parent block not processed yet`;
|
||||
throw new Error(message);
|
||||
}
|
||||
|
||||
const blockProgress = await this._indexer.getBlockProgress(event.block.blockHash);
|
||||
assert(blockProgress);
|
||||
|
||||
const events = await this._indexer.getBlockEvents(event.block.blockHash);
|
||||
const eventIndex = events.findIndex((e: any) => e.id === event.id);
|
||||
assert(eventIndex !== -1);
|
||||
|
||||
// Check if previous event in block has been processed exactly before this and abort if not.
|
||||
if (eventIndex > 0) { // Skip the first event in the block.
|
||||
const prevIndex = eventIndex - 1;
|
||||
const prevEvent = events[prevIndex];
|
||||
if (prevEvent.index !== blockProgress.lastProcessedEventIndex) {
|
||||
throw new Error(`Events received out of order for block number ${event.block.blockNumber} hash ${event.block.blockHash},` +
|
||||
` prev event index ${prevEvent.index}, got event index ${event.index} and lastProcessedEventIndex ${blockProgress.lastProcessedEventIndex}, aborting`);
|
||||
}
|
||||
}
|
||||
|
||||
const uniContract = await this._indexer.isUniswapContract(event.contract);
|
||||
if (uniContract) {
|
||||
// We might not have parsed this event yet. This can happen if the contract was added
|
||||
// as a result of a previous event in the same block.
|
||||
if (event.eventName === UNKNOWN_EVENT_NAME) {
|
||||
const logObj = JSON.parse(event.extraInfo);
|
||||
const { eventName, eventInfo } = this._indexer.parseEventNameAndArgs(uniContract.kind, logObj);
|
||||
event.eventName = eventName;
|
||||
event.eventInfo = JSON.stringify(eventInfo);
|
||||
dbEvent = await this._indexer.saveEventEntity(event);
|
||||
}
|
||||
|
||||
dbEvent = await this._indexer.getEvent(id);
|
||||
assert(dbEvent);
|
||||
|
||||
await this._indexer.processEvent(dbEvent);
|
||||
}
|
||||
|
||||
await this._jobQueue.markComplete(job);
|
||||
});
|
||||
}
|
||||
|
||||
async subscribeChainPruningQueue (): Promise<void> {
|
||||
await this._jobQueue.subscribe(QUEUE_CHAIN_PRUNING, async (job) => {
|
||||
const pruneBlockHeight: number = job.data.pruneBlockHeight;
|
||||
|
||||
log(`Processing chain pruning at ${pruneBlockHeight}`);
|
||||
|
||||
// Assert we're at a depth where pruning is safe.
|
||||
const syncStatus = await this._indexer.getSyncStatus();
|
||||
assert(syncStatus);
|
||||
assert(syncStatus.latestIndexedBlockNumber >= (pruneBlockHeight + MAX_REORG_DEPTH));
|
||||
|
||||
// Check that we haven't already pruned at this depth.
|
||||
if (syncStatus.latestCanonicalBlockNumber >= pruneBlockHeight) {
|
||||
log(`Already pruned at block height ${pruneBlockHeight}, latestCanonicalBlockNumber ${syncStatus.latestCanonicalBlockNumber}`);
|
||||
} else {
|
||||
// Check how many branches there are at the given height/block number.
|
||||
const blocksAtHeight = await this._indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
||||
|
||||
// Should be at least 1.
|
||||
assert(blocksAtHeight.length);
|
||||
|
||||
// We have more than one node at this height, so prune all nodes not reachable from head.
|
||||
// This will lead to orphaned nodes, which will get pruned at the next height.
|
||||
if (blocksAtHeight.length > 1) {
|
||||
for (let i = 0; i < blocksAtHeight.length; i++) {
|
||||
const block = blocksAtHeight[i];
|
||||
// If this block is not reachable from the latest indexed block, mark it as pruned.
|
||||
const isAncestor = await this._indexer.blockIsAncestor(block.blockHash, syncStatus.latestIndexedBlockHash, MAX_REORG_DEPTH);
|
||||
if (!isAncestor) {
|
||||
await this._indexer.markBlockAsPruned(block);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await this._jobQueue.markComplete(job);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export const main = async (): Promise<any> => {
|
||||
const argv = await yargs(hideBin(process.argv))
|
||||
.option('f', {
|
||||
@ -58,111 +224,8 @@ export const main = async (): Promise<any> => {
|
||||
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag });
|
||||
await jobQueue.start();
|
||||
|
||||
await jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||
const { data: { blockHash, blockNumber, parentHash, priority } } = job;
|
||||
|
||||
log(`Processing block number ${blockNumber} hash ${blockHash} `);
|
||||
|
||||
// Check if parent block has been processed yet, if not, push a high priority job to process that first and abort.
|
||||
// However, don't go beyond the `latestCanonicalBlockHash` from SyncStatus as we have to assume the reorg can't be that deep.
|
||||
let syncStatus = await indexer.getSyncStatus();
|
||||
if (!syncStatus) {
|
||||
syncStatus = await indexer.updateSyncStatus(blockHash, blockNumber);
|
||||
}
|
||||
|
||||
if (blockHash !== syncStatus.latestCanonicalBlockHash) {
|
||||
const parent = await indexer.getBlockProgress(parentHash);
|
||||
if (!parent) {
|
||||
const { number: parentBlockNumber, parent: { hash: grandparentHash } } = await indexer.getBlock(parentHash);
|
||||
|
||||
// Create a higher priority job to index parent block and then abort.
|
||||
// We don't have to worry about aborting as this job will get retried later.
|
||||
const newPriority = (priority || 0) + 1;
|
||||
await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, {
|
||||
blockHash: parentHash,
|
||||
blockNumber: parentBlockNumber,
|
||||
parentHash: grandparentHash,
|
||||
priority: newPriority
|
||||
}, { priority: newPriority });
|
||||
|
||||
const message = `Parent block number ${parentBlockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash} not fetched yet, aborting`;
|
||||
log(message);
|
||||
|
||||
throw new Error(message);
|
||||
}
|
||||
|
||||
if (parentHash !== syncStatus.latestCanonicalBlockHash && !parent.isComplete) {
|
||||
// Parent block indexing needs to finish before this block can be indexed.
|
||||
const message = `Indexing incomplete for parent block number ${parent.blockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash}, aborting`;
|
||||
log(message);
|
||||
|
||||
throw new Error(message);
|
||||
}
|
||||
}
|
||||
|
||||
const events = await indexer.getOrFetchBlockEvents(blockHash);
|
||||
for (let ei = 0; ei < events.length; ei++) {
|
||||
await jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true });
|
||||
}
|
||||
|
||||
await jobQueue.markComplete(job);
|
||||
});
|
||||
|
||||
await jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||
const { data: { id } } = job;
|
||||
|
||||
log(`Processing event ${id}`);
|
||||
|
||||
let dbEvent = await indexer.getEvent(id);
|
||||
assert(dbEvent);
|
||||
|
||||
const event: Event = dbEvent;
|
||||
|
||||
// Confirm that the parent block has been completely processed.
|
||||
// We don't have to worry about aborting as this job will get retried later.
|
||||
const parent = await indexer.getBlockProgress(event.block.parentHash);
|
||||
if (!parent || !parent.isComplete) {
|
||||
const message = `Abort processing of event ${id} as parent block not processed yet`;
|
||||
throw new Error(message);
|
||||
}
|
||||
|
||||
const blockProgress = await indexer.getBlockProgress(event.block.blockHash);
|
||||
assert(blockProgress);
|
||||
|
||||
const events = await indexer.getBlockEvents(event.block.blockHash);
|
||||
const eventIndex = events.findIndex(e => e.id === event.id);
|
||||
assert(eventIndex !== -1);
|
||||
|
||||
// Check if previous event in block has been processed exactly before this and abort if not.
|
||||
if (eventIndex > 0) { // Skip the first event in the block.
|
||||
const prevIndex = eventIndex - 1;
|
||||
const prevEvent = events[prevIndex];
|
||||
if (prevEvent.index !== blockProgress.lastProcessedEventIndex) {
|
||||
throw new Error(`Events received out of order for block number ${event.block.blockNumber} hash ${event.block.blockHash},` +
|
||||
` prev event index ${prevEvent.index}, got event index ${event.index} and lastProcessedEventIndex ${blockProgress.lastProcessedEventIndex}, aborting`);
|
||||
}
|
||||
}
|
||||
|
||||
const uniContract = await indexer.isUniswapContract(event.contract);
|
||||
if (uniContract) {
|
||||
// We might not have parsed this event yet. This can happen if the contract was added
|
||||
// as a result of a previous event in the same block.
|
||||
if (event.eventName === UNKNOWN_EVENT_NAME) {
|
||||
const logObj = JSON.parse(event.extraInfo);
|
||||
const { eventName, eventInfo } = indexer.parseEventNameAndArgs(uniContract.kind, logObj);
|
||||
event.eventName = eventName;
|
||||
event.eventInfo = JSON.stringify(eventInfo);
|
||||
dbEvent = await indexer.saveEventEntity(event);
|
||||
}
|
||||
|
||||
dbEvent = await indexer.getEvent(id);
|
||||
assert(dbEvent);
|
||||
|
||||
await indexer.processEvent(dbEvent);
|
||||
}
|
||||
|
||||
await jobQueue.markComplete(job);
|
||||
});
|
||||
const jobRunner = new JobRunner(indexer, jobQueue);
|
||||
await jobRunner.start();
|
||||
};
|
||||
|
||||
main().then(() => {
|
||||
|
Loading…
Reference in New Issue
Block a user