mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-02-11 12:26:35 +00:00
Fix block processing job created twice when processing missing parent block (#67)
* Fix block processing job created twice in watcher * Fix block processing job for missing parent blocks
This commit is contained in:
parent
faf046d181
commit
30f3c9e694
@ -2,7 +2,7 @@
|
|||||||
// Copyright 2021 Vulcanize, Inc.
|
// Copyright 2021 Vulcanize, Inc.
|
||||||
//
|
//
|
||||||
|
|
||||||
import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
|
import { Entity, PrimaryGeneratedColumn, Column, Index, CreateDateColumn } from 'typeorm';
|
||||||
import { BlockProgressInterface } from '@vulcanize/util';
|
import { BlockProgressInterface } from '@vulcanize/util';
|
||||||
|
|
||||||
@Entity()
|
@Entity()
|
||||||
@ -42,4 +42,7 @@ export class BlockProgress implements BlockProgressInterface {
|
|||||||
|
|
||||||
@Column('boolean', { default: false })
|
@Column('boolean', { default: false })
|
||||||
isPruned!: boolean;
|
isPruned!: boolean;
|
||||||
|
|
||||||
|
@CreateDateColumn()
|
||||||
|
createdAt!: Date;
|
||||||
}
|
}
|
||||||
|
@ -159,7 +159,6 @@ export const instantiate = async (
|
|||||||
// TODO: Check for function overloading.
|
// TODO: Check for function overloading.
|
||||||
let result = await contract[functionName](...functionParams);
|
let result = await contract[functionName](...functionParams);
|
||||||
|
|
||||||
// TODO: Check for function overloading.
|
|
||||||
// Using function signature does not work.
|
// Using function signature does not work.
|
||||||
const { outputs } = contract.interface.getFunction(functionName);
|
const { outputs } = contract.interface.getFunction(functionName);
|
||||||
assert(outputs);
|
assert(outputs);
|
||||||
|
@ -143,7 +143,6 @@ export function testStructEthCall (): void {
|
|||||||
log.debug('In test struct eth call', []);
|
log.debug('In test struct eth call', []);
|
||||||
|
|
||||||
// Bind the contract to the address that emitted the event.
|
// Bind the contract to the address that emitted the event.
|
||||||
// TODO: Address.fromString throws error in WASM module instantiation.
|
|
||||||
const contractAddress = dataSource.address();
|
const contractAddress = dataSource.address();
|
||||||
const contract = Example1.bind(contractAddress);
|
const contract = Example1.bind(contractAddress);
|
||||||
|
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
// Copyright 2021 Vulcanize, Inc.
|
// Copyright 2021 Vulcanize, Inc.
|
||||||
//
|
//
|
||||||
|
|
||||||
import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
|
import { Entity, PrimaryGeneratedColumn, Column, Index, CreateDateColumn } from 'typeorm';
|
||||||
import { BlockProgressInterface } from '@vulcanize/util';
|
import { BlockProgressInterface } from '@vulcanize/util';
|
||||||
|
|
||||||
@Entity()
|
@Entity()
|
||||||
@ -42,4 +42,7 @@ export class BlockProgress implements BlockProgressInterface {
|
|||||||
|
|
||||||
@Column('boolean', { default: false })
|
@Column('boolean', { default: false })
|
||||||
isPruned!: boolean;
|
isPruned!: boolean;
|
||||||
|
|
||||||
|
@CreateDateColumn()
|
||||||
|
createdAt!: Date;
|
||||||
}
|
}
|
||||||
|
@ -50,7 +50,7 @@ export class EventWatcher {
|
|||||||
|
|
||||||
async startBlockProcessing (): Promise<void> {
|
async startBlockProcessing (): Promise<void> {
|
||||||
const syncStatus = await this._indexer.getSyncStatus();
|
const syncStatus = await this._indexer.getSyncStatus();
|
||||||
let startBlockNumber;
|
let startBlockNumber: number;
|
||||||
|
|
||||||
if (!syncStatus) {
|
if (!syncStatus) {
|
||||||
// Get latest block in chain.
|
// Get latest block in chain.
|
||||||
@ -62,7 +62,8 @@ export class EventWatcher {
|
|||||||
|
|
||||||
const { ethServer: { blockDelayInMilliSecs } } = this._upstreamConfig;
|
const { ethServer: { blockDelayInMilliSecs } } = this._upstreamConfig;
|
||||||
|
|
||||||
processBlockByNumber(this._jobQueue, this._indexer, blockDelayInMilliSecs, startBlockNumber);
|
// Wait for block processing as blockProgress event might process the same block.
|
||||||
|
await processBlockByNumber(this._jobQueue, this._indexer, blockDelayInMilliSecs, startBlockNumber);
|
||||||
|
|
||||||
// Creating an AsyncIterable from AsyncIterator to iterate over the values.
|
// Creating an AsyncIterable from AsyncIterator to iterate over the values.
|
||||||
// https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of
|
// https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of
|
||||||
@ -141,22 +142,28 @@ export class EventWatcher {
|
|||||||
|
|
||||||
async _handleIndexingComplete (jobData: any): Promise<void> {
|
async _handleIndexingComplete (jobData: any): Promise<void> {
|
||||||
const { blockHash, blockNumber, priority } = jobData;
|
const { blockHash, blockNumber, priority } = jobData;
|
||||||
log(`Job onComplete indexing block ${blockHash} ${blockNumber}`);
|
|
||||||
|
|
||||||
const [blockProgress, syncStatus] = await Promise.all([
|
const [blockProgress, syncStatus] = await Promise.all([
|
||||||
this._indexer.getBlockProgress(blockHash),
|
this._indexer.getBlockProgress(blockHash),
|
||||||
|
// Update sync progress.
|
||||||
this._indexer.updateSyncStatusIndexedBlock(blockHash, blockNumber)
|
this._indexer.updateSyncStatusIndexedBlock(blockHash, blockNumber)
|
||||||
]);
|
]);
|
||||||
|
|
||||||
// Create pruning job if required.
|
if (blockProgress) {
|
||||||
if (syncStatus && syncStatus.latestIndexedBlockNumber > (syncStatus.latestCanonicalBlockNumber + MAX_REORG_DEPTH)) {
|
log(`Job onComplete indexing block ${blockHash} ${blockNumber}`);
|
||||||
await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Publish block progress event if no events exist.
|
// Create pruning job if required.
|
||||||
// Event for blocks with events will be pusblished from eventProcessingCompleteHandler.
|
if (syncStatus && syncStatus.latestIndexedBlockNumber > (syncStatus.latestCanonicalBlockNumber + MAX_REORG_DEPTH)) {
|
||||||
if (blockProgress && blockProgress.numEvents === 0) {
|
await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority);
|
||||||
await this.publishBlockProgressToSubscribers(blockProgress);
|
}
|
||||||
|
|
||||||
|
// Publish block progress event if no events exist.
|
||||||
|
// Event for blocks with events will be pusblished from eventProcessingCompleteHandler.
|
||||||
|
if (blockProgress.numEvents === 0) {
|
||||||
|
await this.publishBlockProgressToSubscribers(blockProgress);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log(`block not indexed for ${blockHash} ${blockNumber}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,7 +192,7 @@ export class JobRunner {
|
|||||||
const message = `Parent block number ${parentBlockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash} not fetched yet, aborting`;
|
const message = `Parent block number ${parentBlockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash} not fetched yet, aborting`;
|
||||||
log(message);
|
log(message);
|
||||||
|
|
||||||
throw new Error(message);
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!parentBlock.isComplete) {
|
if (!parentBlock.isComplete) {
|
||||||
|
Loading…
Reference in New Issue
Block a user