mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-01-24 03:59:06 +00:00
Purge unknown events after block processing complete (#280)
This commit is contained in:
parent
47b9e6bbbd
commit
5cbcd455d2
@ -3,7 +3,7 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
import assert from 'assert';
|
import assert from 'assert';
|
||||||
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner } from 'typeorm';
|
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm';
|
||||||
import path from 'path';
|
import path from 'path';
|
||||||
|
|
||||||
import { Database as BaseDatabase } from '@vulcanize/util';
|
import { Database as BaseDatabase } from '@vulcanize/util';
|
||||||
@ -179,7 +179,7 @@ export class Database {
|
|||||||
return this._baseDatabase.updateBlockProgress(repo, blockHash, lastProcessedEventIndex);
|
return this._baseDatabase.updateBlockProgress(repo, blockHash, lastProcessedEventIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<void> {
|
async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions<Entity> | FindConditions<Entity>): Promise<void> {
|
||||||
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
|
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,8 +91,8 @@ export const main = async (): Promise<any> => {
|
|||||||
await fillBlocks(jobQueue, indexer, ethClient, eventWatcher, argv);
|
await fillBlocks(jobQueue, indexer, ethClient, eventWatcher, argv);
|
||||||
};
|
};
|
||||||
|
|
||||||
main().then(() => {
|
main().catch(err => {
|
||||||
process.exit();
|
|
||||||
}).catch(err => {
|
|
||||||
log(err);
|
log(err);
|
||||||
|
}).finally(() => {
|
||||||
|
process.exit();
|
||||||
});
|
});
|
||||||
|
@ -275,6 +275,10 @@ export class Indexer {
|
|||||||
return this._baseIndexer.getBlockEvents(blockHash);
|
return this._baseIndexer.getBlockEvents(blockHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async removeUnknownEvents (block: BlockProgress): Promise<void> {
|
||||||
|
return this._baseIndexer.removeUnknownEvents(Event, block);
|
||||||
|
}
|
||||||
|
|
||||||
async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> {
|
async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> {
|
||||||
return this._baseIndexer.markBlocksAsPruned(blocks);
|
return this._baseIndexer.markBlocksAsPruned(blocks);
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,7 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
import assert from 'assert';
|
import assert from 'assert';
|
||||||
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner } from 'typeorm';
|
import { Connection, ConnectionOptions, DeepPartial, FindConditions, FindManyOptions, QueryRunner } from 'typeorm';
|
||||||
import path from 'path';
|
import path from 'path';
|
||||||
|
|
||||||
import { Database as BaseDatabase } from '@vulcanize/util';
|
import { Database as BaseDatabase } from '@vulcanize/util';
|
||||||
@ -177,7 +177,7 @@ export class Database {
|
|||||||
return this._baseDatabase.updateBlockProgress(repo, blockHash, lastProcessedEventIndex);
|
return this._baseDatabase.updateBlockProgress(repo, blockHash, lastProcessedEventIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<void> {
|
async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions<Entity> | FindConditions<Entity>): Promise<void> {
|
||||||
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
|
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,8 +88,8 @@ export const main = async (): Promise<any> => {
|
|||||||
await fillBlocks(jobQueue, indexer, ethClient, eventWatcher, argv);
|
await fillBlocks(jobQueue, indexer, ethClient, eventWatcher, argv);
|
||||||
};
|
};
|
||||||
|
|
||||||
main().then(() => {
|
main().catch(err => {
|
||||||
process.exit();
|
|
||||||
}).catch(err => {
|
|
||||||
log(err);
|
log(err);
|
||||||
|
}).finally(() => {
|
||||||
|
process.exit();
|
||||||
});
|
});
|
||||||
|
@ -357,6 +357,10 @@ export class Indexer {
|
|||||||
return this._baseIndexer.getBlockEvents(blockHash);
|
return this._baseIndexer.getBlockEvents(blockHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async removeUnknownEvents (block: BlockProgress): Promise<void> {
|
||||||
|
return this._baseIndexer.removeUnknownEvents(Event, block);
|
||||||
|
}
|
||||||
|
|
||||||
async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> {
|
async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> {
|
||||||
return this._baseIndexer.markBlocksAsPruned(blocks);
|
return this._baseIndexer.markBlocksAsPruned(blocks);
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,7 @@ import {
|
|||||||
ConnectionOptions,
|
ConnectionOptions,
|
||||||
DeepPartial,
|
DeepPartial,
|
||||||
FindConditions,
|
FindConditions,
|
||||||
|
FindManyOptions,
|
||||||
FindOneOptions,
|
FindOneOptions,
|
||||||
LessThanOrEqual,
|
LessThanOrEqual,
|
||||||
QueryRunner
|
QueryRunner
|
||||||
@ -626,7 +627,7 @@ export class Database implements DatabaseInterface {
|
|||||||
return this._baseDatabase.getEntities(queryRunner, entity, findConditions);
|
return this._baseDatabase.getEntities(queryRunner, entity, findConditions);
|
||||||
}
|
}
|
||||||
|
|
||||||
async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<void> {
|
async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions<Entity> | FindConditions<Entity>): Promise<void> {
|
||||||
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
|
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,8 +89,8 @@ export const main = async (): Promise<any> => {
|
|||||||
await fillBlocks(jobQueue, indexer, ethClient, eventWatcher, argv);
|
await fillBlocks(jobQueue, indexer, ethClient, eventWatcher, argv);
|
||||||
};
|
};
|
||||||
|
|
||||||
main().then(() => {
|
main().catch(err => {
|
||||||
process.exit();
|
|
||||||
}).catch(err => {
|
|
||||||
log(err);
|
log(err);
|
||||||
|
}).finally(() => {
|
||||||
|
process.exit();
|
||||||
});
|
});
|
||||||
|
@ -308,6 +308,10 @@ export class Indexer implements IndexerInterface {
|
|||||||
return this._baseIndexer.getBlockEvents(blockHash);
|
return this._baseIndexer.getBlockEvents(blockHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async removeUnknownEvents (block: BlockProgress): Promise<void> {
|
||||||
|
return this._baseIndexer.removeUnknownEvents(Event, block);
|
||||||
|
}
|
||||||
|
|
||||||
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
|
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
|
||||||
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force);
|
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force);
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,7 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
import assert from 'assert';
|
import assert from 'assert';
|
||||||
import { Connection, ConnectionOptions, DeepPartial, QueryRunner, FindConditions } from 'typeorm';
|
import { Connection, ConnectionOptions, DeepPartial, QueryRunner, FindConditions, FindManyOptions } from 'typeorm';
|
||||||
import path from 'path';
|
import path from 'path';
|
||||||
|
|
||||||
import { Database as BaseDatabase, DatabaseInterface } from '@vulcanize/util';
|
import { Database as BaseDatabase, DatabaseInterface } from '@vulcanize/util';
|
||||||
@ -148,7 +148,7 @@ export class Database implements DatabaseInterface {
|
|||||||
return this._baseDatabase.getEntities(queryRunner, entity, findConditions);
|
return this._baseDatabase.getEntities(queryRunner, entity, findConditions);
|
||||||
}
|
}
|
||||||
|
|
||||||
async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<void> {
|
async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions<Entity> | FindConditions<Entity>): Promise<void> {
|
||||||
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
|
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,8 +90,8 @@ export const main = async (): Promise<any> => {
|
|||||||
await fillBlocks(jobQueue, indexer, ethClient, eventWatcher, argv);
|
await fillBlocks(jobQueue, indexer, ethClient, eventWatcher, argv);
|
||||||
};
|
};
|
||||||
|
|
||||||
main().then(() => {
|
main().catch(err => {
|
||||||
process.exit();
|
|
||||||
}).catch(err => {
|
|
||||||
log(err);
|
log(err);
|
||||||
|
}).finally(() => {
|
||||||
|
process.exit();
|
||||||
});
|
});
|
||||||
|
@ -324,6 +324,10 @@ export class Indexer implements IndexerInterface {
|
|||||||
return this._baseIndexer.getBlockEvents(blockHash);
|
return this._baseIndexer.getBlockEvents(blockHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async removeUnknownEvents (block: BlockProgress): Promise<void> {
|
||||||
|
return this._baseIndexer.removeUnknownEvents(Event, block);
|
||||||
|
}
|
||||||
|
|
||||||
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
|
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
|
||||||
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force);
|
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force);
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,7 @@ import {
|
|||||||
createConnection,
|
createConnection,
|
||||||
DeepPartial,
|
DeepPartial,
|
||||||
FindConditions,
|
FindConditions,
|
||||||
|
FindManyOptions,
|
||||||
In,
|
In,
|
||||||
QueryRunner,
|
QueryRunner,
|
||||||
Repository
|
Repository
|
||||||
@ -261,7 +262,7 @@ export class Database {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<void> {
|
async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions<Entity> | FindConditions<Entity>): Promise<void> {
|
||||||
const repo = queryRunner.manager.getRepository(entity);
|
const repo = queryRunner.manager.getRepository(entity);
|
||||||
|
|
||||||
const entities = await repo.find(findConditions);
|
const entities = await repo.find(findConditions);
|
||||||
|
@ -72,8 +72,13 @@ export class EventWatcher {
|
|||||||
|
|
||||||
await this._indexer.updateBlockProgress(dbEvent.block.blockHash, dbEvent.index);
|
await this._indexer.updateBlockProgress(dbEvent.block.blockHash, dbEvent.index);
|
||||||
const blockProgress = await this._indexer.getBlockProgress(dbEvent.block.blockHash);
|
const blockProgress = await this._indexer.getBlockProgress(dbEvent.block.blockHash);
|
||||||
|
|
||||||
if (blockProgress) {
|
if (blockProgress) {
|
||||||
await this.publishBlockProgressToSubscribers(blockProgress);
|
await this.publishBlockProgressToSubscribers(blockProgress);
|
||||||
|
|
||||||
|
if (blockProgress.isComplete) {
|
||||||
|
await this._indexer.removeUnknownEvents(blockProgress);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return dbEvent;
|
return dbEvent;
|
||||||
|
@ -196,6 +196,31 @@ export class Indexer {
|
|||||||
return events;
|
return events;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async removeUnknownEvents (eventEntityClass: new () => EventInterface, block: BlockProgressInterface): Promise<void> {
|
||||||
|
const dbTx = await this._db.createTransactionRunner();
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this._db.removeEntities(
|
||||||
|
dbTx,
|
||||||
|
eventEntityClass,
|
||||||
|
{
|
||||||
|
where: {
|
||||||
|
block: { id: block.id },
|
||||||
|
eventName: UNKNOWN_EVENT_NAME
|
||||||
|
},
|
||||||
|
relations: ['block']
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
await dbTx.commitTransaction();
|
||||||
|
} catch (error) {
|
||||||
|
await dbTx.rollbackTransaction();
|
||||||
|
throw error;
|
||||||
|
} finally {
|
||||||
|
await dbTx.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
|
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
|
||||||
return this._db.getAncestorAtDepth(blockHash, depth);
|
return this._db.getAncestorAtDepth(blockHash, depth);
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
// Copyright 2021 Vulcanize, Inc.
|
// Copyright 2021 Vulcanize, Inc.
|
||||||
//
|
//
|
||||||
|
|
||||||
import { DeepPartial, FindConditions, QueryRunner } from 'typeorm';
|
import { DeepPartial, FindConditions, FindManyOptions, QueryRunner } from 'typeorm';
|
||||||
|
|
||||||
export interface BlockProgressInterface {
|
export interface BlockProgressInterface {
|
||||||
id: number;
|
id: number;
|
||||||
@ -55,6 +55,7 @@ export interface IndexerInterface {
|
|||||||
getBlockEvents (blockHash: string): Promise<Array<EventInterface>>
|
getBlockEvents (blockHash: string): Promise<Array<EventInterface>>
|
||||||
getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
|
getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
|
||||||
getOrFetchBlockEvents (block: DeepPartial<BlockProgressInterface>): Promise<Array<EventInterface>>
|
getOrFetchBlockEvents (block: DeepPartial<BlockProgressInterface>): Promise<Array<EventInterface>>
|
||||||
|
removeUnknownEvents (block: BlockProgressInterface): Promise<void>
|
||||||
updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void>
|
updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void>
|
||||||
updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatusInterface>
|
updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatusInterface>
|
||||||
updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
|
updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
|
||||||
@ -85,6 +86,6 @@ export interface DatabaseInterface {
|
|||||||
updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
|
updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
|
||||||
saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<void>;
|
saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<void>;
|
||||||
saveEventEntity (queryRunner: QueryRunner, entity: EventInterface): Promise<EventInterface>;
|
saveEventEntity (queryRunner: QueryRunner, entity: EventInterface): Promise<EventInterface>;
|
||||||
removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<void>;
|
removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions<Entity> | FindConditions<Entity>): Promise<void>;
|
||||||
getContract?: (address: string) => Promise<ContractInterface | undefined>
|
getContract?: (address: string) => Promise<ContractInterface | undefined>
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user