mirror of
https://github.com/cerc-io/watcher-ts
synced 2024-11-19 20:36:19 +00:00
Move chain pruning to block processing queue with higher priority (#235)
* Move chain pruning to block processing queue. * Refactor common code in util. Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
parent
458956a3a1
commit
f1f70ad7d4
@ -7,7 +7,7 @@ import debug from 'debug';
|
|||||||
import { PubSub } from 'apollo-server-express';
|
import { PubSub } from 'apollo-server-express';
|
||||||
|
|
||||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||||
import { EventWatcher as BaseEventWatcher, EventWatcherInterface, JobQueue, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_CHAIN_PRUNING } from '@vulcanize/util';
|
import { EventWatcher as BaseEventWatcher, EventWatcherInterface, JobQueue, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from '@vulcanize/util';
|
||||||
|
|
||||||
import { Indexer } from './indexer';
|
import { Indexer } from './indexer';
|
||||||
|
|
||||||
@ -143,7 +143,6 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
await this.watchBlocksAtChainHead();
|
await this.watchBlocksAtChainHead();
|
||||||
await this.initBlockProcessingOnCompleteHandler();
|
await this.initBlockProcessingOnCompleteHandler();
|
||||||
await this.initEventProcessingOnCompleteHandler();
|
await this.initEventProcessingOnCompleteHandler();
|
||||||
await this.initChainPruningOnCompleteHandler();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async stop (): Promise<void> {
|
async stop (): Promise<void> {
|
||||||
@ -167,10 +166,4 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async initChainPruningOnCompleteHandler (): Promise<void> {
|
|
||||||
this._jobQueue.onComplete(QUEUE_CHAIN_PRUNING, async (job) => {
|
|
||||||
await this._baseEventWatcher.chainPruningCompleteHandler(job);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,6 @@ import {
|
|||||||
JobQueue,
|
JobQueue,
|
||||||
QUEUE_BLOCK_PROCESSING,
|
QUEUE_BLOCK_PROCESSING,
|
||||||
QUEUE_EVENT_PROCESSING,
|
QUEUE_EVENT_PROCESSING,
|
||||||
QUEUE_CHAIN_PRUNING,
|
|
||||||
JobRunner as BaseJobRunner,
|
JobRunner as BaseJobRunner,
|
||||||
JobQueueConfig
|
JobQueueConfig
|
||||||
} from '@vulcanize/util';
|
} from '@vulcanize/util';
|
||||||
@ -43,7 +42,6 @@ export class JobRunner {
|
|||||||
async start (): Promise<void> {
|
async start (): Promise<void> {
|
||||||
await this.subscribeBlockProcessingQueue();
|
await this.subscribeBlockProcessingQueue();
|
||||||
await this.subscribeEventProcessingQueue();
|
await this.subscribeEventProcessingQueue();
|
||||||
await this.subscribeChainPruningQueue();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async subscribeBlockProcessingQueue (): Promise<void> {
|
async subscribeBlockProcessingQueue (): Promise<void> {
|
||||||
@ -66,14 +64,6 @@ export class JobRunner {
|
|||||||
await this._jobQueue.markComplete(job);
|
await this._jobQueue.markComplete(job);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async subscribeChainPruningQueue (): Promise<void> {
|
|
||||||
await this._jobQueue.subscribe(QUEUE_CHAIN_PRUNING, async (job) => {
|
|
||||||
await this._baseJobRunner.pruneChain(job);
|
|
||||||
|
|
||||||
await this._jobQueue.markComplete(job);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export const main = async (): Promise<any> => {
|
export const main = async (): Promise<any> => {
|
||||||
|
@ -7,7 +7,7 @@ import { AssertionError } from 'assert';
|
|||||||
import 'mocha';
|
import 'mocha';
|
||||||
import _ from 'lodash';
|
import _ from 'lodash';
|
||||||
|
|
||||||
import { getConfig, JobQueue, JobRunner } from '@vulcanize/util';
|
import { getConfig, JobQueue, JobRunner, JOB_KIND_PRUNE } from '@vulcanize/util';
|
||||||
import { getCache } from '@vulcanize/cache';
|
import { getCache } from '@vulcanize/cache';
|
||||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||||
import { insertNDummyBlocks, removeEntities } from '@vulcanize/util/test';
|
import { insertNDummyBlocks, removeEntities } from '@vulcanize/util/test';
|
||||||
@ -125,8 +125,8 @@ describe('chain pruning', () => {
|
|||||||
const blocks = await indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
const blocks = await indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
||||||
expect(blocks).to.have.lengthOf(1);
|
expect(blocks).to.have.lengthOf(1);
|
||||||
|
|
||||||
const job = { data: { pruneBlockHeight } };
|
const job = { data: { kind: JOB_KIND_PRUNE, pruneBlockHeight } };
|
||||||
await jobRunner.pruneChain(job);
|
await jobRunner.processBlock(job);
|
||||||
|
|
||||||
// Only one canonical (not pruned) block should exist at the pruned height.
|
// Only one canonical (not pruned) block should exist at the pruned height.
|
||||||
const blocksAfterPruning = await indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
const blocksAfterPruning = await indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
||||||
@ -191,8 +191,8 @@ describe('chain pruning', () => {
|
|||||||
const blocksBeforePruning = await indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
const blocksBeforePruning = await indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
||||||
expect(blocksBeforePruning).to.have.lengthOf(3);
|
expect(blocksBeforePruning).to.have.lengthOf(3);
|
||||||
|
|
||||||
const job = { data: { pruneBlockHeight } };
|
const job = { data: { kind: JOB_KIND_PRUNE, pruneBlockHeight } };
|
||||||
await jobRunner.pruneChain(job);
|
await jobRunner.processBlock(job);
|
||||||
|
|
||||||
// Only one canonical (not pruned) block should exist at the pruned height.
|
// Only one canonical (not pruned) block should exist at the pruned height.
|
||||||
const blocksAfterPruning = await indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
const blocksAfterPruning = await indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
||||||
@ -277,8 +277,8 @@ describe('chain pruning', () => {
|
|||||||
const blocksBeforePruning = await indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
const blocksBeforePruning = await indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
||||||
expect(blocksBeforePruning).to.have.lengthOf(2);
|
expect(blocksBeforePruning).to.have.lengthOf(2);
|
||||||
|
|
||||||
const job = { data: { pruneBlockHeight } };
|
const job = { data: { kind: JOB_KIND_PRUNE, pruneBlockHeight } };
|
||||||
await jobRunner.pruneChain(job);
|
await jobRunner.processBlock(job);
|
||||||
|
|
||||||
// Only one canonical (not pruned) block should exist at the pruned height.
|
// Only one canonical (not pruned) block should exist at the pruned height.
|
||||||
const blocksAfterPruning = await indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
const blocksAfterPruning = await indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
||||||
@ -341,8 +341,8 @@ describe('chain pruning', () => {
|
|||||||
const blocksBeforePruning = await indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
const blocksBeforePruning = await indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
||||||
expect(blocksBeforePruning).to.have.lengthOf(2);
|
expect(blocksBeforePruning).to.have.lengthOf(2);
|
||||||
|
|
||||||
const job = { data: { pruneBlockHeight } };
|
const job = { data: { kind: JOB_KIND_PRUNE, pruneBlockHeight } };
|
||||||
await jobRunner.pruneChain(job);
|
await jobRunner.processBlock(job);
|
||||||
|
|
||||||
// Only one canonical (not pruned) block should exist at the pruned height.
|
// Only one canonical (not pruned) block should exist at the pruned height.
|
||||||
const blocksAfterPruning = await indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
const blocksAfterPruning = await indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
||||||
@ -405,8 +405,8 @@ describe('chain pruning', () => {
|
|||||||
expect(blocksBeforePruning).to.have.lengthOf(2);
|
expect(blocksBeforePruning).to.have.lengthOf(2);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const job = { data: { pruneBlockHeight } };
|
const job = { data: { kind: JOB_KIND_PRUNE, pruneBlockHeight } };
|
||||||
await jobRunner.pruneChain(job);
|
await jobRunner.processBlock(job);
|
||||||
expect.fail('Job Runner should throw error for pruning at frothy region');
|
expect.fail('Job Runner should throw error for pruning at frothy region');
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
expect(error).to.be.instanceof(AssertionError);
|
expect(error).to.be.instanceof(AssertionError);
|
||||||
|
@ -12,7 +12,6 @@ import {
|
|||||||
EventWatcher as BaseEventWatcher,
|
EventWatcher as BaseEventWatcher,
|
||||||
QUEUE_BLOCK_PROCESSING,
|
QUEUE_BLOCK_PROCESSING,
|
||||||
QUEUE_EVENT_PROCESSING,
|
QUEUE_EVENT_PROCESSING,
|
||||||
QUEUE_CHAIN_PRUNING,
|
|
||||||
EventWatcherInterface
|
EventWatcherInterface
|
||||||
} from '@vulcanize/util';
|
} from '@vulcanize/util';
|
||||||
|
|
||||||
@ -53,7 +52,6 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
await this.watchBlocksAtChainHead();
|
await this.watchBlocksAtChainHead();
|
||||||
await this.initBlockProcessingOnCompleteHandler();
|
await this.initBlockProcessingOnCompleteHandler();
|
||||||
await this.initEventProcessingOnCompleteHandler();
|
await this.initEventProcessingOnCompleteHandler();
|
||||||
await this.initChainPruningOnCompleteHandler();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async stop (): Promise<void> {
|
async stop (): Promise<void> {
|
||||||
@ -92,12 +90,6 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async initChainPruningOnCompleteHandler (): Promise<void> {
|
|
||||||
this._jobQueue.onComplete(QUEUE_CHAIN_PRUNING, async (job) => {
|
|
||||||
await this._baseEventWatcher.chainPruningCompleteHandler(job);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async publishUniswapEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
|
async publishUniswapEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
|
||||||
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
|
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
|
||||||
const resultEvent = this._indexer.getResultEvent(dbEvent);
|
const resultEvent = this._indexer.getResultEvent(dbEvent);
|
||||||
|
@ -16,7 +16,6 @@ import {
|
|||||||
JobRunner as BaseJobRunner,
|
JobRunner as BaseJobRunner,
|
||||||
QUEUE_BLOCK_PROCESSING,
|
QUEUE_BLOCK_PROCESSING,
|
||||||
QUEUE_EVENT_PROCESSING,
|
QUEUE_EVENT_PROCESSING,
|
||||||
QUEUE_CHAIN_PRUNING,
|
|
||||||
JobQueueConfig
|
JobQueueConfig
|
||||||
} from '@vulcanize/util';
|
} from '@vulcanize/util';
|
||||||
|
|
||||||
@ -42,7 +41,6 @@ export class JobRunner {
|
|||||||
async start (): Promise<void> {
|
async start (): Promise<void> {
|
||||||
await this.subscribeBlockProcessingQueue();
|
await this.subscribeBlockProcessingQueue();
|
||||||
await this.subscribeEventProcessingQueue();
|
await this.subscribeEventProcessingQueue();
|
||||||
await this.subscribeChainPruningQueue();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async subscribeBlockProcessingQueue (): Promise<void> {
|
async subscribeBlockProcessingQueue (): Promise<void> {
|
||||||
@ -81,14 +79,6 @@ export class JobRunner {
|
|||||||
await this._jobQueue.markComplete(job);
|
await this._jobQueue.markComplete(job);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async subscribeChainPruningQueue (): Promise<void> {
|
|
||||||
await this._jobQueue.subscribe(QUEUE_CHAIN_PRUNING, async (job) => {
|
|
||||||
await this._baseJobRunner.pruneChain(job);
|
|
||||||
|
|
||||||
await this._jobQueue.markComplete(job);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export const main = async (): Promise<any> => {
|
export const main = async (): Promise<any> => {
|
||||||
|
26
packages/util/src/common.ts
Normal file
26
packages/util/src/common.ts
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
import { JOB_KIND_PRUNE, QUEUE_BLOCK_PROCESSING } from './constants';
|
||||||
|
import { JobQueue } from './job-queue';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create pruning job in QUEUE_BLOCK_PROCESSING.
|
||||||
|
* @param jobQueue
|
||||||
|
* @param latestCanonicalBlockNumber
|
||||||
|
* @param priority
|
||||||
|
*/
|
||||||
|
export const createPruningJob = async (jobQueue: JobQueue, latestCanonicalBlockNumber: number, priority = 0): Promise<void> => {
|
||||||
|
const pruneBlockHeight = latestCanonicalBlockNumber + 1;
|
||||||
|
const newPriority = priority + 1;
|
||||||
|
|
||||||
|
// Create a job to prune at block height (latestCanonicalBlockNumber + 1).
|
||||||
|
return jobQueue.pushJob(
|
||||||
|
QUEUE_BLOCK_PROCESSING,
|
||||||
|
{
|
||||||
|
kind: JOB_KIND_PRUNE,
|
||||||
|
pruneBlockHeight,
|
||||||
|
priority: newPriority
|
||||||
|
},
|
||||||
|
{
|
||||||
|
priority: newPriority
|
||||||
|
}
|
||||||
|
);
|
||||||
|
};
|
@ -7,3 +7,6 @@ export const MAX_REORG_DEPTH = 16;
|
|||||||
export const QUEUE_BLOCK_PROCESSING = 'block-processing';
|
export const QUEUE_BLOCK_PROCESSING = 'block-processing';
|
||||||
export const QUEUE_EVENT_PROCESSING = 'event-processing';
|
export const QUEUE_EVENT_PROCESSING = 'event-processing';
|
||||||
export const QUEUE_CHAIN_PRUNING = 'chain-pruning';
|
export const QUEUE_CHAIN_PRUNING = 'chain-pruning';
|
||||||
|
|
||||||
|
export const JOB_KIND_INDEX = 'index';
|
||||||
|
export const JOB_KIND_PRUNE = 'prune';
|
||||||
|
@ -11,7 +11,8 @@ import { EthClient } from '@vulcanize/ipld-eth-client';
|
|||||||
|
|
||||||
import { JobQueue } from './job-queue';
|
import { JobQueue } from './job-queue';
|
||||||
import { BlockProgressInterface, EventInterface, IndexerInterface } from './types';
|
import { BlockProgressInterface, EventInterface, IndexerInterface } from './types';
|
||||||
import { QUEUE_BLOCK_PROCESSING, QUEUE_CHAIN_PRUNING, MAX_REORG_DEPTH } from './constants';
|
import { QUEUE_BLOCK_PROCESSING, MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX } from './constants';
|
||||||
|
import { createPruningJob } from './common';
|
||||||
|
|
||||||
const log = debug('vulcanize:events');
|
const log = debug('vulcanize:events');
|
||||||
|
|
||||||
@ -42,28 +43,24 @@ export class EventWatcher {
|
|||||||
|
|
||||||
log('watchBlock', blockHash, blockNumber);
|
log('watchBlock', blockHash, blockNumber);
|
||||||
|
|
||||||
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber, parentHash, timestamp });
|
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { kind: JOB_KIND_INDEX, blockHash, blockNumber, parentHash, timestamp });
|
||||||
}
|
}
|
||||||
|
|
||||||
async blockProcessingCompleteHandler (job: any): Promise<void> {
|
async blockProcessingCompleteHandler (job: any): Promise<void> {
|
||||||
const { data: { request: { data: { blockHash, blockNumber } } } } = job;
|
const { data: { request: { data } } } = job;
|
||||||
log(`Job onComplete block ${blockHash} ${blockNumber}`);
|
const { kind } = data;
|
||||||
|
|
||||||
// Update sync progress.
|
switch (kind) {
|
||||||
const syncStatus = await this._indexer.updateSyncStatusIndexedBlock(blockHash, blockNumber);
|
case JOB_KIND_INDEX:
|
||||||
|
this._handleIndexingComplete(data);
|
||||||
|
break;
|
||||||
|
|
||||||
// Create pruning job if required.
|
case JOB_KIND_PRUNE:
|
||||||
if (syncStatus && syncStatus.latestIndexedBlockNumber > (syncStatus.latestCanonicalBlockNumber + MAX_REORG_DEPTH)) {
|
this._handlePruningComplete(data);
|
||||||
// Create a job to prune at block height (latestCanonicalBlockNumber + 1)
|
break;
|
||||||
const pruneBlockHeight = syncStatus.latestCanonicalBlockNumber + 1;
|
|
||||||
// TODO: Move this to the block processing queue to run pruning jobs at a higher priority than block processing jobs.
|
|
||||||
await this._jobQueue.pushJob(QUEUE_CHAIN_PRUNING, { pruneBlockHeight });
|
|
||||||
}
|
|
||||||
|
|
||||||
// Publish block progress event.
|
default:
|
||||||
const blockProgress = await this._indexer.getBlockProgress(blockHash);
|
throw new Error(`Invalid Job kind ${kind} in complete handler of QUEUE_BLOCK_PROCESSING.`);
|
||||||
if (blockProgress) {
|
|
||||||
await this.publishBlockProgressToSubscribers(blockProgress);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -82,19 +79,6 @@ export class EventWatcher {
|
|||||||
return dbEvent;
|
return dbEvent;
|
||||||
}
|
}
|
||||||
|
|
||||||
async chainPruningCompleteHandler (job:any): Promise<void> {
|
|
||||||
const { data: { request: { data: { pruneBlockHeight } } } } = job;
|
|
||||||
log(`Job onComplete chain pruning ${pruneBlockHeight}`);
|
|
||||||
|
|
||||||
const blocks = await this._indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
|
||||||
|
|
||||||
// Only one canonical (not pruned) block should exist at the pruned height.
|
|
||||||
assert(blocks.length === 1);
|
|
||||||
const [block] = blocks;
|
|
||||||
|
|
||||||
await this._indexer.updateSyncStatusCanonicalBlock(block.blockHash, block.blockNumber);
|
|
||||||
}
|
|
||||||
|
|
||||||
async publishBlockProgressToSubscribers (blockProgress: BlockProgressInterface): Promise<void> {
|
async publishBlockProgressToSubscribers (blockProgress: BlockProgressInterface): Promise<void> {
|
||||||
const { blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress;
|
const { blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress;
|
||||||
|
|
||||||
@ -110,6 +94,38 @@ export class EventWatcher {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async _handleIndexingComplete (jobData: any): Promise<void> {
|
||||||
|
const { blockHash, blockNumber, priority } = jobData;
|
||||||
|
log(`Job onComplete indexing block ${blockHash} ${blockNumber}`);
|
||||||
|
|
||||||
|
// Update sync progress.
|
||||||
|
const syncStatus = await this._indexer.updateSyncStatusIndexedBlock(blockHash, blockNumber);
|
||||||
|
|
||||||
|
// Create pruning job if required.
|
||||||
|
if (syncStatus && syncStatus.latestIndexedBlockNumber > (syncStatus.latestCanonicalBlockNumber + MAX_REORG_DEPTH)) {
|
||||||
|
await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish block progress event.
|
||||||
|
const blockProgress = await this._indexer.getBlockProgress(blockHash);
|
||||||
|
if (blockProgress) {
|
||||||
|
await this.publishBlockProgressToSubscribers(blockProgress);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async _handlePruningComplete (jobData: any): Promise<void> {
|
||||||
|
const { pruneBlockHeight } = jobData;
|
||||||
|
log(`Job onComplete pruning at height ${pruneBlockHeight}`);
|
||||||
|
|
||||||
|
const blocks = await this._indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
||||||
|
|
||||||
|
// Only one canonical (not pruned) block should exist at the pruned height.
|
||||||
|
assert(blocks.length === 1);
|
||||||
|
const [block] = blocks;
|
||||||
|
|
||||||
|
await this._indexer.updateSyncStatusCanonicalBlock(block.blockHash, block.blockNumber);
|
||||||
|
}
|
||||||
|
|
||||||
async stop (): Promise<void> {
|
async stop (): Promise<void> {
|
||||||
if (this._subscription) {
|
if (this._subscription) {
|
||||||
log('Stopped watching upstream blocks');
|
log('Stopped watching upstream blocks');
|
||||||
|
@ -7,7 +7,7 @@ import debug from 'debug';
|
|||||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||||
|
|
||||||
import { JobQueue } from './job-queue';
|
import { JobQueue } from './job-queue';
|
||||||
import { QUEUE_BLOCK_PROCESSING } from './constants';
|
import { JOB_KIND_INDEX, QUEUE_BLOCK_PROCESSING } from './constants';
|
||||||
import { EventWatcherInterface, IndexerInterface } from './types';
|
import { EventWatcherInterface, IndexerInterface } from './types';
|
||||||
|
|
||||||
const log = debug('vulcanize:fill');
|
const log = debug('vulcanize:fill');
|
||||||
@ -35,7 +35,7 @@ export const fillBlocks = async (
|
|||||||
if (blockProgress) {
|
if (blockProgress) {
|
||||||
log(`Block number ${blockNumber}, block hash ${blockHash} already known, skip filling`);
|
log(`Block number ${blockNumber}, block hash ${blockHash} already known, skip filling`);
|
||||||
} else {
|
} else {
|
||||||
await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber, parentHash, timestamp });
|
await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { kind: JOB_KIND_INDEX, blockHash, blockNumber, parentHash, timestamp });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,11 +5,12 @@
|
|||||||
import assert from 'assert';
|
import assert from 'assert';
|
||||||
import debug from 'debug';
|
import debug from 'debug';
|
||||||
import { wait } from '.';
|
import { wait } from '.';
|
||||||
|
import { createPruningJob } from './common';
|
||||||
|
|
||||||
import { JobQueueConfig } from './config';
|
import { JobQueueConfig } from './config';
|
||||||
import { MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './constants';
|
import { JOB_KIND_INDEX, JOB_KIND_PRUNE, MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './constants';
|
||||||
import { JobQueue } from './job-queue';
|
import { JobQueue } from './job-queue';
|
||||||
import { EventInterface, IndexerInterface } from './types';
|
import { EventInterface, IndexerInterface, SyncStatusInterface } from './types';
|
||||||
|
|
||||||
const log = debug('vulcanize:job-runner');
|
const log = debug('vulcanize:job-runner');
|
||||||
|
|
||||||
@ -25,62 +26,23 @@ export class JobRunner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async processBlock (job: any): Promise<void> {
|
async processBlock (job: any): Promise<void> {
|
||||||
const { data: { blockHash, blockNumber, parentHash, priority, timestamp } } = job;
|
const { data: { kind } } = job;
|
||||||
|
|
||||||
log(`Processing block number ${blockNumber} hash ${blockHash} `);
|
const syncStatus = await this._indexer.getSyncStatus();
|
||||||
|
assert(syncStatus);
|
||||||
|
|
||||||
// Init sync status record if none exists.
|
switch (kind) {
|
||||||
let syncStatus = await this._indexer.getSyncStatus();
|
case JOB_KIND_INDEX:
|
||||||
if (!syncStatus) {
|
await this._indexBlock(job, syncStatus);
|
||||||
syncStatus = await this._indexer.updateSyncStatusChainHead(blockHash, blockNumber);
|
break;
|
||||||
}
|
|
||||||
|
|
||||||
// Check if parent block has been processed yet, if not, push a high priority job to process that first and abort.
|
case JOB_KIND_PRUNE:
|
||||||
// However, don't go beyond the `latestCanonicalBlockHash` from SyncStatus as we have to assume the reorg can't be that deep.
|
await this._pruneChain(job, syncStatus);
|
||||||
if (blockHash !== syncStatus.latestCanonicalBlockHash) {
|
break;
|
||||||
const parent = await this._indexer.getBlockProgress(parentHash);
|
|
||||||
if (!parent) {
|
|
||||||
const { number: parentBlockNumber, parent: { hash: grandparentHash }, timestamp: parentTimestamp } = await this._indexer.getBlock(parentHash);
|
|
||||||
|
|
||||||
// Create a higher priority job to index parent block and then abort.
|
default:
|
||||||
// We don't have to worry about aborting as this job will get retried later.
|
log(`Invalid Job kind ${kind} in QUEUE_BLOCK_PROCESSING.`);
|
||||||
const newPriority = (priority || 0) + 1;
|
break;
|
||||||
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, {
|
|
||||||
blockHash: parentHash,
|
|
||||||
blockNumber: parentBlockNumber,
|
|
||||||
parentHash: grandparentHash,
|
|
||||||
timestamp: parentTimestamp,
|
|
||||||
priority: newPriority
|
|
||||||
}, { priority: newPriority });
|
|
||||||
|
|
||||||
const message = `Parent block number ${parentBlockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash} not fetched yet, aborting`;
|
|
||||||
log(message);
|
|
||||||
|
|
||||||
throw new Error(message);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (parentHash !== syncStatus.latestCanonicalBlockHash && !parent.isComplete) {
|
|
||||||
// Parent block indexing needs to finish before this block can be indexed.
|
|
||||||
const message = `Indexing incomplete for parent block number ${parent.blockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash}, aborting`;
|
|
||||||
log(message);
|
|
||||||
|
|
||||||
throw new Error(message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if block is being already processed.
|
|
||||||
const blockProgress = await this._indexer.getBlockProgress(blockHash);
|
|
||||||
|
|
||||||
if (!blockProgress) {
|
|
||||||
const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig;
|
|
||||||
|
|
||||||
// Delay required to process block.
|
|
||||||
await wait(jobDelayInMilliSecs);
|
|
||||||
const events = await this._indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
|
|
||||||
|
|
||||||
for (let ei = 0; ei < events.length; ei++) {
|
|
||||||
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true });
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -122,14 +84,12 @@ export class JobRunner {
|
|||||||
return event;
|
return event;
|
||||||
}
|
}
|
||||||
|
|
||||||
async pruneChain (job: any): Promise<void> {
|
async _pruneChain (job: any, syncStatus: SyncStatusInterface): Promise<void> {
|
||||||
const pruneBlockHeight: number = job.data.pruneBlockHeight;
|
const { pruneBlockHeight } = job.data;
|
||||||
|
|
||||||
log(`Processing chain pruning at ${pruneBlockHeight}`);
|
log(`Processing chain pruning at ${pruneBlockHeight}`);
|
||||||
|
|
||||||
// Assert we're at a depth where pruning is safe.
|
// Assert we're at a depth where pruning is safe.
|
||||||
const syncStatus = await this._indexer.getSyncStatus();
|
|
||||||
assert(syncStatus);
|
|
||||||
assert(syncStatus.latestIndexedBlockNumber >= (pruneBlockHeight + MAX_REORG_DEPTH));
|
assert(syncStatus.latestIndexedBlockNumber >= (pruneBlockHeight + MAX_REORG_DEPTH));
|
||||||
|
|
||||||
// Check that we haven't already pruned at this depth.
|
// Check that we haven't already pruned at this depth.
|
||||||
@ -159,4 +119,68 @@ export class JobRunner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async _indexBlock (job: any, syncStatus: SyncStatusInterface): Promise<void> {
|
||||||
|
const { data: { blockHash, blockNumber, parentHash, priority, timestamp } } = job;
|
||||||
|
log(`Processing block number ${blockNumber} hash ${blockHash} `);
|
||||||
|
|
||||||
|
// Check if chain pruning is caught up.
|
||||||
|
if ((syncStatus.latestIndexedBlockNumber - syncStatus.latestCanonicalBlockNumber) > MAX_REORG_DEPTH) {
|
||||||
|
await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority);
|
||||||
|
|
||||||
|
const message = `Chain pruning not caught up yet, latest canonical block number ${syncStatus.latestCanonicalBlockNumber} and latest indexed block number ${syncStatus.latestIndexedBlockNumber}`;
|
||||||
|
log(message);
|
||||||
|
throw new Error(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if parent block has been processed yet, if not, push a high priority job to process that first and abort.
|
||||||
|
// However, don't go beyond the `latestCanonicalBlockHash` from SyncStatus as we have to assume the reorg can't be that deep.
|
||||||
|
if (blockHash !== syncStatus.latestCanonicalBlockHash) {
|
||||||
|
const parent = await this._indexer.getBlockProgress(parentHash);
|
||||||
|
|
||||||
|
if (!parent) {
|
||||||
|
const { number: parentBlockNumber, parent: { hash: grandparentHash }, timestamp: parentTimestamp } = await this._indexer.getBlock(parentHash);
|
||||||
|
|
||||||
|
// Create a higher priority job to index parent block and then abort.
|
||||||
|
// We don't have to worry about aborting as this job will get retried later.
|
||||||
|
const newPriority = (priority || 0) + 1;
|
||||||
|
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, {
|
||||||
|
kind: JOB_KIND_INDEX,
|
||||||
|
blockHash: parentHash,
|
||||||
|
blockNumber: parentBlockNumber,
|
||||||
|
parentHash: grandparentHash,
|
||||||
|
timestamp: parentTimestamp,
|
||||||
|
priority: newPriority
|
||||||
|
}, { priority: newPriority });
|
||||||
|
|
||||||
|
const message = `Parent block number ${parentBlockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash} not fetched yet, aborting`;
|
||||||
|
log(message);
|
||||||
|
|
||||||
|
throw new Error(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (parentHash !== syncStatus.latestCanonicalBlockHash && !parent.isComplete) {
|
||||||
|
// Parent block indexing needs to finish before this block can be indexed.
|
||||||
|
const message = `Indexing incomplete for parent block number ${parent.blockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash}, aborting`;
|
||||||
|
log(message);
|
||||||
|
|
||||||
|
throw new Error(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if block is being already processed.
|
||||||
|
const blockProgress = await this._indexer.getBlockProgress(blockHash);
|
||||||
|
|
||||||
|
if (!blockProgress) {
|
||||||
|
const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig;
|
||||||
|
|
||||||
|
// Delay required to process block.
|
||||||
|
await wait(jobDelayInMilliSecs);
|
||||||
|
const events = await this._indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
|
||||||
|
|
||||||
|
for (let ei = 0; ei < events.length; ei++) {
|
||||||
|
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user