Reset watcher to previous indexed block on start (#207)

* Reset watcher to previous indexed block before start

* Implement changes in other watchers

* Save successfully fetched blocks and events to prefetch cache

* Add unique query for transaction table

* Check db for blocks before fetching from eth-server

* Show all mismatches at a block

Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
This commit is contained in:
nikugogoi 2022-11-03 14:01:10 +05:30 committed by GitHub
parent 306bbb73ca
commit 52c42f4e84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 328 additions and 341 deletions

View File

@ -45,6 +45,7 @@ export class Indexer {
const queryObject = {
name,
entityName: '',
getQueryName: '',
saveQueryName: '',
params: _.cloneDeep(params),
@ -56,10 +57,12 @@ export class Indexer {
if (name.charAt(0) === '_') {
const capitalizedName = `${name.charAt(1).toUpperCase()}${name.slice(2)}`;
queryObject.entityName = `_${capitalizedName}`;
queryObject.getQueryName = `_get${capitalizedName}`;
queryObject.saveQueryName = `_save${capitalizedName}`;
} else {
const capitalizedName = `${name.charAt(0).toUpperCase()}${name.slice(1)}`;
queryObject.entityName = capitalizedName;
queryObject.getQueryName = `get${capitalizedName}`;
queryObject.saveQueryName = `save${capitalizedName}`;
}

View File

@ -3,7 +3,7 @@
//
import assert from 'assert';
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm';
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions, EntityTarget } from 'typeorm';
import path from 'path';
import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util';
@ -265,6 +265,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
}
async deleteEntitiesByConditions<Entity> (queryRunner: QueryRunner, entity: EntityTarget<Entity>, findConditions: FindConditions<Entity>): Promise<void> {
await this._baseDatabase.deleteEntitiesByConditions(queryRunner, entity, findConditions);
}
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
return this._baseDatabase.getAncestorAtDepth(blockHash, depth);
}

View File

@ -50,6 +50,9 @@ import { StateSyncStatus } from './entity/StateSyncStatus';
import { BlockProgress } from './entity/BlockProgress';
import { State } from './entity/State';
{{#each queries as | query |}}
import { {{query.entityName}} } from './entity/{{query.entityName}}';
{{/each}}
{{#each subgraphEntities as | subgraphEntity |}}
import { {{subgraphEntity.className}} } from './entity/{{subgraphEntity.className}}';
{{/each}}
@ -537,8 +540,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
async fetchBlockWithEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this));
async saveBlockAndFetchEvents (block: DeepPartial<BlockProgress>): Promise<[BlockProgress, DeepPartial<Event>[]]> {
return this._baseIndexer.saveBlockAndFetchEvents(block, this._saveBlockAndFetchEvents.bind(this));
}
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> {
@ -594,6 +597,18 @@ export class Indexer implements IndexerInterface {
this._subgraphStateMap.clear();
}
async resetWatcherToBlock (blockNumber: number): Promise<void> {
const entities = [
{{#each queries as | query |}}
{{query.entityName}},
{{/each}}
{{#each subgraphEntities as | subgraphEntity |}}
{{subgraphEntity.className}},
{{/each}}
];
await this._baseIndexer.resetWatcherToBlock(blockNumber, entities);
}
_populateEntityTypesMap (): void {
{{#each subgraphEntities as | subgraphEntity |}}
this._entityTypesMap.set('{{subgraphEntity.className}}', {

View File

@ -49,6 +49,8 @@ export class JobRunner {
}
async start (): Promise<void> {
await this._jobQueue.deleteAllJobs();
await this._baseJobRunner.resetToPrevIndexedBlock();
await this.subscribeBlockProcessingQueue();
await this.subscribeEventProcessingQueue();
await this.subscribeBlockCheckpointQueue();

View File

@ -67,58 +67,6 @@ export const handler = async (argv: any): Promise<void> => {
await graphWatcher.init();
{{/if}}
const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false);
assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`);
assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`);
const [blockProgress] = blockProgresses;
const dbTx = await db.createTransactionRunner();
try {
const entities = [BlockProgress
{{~#each queries as | query |~}}
, {{query.entityName}}
{{~/each~}}
];
const removeEntitiesPromise = entities.map(async entityClass => {
return db.removeEntities<any>(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) });
});
await Promise.all(removeEntitiesPromise);
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');
if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
const stateSyncStatus = await indexer.getStateSyncStatus();
if (stateSyncStatus) {
if (stateSyncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await indexer.updateStateSyncStatusIndexedBlock(blockProgress.blockNumber, true);
}
if (stateSyncStatus.latestCheckpointBlockNumber > blockProgress.blockNumber) {
await indexer.updateStateSyncStatusCheckpointBlock(blockProgress.blockNumber, true);
}
}
await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true);
dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
await indexer.resetWatcherToBlock(argv.blockNumber);
log('Reset watcher successfully');
};

View File

@ -76,6 +76,8 @@ export const main = async (): Promise<any> => {
if (watcherKind === KIND_ACTIVE) {
await jobQueue.start();
// Delete jobs to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs();
await eventWatcher.start();
}

View File

@ -73,52 +73,6 @@ export const handler = async (argv: any): Promise<void> => {
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false);
assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`);
assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`);
const [blockProgress] = blockProgresses;
const dbTx = await db.createTransactionRunner();
try {
const entities = [BlockProgress, Producer, ProducerSet, ProducerSetChange, ProducerRewardCollectorChange, RewardScheduleEntry, RewardSchedule, ProducerEpoch, Block, Epoch, SlotClaim, Slot, Staker, Network, Distributor, Distribution, Claim, Slash, Account];
for (const entity of entities) {
await db.deleteEntitiesByConditions<any>(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) });
}
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');
if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
const stateSyncStatus = await indexer.getStateSyncStatus();
if (stateSyncStatus) {
if (stateSyncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await indexer.updateStateSyncStatusIndexedBlock(blockProgress.blockNumber, true);
}
if (stateSyncStatus.latestCheckpointBlockNumber > blockProgress.blockNumber) {
await indexer.updateStateSyncStatusCheckpointBlock(blockProgress.blockNumber, true);
}
}
await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true);
dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
await indexer.resetWatcherToBlock(argv.blockNumber);
log('Reset watcher successfully');
};

View File

@ -3,7 +3,7 @@
//
import assert from 'assert';
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm';
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions, EntityTarget } from 'typeorm';
import path from 'path';
import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util';
@ -225,7 +225,7 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
}
async deleteEntitiesByConditions<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions: FindConditions<Entity>): Promise<void> {
async deleteEntitiesByConditions<Entity> (queryRunner: QueryRunner, entity: EntityTarget<Entity>, findConditions: FindConditions<Entity>): Promise<void> {
await this._baseDatabase.deleteEntitiesByConditions(queryRunner, entity, findConditions);
}

View File

@ -519,6 +519,11 @@ export class Indexer implements IndexerInterface {
this._subgraphStateMap.clear();
}
async resetWatcherToBlock (blockNumber: number): Promise<void> {
const entities = [ProducerSet, Producer, RewardSchedule, RewardScheduleEntry, Network, Staker, ProducerEpoch, Epoch, Block, SlotClaim, Slot, Distributor, Distribution, Claim, Account, Slash];
await this._baseIndexer.resetWatcherToBlock(blockNumber, entities);
}
_populateEntityTypesMap (): void {
this._entityTypesMap.set(
'Producer',

View File

@ -44,6 +44,8 @@ export class JobRunner {
}
async start (): Promise<void> {
await this._jobQueue.deleteAllJobs();
await this._baseJobRunner.resetToPrevIndexedBlock();
await this.subscribeBlockProcessingQueue();
await this.subscribeEventProcessingQueue();
await this.subscribeBlockCheckpointQueue();

View File

@ -14,7 +14,7 @@ import debug from 'debug';
import 'graphql-import-node';
import { createServer } from 'http';
import { DEFAULT_CONFIG_PATH, getConfig, Config, JobQueue, KIND_ACTIVE, initClients, startGQLMetricsServer } from '@cerc-io/util';
import { DEFAULT_CONFIG_PATH, getConfig, Config, JobQueue, KIND_ACTIVE, initClients, startGQLMetricsServer, resetJobs } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { createResolvers } from './resolvers';
@ -70,6 +70,8 @@ export const main = async (): Promise<any> => {
if (watcherKind === KIND_ACTIVE) {
await jobQueue.start();
// Delete jobs to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs();
await eventWatcher.start();
}

View File

@ -48,35 +48,6 @@ export const handler = async (argv: any): Promise<void> => {
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');
const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false);
assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`);
assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`);
const [blockProgress] = blockProgresses;
const dbTx = await db.createTransactionRunner();
try {
for (const entity of [BlockProgress, Allowance, Balance]) {
await db.deleteEntitiesByConditions<any>(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) });
}
if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true);
dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
await indexer.resetWatcherToBlock(argv.blockNumber);
log('Reset watcher successfully');
};

View File

@ -94,6 +94,18 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getStateSyncStatus(repo);
}
async updateStateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<StateSyncStatus> {
const repo = queryRunner.manager.getRepository(StateSyncStatus);
return this._baseDatabase.updateStateSyncStatusIndexedBlock(repo, blockNumber, force);
}
async updateStateSyncStatusCheckpointBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<StateSyncStatus> {
const repo = queryRunner.manager.getRepository(StateSyncStatus);
return this._baseDatabase.updateStateSyncStatusCheckpointBlock(repo, blockNumber, force);
}
async getBalance ({ blockHash, token, owner }: { blockHash: string, token: string, owner: string }): Promise<Balance | undefined> {
return this._conn.getRepository(Balance)
.createQueryBuilder('balance')

View File

@ -23,6 +23,8 @@ import artifacts from './artifacts/ERC20.json';
import { BlockProgress } from './entity/BlockProgress';
import { Contract } from './entity/Contract';
import { State } from './entity/State';
import { Allowance } from './entity/Allowance';
import { Balance } from './entity/Balance';
const log = debug('vulcanize:indexer');
const JSONbigNative = JSONbig({ useNativeBigInt: true });
@ -254,6 +256,11 @@ export class Indexer implements IndexerInterface {
// TODO Implement
}
async processInitialState (contractAddress: string, blockHash: string): Promise<any> {
// TODO: Call initial state hook.
return {};
}
async processCheckpoint (blockHash: string): Promise<void> {
// TODO Implement
}
@ -422,6 +429,11 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
}
async resetWatcherToBlock (blockNumber: number): Promise<void> {
const entities = [Allowance, Balance];
await this._baseIndexer.resetWatcherToBlock(blockNumber, entities);
}
async _saveBlockAndFetchEvents ({
cid: blockCid,
blockHash,

View File

@ -40,6 +40,8 @@ export class JobRunner {
}
async start (): Promise<void> {
await this._jobQueue.deleteAllJobs();
await this._baseJobRunner.resetToPrevIndexedBlock();
await this.subscribeBlockProcessingQueue();
await this.subscribeEventProcessingQueue();
this._baseJobRunner.handleShutdown();

View File

@ -62,6 +62,8 @@ export const main = async (): Promise<any> => {
if (watcherKind === KIND_ACTIVE) {
await jobQueue.start();
// Delete jobs to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs();
await eventWatcher.start();
}

View File

@ -86,18 +86,18 @@
yarn && yarn build
```
* Run the watcher:
```bash
yarn server
```
* Run the job-runner:
```bash
yarn job-runner
```
* Run the watcher:
```bash
yarn server
```
* Deploy an ERC721 token:
```bash

View File

@ -60,52 +60,6 @@ export const handler = async (argv: any): Promise<void> => {
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue);
await indexer.init();
const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false);
assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`);
assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`);
const [blockProgress] = blockProgresses;
const dbTx = await db.createTransactionRunner();
try {
const entities = [BlockProgress, SupportsInterface, BalanceOf, OwnerOf, GetApproved, IsApprovedForAll, Name, Symbol, TokenURI, _Name, _Symbol, _Owners, _Balances, _TokenApprovals, _OperatorApprovals];
for (const entity of entities) {
await db.deleteEntitiesByConditions<any>(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) });
}
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');
if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
const stateSyncStatus = await indexer.getStateSyncStatus();
if (stateSyncStatus) {
if (stateSyncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await indexer.updateStateSyncStatusIndexedBlock(blockProgress.blockNumber, true);
}
if (stateSyncStatus.latestCheckpointBlockNumber > blockProgress.blockNumber) {
await indexer.updateStateSyncStatusCheckpointBlock(blockProgress.blockNumber, true);
}
}
await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true);
dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
await indexer.resetWatcherToBlock(argv.blockNumber);
log('Reset watcher successfully');
};

View File

@ -38,6 +38,20 @@ import { SyncStatus } from './entity/SyncStatus';
import { StateSyncStatus } from './entity/StateSyncStatus';
import { BlockProgress } from './entity/BlockProgress';
import { State } from './entity/State';
import { SupportsInterface } from './entity/SupportsInterface';
import { BalanceOf } from './entity/BalanceOf';
import { OwnerOf } from './entity/OwnerOf';
import { GetApproved } from './entity/GetApproved';
import { IsApprovedForAll } from './entity/IsApprovedForAll';
import { Name } from './entity/Name';
import { Symbol } from './entity/Symbol';
import { TokenURI } from './entity/TokenURI';
import { _Name } from './entity/_Name';
import { _Symbol } from './entity/_Symbol';
import { _Owners } from './entity/_Owners';
import { _Balances } from './entity/_Balances';
import { _TokenApprovals } from './entity/_TokenApprovals';
import { _OperatorApprovals } from './entity/_OperatorApprovals';
import { TransferCount } from './entity/TransferCount';
const log = debug('vulcanize:indexer');
@ -863,6 +877,11 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
}
async resetWatcherToBlock (blockNumber: number): Promise<void> {
const entities = [SupportsInterface, BalanceOf, OwnerOf, GetApproved, IsApprovedForAll, Name, Symbol, TokenURI, _Name, _Symbol, _Owners, _Balances, _TokenApprovals, _OperatorApprovals, TransferCount];
await this._baseIndexer.resetWatcherToBlock(blockNumber, entities);
}
async _saveBlockAndFetchEvents ({
cid: blockCid,
blockHash,

View File

@ -42,6 +42,8 @@ export class JobRunner {
}
async start (): Promise<void> {
await this._jobQueue.deleteAllJobs();
await this._baseJobRunner.resetToPrevIndexedBlock();
await this.subscribeBlockProcessingQueue();
await this.subscribeEventProcessingQueue();
await this.subscribeBlockCheckpointQueue();

View File

@ -61,6 +61,8 @@ export const main = async (): Promise<any> => {
if (watcherKind === KIND_ACTIVE) {
await jobQueue.start();
// Delete jobs to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs();
await eventWatcher.start();
}

View File

@ -200,7 +200,6 @@ export const main = async (): Promise<void> => {
for (const [queryName, entityName] of Object.entries(queryNames)) {
try {
log(`At block ${blockNumber} for query ${queryName}:`);
let resultDiff = '';
if (fetchIds) {
const queryLimit = config.queries.queryLimits[queryName];
@ -230,11 +229,15 @@ export const main = async (): Promise<void> => {
}
if (diff) {
resultDiff = diff;
log('Results mismatch:', diff);
diffFound = true;
} else {
log('Results match.');
}
}
} else {
if (updatedEntities.has(entityName)) {
let resultDiff;
let result;
let skip = 0;
@ -271,8 +274,6 @@ export const main = async (): Promise<void> => {
// eslint-disable-next-line no-unmodified-loop-condition
paginate
);
}
}
if (resultDiff) {
log('Results mismatch:', resultDiff);
@ -280,6 +281,8 @@ export const main = async (): Promise<void> => {
} else {
log('Results match.');
}
}
}
} catch (err: any) {
log('Error:', err.message);
log('Error:', JSON.stringify(err, null, 2));

View File

@ -41,7 +41,8 @@ const DEFAULT_CLEAR_ENTITIES_CACHE_INTERVAL = 1000;
export enum ENTITY_QUERY_TYPE {
SINGULAR,
DISTINCT_ON,
GROUP_BY
GROUP_BY,
UNIQUE
}
interface CachedEntities {
@ -378,6 +379,10 @@ export class Database {
entities = await this.getEntitiesDistinctOn(queryRunner, entity, block, where, queryOptions);
break;
case ENTITY_QUERY_TYPE.UNIQUE:
entities = await this.getEntitiesUnique(queryRunner, entity, block, where, queryOptions);
break;
default:
// Use group by query if entity query type is not specified in map.
entities = await this.getEntitiesGroupBy(queryRunner, entity, block, where, queryOptions);
@ -413,6 +418,11 @@ export class Database {
.andWhere('blockProgress.is_pruned = :isPruned', { isPruned: false })
.groupBy('subTable.id');
if (where.id) {
subQuery = this._baseDatabase.buildQuery(repo, subQuery, { id: where.id });
delete where.id;
}
if (block.hash) {
const { canonicalBlockNumber, blockHashes } = await this._baseDatabase.getFrothyRegion(queryRunner, block.hash);
@ -473,6 +483,11 @@ export class Database {
.addOrderBy('subTable.id', 'ASC')
.addOrderBy('subTable.block_number', 'DESC');
if (where.id) {
subQuery = this._baseDatabase.buildQuery(repo, subQuery, { id: where.id });
delete where.id;
}
if (block.hash) {
const { canonicalBlockNumber, blockHashes } = await this._baseDatabase.getFrothyRegion(queryRunner, block.hash);
@ -554,6 +569,56 @@ export class Database {
return entities as Entity[];
}
async getEntitiesUnique<Entity> (
queryRunner: QueryRunner,
entity: new () => Entity,
block: BlockHeight,
where: Where = {},
queryOptions: QueryOptions = {}
): Promise<Entity[]> {
const repo = queryRunner.manager.getRepository(entity);
const { tableName } = repo.metadata;
let selectQueryBuilder = repo.createQueryBuilder(tableName)
.addFrom('block_progress', 'blockProgress')
.where(`${tableName}.block_hash = blockProgress.block_hash`)
.andWhere('blockProgress.is_pruned = :isPruned', { isPruned: false });
if (block.hash) {
const { canonicalBlockNumber, blockHashes } = await this._baseDatabase.getFrothyRegion(queryRunner, block.hash);
selectQueryBuilder = selectQueryBuilder
.andWhere(new Brackets(qb => {
qb.where(`${tableName}.block_hash IN (:...blockHashes)`, { blockHashes })
.orWhere(`${tableName}.block_number <= :canonicalBlockNumber`, { canonicalBlockNumber });
}));
}
if (block.number) {
selectQueryBuilder = selectQueryBuilder.andWhere(`${tableName}.block_number <= :blockNumber`, { blockNumber: block.number });
}
selectQueryBuilder = this._baseDatabase.buildQuery(repo, selectQueryBuilder, where);
if (queryOptions.orderBy) {
selectQueryBuilder = this._baseDatabase.orderQuery(repo, selectQueryBuilder, queryOptions);
}
selectQueryBuilder = this._baseDatabase.orderQuery(repo, selectQueryBuilder, { ...queryOptions, orderBy: 'id' });
if (queryOptions.skip) {
selectQueryBuilder = selectQueryBuilder.offset(queryOptions.skip);
}
if (queryOptions.limit) {
selectQueryBuilder = selectQueryBuilder.limit(queryOptions.limit);
}
const entities = await selectQueryBuilder.getMany();
return entities as Entity[];
}
async loadEntitiesRelations<Entity> (
queryRunner: QueryRunner,
block: BlockHeight,

View File

@ -197,6 +197,10 @@ export class Indexer implements IndexerInterface {
updateStateStatusMap (address: string, stateStatus: StateStatus): void {
return undefined;
}
async resetWatcherToBlock (blockNumber: number): Promise<void> {
return undefined;
}
}
class ServerConfig implements ServerConfigInterface {

View File

@ -61,52 +61,6 @@ export const handler = async (argv: any): Promise<void> => {
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false);
assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`);
assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`);
const [blockProgress] = blockProgresses;
const dbTx = await db.createTransactionRunner();
try {
const entities = [BlockProgress, GetMethod, _Test, Author, Category, Blog];
for (const entity of entities) {
await db.deleteEntitiesByConditions<any>(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) });
}
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');
if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
const stateSyncStatus = await indexer.getStateSyncStatus();
if (stateSyncStatus) {
if (stateSyncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await indexer.updateStateSyncStatusIndexedBlock(blockProgress.blockNumber, true);
}
if (stateSyncStatus.latestCheckpointBlockNumber > blockProgress.blockNumber) {
await indexer.updateStateSyncStatusCheckpointBlock(blockProgress.blockNumber, true);
}
}
await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true);
dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
await indexer.resetWatcherToBlock(argv.blockNumber);
log('Reset watcher successfully');
};

View File

@ -524,6 +524,11 @@ export class Indexer implements IndexerInterface {
this._subgraphStateMap.clear();
}
async resetWatcherToBlock (blockNumber: number): Promise<void> {
const entities = [Author, Blog, Category];
await this._baseIndexer.resetWatcherToBlock(blockNumber, entities);
}
_populateEntityTypesMap (): void {
this._entityTypesMap.set(
'Author',

View File

@ -44,6 +44,8 @@ export class JobRunner {
}
async start (): Promise<void> {
await this._jobQueue.deleteAllJobs();
await this._baseJobRunner.resetToPrevIndexedBlock();
await this.subscribeBlockProcessingQueue();
await this.subscribeEventProcessingQueue();
await this.subscribeBlockCheckpointQueue();

View File

@ -70,6 +70,8 @@ export const main = async (): Promise<any> => {
if (watcherKind === KIND_ACTIVE) {
await jobQueue.start();
// Delete jobs to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs();
await eventWatcher.start();
}

View File

@ -86,18 +86,18 @@
yarn && yarn build
```
* Change directory to `packages/mobymask-watcher/` and run the watcher:
```bash
yarn server
```
* Run the job-runner:
```bash
yarn job-runner
```
* Change directory to `packages/mobymask-watcher/` and run the watcher:
```bash
yarn server
```
* Clone the [MobyMask](https://github.com/cerc-io/MobyMask) repo.
* Checkout to the branch with changes for using this watcher:

View File

@ -47,6 +47,6 @@
maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 100
eventsInBatch = 50
blockDelayInMilliSecs = 60000
blockDelayInMilliSecs = 2000
prefetchBlocksInMem = true
prefetchBlockCount = 10

View File

@ -51,52 +51,6 @@ export const handler = async (argv: any): Promise<void> => {
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue);
await indexer.init();
const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false);
assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`);
assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`);
const [blockProgress] = blockProgresses;
const dbTx = await db.createTransactionRunner();
try {
const entities = [BlockProgress, MultiNonce, _Owner, IsRevoked, IsPhisher, IsMember];
for (const entity of entities) {
await db.deleteEntitiesByConditions<any>(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) });
}
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');
if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
const stateSyncStatus = await indexer.getStateSyncStatus();
if (stateSyncStatus) {
if (stateSyncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await indexer.updateStateSyncStatusIndexedBlock(blockProgress.blockNumber, true);
}
if (stateSyncStatus.latestCheckpointBlockNumber > blockProgress.blockNumber) {
await indexer.updateStateSyncStatusCheckpointBlock(blockProgress.blockNumber, true);
}
}
await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true);
dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
await indexer.resetWatcherToBlock(argv.blockNumber);
log('Reset watcher successfully');
};

View File

@ -607,6 +607,11 @@ export class Indexer implements IndexerInterface {
return this._contractMap.get(kind);
}
async resetWatcherToBlock (blockNumber: number): Promise<void> {
const entities = [MultiNonce, _Owner, IsRevoked, IsPhisher, IsMember];
await this._baseIndexer.resetWatcherToBlock(blockNumber, entities);
}
async _saveBlockAndFetchEvents ({
cid: blockCid,
blockHash,

View File

@ -42,6 +42,8 @@ export class JobRunner {
}
async start (): Promise<void> {
await this._jobQueue.deleteAllJobs();
await this._baseJobRunner.resetToPrevIndexedBlock();
await this.subscribeBlockProcessingQueue();
await this.subscribeEventProcessingQueue();
await this.subscribeBlockCheckpointQueue();

View File

@ -61,6 +61,8 @@ export const main = async (): Promise<any> => {
if (watcherKind === KIND_ACTIVE) {
await jobQueue.start();
// Delete jobs to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs();
await eventWatcher.start();
}

View File

@ -63,6 +63,16 @@ export const fetchBlocksAtHeight = async (
const { blockNumber } = job.data;
let blocks = [];
// Try fetching blocks from the db.
const blockProgressEntities = await indexer.getBlocksAtHeight(blockNumber, false);
blocks = blockProgressEntities.map((block: any) => {
block.timestamp = block.blockTimestamp;
return block;
});
// If blocks not found in the db:
if (!blocks.length) {
// Check for blocks in cache if prefetchBlocksInMem flag set.
if (jobQueueConfig.prefetchBlocksInMem) {
// Get blocks prefetched in memory.
@ -70,6 +80,8 @@ export const fetchBlocksAtHeight = async (
// If not found in cache, fetch the next batch.
if (!blocks.length) {
log(`common#cache-miss-${blockNumber}`);
// Wait for blocks to be prefetched.
console.time('time:common#fetchBlocks-_prefetchBlocks');
await _prefetchBlocks(blockNumber, indexer, jobQueueConfig, prefetchedBlocksMap);
@ -80,29 +92,9 @@ export const fetchBlocksAtHeight = async (
log('size:common#_fetchBlocks-_prefetchedBlocksMap-size:', prefetchedBlocksMap.size);
}
if (!blocks.length) {
log(`common#cache-miss-${blockNumber}`);
const blockProgressEntities = await indexer.getBlocksAtHeight(blockNumber, false);
blocks = blockProgressEntities.map((block: any) => {
block.timestamp = block.blockTimestamp;
return block;
});
}
// Try fetching blocks from eth-server until found.
while (!blocks.length) {
console.time('time:common#_fetchBlocks-eth-server');
blocks = await indexer.getBlocks({ blockNumber });
console.timeEnd('time:common#_fetchBlocks-eth-server');
if (!blocks.length) {
log(`No blocks fetched for block number ${blockNumber}, retrying after ${jobQueueConfig.blockDelayInMilliSecs} ms delay.`);
await wait(jobQueueConfig.blockDelayInMilliSecs);
}
}
assert(blocks.length, 'Blocks not fetched');
const blocksToBeIndexed: DeepPartial<BlockProgressInterface>[] = [];
for (const block of blocks) {
@ -150,7 +142,15 @@ export const _prefetchBlocks = async (
* @param startBlock
* @param endBlock
*/
export const _fetchBatchBlocks = async (indexer: IndexerInterface, jobQueueConfig: JobQueueConfig, startBlock: number, endBlock: number): Promise<any[]> => {
export const _fetchBatchBlocks = async (
indexer: IndexerInterface,
jobQueueConfig: JobQueueConfig,
startBlock: number,
endBlock: number
): Promise<{
blockProgress: BlockProgressInterface,
events: DeepPartial<EventInterface>[]
}[]> => {
const blockNumbers = [...Array(endBlock - startBlock).keys()].map(n => n + startBlock);
let blocks = [];
@ -181,15 +181,26 @@ export const _fetchBatchBlocks = async (indexer: IndexerInterface, jobQueueConfi
await wait(jobQueueConfig.jobDelayInMilliSecs);
}
// TODO Catch errors and continue to process available events instead of retrying for whole range because of an error.
const blockAndEventPromises = blocks.map(async block => {
console.time('time:common#fetchBatchBlocks-saveBlockAndFetchEvents');
const blockAndEventsPromises = blocks.map(async block => {
block.blockTimestamp = block.timestamp;
const [blockProgress, events] = await indexer.saveBlockAndFetchEvents(block);
try {
const [blockProgress, events] = await indexer.saveBlockAndFetchEvents(block);
return { blockProgress, events };
} catch (error) {
log(error);
return null;
}
});
return Promise.all(blockAndEventPromises);
const blockAndEventsList = await Promise.all(blockAndEventsPromises);
console.timeEnd('time:common#fetchBatchBlocks-saveBlockAndFetchEvents');
return blockAndEventsList.filter(blockAndEvent => blockAndEvent !== null) as {
blockProgress: BlockProgressInterface,
events: DeepPartial<EventInterface>[]
}[];
};
/**

View File

@ -9,6 +9,7 @@ import {
ConnectionOptions,
createConnection,
DeepPartial,
EntityTarget,
FindConditions,
FindManyOptions,
In,
@ -335,7 +336,7 @@ export class Database {
await repo.remove(entities);
}
async deleteEntitiesByConditions<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions: FindConditions<Entity>): Promise<void> {
async deleteEntitiesByConditions<Entity> (queryRunner: QueryRunner, entity: EntityTarget<Entity>, findConditions: FindConditions<Entity>): Promise<void> {
const repo = queryRunner.manager.getRepository(entity);
await repo.delete(findConditions);

View File

@ -3,7 +3,7 @@
//
import assert from 'assert';
import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm';
import { DeepPartial, EntityTarget, FindConditions, FindManyOptions, MoreThan } from 'typeorm';
import debug from 'debug';
import JSONbig from 'json-bigint';
import { ethers } from 'ethers';
@ -981,6 +981,54 @@ export class Indexer {
}
}
async resetWatcherToBlock (blockNumber: number, entities: EntityTarget<{ blockNumber: number }>[]): Promise<void> {
const blockProgresses = await this.getBlocksAtHeight(blockNumber, false);
assert(blockProgresses.length, `No blocks at specified block number ${blockNumber}`);
assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${blockNumber} with unprocessed events`);
const [blockProgress] = blockProgresses;
const dbTx = await this._db.createTransactionRunner();
try {
for (const entity of entities) {
await this._db.deleteEntitiesByConditions(dbTx, entity, { blockNumber: MoreThan(blockNumber) });
}
await this._db.deleteEntitiesByConditions(dbTx, 'block_progress', { blockNumber: MoreThan(blockNumber) });
const syncStatus = await this.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');
if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await this.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) {
await this.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
const stateSyncStatus = await this._db.getStateSyncStatus();
if (stateSyncStatus) {
if (stateSyncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await this._db.updateStateSyncStatusIndexedBlock(dbTx, blockNumber, true);
}
if (stateSyncStatus.latestCheckpointBlockNumber > blockProgress.blockNumber) {
await this._db.updateStateSyncStatusCheckpointBlock(dbTx, blockNumber, true);
}
}
await this.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true);
dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
updateStateStatusMap (address: string, stateStatus: StateStatus): void {
// Get and update State status for the contract.
const oldStateStatus = this._stateStatusMap[address];

View File

@ -172,6 +172,17 @@ export class JobRunner {
await this._jobQueue.markComplete(job);
}
async resetToPrevIndexedBlock (): Promise<void> {
const syncStatus = await this._indexer.getSyncStatus();
if (syncStatus) {
// Resetting to block before latest indexed as all events might not be processed in latest indexed block.
// Reprocessing of events in subgraph watchers is not possible as DB transaction is not implemented.
// TODO: Check updating latestIndexedBlock after blockProgress.isComplete is set to true.
await this._indexer.resetWatcherToBlock(syncStatus.latestIndexedBlockNumber - 1);
}
}
handleShutdown (): void {
process.on('SIGINT', this._processShutdown.bind(this));
process.on('SIGTERM', this._processShutdown.bind(this));

View File

@ -2,7 +2,7 @@
// Copyright 2021 Vulcanize, Inc.
//
import { Connection, DeepPartial, FindConditions, FindManyOptions, QueryRunner } from 'typeorm';
import { Connection, DeepPartial, EntityTarget, FindConditions, FindManyOptions, QueryRunner } from 'typeorm';
import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
import { ServerConfig } from './config';
@ -120,6 +120,7 @@ export interface IndexerInterface {
updateSubgraphState?: (contractAddress: string, data: any) => void
updateStateStatusMap (address: string, stateStatus: StateStatus): void
getStateData (state: StateInterface): any
resetWatcherToBlock (blockNumber: number): Promise<void>
}
export interface EventWatcherInterface {
@ -130,7 +131,8 @@ export interface EventWatcherInterface {
export interface DatabaseInterface {
_conn: Connection;
createTransactionRunner(): Promise<QueryRunner>;
close (): Promise<void>;
createTransactionRunner (): Promise<QueryRunner>;
getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]>;
getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined>;
getBlockProgressEntities (where: FindConditions<BlockProgressInterface>, options: FindManyOptions<BlockProgressInterface>): Promise<BlockProgressInterface[]>
@ -150,6 +152,7 @@ export interface DatabaseInterface {
saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<BlockProgressInterface>;
saveEventEntity (queryRunner: QueryRunner, entity: EventInterface): Promise<EventInterface>;
removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions<Entity> | FindConditions<Entity>): Promise<void>;
deleteEntitiesByConditions<Entity> (queryRunner: QueryRunner, entity: EntityTarget<Entity>, findConditions: FindConditions<Entity>): Promise<void>
getContracts?: () => Promise<ContractInterface[]>
saveContract?: (queryRunner: QueryRunner, contractAddress: string, kind: string, checkpoint: boolean, startingBlock: number) => Promise<ContractInterface>
getLatestState (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise<StateInterface | undefined>
@ -159,6 +162,8 @@ export interface DatabaseInterface {
removeStates(dbTx: QueryRunner, blockNumber: number, kind: StateKind): Promise<void>
saveOrUpdateState (dbTx: QueryRunner, state: StateInterface): Promise<StateInterface>
getStateSyncStatus (): Promise<StateSyncStatusInterface | undefined>
updateStateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface>
updateStateSyncStatusCheckpointBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface>
}
export interface GraphDatabaseInterface {