mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-08-01 12:12:07 +00:00
Add checks for and create hooks, checkpoint and IPFS jobs in job-runner (#99)
* Add fields for checkpoint and IPFS in IpldStatus table * Mark block processing job complete in watchers
This commit is contained in:
parent
dee517e444
commit
3638d56787
@ -107,6 +107,14 @@ export const handler = async (argv: any): Promise<void> => {
|
|||||||
await indexer.updateIPLDStatusHooksBlock(blockProgress.blockNumber, true);
|
await indexer.updateIPLDStatusHooksBlock(blockProgress.blockNumber, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (ipldStatus.latestCheckpointBlockNumber > blockProgress.blockNumber) {
|
||||||
|
await indexer.updateIPLDStatusCheckpointBlock(blockProgress.blockNumber, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ipldStatus.latestIPFSBlockNumber > blockProgress.blockNumber) {
|
||||||
|
await indexer.updateIPLDStatusIPFSBlock(blockProgress.blockNumber, true);
|
||||||
|
}
|
||||||
|
|
||||||
await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true);
|
await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true);
|
||||||
|
|
||||||
dbTx.commitTransaction();
|
dbTx.commitTransaction();
|
||||||
|
@ -92,6 +92,18 @@ export class Database implements IPLDDatabaseInterface {
|
|||||||
return this._baseDatabase.updateIPLDStatusHooksBlock(repo, blockNumber, force);
|
return this._baseDatabase.updateIPLDStatusHooksBlock(repo, blockNumber, force);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async updateIPLDStatusCheckpointBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<IpldStatus> {
|
||||||
|
const repo = queryRunner.manager.getRepository(IpldStatus);
|
||||||
|
|
||||||
|
return this._baseDatabase.updateIPLDStatusCheckpointBlock(repo, blockNumber, force);
|
||||||
|
}
|
||||||
|
|
||||||
|
async updateIPLDStatusIPFSBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<IpldStatus> {
|
||||||
|
const repo = queryRunner.manager.getRepository(IpldStatus);
|
||||||
|
|
||||||
|
return this._baseDatabase.updateIPLDStatusIPFSBlock(repo, blockNumber, force);
|
||||||
|
}
|
||||||
|
|
||||||
async getContracts (): Promise<Contract[]> {
|
async getContracts (): Promise<Contract[]> {
|
||||||
const repo = this._conn.getRepository(Contract);
|
const repo = this._conn.getRepository(Contract);
|
||||||
|
|
||||||
|
@ -16,5 +16,5 @@ export class IpldStatus {
|
|||||||
latestCheckpointBlockNumber!: number;
|
latestCheckpointBlockNumber!: number;
|
||||||
|
|
||||||
@Column('integer', { nullable: true })
|
@Column('integer', { nullable: true })
|
||||||
latestIpfsBlockNumber!: number;
|
latestIPFSBlockNumber!: number;
|
||||||
}
|
}
|
||||||
|
@ -13,12 +13,8 @@ import {
|
|||||||
EventWatcherInterface,
|
EventWatcherInterface,
|
||||||
QUEUE_BLOCK_PROCESSING,
|
QUEUE_BLOCK_PROCESSING,
|
||||||
QUEUE_EVENT_PROCESSING,
|
QUEUE_EVENT_PROCESSING,
|
||||||
QUEUE_BLOCK_CHECKPOINT,
|
|
||||||
QUEUE_HOOKS,
|
|
||||||
QUEUE_IPFS,
|
|
||||||
UNKNOWN_EVENT_NAME,
|
UNKNOWN_EVENT_NAME,
|
||||||
UpstreamConfig,
|
UpstreamConfig
|
||||||
JOB_KIND_PRUNE
|
|
||||||
} from '@vulcanize/util';
|
} from '@vulcanize/util';
|
||||||
|
|
||||||
import { Indexer } from './indexer';
|
import { Indexer } from './indexer';
|
||||||
@ -60,8 +56,6 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
|
|
||||||
await this.initBlockProcessingOnCompleteHandler();
|
await this.initBlockProcessingOnCompleteHandler();
|
||||||
await this.initEventProcessingOnCompleteHandler();
|
await this.initEventProcessingOnCompleteHandler();
|
||||||
await this.initHooksOnCompleteHandler();
|
|
||||||
await this.initBlockCheckpointOnCompleteHandler();
|
|
||||||
this._baseEventWatcher.startBlockProcessing();
|
this._baseEventWatcher.startBlockProcessing();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -71,7 +65,7 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
|
|
||||||
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
||||||
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||||
const { id, data: { failed, request: { data: { kind } } } } = job;
|
const { id, data: { failed } } = job;
|
||||||
|
|
||||||
if (failed) {
|
if (failed) {
|
||||||
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
|
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
|
||||||
@ -79,11 +73,6 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
||||||
|
|
||||||
// If it's a pruning job: Create a hooks job.
|
|
||||||
if (kind === JOB_KIND_PRUNE) {
|
|
||||||
await this.createHooksJob();
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -117,27 +106,6 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async initHooksOnCompleteHandler (): Promise<void> {
|
|
||||||
this._jobQueue.onComplete(QUEUE_HOOKS, async (job) => {
|
|
||||||
const { data: { request: { data: { blockNumber, blockHash } } } } = job;
|
|
||||||
|
|
||||||
await this._indexer.updateIPLDStatusHooksBlock(blockNumber);
|
|
||||||
|
|
||||||
// Create a checkpoint job after completion of a hook job.
|
|
||||||
await this.createCheckpointJob(blockHash, blockNumber);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async initBlockCheckpointOnCompleteHandler (): Promise<void> {
|
|
||||||
this._jobQueue.onComplete(QUEUE_BLOCK_CHECKPOINT, async (job) => {
|
|
||||||
const { data: { request: { data: { blockHash } } } } = job;
|
|
||||||
|
|
||||||
if (this._indexer.isIPFSConfigured()) {
|
|
||||||
await this.createIPFSPutJob(blockHash);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async publishEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
|
async publishEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
|
||||||
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
|
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
|
||||||
const resultEvent = this._indexer.getResultEvent(dbEvent);
|
const resultEvent = this._indexer.getResultEvent(dbEvent);
|
||||||
@ -150,38 +118,4 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async createHooksJob (): Promise<void> {
|
|
||||||
// Get the latest canonical block
|
|
||||||
const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock();
|
|
||||||
|
|
||||||
// Create a hooks job for parent block of latestCanonicalBlock because pruning for first block is skipped as it is assumed to be a canonical block.
|
|
||||||
await this._jobQueue.pushJob(
|
|
||||||
QUEUE_HOOKS,
|
|
||||||
{
|
|
||||||
blockHash: latestCanonicalBlock.parentHash,
|
|
||||||
blockNumber: latestCanonicalBlock.blockNumber - 1
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
async createCheckpointJob (blockHash: string, blockNumber: number): Promise<void> {
|
|
||||||
await this._jobQueue.pushJob(
|
|
||||||
QUEUE_BLOCK_CHECKPOINT,
|
|
||||||
{
|
|
||||||
blockHash,
|
|
||||||
blockNumber
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
async createIPFSPutJob (blockHash: string): Promise<void> {
|
|
||||||
const ipldBlocks = await this._indexer.getIPLDBlocksByHash(blockHash);
|
|
||||||
|
|
||||||
for (const ipldBlock of ipldBlocks) {
|
|
||||||
const data = this._indexer.getIPLDData(ipldBlock);
|
|
||||||
|
|
||||||
await this._jobQueue.pushJob(QUEUE_IPFS, { data });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -267,9 +267,7 @@ export class Indexer implements IndexerInterface {
|
|||||||
return createStateCheckpoint(this, contractAddress, blockHash);
|
return createStateCheckpoint(this, contractAddress, blockHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
async processCanonicalBlock (job: any): Promise<void> {
|
async processCanonicalBlock (blockHash: string): Promise<void> {
|
||||||
const { data: { blockHash } } = job;
|
|
||||||
|
|
||||||
console.time('time:indexer#processCanonicalBlock-finalize_auto_diffs');
|
console.time('time:indexer#processCanonicalBlock-finalize_auto_diffs');
|
||||||
|
|
||||||
// Finalize staged diff blocks if any.
|
// Finalize staged diff blocks if any.
|
||||||
@ -281,13 +279,11 @@ export class Indexer implements IndexerInterface {
|
|||||||
await createStateDiff(this, blockHash);
|
await createStateDiff(this, blockHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
async processCheckpoint (job: any): Promise<void> {
|
async processCheckpoint (blockHash: string): Promise<void> {
|
||||||
// Return if checkpointInterval is <= 0.
|
// Return if checkpointInterval is <= 0.
|
||||||
const checkpointInterval = this._serverConfig.checkpointInterval;
|
const checkpointInterval = this._serverConfig.checkpointInterval;
|
||||||
if (checkpointInterval <= 0) return;
|
if (checkpointInterval <= 0) return;
|
||||||
|
|
||||||
const { data: { blockHash } } = job;
|
|
||||||
|
|
||||||
console.time('time:indexer#processCheckpoint-checkpoint');
|
console.time('time:indexer#processCheckpoint-checkpoint');
|
||||||
|
|
||||||
await this._baseIndexer.processCheckpoint(this, blockHash, checkpointInterval);
|
await this._baseIndexer.processCheckpoint(this, blockHash, checkpointInterval);
|
||||||
@ -854,6 +850,40 @@ export class Indexer implements IndexerInterface {
|
|||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async updateIPLDStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise<IpldStatus> {
|
||||||
|
const dbTx = await this._db.createTransactionRunner();
|
||||||
|
let res;
|
||||||
|
|
||||||
|
try {
|
||||||
|
res = await this._db.updateIPLDStatusCheckpointBlock(dbTx, blockNumber, force);
|
||||||
|
await dbTx.commitTransaction();
|
||||||
|
} catch (error) {
|
||||||
|
await dbTx.rollbackTransaction();
|
||||||
|
throw error;
|
||||||
|
} finally {
|
||||||
|
await dbTx.release();
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
async updateIPLDStatusIPFSBlock (blockNumber: number, force?: boolean): Promise<IpldStatus> {
|
||||||
|
const dbTx = await this._db.createTransactionRunner();
|
||||||
|
let res;
|
||||||
|
|
||||||
|
try {
|
||||||
|
res = await this._db.updateIPLDStatusIPFSBlock(dbTx, blockNumber, force);
|
||||||
|
await dbTx.commitTransaction();
|
||||||
|
} catch (error) {
|
||||||
|
await dbTx.rollbackTransaction();
|
||||||
|
throw error;
|
||||||
|
} finally {
|
||||||
|
await dbTx.release();
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
async getLatestCanonicalBlock (): Promise<BlockProgress> {
|
async getLatestCanonicalBlock (): Promise<BlockProgress> {
|
||||||
const syncStatus = await this.getSyncStatus();
|
const syncStatus = await this.getSyncStatus();
|
||||||
assert(syncStatus);
|
assert(syncStatus);
|
||||||
|
@ -19,6 +19,7 @@ import {
|
|||||||
QUEUE_BLOCK_CHECKPOINT,
|
QUEUE_BLOCK_CHECKPOINT,
|
||||||
QUEUE_HOOKS,
|
QUEUE_HOOKS,
|
||||||
QUEUE_IPFS,
|
QUEUE_IPFS,
|
||||||
|
JOB_KIND_PRUNE,
|
||||||
JobQueueConfig,
|
JobQueueConfig,
|
||||||
DEFAULT_CONFIG_PATH,
|
DEFAULT_CONFIG_PATH,
|
||||||
initClients
|
initClients
|
||||||
@ -53,9 +54,16 @@ export class JobRunner {
|
|||||||
|
|
||||||
async subscribeBlockProcessingQueue (): Promise<void> {
|
async subscribeBlockProcessingQueue (): Promise<void> {
|
||||||
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
|
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||||
// TODO Call pre-block hook here (Directly or indirectly (Like done through indexer.processEvent for events)).
|
|
||||||
|
|
||||||
await this._baseJobRunner.processBlock(job);
|
await this._baseJobRunner.processBlock(job);
|
||||||
|
|
||||||
|
const { data: { kind } } = job;
|
||||||
|
|
||||||
|
// If it's a pruning job: Create a hooks job.
|
||||||
|
if (kind === JOB_KIND_PRUNE) {
|
||||||
|
await this.createHooksJob();
|
||||||
|
}
|
||||||
|
|
||||||
|
await this._jobQueue.markComplete(job);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,12 +75,17 @@ export class JobRunner {
|
|||||||
|
|
||||||
async subscribeHooksQueue (): Promise<void> {
|
async subscribeHooksQueue (): Promise<void> {
|
||||||
await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
|
await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
|
||||||
const { data: { blockNumber } } = job;
|
const { data: { blockHash, blockNumber } } = job;
|
||||||
|
|
||||||
|
// Get the current IPLD Status.
|
||||||
const ipldStatus = await this._indexer.getIPLDStatus();
|
const ipldStatus = await this._indexer.getIPLDStatus();
|
||||||
|
|
||||||
if (ipldStatus) {
|
if (ipldStatus) {
|
||||||
if (ipldStatus.latestHooksBlockNumber < (blockNumber - 1)) {
|
if (ipldStatus.latestHooksBlockNumber < (blockNumber - 1)) {
|
||||||
|
// Create hooks job for parent block.
|
||||||
|
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
|
||||||
|
await this.createHooksJob(parentBlock.blockHash, parentBlock.blockNumber);
|
||||||
|
|
||||||
const message = `Hooks for blockNumber ${blockNumber - 1} not processed yet, aborting`;
|
const message = `Hooks for blockNumber ${blockNumber - 1} not processed yet, aborting`;
|
||||||
log(message);
|
log(message);
|
||||||
|
|
||||||
@ -86,7 +99,14 @@ export class JobRunner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
await this._indexer.processCanonicalBlock(job);
|
// Process the hooks for the given block number.
|
||||||
|
await this._indexer.processCanonicalBlock(blockHash);
|
||||||
|
|
||||||
|
// Update the IPLD status.
|
||||||
|
await this._indexer.updateIPLDStatusHooksBlock(blockNumber);
|
||||||
|
|
||||||
|
// Create a checkpoint job after completion of a hook job.
|
||||||
|
await this.createCheckpointJob(blockHash, blockNumber);
|
||||||
|
|
||||||
await this._jobQueue.markComplete(job);
|
await this._jobQueue.markComplete(job);
|
||||||
});
|
});
|
||||||
@ -94,7 +114,41 @@ export class JobRunner {
|
|||||||
|
|
||||||
async subscribeBlockCheckpointQueue (): Promise<void> {
|
async subscribeBlockCheckpointQueue (): Promise<void> {
|
||||||
await this._jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => {
|
await this._jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => {
|
||||||
await this._indexer.processCheckpoint(job);
|
const { data: { blockHash, blockNumber } } = job;
|
||||||
|
|
||||||
|
// Get the current IPLD Status.
|
||||||
|
const ipldStatus = await this._indexer.getIPLDStatus();
|
||||||
|
assert(ipldStatus);
|
||||||
|
|
||||||
|
if (ipldStatus.latestCheckpointBlockNumber >= 0) {
|
||||||
|
if (ipldStatus.latestCheckpointBlockNumber < (blockNumber - 1)) {
|
||||||
|
// Create a checkpoint job for parent block.
|
||||||
|
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
|
||||||
|
await this.createCheckpointJob(parentBlock.blockHash, parentBlock.blockNumber);
|
||||||
|
|
||||||
|
const message = `Checkpoints for blockNumber ${blockNumber - 1} not processed yet, aborting`;
|
||||||
|
log(message);
|
||||||
|
|
||||||
|
throw new Error(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ipldStatus.latestCheckpointBlockNumber > (blockNumber - 1)) {
|
||||||
|
log(`Checkpoints for blockNumber ${blockNumber} already processed`);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process checkpoints for the given block.
|
||||||
|
await this._indexer.processCheckpoint(blockHash);
|
||||||
|
|
||||||
|
// Update the IPLD status.
|
||||||
|
await this._indexer.updateIPLDStatusCheckpointBlock(blockNumber);
|
||||||
|
|
||||||
|
// Create an IPFS job after completion of a checkpoint job.
|
||||||
|
if (this._indexer.isIPFSConfigured()) {
|
||||||
|
await this.createIPFSPutJob(blockHash, blockNumber);
|
||||||
|
}
|
||||||
|
|
||||||
await this._jobQueue.markComplete(job);
|
await this._jobQueue.markComplete(job);
|
||||||
});
|
});
|
||||||
@ -102,13 +156,84 @@ export class JobRunner {
|
|||||||
|
|
||||||
async subscribeIPFSQueue (): Promise<void> {
|
async subscribeIPFSQueue (): Promise<void> {
|
||||||
await this._jobQueue.subscribe(QUEUE_IPFS, async (job) => {
|
await this._jobQueue.subscribe(QUEUE_IPFS, async (job) => {
|
||||||
const { data: { data } } = job;
|
const { data: { blockHash, blockNumber } } = job;
|
||||||
|
|
||||||
await this._indexer.pushToIPFS(data);
|
const ipldStatus = await this._indexer.getIPLDStatus();
|
||||||
|
assert(ipldStatus);
|
||||||
|
|
||||||
|
if (ipldStatus.latestIPFSBlockNumber >= 0) {
|
||||||
|
if (ipldStatus.latestIPFSBlockNumber < (blockNumber - 1)) {
|
||||||
|
// Create a IPFS job for parent block.
|
||||||
|
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
|
||||||
|
await this.createIPFSPutJob(parentBlock.blockHash, parentBlock.blockNumber);
|
||||||
|
|
||||||
|
const message = `IPFS for blockNumber ${blockNumber - 1} not processed yet, aborting`;
|
||||||
|
log(message);
|
||||||
|
|
||||||
|
throw new Error(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ipldStatus.latestIPFSBlockNumber > (blockNumber - 1)) {
|
||||||
|
log(`IPFS for blockNumber ${blockNumber} already processed`);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get IPLDBlocks for the given blocHash.
|
||||||
|
const ipldBlocks = await this._indexer.getIPLDBlocksByHash(blockHash);
|
||||||
|
|
||||||
|
// Push all the IPLDBlocks to IPFS.
|
||||||
|
for (const ipldBlock of ipldBlocks) {
|
||||||
|
const data = this._indexer.getIPLDData(ipldBlock);
|
||||||
|
await this._indexer.pushToIPFS(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the IPLD status.
|
||||||
|
await this._indexer.updateIPLDStatusIPFSBlock(blockNumber);
|
||||||
|
|
||||||
await this._jobQueue.markComplete(job);
|
await this._jobQueue.markComplete(job);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async createHooksJob (blockHash?: string, blockNumber?: number): Promise<void> {
|
||||||
|
if (!blockNumber || !blockHash) {
|
||||||
|
// Get the latest canonical block
|
||||||
|
const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock();
|
||||||
|
|
||||||
|
// Create a hooks job for parent block of latestCanonicalBlock because pruning for first block is skipped as it is assumed to be a canonical block.
|
||||||
|
blockHash = latestCanonicalBlock.parentHash;
|
||||||
|
blockNumber = latestCanonicalBlock.blockNumber - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
await this._jobQueue.pushJob(
|
||||||
|
QUEUE_HOOKS,
|
||||||
|
{
|
||||||
|
blockHash,
|
||||||
|
blockNumber
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async createCheckpointJob (blockHash: string, blockNumber: number): Promise<void> {
|
||||||
|
await this._jobQueue.pushJob(
|
||||||
|
QUEUE_BLOCK_CHECKPOINT,
|
||||||
|
{
|
||||||
|
blockHash,
|
||||||
|
blockNumber
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async createIPFSPutJob (blockHash: string, blockNumber: number): Promise<void> {
|
||||||
|
await this._jobQueue.pushJob(
|
||||||
|
QUEUE_IPFS,
|
||||||
|
{
|
||||||
|
blockHash,
|
||||||
|
blockNumber
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export const main = async (): Promise<any> => {
|
export const main = async (): Promise<any> => {
|
||||||
|
@ -91,8 +91,16 @@ export const handler = async (argv: any): Promise<void> => {
|
|||||||
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
|
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ipldStatus.latestProcessedBlockNumber > blockProgress.blockNumber) {
|
if (ipldStatus.latestHooksBlockNumber > blockProgress.blockNumber) {
|
||||||
await indexer.updateHookStatusProcessedBlock(blockProgress.blockNumber, true);
|
await indexer.updateIPLDStatusHooksBlock(blockProgress.blockNumber, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ipldStatus.latestCheckpointBlockNumber > blockProgress.blockNumber) {
|
||||||
|
await indexer.updateIPLDStatusCheckpointBlock(blockProgress.blockNumber, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ipldStatus.latestIPFSBlockNumber > blockProgress.blockNumber) {
|
||||||
|
await indexer.updateIPLDStatusIPFSBlock(blockProgress.blockNumber, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true);
|
await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true);
|
||||||
|
@ -126,6 +126,18 @@ export class Database implements IPLDDatabaseInterface {
|
|||||||
return this._baseDatabase.updateIPLDStatusHooksBlock(repo, blockNumber, force);
|
return this._baseDatabase.updateIPLDStatusHooksBlock(repo, blockNumber, force);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async updateIPLDStatusCheckpointBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<IpldStatus> {
|
||||||
|
const repo = queryRunner.manager.getRepository(IpldStatus);
|
||||||
|
|
||||||
|
return this._baseDatabase.updateIPLDStatusCheckpointBlock(repo, blockNumber, force);
|
||||||
|
}
|
||||||
|
|
||||||
|
async updateIPLDStatusIPFSBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<IpldStatus> {
|
||||||
|
const repo = queryRunner.manager.getRepository(IpldStatus);
|
||||||
|
|
||||||
|
return this._baseDatabase.updateIPLDStatusIPFSBlock(repo, blockNumber, force);
|
||||||
|
}
|
||||||
|
|
||||||
async getContracts (): Promise<Contract[]> {
|
async getContracts (): Promise<Contract[]> {
|
||||||
const repo = this._conn.getRepository(Contract);
|
const repo = this._conn.getRepository(Contract);
|
||||||
|
|
||||||
|
@ -16,5 +16,5 @@ export class IpldStatus {
|
|||||||
latestCheckpointBlockNumber!: number;
|
latestCheckpointBlockNumber!: number;
|
||||||
|
|
||||||
@Column('integer', { nullable: true })
|
@Column('integer', { nullable: true })
|
||||||
latestIpfsBlockNumber!: number;
|
latestIPFSBlockNumber!: number;
|
||||||
}
|
}
|
||||||
|
@ -13,12 +13,8 @@ import {
|
|||||||
EventWatcherInterface,
|
EventWatcherInterface,
|
||||||
QUEUE_BLOCK_PROCESSING,
|
QUEUE_BLOCK_PROCESSING,
|
||||||
QUEUE_EVENT_PROCESSING,
|
QUEUE_EVENT_PROCESSING,
|
||||||
QUEUE_BLOCK_CHECKPOINT,
|
|
||||||
QUEUE_HOOKS,
|
|
||||||
QUEUE_IPFS,
|
|
||||||
UNKNOWN_EVENT_NAME,
|
UNKNOWN_EVENT_NAME,
|
||||||
UpstreamConfig,
|
UpstreamConfig
|
||||||
JOB_KIND_PRUNE
|
|
||||||
} from '@vulcanize/util';
|
} from '@vulcanize/util';
|
||||||
|
|
||||||
import { Indexer } from './indexer';
|
import { Indexer } from './indexer';
|
||||||
@ -60,8 +56,6 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
|
|
||||||
await this.initBlockProcessingOnCompleteHandler();
|
await this.initBlockProcessingOnCompleteHandler();
|
||||||
await this.initEventProcessingOnCompleteHandler();
|
await this.initEventProcessingOnCompleteHandler();
|
||||||
await this.initHooksOnCompleteHandler();
|
|
||||||
await this.initBlockCheckpointOnCompleteHandler();
|
|
||||||
this._baseEventWatcher.startBlockProcessing();
|
this._baseEventWatcher.startBlockProcessing();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -71,7 +65,7 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
|
|
||||||
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
||||||
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||||
const { id, data: { failed, request: { data: { kind } } } } = job;
|
const { id, data: { failed } } = job;
|
||||||
|
|
||||||
if (failed) {
|
if (failed) {
|
||||||
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
|
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
|
||||||
@ -79,11 +73,6 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
||||||
|
|
||||||
// If it's a pruning job: Create a hooks job.
|
|
||||||
if (kind === JOB_KIND_PRUNE) {
|
|
||||||
await this.createHooksJob();
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -117,27 +106,6 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async initHooksOnCompleteHandler (): Promise<void> {
|
|
||||||
this._jobQueue.onComplete(QUEUE_HOOKS, async (job) => {
|
|
||||||
const { data: { request: { data: { blockNumber, blockHash } } } } = job;
|
|
||||||
|
|
||||||
await this._indexer.updateIPLDStatusHooksBlock(blockNumber);
|
|
||||||
|
|
||||||
// Create a checkpoint job after completion of a hook job.
|
|
||||||
await this.createCheckpointJob(blockHash, blockNumber);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async initBlockCheckpointOnCompleteHandler (): Promise<void> {
|
|
||||||
this._jobQueue.onComplete(QUEUE_BLOCK_CHECKPOINT, async (job) => {
|
|
||||||
const { data: { request: { data: { blockHash } } } } = job;
|
|
||||||
|
|
||||||
if (this._indexer.isIPFSConfigured()) {
|
|
||||||
await this.createIPFSPutJob(blockHash);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async publishEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
|
async publishEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
|
||||||
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
|
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
|
||||||
const resultEvent = this._indexer.getResultEvent(dbEvent);
|
const resultEvent = this._indexer.getResultEvent(dbEvent);
|
||||||
@ -150,38 +118,4 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async createHooksJob (): Promise<void> {
|
|
||||||
// Get the latest canonical block
|
|
||||||
const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock();
|
|
||||||
|
|
||||||
// Create a hooks job for parent block of latestCanonicalBlock because pruning for first block is skipped as it is assumed to be a canonical block.
|
|
||||||
await this._jobQueue.pushJob(
|
|
||||||
QUEUE_HOOKS,
|
|
||||||
{
|
|
||||||
blockHash: latestCanonicalBlock.parentHash,
|
|
||||||
blockNumber: latestCanonicalBlock.blockNumber - 1
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
async createCheckpointJob (blockHash: string, blockNumber: number): Promise<void> {
|
|
||||||
await this._jobQueue.pushJob(
|
|
||||||
QUEUE_BLOCK_CHECKPOINT,
|
|
||||||
{
|
|
||||||
blockHash,
|
|
||||||
blockNumber
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
async createIPFSPutJob (blockHash: string): Promise<void> {
|
|
||||||
const ipldBlocks = await this._indexer.getIPLDBlocksByHash(blockHash);
|
|
||||||
|
|
||||||
for (const ipldBlock of ipldBlocks) {
|
|
||||||
const data = this._indexer.getIPLDData(ipldBlock);
|
|
||||||
|
|
||||||
await this._jobQueue.pushJob(QUEUE_IPFS, { data });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -266,9 +266,7 @@ export class Indexer implements IndexerInterface {
|
|||||||
return createStateCheckpoint(this, contractAddress, blockHash);
|
return createStateCheckpoint(this, contractAddress, blockHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
async processCanonicalBlock (job: any): Promise<void> {
|
async processCanonicalBlock (blockHash: string): Promise<void> {
|
||||||
const { data: { blockHash } } = job;
|
|
||||||
|
|
||||||
// Finalize staged diff blocks if any.
|
// Finalize staged diff blocks if any.
|
||||||
await this._baseIndexer.finalizeDiffStaged(blockHash);
|
await this._baseIndexer.finalizeDiffStaged(blockHash);
|
||||||
|
|
||||||
@ -276,12 +274,11 @@ export class Indexer implements IndexerInterface {
|
|||||||
await createStateDiff(this, blockHash);
|
await createStateDiff(this, blockHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
async processCheckpoint (job: any): Promise<void> {
|
async processCheckpoint (blockHash: string): Promise<void> {
|
||||||
// Return if checkpointInterval is <= 0.
|
// Return if checkpointInterval is <= 0.
|
||||||
const checkpointInterval = this._serverConfig.checkpointInterval;
|
const checkpointInterval = this._serverConfig.checkpointInterval;
|
||||||
if (checkpointInterval <= 0) return;
|
if (checkpointInterval <= 0) return;
|
||||||
|
|
||||||
const { data: { blockHash } } = job;
|
|
||||||
await this._baseIndexer.processCheckpoint(this, blockHash, checkpointInterval);
|
await this._baseIndexer.processCheckpoint(this, blockHash, checkpointInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -430,6 +427,40 @@ export class Indexer implements IndexerInterface {
|
|||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async updateIPLDStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise<IpldStatus> {
|
||||||
|
const dbTx = await this._db.createTransactionRunner();
|
||||||
|
let res;
|
||||||
|
|
||||||
|
try {
|
||||||
|
res = await this._db.updateIPLDStatusCheckpointBlock(dbTx, blockNumber, force);
|
||||||
|
await dbTx.commitTransaction();
|
||||||
|
} catch (error) {
|
||||||
|
await dbTx.rollbackTransaction();
|
||||||
|
throw error;
|
||||||
|
} finally {
|
||||||
|
await dbTx.release();
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
async updateIPLDStatusIPFSBlock (blockNumber: number, force?: boolean): Promise<IpldStatus> {
|
||||||
|
const dbTx = await this._db.createTransactionRunner();
|
||||||
|
let res;
|
||||||
|
|
||||||
|
try {
|
||||||
|
res = await this._db.updateIPLDStatusIPFSBlock(dbTx, blockNumber, force);
|
||||||
|
await dbTx.commitTransaction();
|
||||||
|
} catch (error) {
|
||||||
|
await dbTx.rollbackTransaction();
|
||||||
|
throw error;
|
||||||
|
} finally {
|
||||||
|
await dbTx.release();
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
async getLatestCanonicalBlock (): Promise<BlockProgress> {
|
async getLatestCanonicalBlock (): Promise<BlockProgress> {
|
||||||
const syncStatus = await this.getSyncStatus();
|
const syncStatus = await this.getSyncStatus();
|
||||||
assert(syncStatus);
|
assert(syncStatus);
|
||||||
|
@ -19,6 +19,7 @@ import {
|
|||||||
QUEUE_BLOCK_CHECKPOINT,
|
QUEUE_BLOCK_CHECKPOINT,
|
||||||
QUEUE_HOOKS,
|
QUEUE_HOOKS,
|
||||||
QUEUE_IPFS,
|
QUEUE_IPFS,
|
||||||
|
JOB_KIND_PRUNE,
|
||||||
JobQueueConfig,
|
JobQueueConfig,
|
||||||
DEFAULT_CONFIG_PATH,
|
DEFAULT_CONFIG_PATH,
|
||||||
initClients
|
initClients
|
||||||
@ -53,9 +54,16 @@ export class JobRunner {
|
|||||||
|
|
||||||
async subscribeBlockProcessingQueue (): Promise<void> {
|
async subscribeBlockProcessingQueue (): Promise<void> {
|
||||||
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
|
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||||
// TODO Call pre-block hook here (Directly or indirectly (Like done through indexer.processEvent for events)).
|
|
||||||
|
|
||||||
await this._baseJobRunner.processBlock(job);
|
await this._baseJobRunner.processBlock(job);
|
||||||
|
|
||||||
|
const { data: { kind } } = job;
|
||||||
|
|
||||||
|
// If it's a pruning job: Create a hooks job.
|
||||||
|
if (kind === JOB_KIND_PRUNE) {
|
||||||
|
await this.createHooksJob();
|
||||||
|
}
|
||||||
|
|
||||||
|
await this._jobQueue.markComplete(job);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,12 +75,17 @@ export class JobRunner {
|
|||||||
|
|
||||||
async subscribeHooksQueue (): Promise<void> {
|
async subscribeHooksQueue (): Promise<void> {
|
||||||
await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
|
await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
|
||||||
const { data: { blockNumber } } = job;
|
const { data: { blockHash, blockNumber } } = job;
|
||||||
|
|
||||||
|
// Get the current IPLD Status.
|
||||||
const ipldStatus = await this._indexer.getIPLDStatus();
|
const ipldStatus = await this._indexer.getIPLDStatus();
|
||||||
|
|
||||||
if (ipldStatus) {
|
if (ipldStatus) {
|
||||||
if (ipldStatus.latestHooksBlockNumber < (blockNumber - 1)) {
|
if (ipldStatus.latestHooksBlockNumber < (blockNumber - 1)) {
|
||||||
|
// Create hooks job for parent block.
|
||||||
|
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
|
||||||
|
await this.createHooksJob(parentBlock.blockHash, parentBlock.blockNumber);
|
||||||
|
|
||||||
const message = `Hooks for blockNumber ${blockNumber - 1} not processed yet, aborting`;
|
const message = `Hooks for blockNumber ${blockNumber - 1} not processed yet, aborting`;
|
||||||
log(message);
|
log(message);
|
||||||
|
|
||||||
@ -86,7 +99,14 @@ export class JobRunner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
await this._indexer.processCanonicalBlock(job);
|
// Process the hooks for the given block number.
|
||||||
|
await this._indexer.processCanonicalBlock(blockHash);
|
||||||
|
|
||||||
|
// Update the IPLD status.
|
||||||
|
await this._indexer.updateIPLDStatusHooksBlock(blockNumber);
|
||||||
|
|
||||||
|
// Create a checkpoint job after completion of a hook job.
|
||||||
|
await this.createCheckpointJob(blockHash, blockNumber);
|
||||||
|
|
||||||
await this._jobQueue.markComplete(job);
|
await this._jobQueue.markComplete(job);
|
||||||
});
|
});
|
||||||
@ -94,7 +114,41 @@ export class JobRunner {
|
|||||||
|
|
||||||
async subscribeBlockCheckpointQueue (): Promise<void> {
|
async subscribeBlockCheckpointQueue (): Promise<void> {
|
||||||
await this._jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => {
|
await this._jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => {
|
||||||
await this._indexer.processCheckpoint(job);
|
const { data: { blockHash, blockNumber } } = job;
|
||||||
|
|
||||||
|
// Get the current IPLD Status.
|
||||||
|
const ipldStatus = await this._indexer.getIPLDStatus();
|
||||||
|
assert(ipldStatus);
|
||||||
|
|
||||||
|
if (ipldStatus.latestCheckpointBlockNumber >= 0) {
|
||||||
|
if (ipldStatus.latestCheckpointBlockNumber < (blockNumber - 1)) {
|
||||||
|
// Create a checkpoint job for parent block.
|
||||||
|
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
|
||||||
|
await this.createCheckpointJob(parentBlock.blockHash, parentBlock.blockNumber);
|
||||||
|
|
||||||
|
const message = `Checkpoints for blockNumber ${blockNumber - 1} not processed yet, aborting`;
|
||||||
|
log(message);
|
||||||
|
|
||||||
|
throw new Error(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ipldStatus.latestCheckpointBlockNumber > (blockNumber - 1)) {
|
||||||
|
log(`Checkpoints for blockNumber ${blockNumber} already processed`);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process checkpoints for the given block.
|
||||||
|
await this._indexer.processCheckpoint(blockHash);
|
||||||
|
|
||||||
|
// Update the IPLD status.
|
||||||
|
await this._indexer.updateIPLDStatusCheckpointBlock(blockNumber);
|
||||||
|
|
||||||
|
// Create an IPFS job after completion of a checkpoint job.
|
||||||
|
if (this._indexer.isIPFSConfigured()) {
|
||||||
|
await this.createIPFSPutJob(blockHash, blockNumber);
|
||||||
|
}
|
||||||
|
|
||||||
await this._jobQueue.markComplete(job);
|
await this._jobQueue.markComplete(job);
|
||||||
});
|
});
|
||||||
@ -102,13 +156,84 @@ export class JobRunner {
|
|||||||
|
|
||||||
async subscribeIPFSQueue (): Promise<void> {
|
async subscribeIPFSQueue (): Promise<void> {
|
||||||
await this._jobQueue.subscribe(QUEUE_IPFS, async (job) => {
|
await this._jobQueue.subscribe(QUEUE_IPFS, async (job) => {
|
||||||
const { data: { data } } = job;
|
const { data: { blockHash, blockNumber } } = job;
|
||||||
|
|
||||||
await this._indexer.pushToIPFS(data);
|
const ipldStatus = await this._indexer.getIPLDStatus();
|
||||||
|
assert(ipldStatus);
|
||||||
|
|
||||||
|
if (ipldStatus.latestIPFSBlockNumber >= 0) {
|
||||||
|
if (ipldStatus.latestIPFSBlockNumber < (blockNumber - 1)) {
|
||||||
|
// Create a IPFS job for parent block.
|
||||||
|
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
|
||||||
|
await this.createIPFSPutJob(parentBlock.blockHash, parentBlock.blockNumber);
|
||||||
|
|
||||||
|
const message = `IPFS for blockNumber ${blockNumber - 1} not processed yet, aborting`;
|
||||||
|
log(message);
|
||||||
|
|
||||||
|
throw new Error(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ipldStatus.latestIPFSBlockNumber > (blockNumber - 1)) {
|
||||||
|
log(`IPFS for blockNumber ${blockNumber} already processed`);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get IPLDBlocks for the given blocHash.
|
||||||
|
const ipldBlocks = await this._indexer.getIPLDBlocksByHash(blockHash);
|
||||||
|
|
||||||
|
// Push all the IPLDBlocks to IPFS.
|
||||||
|
for (const ipldBlock of ipldBlocks) {
|
||||||
|
const data = this._indexer.getIPLDData(ipldBlock);
|
||||||
|
await this._indexer.pushToIPFS(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the IPLD status.
|
||||||
|
await this._indexer.updateIPLDStatusIPFSBlock(blockNumber);
|
||||||
|
|
||||||
await this._jobQueue.markComplete(job);
|
await this._jobQueue.markComplete(job);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async createHooksJob (blockHash?: string, blockNumber?: number): Promise<void> {
|
||||||
|
if (!blockNumber || !blockHash) {
|
||||||
|
// Get the latest canonical block
|
||||||
|
const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock();
|
||||||
|
|
||||||
|
// Create a hooks job for parent block of latestCanonicalBlock because pruning for first block is skipped as it is assumed to be a canonical block.
|
||||||
|
blockHash = latestCanonicalBlock.parentHash;
|
||||||
|
blockNumber = latestCanonicalBlock.blockNumber - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
await this._jobQueue.pushJob(
|
||||||
|
QUEUE_HOOKS,
|
||||||
|
{
|
||||||
|
blockHash,
|
||||||
|
blockNumber
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async createCheckpointJob (blockHash: string, blockNumber: number): Promise<void> {
|
||||||
|
await this._jobQueue.pushJob(
|
||||||
|
QUEUE_BLOCK_CHECKPOINT,
|
||||||
|
{
|
||||||
|
blockHash,
|
||||||
|
blockNumber
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async createIPFSPutJob (blockHash: string, blockNumber: number): Promise<void> {
|
||||||
|
await this._jobQueue.pushJob(
|
||||||
|
QUEUE_IPFS,
|
||||||
|
{
|
||||||
|
blockHash,
|
||||||
|
blockNumber
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export const main = async (): Promise<any> => {
|
export const main = async (): Promise<any> => {
|
||||||
|
@ -49,6 +49,8 @@ export class JobRunner {
|
|||||||
async subscribeBlockProcessingQueue (): Promise<void> {
|
async subscribeBlockProcessingQueue (): Promise<void> {
|
||||||
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
|
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||||
await this._baseJobRunner.processBlock(job);
|
await this._baseJobRunner.processBlock(job);
|
||||||
|
|
||||||
|
await this._jobQueue.markComplete(job);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,6 +46,8 @@ export class JobRunner {
|
|||||||
async subscribeBlockProcessingQueue (): Promise<void> {
|
async subscribeBlockProcessingQueue (): Promise<void> {
|
||||||
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
|
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||||
await this._baseJobRunner.processBlock(job);
|
await this._baseJobRunner.processBlock(job);
|
||||||
|
|
||||||
|
await this._jobQueue.markComplete(job);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,14 +46,6 @@ export const fillBlocks = async (
|
|||||||
await eventWatcher.initBlockProcessingOnCompleteHandler();
|
await eventWatcher.initBlockProcessingOnCompleteHandler();
|
||||||
await eventWatcher.initEventProcessingOnCompleteHandler();
|
await eventWatcher.initEventProcessingOnCompleteHandler();
|
||||||
|
|
||||||
if (eventWatcher.initHooksOnCompleteHandler) {
|
|
||||||
await eventWatcher.initHooksOnCompleteHandler();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (eventWatcher.initBlockCheckpointOnCompleteHandler) {
|
|
||||||
await eventWatcher.initBlockCheckpointOnCompleteHandler();
|
|
||||||
}
|
|
||||||
|
|
||||||
const numberOfBlocks = endBlock - startBlock + 1;
|
const numberOfBlocks = endBlock - startBlock + 1;
|
||||||
|
|
||||||
processBlockByNumber(jobQueue, indexer, blockDelayInMilliSecs, startBlock);
|
processBlockByNumber(jobQueue, indexer, blockDelayInMilliSecs, startBlock);
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
import { FindConditions, MoreThan, Repository } from 'typeorm';
|
import { FindConditions, MoreThan, Repository } from 'typeorm';
|
||||||
|
import assert from 'assert';
|
||||||
|
|
||||||
import { IPLDBlockInterface, IpldStatusInterface, StateKind } from './types';
|
import { IPLDBlockInterface, IpldStatusInterface, StateKind } from './types';
|
||||||
import { Database } from './database';
|
import { Database } from './database';
|
||||||
@ -150,7 +151,9 @@ export class IPLDDatabase extends Database {
|
|||||||
|
|
||||||
if (!entity) {
|
if (!entity) {
|
||||||
entity = repo.create({
|
entity = repo.create({
|
||||||
latestHooksBlockNumber: blockNumber
|
latestHooksBlockNumber: blockNumber,
|
||||||
|
latestCheckpointBlockNumber: -1,
|
||||||
|
latestIPFSBlockNumber: -1
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -160,4 +163,26 @@ export class IPLDDatabase extends Database {
|
|||||||
|
|
||||||
return repo.save(entity);
|
return repo.save(entity);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async updateIPLDStatusCheckpointBlock (repo: Repository<IpldStatusInterface>, blockNumber: number, force?: boolean): Promise<IpldStatusInterface> {
|
||||||
|
const entity = await repo.findOne();
|
||||||
|
assert(entity);
|
||||||
|
|
||||||
|
if (force || blockNumber > entity.latestCheckpointBlockNumber) {
|
||||||
|
entity.latestCheckpointBlockNumber = blockNumber;
|
||||||
|
}
|
||||||
|
|
||||||
|
return repo.save(entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
async updateIPLDStatusIPFSBlock (repo: Repository<IpldStatusInterface>, blockNumber: number, force?: boolean): Promise<IpldStatusInterface> {
|
||||||
|
const entity = await repo.findOne();
|
||||||
|
assert(entity);
|
||||||
|
|
||||||
|
if (force || blockNumber > entity.latestIPFSBlockNumber) {
|
||||||
|
entity.latestIPFSBlockNumber = blockNumber;
|
||||||
|
}
|
||||||
|
|
||||||
|
return repo.save(entity);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -71,7 +71,7 @@ export class IPLDIndexer extends Indexer {
|
|||||||
async getLatestHooksProcessedBlock (): Promise<BlockProgressInterface> {
|
async getLatestHooksProcessedBlock (): Promise<BlockProgressInterface> {
|
||||||
// Get current hookStatus.
|
// Get current hookStatus.
|
||||||
const ipldStatus = await this._ipldDb.getIPLDStatus();
|
const ipldStatus = await this._ipldDb.getIPLDStatus();
|
||||||
assert(ipldStatus, 'ipld status not found');
|
assert(ipldStatus, 'IPLD status not found');
|
||||||
|
|
||||||
// Get all the blocks at height hookStatus.latestProcessedBlockNumber.
|
// Get all the blocks at height hookStatus.latestProcessedBlockNumber.
|
||||||
const blocksAtHeight = await this.getBlocksAtHeight(ipldStatus.latestHooksBlockNumber, false);
|
const blocksAtHeight = await this.getBlocksAtHeight(ipldStatus.latestHooksBlockNumber, false);
|
||||||
|
@ -58,8 +58,6 @@ export class JobRunner {
|
|||||||
log(`Invalid Job kind ${kind} in QUEUE_BLOCK_PROCESSING.`);
|
log(`Invalid Job kind ${kind} in QUEUE_BLOCK_PROCESSING.`);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
await this._jobQueue.markComplete(job);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async processEvent (job: any): Promise<EventInterface | void> {
|
async processEvent (job: any): Promise<EventInterface | void> {
|
||||||
|
@ -42,7 +42,7 @@ export interface IpldStatusInterface {
|
|||||||
id: number;
|
id: number;
|
||||||
latestHooksBlockNumber: number;
|
latestHooksBlockNumber: number;
|
||||||
latestCheckpointBlockNumber: number;
|
latestCheckpointBlockNumber: number;
|
||||||
latestIpfsBlockNumber: number
|
latestIPFSBlockNumber: number
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface EventInterface {
|
export interface EventInterface {
|
||||||
@ -107,8 +107,6 @@ export interface EventWatcherInterface {
|
|||||||
getBlockProgressEventIterator (): AsyncIterator<any>
|
getBlockProgressEventIterator (): AsyncIterator<any>
|
||||||
initBlockProcessingOnCompleteHandler (): Promise<void>
|
initBlockProcessingOnCompleteHandler (): Promise<void>
|
||||||
initEventProcessingOnCompleteHandler (): Promise<void>
|
initEventProcessingOnCompleteHandler (): Promise<void>
|
||||||
initHooksOnCompleteHandler?: () => Promise<void>
|
|
||||||
initBlockCheckpointOnCompleteHandler?: () => Promise<void>
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface DatabaseInterface {
|
export interface DatabaseInterface {
|
||||||
|
Loading…
Reference in New Issue
Block a user