Optimize and reduce queries in block processing job (#308)

* Improvement TODOs

* Get parent and current block in single query

* Remove block progress queries and events query

* Reduce queries in fetching batch events by using query builder

* Implement changes in codegen package
This commit is contained in:
nikugogoi 2021-12-13 15:38:34 +05:30 committed by GitHub
parent f56f7a823f
commit b345d25bb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 270 additions and 188 deletions

View File

@ -6,7 +6,7 @@ import assert from 'assert';
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm';
import path from 'path';
import { Database as BaseDatabase } from '@vulcanize/util';
import { Database as BaseDatabase, QueryOptions, Where } from '@vulcanize/util';
import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
@ -105,13 +105,14 @@ export class Database {
return this._baseDatabase.saveEventEntity(repo, entity);
}
async getBlockEvents (blockHash: string, where: FindConditions<Event>): Promise<Event[]> {
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Event[]> {
const repo = this._conn.getRepository(Event);
return this._baseDatabase.getBlockEvents(repo, blockHash, where);
return this._baseDatabase.getBlockEvents(repo, blockHash, options);
return this._baseDatabase.getBlockEvents(repo, blockHash, where, queryOptions);
}
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<void> {
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<BlockProgress> {
const blockRepo = queryRunner.manager.getRepository(BlockProgress);
const eventRepo = queryRunner.manager.getRepository(Event);
@ -173,6 +174,12 @@ export class Database {
return this._baseDatabase.getBlockProgress(repo, blockHash);
}
async getBlockProgressEntities (where: FindConditions<BlockProgress>, options: FindManyOptions<BlockProgress>): Promise<BlockProgress[]> {
const repo = this._conn.getRepository(BlockProgress);
return this._baseDatabase.getBlockProgressEntities(repo, where, options);
}
async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise<void> {
const repo = queryRunner.manager.getRepository(BlockProgress);

View File

@ -5,14 +5,14 @@
import assert from 'assert';
import debug from 'debug';
import { JsonFragment } from '@ethersproject/abi';
import { DeepPartial } from 'typeorm';
import { DeepPartial, FindConditions } from 'typeorm';
import JSONbig from 'json-bigint';
import { ethers } from 'ethers';
import { BaseProvider } from '@ethersproject/providers';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { StorageLayout } from '@vulcanize/solidity-mapper';
import { EventInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME } from '@vulcanize/util';
import { EventInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, Where, QueryOptions } from '@vulcanize/util';
import { Database } from './database';
import { Contract } from './entity/Contract';
@ -260,16 +260,20 @@ export class Indexer {
return this._baseIndexer.getBlockProgress(blockHash);
}
async getBlockProgressEntities (where: FindConditions<BlockProgress>, options: FindManyOptions<BlockProgress>): Promise<BlockProgress[]> {
return this._baseIndexer.getBlockProgressEntities(where, options);
}
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgress[]> {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
async getOrFetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<Array<EventInterface>> {
return this._baseIndexer.getOrFetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
async fetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
return this._baseIndexer.fetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
}
async getBlockEvents (blockHash: string): Promise<Array<Event>> {
return this._baseIndexer.getBlockEvents(blockHash);
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> {
return this._baseIndexer.getBlockEvents(blockHash, where, queryOptions);
}
async removeUnknownEvents (block: BlockProgress): Promise<void> {
@ -288,7 +292,7 @@ export class Indexer {
return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
}
async _fetchAndSaveEvents ({ blockHash }: DeepPartial<BlockProgress>): Promise<void> {
async _fetchAndSaveEvents ({ blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> {
assert(blockHash);
let { block, logs } = await this._ethClient.getLogs({ blockHash });
@ -377,8 +381,10 @@ export class Indexer {
parentHash: block.parent.hash
};
await this._db.saveEvents(dbTx, block, dbEvents);
const blockProgress = await this._db.saveEvents(dbTx, block, dbEvents);
await dbTx.commitTransaction();
return blockProgress;
} catch (error) {
await dbTx.rollbackTransaction();
throw error;

View File

@ -6,7 +6,7 @@ import assert from 'assert';
import { Connection, ConnectionOptions, DeepPartial, FindConditions, FindManyOptions, QueryRunner } from 'typeorm';
import path from 'path';
import { Database as BaseDatabase } from '@vulcanize/util';
import { Database as BaseDatabase, QueryOptions, Where } from '@vulcanize/util';
import { Allowance } from './entity/Allowance';
import { Balance } from './entity/Balance';
@ -101,13 +101,13 @@ export class Database {
return this._baseDatabase.saveEventEntity(repo, entity);
}
async getBlockEvents (blockHash: string, options: FindManyOptions<Event>): Promise<Event[]> {
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Event[]> {
const repo = this._conn.getRepository(Event);
return this._baseDatabase.getBlockEvents(repo, blockHash, options);
return this._baseDatabase.getBlockEvents(repo, blockHash, where, queryOptions);
}
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<void> {
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<BlockProgress> {
const blockRepo = queryRunner.manager.getRepository(BlockProgress);
const eventRepo = queryRunner.manager.getRepository(Event);
@ -167,6 +167,12 @@ export class Database {
return this._baseDatabase.getBlockProgress(repo, blockHash);
}
async getBlockProgressEntities (where: FindConditions<BlockProgress>, options: FindManyOptions<BlockProgress>): Promise<BlockProgress[]> {
const repo = this._conn.getRepository(BlockProgress);
return this._baseDatabase.getBlockProgressEntities(repo, where, options);
}
async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);

View File

@ -5,14 +5,14 @@
import assert from 'assert';
import debug from 'debug';
import { JsonFragment } from '@ethersproject/abi';
import { DeepPartial, FindManyOptions } from 'typeorm';
import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm';
import JSONbig from 'json-bigint';
import { ethers } from 'ethers';
import { BaseProvider } from '@ethersproject/providers';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { StorageLayout } from '@vulcanize/solidity-mapper';
import { EventInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, JobQueue } from '@vulcanize/util';
import { Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, JobQueue, Where, QueryOptions } from '@vulcanize/util';
import { Database } from './database';
import { Event } from './entity/Event';
@ -344,16 +344,20 @@ export class Indexer {
return this._baseIndexer.getBlockProgress(blockHash);
}
async getBlockProgressEntities (where: FindConditions<BlockProgress>, options: FindManyOptions<BlockProgress>): Promise<BlockProgress[]> {
return this._baseIndexer.getBlockProgressEntities(where, options);
}
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgress[]> {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
async getOrFetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<Array<EventInterface>> {
return this._baseIndexer.getOrFetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
async fetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
return this._baseIndexer.fetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
}
async getBlockEvents (blockHash: string, options: FindManyOptions<Event>): Promise<Array<Event>> {
return this._baseIndexer.getBlockEvents(blockHash, options);
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> {
return this._baseIndexer.getBlockEvents(blockHash, where, queryOptions);
}
async removeUnknownEvents (block: BlockProgress): Promise<void> {
@ -372,7 +376,7 @@ export class Indexer {
return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
}
async _fetchAndSaveEvents ({ blockHash }: DeepPartial<BlockProgress>): Promise<void> {
async _fetchAndSaveEvents ({ blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> {
assert(blockHash);
let { block, logs } = await this._ethClient.getLogs({ blockHash });
@ -443,8 +447,10 @@ export class Indexer {
parentHash: block.parent.hash
};
await this._db.saveEvents(dbTx, block, dbEvents);
const blockProgress = await this._db.saveEvents(dbTx, block, dbEvents);
await dbTx.commitTransaction();
return blockProgress;
} catch (error) {
await dbTx.rollbackTransaction();
throw error;

View File

@ -603,13 +603,13 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.saveEventEntity(repo, entity);
}
async getBlockEvents (blockHash: string, options?: FindManyOptions<Event>): Promise<Event[]> {
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Event[]> {
const repo = this._conn.getRepository(Event);
return this._baseDatabase.getBlockEvents(repo, blockHash, options);
return this._baseDatabase.getBlockEvents(repo, blockHash, where, queryOptions);
}
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<void> {
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<BlockProgress> {
const blockRepo = queryRunner.manager.getRepository(BlockProgress);
const eventRepo = queryRunner.manager.getRepository(Event);
@ -663,6 +663,12 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getBlockProgress(repo, blockHash);
}
async getBlockProgressEntities (where: FindConditions<BlockProgress>, options: FindManyOptions<BlockProgress>): Promise<BlockProgress[]> {
const repo = this._conn.getRepository(BlockProgress);
return this._baseDatabase.getBlockProgressEntities(repo, where, options);
}
async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);

View File

@ -4,14 +4,14 @@
import assert from 'assert';
import debug from 'debug';
import { DeepPartial, FindManyOptions, QueryRunner } from 'typeorm';
import { DeepPartial, FindConditions, FindManyOptions, QueryRunner } from 'typeorm';
import JSONbig from 'json-bigint';
import { providers, utils, BigNumber } from 'ethers';
import { Client as UniClient } from '@vulcanize/uni-watcher';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { IndexerInterface, Indexer as BaseIndexer, QueryOptions, OrderDirection, BlockHeight, Relation, GraphDecimal, JobQueue } from '@vulcanize/util';
import { IndexerInterface, Indexer as BaseIndexer, QueryOptions, OrderDirection, BlockHeight, Relation, GraphDecimal, JobQueue, Where } from '@vulcanize/util';
import { findEthPerToken, getEthPriceInUSD, getTrackedAmountUSD, sqrtPriceX96ToTokenPrices, WHITELIST_TOKENS } from './utils/pricing';
import { updatePoolDayData, updatePoolHourData, updateTickDayData, updateTokenDayData, updateTokenHourData, updateUniswapDayData } from './utils/interval-updates';
@ -306,12 +306,12 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
}
async getOrFetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<Array<Event>> {
return this._baseIndexer.getOrFetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
async fetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
return this._baseIndexer.fetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
}
async getBlockEvents (blockHash: string, options: FindManyOptions<Event>): Promise<Array<Event>> {
return this._baseIndexer.getBlockEvents(blockHash, options);
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> {
return this._baseIndexer.getBlockEvents(blockHash, where, queryOptions);
}
async removeUnknownEvents (block: BlockProgress): Promise<void> {
@ -346,6 +346,10 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getBlockProgress(blockHash);
}
async getBlockProgressEntities (where: FindConditions<BlockProgress>, options: FindManyOptions<BlockProgress>): Promise<BlockProgress[]> {
return this._baseIndexer.getBlockProgressEntities(where, options);
}
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgress[]> {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
@ -354,7 +358,7 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex);
}
async _fetchAndSaveEvents (block: DeepPartial<BlockProgress>): Promise<void> {
async _fetchAndSaveEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
assert(block.blockHash);
const events = await this._uniClient.getEvents(block.blockHash);
const dbEvents: Array<DeepPartial<Event>> = [];
@ -385,8 +389,10 @@ export class Indexer implements IndexerInterface {
const dbTx = await this._db.createTransactionRunner();
try {
await this._db.saveEvents(dbTx, block, dbEvents);
const blockProgress = await this._db.saveEvents(dbTx, block, dbEvents);
await dbTx.commitTransaction();
return blockProgress;
} catch (error) {
await dbTx.rollbackTransaction();
throw error;

View File

@ -6,7 +6,7 @@ import assert from 'assert';
import { Connection, ConnectionOptions, DeepPartial, QueryRunner, FindConditions, FindManyOptions } from 'typeorm';
import path from 'path';
import { Database as BaseDatabase, DatabaseInterface } from '@vulcanize/util';
import { Database as BaseDatabase, DatabaseInterface, QueryOptions, Where } from '@vulcanize/util';
import { Event } from './entity/Event';
import { Contract } from './entity/Contract';
@ -78,13 +78,13 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.saveEventEntity(repo, entity);
}
async getBlockEvents (blockHash: string, options: FindManyOptions<Event>): Promise<Event[]> {
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Event[]> {
const repo = this._conn.getRepository(Event);
return this._baseDatabase.getBlockEvents(repo, blockHash, options);
return this._baseDatabase.getBlockEvents(repo, blockHash, where, queryOptions);
}
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<void> {
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<BlockProgress> {
const blockRepo = queryRunner.manager.getRepository(BlockProgress);
const eventRepo = queryRunner.manager.getRepository(Event);
@ -138,6 +138,12 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getBlockProgress(repo, blockHash);
}
async getBlockProgressEntities (where: FindConditions<BlockProgress>, options: FindManyOptions<BlockProgress>): Promise<BlockProgress[]> {
const repo = this._conn.getRepository(BlockProgress);
return this._baseDatabase.getBlockProgressEntities(repo, where, options);
}
async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);

View File

@ -3,13 +3,13 @@
//
import debug from 'debug';
import { DeepPartial, FindManyOptions, QueryRunner } from 'typeorm';
import { DeepPartial, FindConditions, FindManyOptions, QueryRunner } from 'typeorm';
import JSONbig from 'json-bigint';
import { ethers } from 'ethers';
import assert from 'assert';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { IndexerInterface, Indexer as BaseIndexer, ValueResult, JobQueue } from '@vulcanize/util';
import { IndexerInterface, Indexer as BaseIndexer, ValueResult, JobQueue, Where, QueryOptions } from '@vulcanize/util';
import { Database } from './database';
import { Event, UNKNOWN_EVENT_NAME } from './entity/Event';
@ -368,12 +368,12 @@ export class Indexer implements IndexerInterface {
}
// Note: Some event names might be unknown at this point, as earlier events might not yet be processed.
async getOrFetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<Array<Event>> {
return this._baseIndexer.getOrFetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
async fetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
return this._baseIndexer.fetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
}
async getBlockEvents (blockHash: string, options: FindManyOptions<Event>): Promise<Array<Event>> {
return this._baseIndexer.getBlockEvents(blockHash, options);
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> {
return this._baseIndexer.getBlockEvents(blockHash, where, queryOptions);
}
async removeUnknownEvents (block: BlockProgress): Promise<void> {
@ -408,6 +408,10 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getBlockProgress(blockHash);
}
async getBlockProgressEntities (where: FindConditions<BlockProgress>, options: FindManyOptions<BlockProgress>): Promise<BlockProgress[]> {
return this._baseIndexer.getBlockProgressEntities(where, options);
}
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgress[]> {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
@ -424,7 +428,7 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
}
async _fetchAndSaveEvents ({ blockHash }: DeepPartial<BlockProgress>): Promise<void> {
async _fetchAndSaveEvents ({ blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> {
assert(blockHash);
const logsPromise = this._ethClient.getLogs({ blockHash });
@ -518,8 +522,10 @@ export class Indexer implements IndexerInterface {
parentHash: block.parent?.hash
};
await this._db.saveEvents(dbTx, block, dbEvents);
const blockProgress = await this._db.saveEvents(dbTx, block, dbEvents);
await dbTx.commitTransaction();
return blockProgress;
} catch (error) {
await dbTx.rollbackTransaction();
throw error;

View File

@ -13,7 +13,8 @@ import {
FindManyOptions,
In,
QueryRunner,
Repository
Repository,
SelectQueryBuilder
} from 'typeorm';
import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
import _ from 'lodash';
@ -147,6 +148,12 @@ export class Database {
return repo.findOne({ where: { blockHash } });
}
async getBlockProgressEntities (repo: Repository<BlockProgressInterface>, where: FindConditions<BlockProgressInterface>, options: FindManyOptions<BlockProgressInterface>): Promise<BlockProgressInterface[]> {
options.where = where;
return repo.find(options);
}
async getBlocksAtHeight (repo: Repository<BlockProgressInterface>, height: number, isPruned: boolean): Promise<BlockProgressInterface[]> {
return repo.createQueryBuilder('block_progress')
.where('block_number = :height AND is_pruned = :isPruned', { height, isPruned })
@ -166,7 +173,7 @@ export class Database {
}
const { generatedMaps } = await repo.createQueryBuilder()
.update(block)
.update()
.set(block)
.where('id = :id', { id: block.id })
.whereEntity(block)
@ -189,29 +196,23 @@ export class Database {
return repo.findOne(id, { relations: ['block'] });
}
async getBlockEvents (repo: Repository<EventInterface>, blockHash: string, options: FindManyOptions<EventInterface> = {}): Promise<EventInterface[]> {
if (!Array.isArray(options.where)) {
options.where = [options.where || {}];
}
async getBlockEvents (repo: Repository<EventInterface>, blockHash: string, where: Where = {}, queryOptions: QueryOptions = {}): Promise<EventInterface[]> {
let queryBuilder = repo.createQueryBuilder('event')
.innerJoinAndSelect('event.block', 'block')
.where('block.block_hash = :blockHash AND block.is_pruned = false', { blockHash });
options.where.forEach((where: FindConditions<EventInterface> = {}) => {
where.block = {
...where.block,
blockHash
};
});
queryBuilder = this._buildQuery(repo, queryBuilder, where, queryOptions);
queryBuilder.addOrderBy('event.id', 'ASC');
options.relations = ['block'];
const { limit = DEFAULT_LIMIT, skip = DEFAULT_SKIP } = queryOptions;
options.order = {
...options.order,
id: 'ASC'
};
queryBuilder = queryBuilder.offset(skip)
.limit(limit);
return repo.find(options);
return queryBuilder.getMany();
}
async saveEvents (blockRepo: Repository<BlockProgressInterface>, eventRepo: Repository<EventInterface>, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<void> {
async saveEvents (blockRepo: Repository<BlockProgressInterface>, eventRepo: Repository<EventInterface>, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<BlockProgressInterface> {
const {
blockHash,
blockNumber,
@ -229,29 +230,32 @@ export class Database {
// (1) Save all the events in the database.
// (2) Add an entry to the block progress table.
const numEvents = events.length;
let blockProgress = await blockRepo.findOne({ where: { blockHash } });
if (!blockProgress) {
const entity = blockRepo.create({
blockHash,
parentHash,
blockNumber,
blockTimestamp,
numEvents,
numProcessedEvents: 0,
lastProcessedEventIndex: -1,
isComplete: (numEvents === 0)
});
const entity = blockRepo.create({
blockHash,
parentHash,
blockNumber,
blockTimestamp,
numEvents,
numProcessedEvents: 0,
lastProcessedEventIndex: -1,
isComplete: (numEvents === 0)
});
blockProgress = await blockRepo.save(entity);
const blockProgress = await blockRepo.save(entity);
// Bulk insert events.
events.forEach(event => {
event.block = blockProgress;
});
// Bulk insert events.
events.forEach(event => {
event.block = blockProgress;
});
await eventRepo.createQueryBuilder().insert().values(events).execute();
}
await eventRepo.createQueryBuilder()
.insert()
.values(events)
.updateEntity(false)
.execute();
return blockProgress;
}
async getEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<Entity[]> {
@ -396,63 +400,13 @@ export class Database {
selectQueryBuilder = selectQueryBuilder.leftJoinAndSelect(property, alias);
});
Object.entries(where).forEach(([field, filters]) => {
filters.forEach((filter, index) => {
// Form the where clause.
let { not, operator, value } = filter;
const columnMetadata = repo.metadata.findColumnWithPropertyName(field);
assert(columnMetadata);
let whereClause = `${tableName}.${columnMetadata.propertyAliasName} `;
selectQueryBuilder = this._buildQuery(repo, selectQueryBuilder, where, queryOptions);
if (not) {
if (operator === 'equals') {
whereClause += '!';
} else {
whereClause += 'NOT ';
}
}
whereClause += `${OPERATOR_MAP[operator]} `;
if (['contains', 'starts'].some(el => el === operator)) {
whereClause += '%:';
} else if (operator === 'in') {
whereClause += '(:...';
} else {
// Convert to string type value as bigint type throws error in query.
value = value.toString();
whereClause += ':';
}
const variableName = `${field}${index}`;
whereClause += variableName;
if (['contains', 'ends'].some(el => el === operator)) {
whereClause += '%';
} else if (operator === 'in') {
whereClause += ')';
if (!value.length) {
whereClause = 'FALSE';
}
}
selectQueryBuilder = selectQueryBuilder.andWhere(whereClause, { [variableName]: value });
});
});
const { limit = DEFAULT_LIMIT, orderBy, orderDirection, skip = DEFAULT_SKIP } = queryOptions;
const { limit = DEFAULT_LIMIT, skip = DEFAULT_SKIP } = queryOptions;
selectQueryBuilder = selectQueryBuilder.skip(skip)
.take(limit);
if (orderBy) {
const columnMetadata = repo.metadata.findColumnWithPropertyName(orderBy);
assert(columnMetadata);
selectQueryBuilder = selectQueryBuilder.orderBy(`${tableName}.${columnMetadata.propertyAliasName}`, orderDirection === 'desc' ? 'DESC' : 'ASC');
}
return selectQueryBuilder.getMany();
}
@ -586,4 +540,64 @@ export class Database {
return repo.save(entity);
}
_buildQuery<Entity> (repo: Repository<Entity>, selectQueryBuilder: SelectQueryBuilder<Entity>, where: Where = {}, queryOptions: QueryOptions = {}): SelectQueryBuilder<Entity> {
const { tableName } = repo.metadata;
Object.entries(where).forEach(([field, filters]) => {
filters.forEach((filter, index) => {
// Form the where clause.
let { not, operator, value } = filter;
const columnMetadata = repo.metadata.findColumnWithPropertyName(field);
assert(columnMetadata);
let whereClause = `${tableName}.${columnMetadata.propertyAliasName} `;
if (not) {
if (operator === 'equals') {
whereClause += '!';
} else {
whereClause += 'NOT ';
}
}
whereClause += `${OPERATOR_MAP[operator]} `;
if (['contains', 'starts'].some(el => el === operator)) {
whereClause += '%:';
} else if (operator === 'in') {
whereClause += '(:...';
} else {
// Convert to string type value as bigint type throws error in query.
value = value.toString();
whereClause += ':';
}
const variableName = `${field}${index}`;
whereClause += variableName;
if (['contains', 'ends'].some(el => el === operator)) {
whereClause += '%';
} else if (operator === 'in') {
whereClause += ')';
if (!value.length) {
whereClause = 'FALSE';
}
}
selectQueryBuilder = selectQueryBuilder.andWhere(whereClause, { [variableName]: value });
});
});
const { orderBy, orderDirection } = queryOptions;
if (orderBy) {
const columnMetadata = repo.metadata.findColumnWithPropertyName(orderBy);
assert(columnMetadata);
selectQueryBuilder = selectQueryBuilder.orderBy(`${tableName}.${columnMetadata.propertyAliasName}`, orderDirection === 'desc' ? 'DESC' : 'ASC');
}
return selectQueryBuilder;
}
}

View File

@ -5,7 +5,6 @@
import assert from 'assert';
import debug from 'debug';
import { PubSub } from 'apollo-server-express';
import { Not } from 'typeorm';
import { EthClient } from '@vulcanize/ipld-eth-client';
@ -14,6 +13,7 @@ import { BlockProgressInterface, EventInterface, IndexerInterface } from './type
import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME } from './constants';
import { createPruningJob, processBlockByNumber } from './common';
import { UpstreamConfig } from './config';
import { OrderDirection } from './database';
const log = debug('vulcanize:events');
@ -116,12 +116,13 @@ export class EventWatcher {
return this._indexer.getBlockEvents(
blockProgress.blockHash,
{
where: {
eventName: Not(UNKNOWN_EVENT_NAME)
},
order: {
index: 'ASC'
}
eventName: [
{ value: UNKNOWN_EVENT_NAME, not: true, operator: 'equals' }
]
},
{
orderBy: 'index',
orderDirection: OrderDirection.asc
}
);
}

View File

@ -3,7 +3,7 @@
//
import assert from 'assert';
import { DeepPartial, FindConditions, FindManyOptions, Not } from 'typeorm';
import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm';
import debug from 'debug';
import { ethers } from 'ethers';
@ -13,6 +13,7 @@ import { GetStorageAt, getStorageValue, StorageLayout } from '@vulcanize/solidit
import { BlockProgressInterface, DatabaseInterface, EventInterface, SyncStatusInterface, ContractInterface } from './types';
import { UNKNOWN_EVENT_NAME, JOB_KIND_CONTRACT, QUEUE_EVENT_PROCESSING } from './constants';
import { JobQueue } from './job-queue';
import { Where, QueryOptions } from './database';
const MAX_EVENTS_BLOCK_RANGE = 1000;
@ -151,6 +152,10 @@ export class Indexer {
return this._db.getBlockProgress(blockHash);
}
async getBlockProgressEntities (where: FindConditions<BlockProgressInterface>, options: FindManyOptions<BlockProgressInterface>): Promise<BlockProgressInterface[]> {
return this._db.getBlockProgressEntities(where, options);
}
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]> {
return this._db.getBlocksAtHeight(height, isPruned);
}
@ -189,23 +194,18 @@ export class Indexer {
return this._db.getEvent(id);
}
async getOrFetchBlockEvents (block: DeepPartial<BlockProgressInterface>, fetchAndSaveEvents: (block: DeepPartial<BlockProgressInterface>) => Promise<void>): Promise<Array<EventInterface>> {
async fetchBlockEvents (block: DeepPartial<BlockProgressInterface>, fetchAndSaveEvents: (block: DeepPartial<BlockProgressInterface>) => Promise<BlockProgressInterface>): Promise<BlockProgressInterface> {
assert(block.blockHash);
const blockProgress = await this._db.getBlockProgress(block.blockHash);
if (!blockProgress) {
// Fetch and save events first and make a note in the event sync progress table.
log(`getBlockEvents: db miss, fetching from upstream server ${block.blockHash}`);
await fetchAndSaveEvents(block);
}
const events = await this._db.getBlockEvents(block.blockHash);
log(`getBlockEvents: db hit, ${block.blockHash} num events: ${events.length}`);
log(`getBlockEvents: fetching from upstream server ${block.blockHash}`);
const blockProgress = await fetchAndSaveEvents(block);
log(`getBlockEvents: fetched for block: ${blockProgress.blockHash} num events: ${blockProgress.numEvents}`);
return events;
return blockProgress;
}
async getBlockEvents (blockHash: string, options: FindManyOptions<EventInterface> = {}): Promise<Array<EventInterface>> {
return this._db.getBlockEvents(blockHash, options);
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<EventInterface>> {
return this._db.getBlockEvents(blockHash, where, queryOptions);
}
async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise<Array<EventInterface>> {
@ -216,19 +216,27 @@ export class Indexer {
}
}
const where: FindConditions<EventInterface> = {
eventName: Not(UNKNOWN_EVENT_NAME)
const where: Where = {
eventName: [{
value: UNKNOWN_EVENT_NAME,
not: true,
operator: 'equals'
}]
};
if (contract) {
where.contract = contract;
where.contract = [
{ value: contract, operator: 'equals', not: false }
];
}
if (name) {
where.eventName = name;
where.eventName = [
{ value: name, operator: 'equals', not: false }
];
}
const events = await this._db.getBlockEvents(blockHash, { where });
const events = await this._db.getBlockEvents(blockHash, where);
log(`getEvents: db hit, num events: ${events.length}`);
return events;

View File

@ -4,14 +4,15 @@
import assert from 'assert';
import debug from 'debug';
import { MoreThanOrEqual } from 'typeorm';
import { In } from 'typeorm';
import { JobQueueConfig } from './config';
import { JOB_KIND_INDEX, JOB_KIND_PRUNE, JOB_KIND_EVENTS, JOB_KIND_CONTRACT, MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, UNKNOWN_EVENT_NAME } from './constants';
import { JobQueue } from './job-queue';
import { EventInterface, IndexerInterface, SyncStatusInterface, BlockProgressInterface } from './types';
import { EventInterface, IndexerInterface, SyncStatusInterface } from './types';
import { wait } from './misc';
import { createPruningJob } from './common';
import { OrderDirection } from './database';
const DEFAULT_EVENTS_IN_BATCH = 50;
@ -21,7 +22,6 @@ export class JobRunner {
_indexer: IndexerInterface
_jobQueue: JobQueue
_jobQueueConfig: JobQueueConfig
_blockInProcess?: BlockProgressInterface
constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) {
this._jobQueueConfig = jobQueueConfig;
@ -121,12 +121,21 @@ export class JobRunner {
throw new Error(message);
}
let [parentBlock, blockProgress] = await this._indexer.getBlockProgressEntities(
{
blockHash: In([parentHash, blockHash])
},
{
order: {
blockNumber: 'ASC'
}
}
);
// Check if parent block has been processed yet, if not, push a high priority job to process that first and abort.
// However, don't go beyond the `latestCanonicalBlockHash` from SyncStatus as we have to assume the reorg can't be that deep.
if (blockHash !== syncStatus.latestCanonicalBlockHash) {
const parent = await this._indexer.getBlockProgress(parentHash);
if (!parent) {
if (!parentBlock || parentBlock.blockHash !== parentHash) {
const blocks = await this._indexer.getBlocks({ blockHash: parentHash });
if (!blocks.length) {
@ -156,9 +165,9 @@ export class JobRunner {
throw new Error(message);
}
if (!parent.isComplete) {
if (!parentBlock.isComplete) {
// Parent block indexing needs to finish before this block can be indexed.
const message = `Indexing incomplete for parent block number ${parent.blockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash}, aborting`;
const message = `Indexing incomplete for parent block number ${parentBlock.blockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash}, aborting`;
log(message);
throw new Error(message);
@ -166,19 +175,15 @@ export class JobRunner {
}
// Check if block is being already processed.
const blockProgress = await this._indexer.getBlockProgress(blockHash);
if (!blockProgress) {
const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig;
// Delay required to process block.
await wait(jobDelayInMilliSecs);
const events = await this._indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
blockProgress = await this._indexer.fetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
if (events.length) {
const block = events[0].block;
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: block.blockHash, publish: true });
if (blockProgress.numEvents) {
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: blockProgress.blockHash, publish: true });
}
}
}
@ -194,13 +199,14 @@ export class JobRunner {
const events: EventInterface[] = await this._indexer.getBlockEvents(
blockHash,
{
take: this._jobQueueConfig.eventsInBatch || DEFAULT_EVENTS_IN_BATCH,
where: {
index: MoreThanOrEqual(block.lastProcessedEventIndex + 1)
},
order: {
index: 'ASC'
}
index: [
{ value: block.lastProcessedEventIndex + 1, operator: 'gte', not: false }
]
},
{
limit: this._jobQueueConfig.eventsInBatch || DEFAULT_EVENTS_IN_BATCH,
orderBy: 'index',
orderDirection: OrderDirection.asc
}
);

View File

@ -4,6 +4,8 @@
import { Connection, DeepPartial, FindConditions, FindManyOptions, QueryRunner } from 'typeorm';
import { Where, QueryOptions } from './database';
export interface BlockProgressInterface {
id: number;
blockHash: string;
@ -48,13 +50,14 @@ export interface ContractInterface {
export interface IndexerInterface {
getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined>
getBlockProgressEntities (where: FindConditions<BlockProgressInterface>, options: FindManyOptions<BlockProgressInterface>): Promise<BlockProgressInterface[]>
getEvent (id: string): Promise<EventInterface | undefined>
getSyncStatus (): Promise<SyncStatusInterface | undefined>;
getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise<any>
getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]>;
getBlockEvents (blockHash: string, options: FindManyOptions<EventInterface>): Promise<Array<EventInterface>>
getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<EventInterface>>
getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
getOrFetchBlockEvents (block: DeepPartial<BlockProgressInterface>): Promise<Array<EventInterface>>
fetchBlockEvents (block: DeepPartial<BlockProgressInterface>): Promise<BlockProgressInterface>
removeUnknownEvents (block: BlockProgressInterface): Promise<void>
updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<BlockProgressInterface>
updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatusInterface>
@ -79,7 +82,8 @@ export interface DatabaseInterface {
createTransactionRunner(): Promise<QueryRunner>;
getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]>;
getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined>;
getBlockEvents (blockHash: string, where?: FindManyOptions<EventInterface>): Promise<EventInterface[]>;
getBlockProgressEntities (where: FindConditions<BlockProgressInterface>, options: FindManyOptions<BlockProgressInterface>): Promise<BlockProgressInterface[]>
getBlockEvents (blockHash: string, where?: Where, queryOptions?: QueryOptions): Promise<EventInterface[]>;
getEvent (id: string): Promise<EventInterface | undefined>
getSyncStatus (queryRunner: QueryRunner): Promise<SyncStatusInterface | undefined>
getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
@ -90,7 +94,7 @@ export interface DatabaseInterface {
updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatusInterface>;
updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<void>;
saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<BlockProgressInterface>;
saveEventEntity (queryRunner: QueryRunner, entity: EventInterface): Promise<EventInterface>;
removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions<Entity> | FindConditions<Entity>): Promise<void>;
getContracts?: () => Promise<ContractInterface[]>