Implement chain pruning in uni-info-watcher (#222)

* Refactor code for chain pruning.

* Implement chain pruning in uni-info-watcher.

* Refactor pruning code to util.

Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
Ashwin Phatak 2021-08-19 13:27:32 +05:30 committed by GitHub
parent b8b216ea5b
commit 35068b2c3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 663 additions and 500 deletions

View File

@ -3,9 +3,9 @@
//
import assert from 'assert';
import { Brackets, Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, FindOneOptions, LessThanOrEqual, QueryRunner, Repository } from 'typeorm';
import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
import { MAX_REORG_DEPTH } from '@vulcanize/util';
import { Brackets, Connection, ConnectionOptions, DeepPartial, FindConditions, FindOneOptions, LessThanOrEqual, QueryRunner, Repository } from 'typeorm';
import { MAX_REORG_DEPTH, Database as BaseDatabase } from '@vulcanize/util';
import { EventSyncProgress } from './entity/EventProgress';
import { Factory } from './entity/Factory';
@ -72,30 +72,24 @@ interface Where {
export class Database {
_config: ConnectionOptions
_conn!: Connection
_baseDatabase: BaseDatabase
constructor (config: ConnectionOptions) {
assert(config);
this._config = config;
this._baseDatabase = new BaseDatabase(this._config);
}
async init (): Promise<void> {
assert(!this._conn);
this._conn = await createConnection({
...this._config,
namingStrategy: new SnakeNamingStrategy()
});
this._conn = await this._baseDatabase.init();
}
async close (): Promise<void> {
return this._conn.close();
return this._baseDatabase.close();
}
async createTransactionRunner (): Promise<QueryRunner> {
const queryRunner = this._conn.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
return queryRunner;
return this._baseDatabase.createTransactionRunner();
}
async getFactory (queryRunner: QueryRunner, { id, blockHash }: DeepPartial<Factory>): Promise<Factory | undefined> {
@ -731,27 +725,22 @@ export class Database {
}
}
async getEvent (id: string): Promise<Event | undefined> {
return this._conn.getRepository(Event).findOne(id, { relations: ['block'] });
}
async updateSyncStatus (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
const repo = queryRunner.manager.getRepository(SyncStatus);
let entity = await repo.findOne();
if (!entity) {
entity = repo.create({
latestCanonicalBlockHash: blockHash,
latestCanonicalBlockNumber: blockNumber
});
}
return this._baseDatabase.updateSyncStatusIndexedBlock(repo, blockHash, blockNumber);
}
if (blockNumber >= entity.latestCanonicalBlockNumber) {
entity.chainHeadBlockHash = blockHash;
entity.chainHeadBlockNumber = blockNumber;
}
async updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
const repo = queryRunner.manager.getRepository(SyncStatus);
return await repo.save(entity);
return this._baseDatabase.updateSyncStatusCanonicalBlock(repo, blockHash, blockNumber);
}
async updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
const repo = queryRunner.manager.getRepository(SyncStatus);
return this._baseDatabase.updateSyncStatusChainHead(repo, blockHash, blockNumber);
}
async getSyncStatus (queryRunner: QueryRunner): Promise<SyncStatus | undefined> {
@ -759,6 +748,22 @@ export class Database {
return repo.findOne();
}
async getEvent (id: string): Promise<Event | undefined> {
return this._conn.getRepository(Event).findOne(id, { relations: ['block'] });
}
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgress[]> {
const repo = this._conn.getRepository(BlockProgress);
return this._baseDatabase.getBlocksAtHeight(repo, height, isPruned);
}
async markBlockAsPruned (queryRunner: QueryRunner, block: BlockProgress): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);
return this._baseDatabase.markBlockAsPruned(repo, block);
}
async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {
const repo = this._conn.getRepository(BlockProgress);
return repo.findOne({ where: { blockHash } });

View File

@ -16,6 +16,14 @@ export class SyncStatus {
@Column('integer')
chainHeadBlockNumber!: number;
// Most recent block hash that's been indexed.
@Column('varchar', { length: 66 })
latestIndexedBlockHash!: string;
// Most recent block number that's been indexed.
@Column('integer')
latestIndexedBlockNumber!: number;
// Most recent block hash and number that we can consider as part
// of the canonical/finalized chain. Reorgs older than this block
// cannot be processed and processing will halt.

View File

@ -4,11 +4,10 @@
import assert from 'assert';
import debug from 'debug';
import _ from 'lodash';
import { PubSub } from 'apollo-server-express';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { EventWatcher as BaseEventWatcher, EventWatcherInterface, JobQueue, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from '@vulcanize/util';
import { EventWatcher as BaseEventWatcher, EventWatcherInterface, JobQueue, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_CHAIN_PRUNING } from '@vulcanize/util';
import { Indexer } from './indexer';
@ -123,65 +122,55 @@ export class EventWatcher implements EventWatcherInterface {
_subscription?: ZenObservable.Subscription
_pubsub: PubSub
_jobQueue: JobQueue
_eventWatcher: BaseEventWatcher
_baseEventWatcher: BaseEventWatcher
constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
this._ethClient = ethClient;
this._indexer = indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._eventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
}
getBlockProgressEventIterator (): AsyncIterator<any> {
return this._eventWatcher.getBlockProgressEventIterator();
return this._baseEventWatcher.getBlockProgressEventIterator();
}
async start (): Promise<void> {
assert(!this._subscription, 'subscription already started');
log('Started watching upstream events...');
await this.watchBlocksAtChainHead();
await this.initBlockProcessingOnCompleteHandler();
await this.initEventProcessingOnCompleteHandler();
await this.watchBlocksAtChainHead();
await this.initChainPruningOnCompleteHandler();
}
async stop (): Promise<void> {
if (this._subscription) {
log('Stopped watching upstream events');
this._subscription.unsubscribe();
}
this._baseEventWatcher.stop();
}
async watchBlocksAtChainHead (): Promise<void> {
this._subscription = await this._ethClient.watchBlocks(async (value) => {
await this._baseEventWatcher.blocksHandler(value);
});
}
async initBlockProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
const { data: { request: { data: { blockHash, blockNumber } } } } = job;
log(`Job onComplete block ${blockHash} ${blockNumber}`);
// Publish block progress event.
const blockProgress = await this._indexer.getBlockProgress(blockHash);
if (blockProgress) {
await this._eventWatcher.publishBlockProgressToSubscribers(blockProgress);
}
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
});
}
async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
await this._eventWatcher.eventProcessingCompleteHandler(job);
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
});
}
async watchBlocksAtChainHead (): Promise<void> {
log('Started watching upstream blocks...');
this._subscription = await this._ethClient.watchBlocks(async (value) => {
const { blockHash, blockNumber, parentHash, timestamp } = _.get(value, 'data.listen.relatedNode');
await this._indexer.updateSyncStatus(blockHash, blockNumber);
log('watchBlock', blockHash, blockNumber);
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber, parentHash, timestamp });
async initChainPruningOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_CHAIN_PRUNING, async (job) => {
await this._baseEventWatcher.chainPruningCompleteHandler(job);
});
}
}

View File

@ -74,7 +74,7 @@ export const main = async (): Promise<any> => {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(db, uniClient, erc20Client, ethClient);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, config);
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLag } = jobQueueConfig;

View File

@ -11,7 +11,7 @@ import { utils } from 'ethers';
import { Client as UniClient } from '@vulcanize/uni-watcher';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { IndexerInterface } from '@vulcanize/util';
import { Config, IndexerInterface, wait, Indexer as BaseIndexer } from '@vulcanize/util';
import { findEthPerToken, getEthPriceInUSD, getTrackedAmountUSD, sqrtPriceX96ToTokenPrices, WHITELIST_TOKENS } from './utils/pricing';
import { updatePoolDayData, updatePoolHourData, updateTokenDayData, updateTokenHourData, updateUniswapDayData } from './utils/interval-updates';
@ -49,8 +49,10 @@ export class Indexer implements IndexerInterface {
_uniClient: UniClient
_erc20Client: ERC20Client
_ethClient: EthClient
_config: Config
_baseIndexer: BaseIndexer
constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient) {
constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, config: Config) {
assert(db);
assert(uniClient);
assert(erc20Client);
@ -60,6 +62,8 @@ export class Indexer implements IndexerInterface {
this._uniClient = uniClient;
this._erc20Client = erc20Client;
this._ethClient = ethClient;
this._config = config;
this._baseIndexer = new BaseIndexer(this._db);
}
getResultEvent (event: Event): ResultEvent {
@ -94,6 +98,10 @@ export class Indexer implements IndexerInterface {
const blockProgress = await this._db.getBlockProgress(block.blockHash);
if (!blockProgress) {
const { jobQueue: { jobDelay } } = this._config;
assert(jobDelay);
// Delay to allow uni-watcher to process block.
await wait(jobDelay);
// Fetch and save events first and make a note in the event sync progress table.
await this._fetchAndSaveEvents(block);
log('getBlockEvents: db miss, fetching from upstream server');
@ -170,21 +178,16 @@ export class Indexer implements IndexerInterface {
log('Event processing completed for', eventName);
}
async updateSyncStatus (blockHash: string, blockNumber: number): Promise<SyncStatus> {
const dbTx = await this._db.createTransactionRunner();
let res;
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber);
}
try {
res = await this._db.updateSyncStatus(dbTx, blockHash, blockNumber);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber);
}
return res;
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber);
}
async getSyncStatus (): Promise<SyncStatus | undefined> {
@ -241,6 +244,18 @@ export class Indexer implements IndexerInterface {
return this._db.getBlockProgress(blockHash);
}
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgress[]> {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
async blockIsAncestor (ancestorBlockHash: string, blockHash: string, maxDepth: number): Promise<boolean> {
return this._baseIndexer.blockIsAncestor(ancestorBlockHash, blockHash, maxDepth);
}
async markBlockAsPruned (block: BlockProgress): Promise<BlockProgress> {
return this._baseIndexer.markBlockAsPruned(block);
}
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
const dbTx = await this._db.createTransactionRunner();
let res;

View File

@ -10,16 +10,76 @@ import debug from 'debug';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { Client as UniClient } from '@vulcanize/uni-watcher';
import { getConfig, JobQueue, wait, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from '@vulcanize/util';
import { getConfig, JobQueue, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_CHAIN_PRUNING, JobRunner as BaseJobRunner } from '@vulcanize/util';
import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { Indexer } from './indexer';
import { Database } from './database';
import { Event } from './entity/Event';
const log = debug('vulcanize:job-runner');
export class JobRunner {
_indexer: Indexer
_jobQueue: JobQueue
_baseJobRunner: BaseJobRunner
constructor (indexer: Indexer, jobQueue: JobQueue) {
this._indexer = indexer;
this._jobQueue = jobQueue;
this._baseJobRunner = new BaseJobRunner(this._indexer, this._jobQueue);
}
async start (): Promise<void> {
await this.subscribeBlockProcessingQueue();
await this.subscribeEventProcessingQueue();
await this.subscribeChainPruningQueue();
}
async subscribeBlockProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
await this._baseJobRunner.processBlock(job);
const { data: { blockHash, blockNumber, parentHash, timestamp } } = job;
// Check if block is being already processed.
// TODO: Debug issue block getting processed twice without this check. Can reproduce with NFPM.mint().
const blockProgress = await this._indexer.getBlockProgress(blockHash);
if (!blockProgress) {
const events = await this._indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
for (let ei = 0; ei < events.length; ei++) {
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true });
}
}
await this._jobQueue.markComplete(job);
});
}
async subscribeEventProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
const event = await this._baseJobRunner.processEvent(job);
// Check if event is processed.
if (!event.block.isComplete && event.index !== event.block.lastProcessedEventIndex) {
await this._indexer.processEvent(event);
}
await this._jobQueue.markComplete(job);
});
}
async subscribeChainPruningQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_CHAIN_PRUNING, async (job) => {
await this._baseJobRunner.pruneChain(job);
await this._jobQueue.markComplete(job);
});
}
}
export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv))
.option('f', {
@ -60,7 +120,7 @@ export const main = async (): Promise<any> => {
const erc20Client = new ERC20Client(tokenWatcher);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, config);
assert(jobQueueConfig, 'Missing job queue config');
@ -71,107 +131,8 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag });
await jobQueue.start();
await jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
const { data: { blockHash, blockNumber, parentHash, timestamp, priority } } = job;
log(`Processing block number ${blockNumber} hash ${blockHash} `);
// Init sync status record if none exists.
let syncStatus = await indexer.getSyncStatus();
if (!syncStatus) {
syncStatus = await indexer.updateSyncStatus(blockHash, blockNumber);
}
// Check if parent block has been processed yet, if not, push a high priority job to process that first and abort.
// However, don't go beyond the `latestCanonicalBlockHash` from SyncStatus as we have to assume the reorg can't be that deep.
if (blockHash !== syncStatus.latestCanonicalBlockHash) {
const parent = await indexer.getBlockProgress(parentHash);
if (!parent) {
const { number: parentBlockNumber, parent: { hash: grandparentHash }, timestamp: parentTimestamp } = await indexer.getBlock(parentHash);
// Create a higher priority job to index parent block and then abort.
// We don't have to worry about aborting as this job will get retried later.
const newPriority = (priority || 0) + 1;
await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, {
blockHash: parentHash,
blockNumber: parentBlockNumber,
parentHash: grandparentHash,
timestamp: parentTimestamp,
priority: newPriority
}, { priority: newPriority });
const message = `Parent block number ${parentBlockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash} not fetched yet, aborting`;
log(message);
throw new Error(message);
}
if (parentHash !== syncStatus.latestCanonicalBlockHash && !parent.isComplete) {
// Parent block indexing needs to finish before this block can be indexed.
const message = `Indexing incomplete for parent block number ${parent.blockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash}, aborting`;
log(message);
throw new Error(message);
}
}
// Check if block is being already processed.
const blockProgress = await indexer.getBlockProgress(blockHash);
if (!blockProgress) {
// Delay to allow uni-watcher to process block.
await wait(jobDelay);
const events = await indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
for (let ei = 0; ei < events.length; ei++) {
const { id } = events[ei];
await jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id });
}
}
await jobQueue.markComplete(job);
});
await jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
const { data: { id } } = job;
log(`Processing event ${id}`);
const dbEvent = await db.getEvent(id);
assert(dbEvent);
const event: Event = dbEvent;
// Confirm that the parent block has been completely processed.
// We don't have to worry about aborting as this job will get retried later.
const parent = await indexer.getBlockProgress(event.block.parentHash);
if (!parent || !parent.isComplete) {
const message = `Abort processing of event ${id} as parent block not processed yet`;
throw new Error(message);
}
const blockProgress = await indexer.getBlockProgress(event.block.blockHash);
assert(blockProgress);
const events = await indexer.getBlockEvents(event.block.blockHash);
const eventIndex = events.findIndex(e => e.id === event.id);
assert(eventIndex !== -1);
// Check if previous event in block has been processed exactly before this and abort if not.
if (eventIndex > 0) { // Skip the first event in the block.
const prevIndex = eventIndex - 1;
const prevEvent = events[prevIndex];
if (prevEvent.index !== blockProgress.lastProcessedEventIndex) {
throw new Error(`Events received out of order for block number ${event.block.blockNumber} hash ${event.block.blockHash},` +
` prev event index ${prevEvent.index}, got event index ${event.index} and lastProcessedEventIndex ${blockProgress.lastProcessedEventIndex}, aborting`);
}
}
// Check if event is processed.
if (!dbEvent.block.isComplete && event.index !== blockProgress.lastProcessedEventIndex) {
await indexer.processEvent(dbEvent);
}
await jobQueue.markComplete(job);
});
const jobRunner = new JobRunner(indexer, jobQueue);
await jobRunner.start();
};
main().then(() => {

View File

@ -74,7 +74,7 @@ export const main = async (): Promise<any> => {
const uniClient = new UniClient(uniWatcher);
const erc20Client = new ERC20Client(tokenWatcher);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, config);
assert(jobQueueConfig, 'Missing job queue config');

View File

@ -187,7 +187,7 @@ export const insertDummyBlock = async (db: TestDatabase, parentBlock: Block): Pr
blockTimestamp,
parentHash
};
await db.updateSyncStatus(dbTx, blockHash, blockNumber);
await db.updateSyncStatusChainHead(dbTx, blockHash, blockNumber);
await db.saveEvents(dbTx, block, []);
await dbTx.commitTransaction();

View File

@ -4,8 +4,9 @@
import assert from 'assert';
import _ from 'lodash';
import { Connection, ConnectionOptions, createConnection, DeepPartial, QueryRunner } from 'typeorm';
import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
import { Connection, ConnectionOptions, DeepPartial, QueryRunner } from 'typeorm';
import { Database as BaseDatabase } from '@vulcanize/util';
import { Event, UNKNOWN_EVENT_NAME } from './entity/Event';
import { Contract } from './entity/Contract';
@ -15,30 +16,24 @@ import { SyncStatus } from './entity/SyncStatus';
export class Database {
_config: ConnectionOptions
_conn!: Connection
_baseDatabase: BaseDatabase
constructor (config: ConnectionOptions) {
assert(config);
this._config = config;
this._baseDatabase = new BaseDatabase(this._config);
}
async init (): Promise<void> {
assert(!this._conn);
this._conn = await createConnection({
...this._config,
namingStrategy: new SnakeNamingStrategy()
});
this._conn = await this._baseDatabase.init();
}
async close (): Promise<void> {
return this._conn.close();
return this._baseDatabase.close();
}
async createTransactionRunner (): Promise<QueryRunner> {
const queryRunner = this._conn.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
return queryRunner;
return this._baseDatabase.createTransactionRunner();
}
async getBlockEvents (blockHash: string): Promise<Event[]> {
@ -124,52 +119,19 @@ export class Database {
async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
const repo = queryRunner.manager.getRepository(SyncStatus);
const entity = await repo.findOne();
assert(entity);
if (blockNumber >= entity.latestIndexedBlockNumber) {
entity.latestIndexedBlockHash = blockHash;
entity.latestIndexedBlockNumber = blockNumber;
}
return await repo.save(entity);
return this._baseDatabase.updateSyncStatusIndexedBlock(repo, blockHash, blockNumber);
}
async updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
const repo = queryRunner.manager.getRepository(SyncStatus);
const entity = await repo.findOne();
assert(entity);
if (blockNumber >= entity.latestCanonicalBlockNumber) {
entity.latestCanonicalBlockHash = blockHash;
entity.latestCanonicalBlockNumber = blockNumber;
}
return await repo.save(entity);
return this._baseDatabase.updateSyncStatusCanonicalBlock(repo, blockHash, blockNumber);
}
async updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
const repo = queryRunner.manager.getRepository(SyncStatus);
let entity = await repo.findOne();
if (!entity) {
entity = repo.create({
chainHeadBlockHash: blockHash,
chainHeadBlockNumber: blockNumber,
latestCanonicalBlockHash: blockHash,
latestCanonicalBlockNumber: blockNumber,
latestIndexedBlockHash: '',
latestIndexedBlockNumber: -1
});
}
if (blockNumber >= entity.chainHeadBlockNumber) {
entity.chainHeadBlockHash = blockHash;
entity.chainHeadBlockNumber = blockNumber;
}
return await repo.save(entity);
return this._baseDatabase.updateSyncStatusChainHead(repo, blockHash, blockNumber);
}
async getSyncStatus (): Promise<SyncStatus | undefined> {
@ -216,16 +178,15 @@ export class Database {
}
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgress[]> {
return this._conn.getRepository(BlockProgress)
.createQueryBuilder('block_progress')
.where('block_number = :height AND is_pruned = :isPruned', { height, isPruned })
.getMany();
const repo = this._conn.getRepository(BlockProgress);
return this._baseDatabase.getBlocksAtHeight(repo, height, isPruned);
}
async markBlockAsPruned (queryRunner: QueryRunner, block: BlockProgress): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);
block.isPruned = true;
return repo.save(block);
return this._baseDatabase.markBlockAsPruned(repo, block);
}
async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {

View File

@ -4,8 +4,10 @@
import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm';
import { SyncStatusInterface } from '@vulcanize/util';
@Entity()
export class SyncStatus {
export class SyncStatus implements SyncStatusInterface {
@PrimaryGeneratedColumn()
id!: number;

View File

@ -4,14 +4,12 @@
import assert from 'assert';
import debug from 'debug';
import _ from 'lodash';
import { PubSub } from 'apollo-server-express';
import { EthClient } from '@vulcanize/ipld-eth-client';
import {
JobQueue,
EventWatcher as BaseEventWatcher,
MAX_REORG_DEPTH,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
QUEUE_CHAIN_PRUNING,
@ -31,14 +29,14 @@ export class EventWatcher implements EventWatcherInterface {
_subscription?: ZenObservable.Subscription
_pubsub: PubSub
_jobQueue: JobQueue
_eventWatcher: BaseEventWatcher
_baseEventWatcher: BaseEventWatcher
constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
this._ethClient = ethClient;
this._indexer = indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._eventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
}
getEventIterator (): AsyncIterator<any> {
@ -46,7 +44,7 @@ export class EventWatcher implements EventWatcherInterface {
}
getBlockProgressEventIterator (): AsyncIterator<any> {
return this._eventWatcher.getBlockProgressEventIterator();
return this._baseEventWatcher.getBlockProgressEventIterator();
}
async start (): Promise<void> {
@ -58,46 +56,26 @@ export class EventWatcher implements EventWatcherInterface {
await this.initChainPruningOnCompleteHandler();
}
async stop (): Promise<void> {
this._baseEventWatcher.stop();
}
async watchBlocksAtChainHead (): Promise<void> {
log('Started watching upstream blocks...');
this._subscription = await this._ethClient.watchBlocks(async (value) => {
const { blockHash, blockNumber, parentHash, timestamp } = _.get(value, 'data.listen.relatedNode');
await this._indexer.updateSyncStatusChainHead(blockHash, blockNumber);
log('watchBlock', blockHash, blockNumber);
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber, parentHash, timestamp });
await this._baseEventWatcher.blocksHandler(value);
});
}
async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
const { data: { request: { data: { blockHash, blockNumber } } } } = job;
log(`Job onComplete block ${blockHash} ${blockNumber}`);
// Update sync progress.
const syncStatus = await this._indexer.updateSyncStatusIndexedBlock(blockHash, blockNumber);
// Create pruning job if required.
if (syncStatus && syncStatus.latestIndexedBlockNumber > (syncStatus.latestCanonicalBlockNumber + MAX_REORG_DEPTH)) {
// Create a job to prune at block height (latestCanonicalBlockNumber + 1)
const pruneBlockHeight = syncStatus.latestCanonicalBlockNumber + 1;
// TODO: Move this to the block processing queue to run pruning jobs at a higher priority than block processing jobs.
await this._jobQueue.pushJob(QUEUE_CHAIN_PRUNING, { pruneBlockHeight });
}
// Publish block progress event.
const blockProgress = await this._indexer.getBlockProgress(blockHash);
if (blockProgress) {
await this._eventWatcher.publishBlockProgressToSubscribers(blockProgress);
}
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
});
}
async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
const dbEvent = await this._eventWatcher.eventProcessingCompleteHandler(job);
const dbEvent = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
const { data: { request, failed, state, createdOn } } = job;
@ -116,16 +94,7 @@ export class EventWatcher implements EventWatcherInterface {
async initChainPruningOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_CHAIN_PRUNING, async (job) => {
const { data: { request: { data: { pruneBlockHeight } } } } = job;
log(`Job onComplete chain pruning ${pruneBlockHeight}`);
const blocks = await this._indexer.getBlocksAtHeight(pruneBlockHeight, false);
// Only one canonical (not pruned) block should exist at the pruned height.
assert(blocks.length === 1);
const [block] = blocks;
await this._indexer.updateSyncStatusCanonicalBlock(block.blockHash, block.blockNumber);
await this._baseEventWatcher.chainPruningCompleteHandler(job);
});
}

View File

@ -10,7 +10,7 @@ import assert from 'assert';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { GetStorageAt, getStorageValue, StorageLayout } from '@vulcanize/solidity-mapper';
import { Config } from '@vulcanize/util';
import { Config, IndexerInterface, Indexer as BaseIndexer } from '@vulcanize/util';
import { Database } from './database';
import { Event, UNKNOWN_EVENT_NAME } from './entity/Event';
@ -46,12 +46,13 @@ interface ValueResult {
}
}
export class Indexer {
export class Indexer implements IndexerInterface {
_config: Config;
_db: Database
_ethClient: EthClient
_postgraphileClient: EthClient
_getStorageAt: GetStorageAt
_baseIndexer: BaseIndexer
_factoryContract: ethers.utils.Interface
_poolContract: ethers.utils.Interface
@ -63,6 +64,7 @@ export class Indexer {
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
this._baseIndexer = new BaseIndexer(this._db);
this._factoryContract = new ethers.utils.Interface(factoryABI);
this._poolContract = new ethers.utils.Interface(poolABI);
@ -408,54 +410,15 @@ export class Indexer {
}
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.updateSyncStatusIndexedBlock(dbTx, blockHash, blockNumber);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber);
}
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatus> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.updateSyncStatusChainHead(dbTx, blockHash, blockNumber);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber);
}
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.updateSyncStatusCanonicalBlock(dbTx, blockHash, blockNumber);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber);
}
async getSyncStatus (): Promise<SyncStatus | undefined> {
@ -493,51 +456,15 @@ export class Indexer {
}
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgress[]> {
return this._db.getBlocksAtHeight(height, isPruned);
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
async blockIsAncestor (ancestorBlockHash: string, blockHash: string, maxDepth: number): Promise<boolean> {
assert(maxDepth > 0);
let depth = 0;
let currentBlockHash = blockHash;
let currentBlock;
// TODO: Use a hierarchical query to optimize this.
while (depth < maxDepth) {
depth++;
currentBlock = await this._db.getBlockProgress(currentBlockHash);
if (!currentBlock) {
break;
} else {
if (currentBlock.parentHash === ancestorBlockHash) {
return true;
}
// Descend the chain.
currentBlockHash = currentBlock.parentHash;
}
}
return false;
return this._baseIndexer.blockIsAncestor(ancestorBlockHash, blockHash, maxDepth);
}
async markBlockAsPruned (block: BlockProgress): Promise<BlockProgress> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.markBlockAsPruned(dbTx, block);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
return this._baseIndexer.markBlockAsPruned(block);
}
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {

View File

@ -10,21 +10,23 @@ import debug from 'debug';
import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { getConfig, JobQueue, MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_CHAIN_PRUNING } from '@vulcanize/util';
import { getConfig, JobQueue, JobRunner as BaseJobRunner, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_CHAIN_PRUNING } from '@vulcanize/util';
import { Indexer } from './indexer';
import { Database } from './database';
import { UNKNOWN_EVENT_NAME, Event } from './entity/Event';
import { UNKNOWN_EVENT_NAME } from './entity/Event';
const log = debug('vulcanize:job-runner');
export class JobRunner {
_indexer: Indexer
_jobQueue: JobQueue
_baseJobRunner: BaseJobRunner
constructor (indexer: Indexer, jobQueue: JobQueue) {
this._indexer = indexer;
this._jobQueue = jobQueue;
this._baseJobRunner = new BaseJobRunner(this._indexer, this._jobQueue);
}
async start (): Promise<void> {
@ -35,48 +37,9 @@ export class JobRunner {
async subscribeBlockProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
const { data: { blockHash, blockNumber, parentHash, timestamp, priority } } = job;
await this._baseJobRunner.processBlock(job);
log(`Processing block number ${blockNumber} hash ${blockHash} `);
// Init sync status record if none exists.
let syncStatus = await this._indexer.getSyncStatus();
if (!syncStatus) {
syncStatus = await this._indexer.updateSyncStatusChainHead(blockHash, blockNumber);
}
// Check if parent block has been processed yet, if not, push a high priority job to process that first and abort.
// However, don't go beyond the `latestCanonicalBlockHash` from SyncStatus as we have to assume the reorg can't be that deep.
if (blockHash !== syncStatus.latestCanonicalBlockHash) {
const parent = await this._indexer.getBlockProgress(parentHash);
if (!parent) {
const { number: parentBlockNumber, parent: { hash: grandparentHash }, timestamp: parentTimestamp } = await this._indexer.getBlock(parentHash);
// Create a higher priority job to index parent block and then abort.
// We don't have to worry about aborting as this job will get retried later.
const newPriority = (priority || 0) + 1;
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, {
blockHash: parentHash,
blockNumber: parentBlockNumber,
parentHash: grandparentHash,
timestamp: parentTimestamp,
priority: newPriority
}, { priority: newPriority });
const message = `Parent block number ${parentBlockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash} not fetched yet, aborting`;
log(message);
throw new Error(message);
}
if (parentHash !== syncStatus.latestCanonicalBlockHash && !parent.isComplete) {
// Parent block indexing needs to finish before this block can be indexed.
const message = `Indexing incomplete for parent block number ${parent.blockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash}, aborting`;
log(message);
throw new Error(message);
}
}
const { data: { blockHash, blockNumber, parentHash, timestamp } } = job;
const events = await this._indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
for (let ei = 0; ei < events.length; ei++) {
@ -89,40 +52,11 @@ export class JobRunner {
async subscribeEventProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
const event = await this._baseJobRunner.processEvent(job);
let dbEvent;
const { data: { id } } = job;
log(`Processing event ${id}`);
let dbEvent = await this._indexer.getEvent(id);
assert(dbEvent);
const event: Event = dbEvent;
// Confirm that the parent block has been completely processed.
// We don't have to worry about aborting as this job will get retried later.
const parent = await this._indexer.getBlockProgress(event.block.parentHash);
if (!parent || !parent.isComplete) {
const message = `Abort processing of event ${id} as parent block not processed yet`;
throw new Error(message);
}
const blockProgress = await this._indexer.getBlockProgress(event.block.blockHash);
assert(blockProgress);
const events = await this._indexer.getBlockEvents(event.block.blockHash);
const eventIndex = events.findIndex((e: any) => e.id === event.id);
assert(eventIndex !== -1);
// Check if previous event in block has been processed exactly before this and abort if not.
if (eventIndex > 0) { // Skip the first event in the block.
const prevIndex = eventIndex - 1;
const prevEvent = events[prevIndex];
if (prevEvent.index !== blockProgress.lastProcessedEventIndex) {
throw new Error(`Events received out of order for block number ${event.block.blockNumber} hash ${event.block.blockHash},` +
` prev event index ${prevEvent.index}, got event index ${event.index} and lastProcessedEventIndex ${blockProgress.lastProcessedEventIndex}, aborting`);
}
}
const uniContract = await this._indexer.isUniswapContract(event.contract);
if (uniContract) {
// We might not have parsed this event yet. This can happen if the contract was added
@ -147,38 +81,7 @@ export class JobRunner {
async subscribeChainPruningQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_CHAIN_PRUNING, async (job) => {
const pruneBlockHeight: number = job.data.pruneBlockHeight;
log(`Processing chain pruning at ${pruneBlockHeight}`);
// Assert we're at a depth where pruning is safe.
const syncStatus = await this._indexer.getSyncStatus();
assert(syncStatus);
assert(syncStatus.latestIndexedBlockNumber >= (pruneBlockHeight + MAX_REORG_DEPTH));
// Check that we haven't already pruned at this depth.
if (syncStatus.latestCanonicalBlockNumber >= pruneBlockHeight) {
log(`Already pruned at block height ${pruneBlockHeight}, latestCanonicalBlockNumber ${syncStatus.latestCanonicalBlockNumber}`);
} else {
// Check how many branches there are at the given height/block number.
const blocksAtHeight = await this._indexer.getBlocksAtHeight(pruneBlockHeight, false);
// Should be at least 1.
assert(blocksAtHeight.length);
// We have more than one node at this height, so prune all nodes not reachable from head.
// This will lead to orphaned nodes, which will get pruned at the next height.
if (blocksAtHeight.length > 1) {
for (let i = 0; i < blocksAtHeight.length; i++) {
const block = blocksAtHeight[i];
// If this block is not reachable from the latest indexed block, mark it as pruned.
const isAncestor = await this._indexer.blockIsAncestor(block.blockHash, syncStatus.latestIndexedBlockHash, MAX_REORG_DEPTH);
if (!isAncestor) {
await this._indexer.markBlockAsPruned(block);
}
}
}
}
await this._baseJobRunner.pruneChain(job);
await this._jobQueue.markComplete(job);
});

View File

@ -10,3 +10,5 @@ export * from './src/index';
export * from './src/fill';
export * from './src/events';
export * from './src/types';
export * from './src/indexer';
export * from './src/job-runner';

View File

@ -2,22 +2,96 @@
// Copyright 2021 Vulcanize, Inc.
//
import Decimal from 'decimal.js';
import { ValueTransformer } from 'typeorm';
import assert from 'assert';
import { Connection, ConnectionOptions, createConnection, QueryRunner, Repository } from 'typeorm';
import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
export const decimalTransformer: ValueTransformer = {
to: (value?: Decimal) => {
if (value) {
return value.toString();
}
import { BlockProgressInterface, SyncStatusInterface } from './types';
return value;
},
from: (value?: string) => {
if (value) {
return new Decimal(value);
}
export class Database {
_config: ConnectionOptions
_conn!: Connection
return value;
constructor (config: ConnectionOptions) {
assert(config);
this._config = config;
}
};
async init (): Promise<Connection> {
assert(!this._conn);
this._conn = await createConnection({
...this._config,
namingStrategy: new SnakeNamingStrategy()
});
return this._conn;
}
async close (): Promise<void> {
return this._conn.close();
}
async createTransactionRunner (): Promise<QueryRunner> {
const queryRunner = this._conn.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
return queryRunner;
}
async updateSyncStatusIndexedBlock (repo: Repository<SyncStatusInterface>, blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
const entity = await repo.findOne();
assert(entity);
if (blockNumber >= entity.latestIndexedBlockNumber) {
entity.latestIndexedBlockHash = blockHash;
entity.latestIndexedBlockNumber = blockNumber;
}
return await repo.save(entity);
}
async updateSyncStatusCanonicalBlock (repo: Repository<SyncStatusInterface>, blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
const entity = await repo.findOne();
assert(entity);
if (blockNumber >= entity.latestCanonicalBlockNumber) {
entity.latestCanonicalBlockHash = blockHash;
entity.latestCanonicalBlockNumber = blockNumber;
}
return await repo.save(entity);
}
async updateSyncStatusChainHead (repo: Repository<SyncStatusInterface>, blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
let entity = await repo.findOne();
if (!entity) {
entity = repo.create({
chainHeadBlockHash: blockHash,
chainHeadBlockNumber: blockNumber,
latestCanonicalBlockHash: blockHash,
latestCanonicalBlockNumber: blockNumber,
latestIndexedBlockHash: '',
latestIndexedBlockNumber: -1
});
}
if (blockNumber >= entity.chainHeadBlockNumber) {
entity.chainHeadBlockHash = blockHash;
entity.chainHeadBlockNumber = blockNumber;
}
return await repo.save(entity);
}
async getBlocksAtHeight (repo: Repository<BlockProgressInterface>, height: number, isPruned: boolean): Promise<BlockProgressInterface[]> {
return repo.createQueryBuilder('block_progress')
.where('block_number = :height AND is_pruned = :isPruned', { height, isPruned })
.getMany();
}
async markBlockAsPruned (repo: Repository<BlockProgressInterface>, block: BlockProgressInterface): Promise<BlockProgressInterface> {
block.isPruned = true;
return repo.save(block);
}
}

View File

@ -5,11 +5,13 @@
import assert from 'assert';
import debug from 'debug';
import { PubSub } from 'apollo-server-express';
import _ from 'lodash';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { JobQueue } from './job-queue';
import { BlockProgressInterface, EventInterface, IndexerInterface } from './types';
import { QUEUE_BLOCK_PROCESSING, QUEUE_CHAIN_PRUNING, MAX_REORG_DEPTH } from './constants';
const log = debug('vulcanize:events');
@ -33,6 +35,66 @@ export class EventWatcher {
return this._pubsub.asyncIterator([BlockProgressEvent]);
}
async blocksHandler (value: any): Promise<void> {
const { blockHash, blockNumber, parentHash, timestamp } = _.get(value, 'data.listen.relatedNode');
await this._indexer.updateSyncStatusChainHead(blockHash, blockNumber);
log('watchBlock', blockHash, blockNumber);
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber, parentHash, timestamp });
}
async blockProcessingCompleteHandler (job: any): Promise<void> {
const { data: { request: { data: { blockHash, blockNumber } } } } = job;
log(`Job onComplete block ${blockHash} ${blockNumber}`);
// Update sync progress.
const syncStatus = await this._indexer.updateSyncStatusIndexedBlock(blockHash, blockNumber);
// Create pruning job if required.
if (syncStatus && syncStatus.latestIndexedBlockNumber > (syncStatus.latestCanonicalBlockNumber + MAX_REORG_DEPTH)) {
// Create a job to prune at block height (latestCanonicalBlockNumber + 1)
const pruneBlockHeight = syncStatus.latestCanonicalBlockNumber + 1;
// TODO: Move this to the block processing queue to run pruning jobs at a higher priority than block processing jobs.
await this._jobQueue.pushJob(QUEUE_CHAIN_PRUNING, { pruneBlockHeight });
}
// Publish block progress event.
const blockProgress = await this._indexer.getBlockProgress(blockHash);
if (blockProgress) {
await this.publishBlockProgressToSubscribers(blockProgress);
}
}
async eventProcessingCompleteHandler (job: any): Promise<EventInterface> {
const { data: { request } } = job;
const dbEvent = await this._indexer.getEvent(request.data.id);
assert(dbEvent);
await this._indexer.updateBlockProgress(dbEvent.block.blockHash, dbEvent.index);
const blockProgress = await this._indexer.getBlockProgress(dbEvent.block.blockHash);
if (blockProgress) {
await this.publishBlockProgressToSubscribers(blockProgress);
}
return dbEvent;
}
async chainPruningCompleteHandler (job:any): Promise<void> {
const { data: { request: { data: { pruneBlockHeight } } } } = job;
log(`Job onComplete chain pruning ${pruneBlockHeight}`);
const blocks = await this._indexer.getBlocksAtHeight(pruneBlockHeight, false);
// Only one canonical (not pruned) block should exist at the pruned height.
assert(blocks.length === 1);
const [block] = blocks;
await this._indexer.updateSyncStatusCanonicalBlock(block.blockHash, block.blockNumber);
}
async publishBlockProgressToSubscribers (blockProgress: BlockProgressInterface): Promise<void> {
const { blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress;
@ -54,19 +116,4 @@ export class EventWatcher {
this._subscription.unsubscribe();
}
}
async eventProcessingCompleteHandler (job: any): Promise<EventInterface> {
const { data: { request } } = job;
const dbEvent = await this._indexer.getEvent(request.data.id);
assert(dbEvent);
await this._indexer.updateBlockProgress(dbEvent.block.blockHash, dbEvent.index);
const blockProgress = await this._indexer.getBlockProgress(dbEvent.block.blockHash);
if (blockProgress) {
await this.publishBlockProgressToSubscribers(blockProgress);
}
return dbEvent;
}
}

View File

@ -2,8 +2,31 @@
// Copyright 2021 Vulcanize, Inc.
//
import Decimal from 'decimal.js';
import { ValueTransformer } from 'typeorm';
/**
* Method to wait for specified time.
* @param time Time to wait in milliseconds
*/
export const wait = async (time: number): Promise<void> => new Promise(resolve => setTimeout(resolve, time));
/**
* Transformer used by typeorm entity for Decimal type fields
*/
export const decimalTransformer: ValueTransformer = {
to: (value?: Decimal) => {
if (value) {
return value.toString();
}
return value;
},
from: (value?: string) => {
if (value) {
return new Decimal(value);
}
return value;
}
};

View File

@ -0,0 +1,114 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import { BlockProgressInterface, DatabaseInterface, SyncStatusInterface } from './types';
export class Indexer {
_db: DatabaseInterface;
constructor (db: DatabaseInterface) {
this._db = db;
}
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.updateSyncStatusIndexedBlock(dbTx, blockHash, blockNumber);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.updateSyncStatusChainHead(dbTx, blockHash, blockNumber);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.updateSyncStatusCanonicalBlock(dbTx, blockHash, blockNumber);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]> {
return this._db.getBlocksAtHeight(height, isPruned);
}
async blockIsAncestor (ancestorBlockHash: string, blockHash: string, maxDepth: number): Promise<boolean> {
assert(maxDepth > 0);
let depth = 0;
let currentBlockHash = blockHash;
let currentBlock;
// TODO: Use a hierarchical query to optimize this.
while (depth < maxDepth) {
depth++;
currentBlock = await this._db.getBlockProgress(currentBlockHash);
if (!currentBlock) {
break;
} else {
if (currentBlock.parentHash === ancestorBlockHash) {
return true;
}
// Descend the chain.
currentBlockHash = currentBlock.parentHash;
}
}
return false;
}
async markBlockAsPruned (block: BlockProgressInterface): Promise<BlockProgressInterface> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.markBlockAsPruned(dbTx, block);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
}

View File

@ -0,0 +1,140 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import debug from 'debug';
import { MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING } from './constants';
import { JobQueue } from './job-queue';
import { EventInterface, IndexerInterface } from './types';
const log = debug('vulcanize:job-runner');
export class JobRunner {
_indexer: IndexerInterface
_jobQueue: JobQueue
constructor (indexer: IndexerInterface, jobQueue: JobQueue) {
this._indexer = indexer;
this._jobQueue = jobQueue;
}
async processBlock (job: any): Promise<void> {
const { data: { blockHash, blockNumber, parentHash, priority } } = job;
log(`Processing block number ${blockNumber} hash ${blockHash} `);
// Init sync status record if none exists.
let syncStatus = await this._indexer.getSyncStatus();
if (!syncStatus) {
syncStatus = await this._indexer.updateSyncStatusChainHead(blockHash, blockNumber);
}
// Check if parent block has been processed yet, if not, push a high priority job to process that first and abort.
// However, don't go beyond the `latestCanonicalBlockHash` from SyncStatus as we have to assume the reorg can't be that deep.
if (blockHash !== syncStatus.latestCanonicalBlockHash) {
const parent = await this._indexer.getBlockProgress(parentHash);
if (!parent) {
const { number: parentBlockNumber, parent: { hash: grandparentHash }, timestamp: parentTimestamp } = await this._indexer.getBlock(parentHash);
// Create a higher priority job to index parent block and then abort.
// We don't have to worry about aborting as this job will get retried later.
const newPriority = (priority || 0) + 1;
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, {
blockHash: parentHash,
blockNumber: parentBlockNumber,
parentHash: grandparentHash,
timestamp: parentTimestamp,
priority: newPriority
}, { priority: newPriority });
const message = `Parent block number ${parentBlockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash} not fetched yet, aborting`;
log(message);
throw new Error(message);
}
if (parentHash !== syncStatus.latestCanonicalBlockHash && !parent.isComplete) {
// Parent block indexing needs to finish before this block can be indexed.
const message = `Indexing incomplete for parent block number ${parent.blockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash}, aborting`;
log(message);
throw new Error(message);
}
}
}
async processEvent (job: any): Promise<EventInterface> {
const { data: { id } } = job;
log(`Processing event ${id}`);
const dbEvent = await this._indexer.getEvent(id);
assert(dbEvent);
const event = dbEvent;
// Confirm that the parent block has been completely processed.
// We don't have to worry about aborting as this job will get retried later.
const parent = await this._indexer.getBlockProgress(event.block.parentHash);
if (!parent || !parent.isComplete) {
const message = `Abort processing of event ${id} as parent block not processed yet`;
throw new Error(message);
}
const blockProgress = await this._indexer.getBlockProgress(event.block.blockHash);
assert(blockProgress);
const events = await this._indexer.getBlockEvents(event.block.blockHash);
const eventIndex = events.findIndex((e: any) => e.id === event.id);
assert(eventIndex !== -1);
// Check if previous event in block has been processed exactly before this and abort if not.
if (eventIndex > 0) { // Skip the first event in the block.
const prevIndex = eventIndex - 1;
const prevEvent = events[prevIndex];
if (prevEvent.index !== blockProgress.lastProcessedEventIndex) {
throw new Error(`Events received out of order for block number ${event.block.blockNumber} hash ${event.block.blockHash},` +
` prev event index ${prevEvent.index}, got event index ${event.index} and lastProcessedEventIndex ${blockProgress.lastProcessedEventIndex}, aborting`);
}
}
return event;
}
async pruneChain (job: any): Promise<void> {
const pruneBlockHeight: number = job.data.pruneBlockHeight;
log(`Processing chain pruning at ${pruneBlockHeight}`);
// Assert we're at a depth where pruning is safe.
const syncStatus = await this._indexer.getSyncStatus();
assert(syncStatus);
assert(syncStatus.latestIndexedBlockNumber >= (pruneBlockHeight + MAX_REORG_DEPTH));
// Check that we haven't already pruned at this depth.
if (syncStatus.latestCanonicalBlockNumber >= pruneBlockHeight) {
log(`Already pruned at block height ${pruneBlockHeight}, latestCanonicalBlockNumber ${syncStatus.latestCanonicalBlockNumber}`);
} else {
// Check how many branches there are at the given height/block number.
const blocksAtHeight = await this._indexer.getBlocksAtHeight(pruneBlockHeight, false);
// Should be at least 1.
assert(blocksAtHeight.length);
// We have more than one node at this height, so prune all nodes not reachable from head.
// This will lead to orphaned nodes, which will get pruned at the next height.
if (blocksAtHeight.length > 1) {
for (let i = 0; i < blocksAtHeight.length; i++) {
const block = blocksAtHeight[i];
// If this block is not reachable from the latest indexed block, mark it as pruned.
const isAncestor = await this._indexer.blockIsAncestor(block.blockHash, syncStatus.latestIndexedBlockHash, MAX_REORG_DEPTH);
if (!isAncestor) {
await this._indexer.markBlockAsPruned(block);
}
}
}
}
}
}

View File

@ -2,6 +2,8 @@
// Copyright 2021 Vulcanize, Inc.
//
import { QueryRunner } from 'typeorm';
export interface BlockProgressInterface {
id: number;
blockHash: string;
@ -19,6 +21,8 @@ export interface SyncStatusInterface {
id: number;
chainHeadBlockHash: string;
chainHeadBlockNumber: number;
latestIndexedBlockHash: string;
latestIndexedBlockNumber: number;
latestCanonicalBlockHash: string;
latestCanonicalBlockNumber: number;
}
@ -38,7 +42,16 @@ export interface EventInterface {
export interface IndexerInterface {
getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined>
getEvent (id: string): Promise<EventInterface | undefined>
getSyncStatus (): Promise<SyncStatusInterface | undefined>;
getBlock (blockHash: string): Promise<any>
getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]>;
getBlockEvents (blockHash: string): Promise<Array<EventInterface>>
blockIsAncestor (ancestorBlockHash: string, blockHash: string, maxDepth: number): Promise<boolean>;
updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void>
updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatusInterface>
updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatusInterface>
updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise<SyncStatusInterface>
markBlockAsPruned (block: BlockProgressInterface): Promise<BlockProgressInterface>;
}
export interface EventWatcherInterface {
@ -46,3 +59,13 @@ export interface EventWatcherInterface {
initBlockProcessingOnCompleteHandler (): Promise<void>
initEventProcessingOnCompleteHandler (): Promise<void>
}
export interface DatabaseInterface {
createTransactionRunner(): Promise<QueryRunner>;
getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]>;
getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined>;
markBlockAsPruned (queryRunner: QueryRunner, block: BlockProgressInterface): Promise<BlockProgressInterface>;
updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatusInterface>;
updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatusInterface>;
updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatusInterface>;
}