Implement IpldStatus table for IPLD related jobs (#98)

This commit is contained in:
nikugogoi 2021-12-24 17:20:03 +05:30 committed by nabarun
parent d385db2775
commit dee517e444
17 changed files with 98 additions and 84 deletions

View File

@ -76,8 +76,8 @@ export const handler = async (argv: any): Promise<void> => {
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');
const hooksStatus = await indexer.getHookStatus();
assert(hooksStatus, 'Missing hooksStatus');
const ipldStatus = await indexer.getIPLDStatus();
assert(ipldStatus, 'Missing ipldStatus');
const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false);
assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`);
@ -103,8 +103,8 @@ export const handler = async (argv: any): Promise<void> => {
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
if (hooksStatus.latestProcessedBlockNumber > blockProgress.blockNumber) {
await indexer.updateHookStatusProcessedBlock(blockProgress.blockNumber, true);
if (ipldStatus.latestHooksBlockNumber > blockProgress.blockNumber) {
await indexer.updateIPLDStatusHooksBlock(blockProgress.blockNumber, true);
}
await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true);

View File

@ -11,7 +11,7 @@ import { IPLDDatabase as BaseDatabase, IPLDDatabaseInterface, QueryOptions, Stat
import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
import { SyncStatus } from './entity/SyncStatus';
import { HookStatus } from './entity/HookStatus';
import { IpldStatus } from './entity/IpldStatus';
import { BlockProgress } from './entity/BlockProgress';
import { IPLDBlock } from './entity/IPLDBlock';
@ -80,16 +80,16 @@ export class Database implements IPLDDatabaseInterface {
await this._baseDatabase.removeIPLDBlocks(repo, blockNumber, kind);
}
async getHookStatus (): Promise<HookStatus | undefined> {
const repo = this._conn.getRepository(HookStatus);
async getIPLDStatus (): Promise<IpldStatus | undefined> {
const repo = this._conn.getRepository(IpldStatus);
return this._baseDatabase.getHookStatus(repo);
return this._baseDatabase.getIPLDStatus(repo);
}
async updateHookStatusProcessedBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<HookStatus> {
const repo = queryRunner.manager.getRepository(HookStatus);
async updateIPLDStatusHooksBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<IpldStatus> {
const repo = queryRunner.manager.getRepository(IpldStatus);
return this._baseDatabase.updateHookStatusProcessedBlock(repo, blockNumber, force);
return this._baseDatabase.updateIPLDStatusHooksBlock(repo, blockNumber, force);
}
async getContracts (): Promise<Contract[]> {

View File

@ -1,14 +0,0 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm';
@Entity()
export class HookStatus {
@PrimaryGeneratedColumn()
id!: number;
@Column('integer')
latestProcessedBlockNumber!: number;
}

View File

@ -0,0 +1,20 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm';
@Entity()
export class IpldStatus {
@PrimaryGeneratedColumn()
id!: number;
@Column('integer')
latestHooksBlockNumber!: number;
@Column('integer', { nullable: true })
latestCheckpointBlockNumber!: number;
@Column('integer', { nullable: true })
latestIpfsBlockNumber!: number;
}

View File

@ -121,7 +121,7 @@ export class EventWatcher implements EventWatcherInterface {
this._jobQueue.onComplete(QUEUE_HOOKS, async (job) => {
const { data: { request: { data: { blockNumber, blockHash } } } } = job;
await this._indexer.updateHookStatusProcessedBlock(blockNumber);
await this._indexer.updateIPLDStatusHooksBlock(blockNumber);
// Create a checkpoint job after completion of a hook job.
await this.createCheckpointJob(blockHash, blockNumber);

View File

@ -31,7 +31,7 @@ import { Database } from './database';
import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
import { SyncStatus } from './entity/SyncStatus';
import { HookStatus } from './entity/HookStatus';
import { IpldStatus } from './entity/IpldStatus';
import { BlockProgress } from './entity/BlockProgress';
import { IPLDBlock } from './entity/IPLDBlock';
import EdenNetworkArtifacts from './artifacts/EdenNetwork.json';
@ -833,16 +833,16 @@ export class Indexer implements IndexerInterface {
};
}
async getHookStatus (): Promise<HookStatus | undefined> {
return this._db.getHookStatus();
async getIPLDStatus (): Promise<IpldStatus | undefined> {
return this._db.getIPLDStatus();
}
async updateHookStatusProcessedBlock (blockNumber: number, force?: boolean): Promise<HookStatus> {
async updateIPLDStatusHooksBlock (blockNumber: number, force?: boolean): Promise<IpldStatus> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.updateHookStatusProcessedBlock(dbTx, blockNumber, force);
res = await this._db.updateIPLDStatusHooksBlock(dbTx, blockNumber, force);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();

View File

@ -69,17 +69,17 @@ export class JobRunner {
await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
const { data: { blockNumber } } = job;
const hookStatus = await this._indexer.getHookStatus();
const ipldStatus = await this._indexer.getIPLDStatus();
if (hookStatus) {
if (hookStatus.latestProcessedBlockNumber < (blockNumber - 1)) {
if (ipldStatus) {
if (ipldStatus.latestHooksBlockNumber < (blockNumber - 1)) {
const message = `Hooks for blockNumber ${blockNumber - 1} not processed yet, aborting`;
log(message);
throw new Error(message);
}
if (hookStatus.latestProcessedBlockNumber > (blockNumber - 1)) {
if (ipldStatus.latestHooksBlockNumber > (blockNumber - 1)) {
log(`Hooks for blockNumber ${blockNumber} already processed`);
return;

View File

@ -64,8 +64,8 @@ export const handler = async (argv: any): Promise<void> => {
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');
const hooksStatus = await indexer.getHookStatus();
assert(hooksStatus, 'Missing hooksStatus');
const ipldStatus = await indexer.getIPLDStatus();
assert(ipldStatus, 'Missing ipldStatus');
const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false);
assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`);
@ -91,7 +91,7 @@ export const handler = async (argv: any): Promise<void> => {
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
if (hooksStatus.latestProcessedBlockNumber > blockProgress.blockNumber) {
if (ipldStatus.latestProcessedBlockNumber > blockProgress.blockNumber) {
await indexer.updateHookStatusProcessedBlock(blockProgress.blockNumber, true);
}

View File

@ -11,7 +11,7 @@ import { IPLDDatabase as BaseDatabase, IPLDDatabaseInterface, QueryOptions, Stat
import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
import { SyncStatus } from './entity/SyncStatus';
import { HookStatus } from './entity/HookStatus';
import { IpldStatus } from './entity/IpldStatus';
import { BlockProgress } from './entity/BlockProgress';
import { IPLDBlock } from './entity/IPLDBlock';
@ -114,16 +114,16 @@ export class Database implements IPLDDatabaseInterface {
await this._baseDatabase.removeIPLDBlocks(repo, blockNumber, kind);
}
async getHookStatus (): Promise<HookStatus | undefined> {
const repo = this._conn.getRepository(HookStatus);
async getIPLDStatus (): Promise<IpldStatus | undefined> {
const repo = this._conn.getRepository(IpldStatus);
return this._baseDatabase.getHookStatus(repo);
return this._baseDatabase.getIPLDStatus(repo);
}
async updateHookStatusProcessedBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<HookStatus> {
const repo = queryRunner.manager.getRepository(HookStatus);
async updateIPLDStatusHooksBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<IpldStatus> {
const repo = queryRunner.manager.getRepository(IpldStatus);
return this._baseDatabase.updateHookStatusProcessedBlock(repo, blockNumber, force);
return this._baseDatabase.updateIPLDStatusHooksBlock(repo, blockNumber, force);
}
async getContracts (): Promise<Contract[]> {

View File

@ -1,14 +0,0 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm';
@Entity()
export class HookStatus {
@PrimaryGeneratedColumn()
id!: number;
@Column('integer')
latestProcessedBlockNumber!: number;
}

View File

@ -0,0 +1,20 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm';
@Entity()
export class IpldStatus {
@PrimaryGeneratedColumn()
id!: number;
@Column('integer')
latestHooksBlockNumber!: number;
@Column('integer', { nullable: true })
latestCheckpointBlockNumber!: number;
@Column('integer', { nullable: true })
latestIpfsBlockNumber!: number;
}

View File

@ -121,7 +121,7 @@ export class EventWatcher implements EventWatcherInterface {
this._jobQueue.onComplete(QUEUE_HOOKS, async (job) => {
const { data: { request: { data: { blockNumber, blockHash } } } } = job;
await this._indexer.updateHookStatusProcessedBlock(blockNumber);
await this._indexer.updateIPLDStatusHooksBlock(blockNumber);
// Create a checkpoint job after completion of a hook job.
await this.createCheckpointJob(blockHash, blockNumber);

View File

@ -33,7 +33,7 @@ import { Database } from './database';
import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
import { SyncStatus } from './entity/SyncStatus';
import { HookStatus } from './entity/HookStatus';
import { IpldStatus } from './entity/IpldStatus';
import { BlockProgress } from './entity/BlockProgress';
import { IPLDBlock } from './entity/IPLDBlock';
import artifacts from './artifacts/Example.json';
@ -409,16 +409,16 @@ export class Indexer implements IndexerInterface {
};
}
async getHookStatus (): Promise<HookStatus | undefined> {
return this._db.getHookStatus();
async getIPLDStatus (): Promise<IpldStatus | undefined> {
return this._db.getIPLDStatus();
}
async updateHookStatusProcessedBlock (blockNumber: number, force?: boolean): Promise<HookStatus> {
async updateIPLDStatusHooksBlock (blockNumber: number, force?: boolean): Promise<IpldStatus> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.updateHookStatusProcessedBlock(dbTx, blockNumber, force);
res = await this._db.updateIPLDStatusHooksBlock(dbTx, blockNumber, force);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();

View File

@ -69,17 +69,17 @@ export class JobRunner {
await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
const { data: { blockNumber } } = job;
const hookStatus = await this._indexer.getHookStatus();
const ipldStatus = await this._indexer.getIPLDStatus();
if (hookStatus) {
if (hookStatus.latestProcessedBlockNumber < (blockNumber - 1)) {
if (ipldStatus) {
if (ipldStatus.latestHooksBlockNumber < (blockNumber - 1)) {
const message = `Hooks for blockNumber ${blockNumber - 1} not processed yet, aborting`;
log(message);
throw new Error(message);
}
if (hookStatus.latestProcessedBlockNumber > (blockNumber - 1)) {
if (ipldStatus.latestHooksBlockNumber > (blockNumber - 1)) {
log(`Hooks for blockNumber ${blockNumber} already processed`);
return;

View File

@ -4,7 +4,7 @@
import { FindConditions, MoreThan, Repository } from 'typeorm';
import { IPLDBlockInterface, HookStatusInterface, StateKind } from './types';
import { IPLDBlockInterface, IpldStatusInterface, StateKind } from './types';
import { Database } from './database';
import { MAX_REORG_DEPTH } from './constants';
@ -141,21 +141,21 @@ export class IPLDDatabase extends Database {
}
}
async getHookStatus (repo: Repository<HookStatusInterface>): Promise<HookStatusInterface | undefined> {
async getIPLDStatus (repo: Repository<IpldStatusInterface>): Promise<IpldStatusInterface | undefined> {
return repo.findOne();
}
async updateHookStatusProcessedBlock (repo: Repository<HookStatusInterface>, blockNumber: number, force?: boolean): Promise<HookStatusInterface> {
async updateIPLDStatusHooksBlock (repo: Repository<IpldStatusInterface>, blockNumber: number, force?: boolean): Promise<IpldStatusInterface> {
let entity = await repo.findOne();
if (!entity) {
entity = repo.create({
latestProcessedBlockNumber: blockNumber
latestHooksBlockNumber: blockNumber
});
}
if (force || blockNumber > entity.latestProcessedBlockNumber) {
entity.latestProcessedBlockNumber = blockNumber;
if (force || blockNumber > entity.latestHooksBlockNumber) {
entity.latestHooksBlockNumber = blockNumber;
}
return repo.save(entity);

View File

@ -70,11 +70,11 @@ export class IPLDIndexer extends Indexer {
async getLatestHooksProcessedBlock (): Promise<BlockProgressInterface> {
// Get current hookStatus.
const hookStatus = await this._ipldDb.getHookStatus();
assert(hookStatus, 'Hook status not found');
const ipldStatus = await this._ipldDb.getIPLDStatus();
assert(ipldStatus, 'ipld status not found');
// Get all the blocks at height hookStatus.latestProcessedBlockNumber.
const blocksAtHeight = await this.getBlocksAtHeight(hookStatus.latestProcessedBlockNumber, false);
const blocksAtHeight = await this.getBlocksAtHeight(ipldStatus.latestHooksBlockNumber, false);
// There can exactly one block at hookStatus.latestProcessedBlockNumber height.
assert(blocksAtHeight.length === 1);
@ -288,11 +288,11 @@ export class IPLDIndexer extends Indexer {
assert(currentBlock.isComplete, 'Block for a checkpoint should be marked as complete');
// Get current hookStatus.
const hookStatus = await this._ipldDb.getHookStatus();
assert(hookStatus);
const ipldStatus = await this._ipldDb.getIPLDStatus();
assert(ipldStatus);
// Make sure the hooks have been processed for the block.
assert(currentBlock.blockNumber <= hookStatus.latestProcessedBlockNumber, 'Block for a checkpoint should have hooks processed');
assert(currentBlock.blockNumber <= ipldStatus.latestHooksBlockNumber, 'Block for a checkpoint should have hooks processed');
// Call state checkpoint hook and check if default checkpoint is disabled.
assert(indexer.processStateCheckpoint);

View File

@ -38,9 +38,11 @@ export interface SyncStatusInterface {
latestCanonicalBlockNumber: number;
}
export interface HookStatusInterface {
export interface IpldStatusInterface {
id: number;
latestProcessedBlockNumber: number
latestHooksBlockNumber: number;
latestCheckpointBlockNumber: number;
latestIpfsBlockNumber: number
}
export interface EventInterface {
@ -140,5 +142,5 @@ export interface IPLDDatabaseInterface extends DatabaseInterface {
getNewIPLDBlock (): IPLDBlockInterface
removeIPLDBlocks(dbTx: QueryRunner, blockNumber: number, kind: StateKind): Promise<void>
saveOrUpdateIPLDBlock (dbTx: QueryRunner, ipldBlock: IPLDBlockInterface): Promise<IPLDBlockInterface>
getHookStatus (): Promise<HookStatusInterface | undefined>
getIPLDStatus (): Promise<IpldStatusInterface | undefined>
}