Performance improvements for fill CLI (#314)

* Fix fill cli to work with watcher server

* Add createdAt column and insert events in batches

* Implement prefetch to fill block and events in parallel

* Fix getPrevEntity and increase fill prefetch default batch size

* Fix watcher creating mulitple jobs for a block
This commit is contained in:
nikugogoi 2021-12-16 17:16:48 +05:30 committed by GitHub
parent b06d931054
commit 105b26d6a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 202 additions and 71 deletions

View File

@ -2,7 +2,7 @@
// Copyright 2021 Vulcanize, Inc.
//
import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
import { Entity, PrimaryGeneratedColumn, Column, Index, CreateDateColumn } from 'typeorm';
import { BlockProgressInterface } from '@vulcanize/util';
@ -40,4 +40,7 @@ export class BlockProgress implements BlockProgressInterface {
@Column('boolean', { default: false })
isPruned!: boolean
@CreateDateColumn()
createdAt!: Date;
}

View File

@ -42,6 +42,16 @@ export const main = async (): Promise<any> => {
require: true,
demandOption: true,
describe: 'Block number to stop processing at'
},
prefetch: {
type: 'boolean',
default: false,
describe: 'Block and events prefetch mode'
},
batchBlocks: {
type: 'number',
default: 10,
describe: 'Number of blocks prefetched in batch'
}
}).argv;
@ -90,7 +100,7 @@ export const main = async (): Promise<any> => {
assert(jobQueueConfig, 'Missing job queue config');
await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, blockDelayInMilliSecs, argv);
await fillBlocks(jobQueue, indexer, eventWatcher, blockDelayInMilliSecs, argv);
};
main().catch(err => {

View File

@ -2,7 +2,7 @@
// Copyright 2021 Vulcanize, Inc.
//
import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
import { Entity, PrimaryGeneratedColumn, Column, Index, CreateDateColumn } from 'typeorm';
import { BlockProgressInterface } from '@vulcanize/util';
@ -40,4 +40,7 @@ export class BlockProgress implements BlockProgressInterface {
@Column('boolean', { default: false })
isPruned!: boolean
@CreateDateColumn()
createdAt!: Date;
}

View File

@ -44,6 +44,16 @@ export const main = async (): Promise<any> => {
require: true,
demandOption: true,
describe: 'Block number to stop processing at'
},
prefetch: {
type: 'boolean',
default: false,
describe: 'Block and events prefetch mode'
},
batchBlocks: {
type: 'number',
default: 10,
describe: 'Number of blocks prefetched in batch'
}
}).argv;
@ -94,7 +104,7 @@ export const main = async (): Promise<any> => {
const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, blockDelayInMilliSecs, argv);
await fillBlocks(jobQueue, indexer, eventWatcher, blockDelayInMilliSecs, argv);
};
main().catch(err => {

View File

@ -363,9 +363,7 @@ export class Indexer implements IndexerInterface {
async _fetchAndSaveEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
assert(block.blockHash);
console.time('time:indexer#_fetchAndSaveEvents-uni_watcher');
const events = await this._uniClient.getEvents(block.blockHash);
console.timeEnd('time:indexer#_fetchAndSaveEvents-uni_watcher');
const dbEvents: Array<DeepPartial<Event>> = [];
@ -1106,7 +1104,7 @@ export class Indexer implements IndexerInterface {
if (modulo === BigInt(0)) {
// Current tick is initialized and needs to be updated.
this._loadTickUpdateFeeVarsAndSave(dbTx, Number(newTick), block, contractAddress);
await this._loadTickUpdateFeeVarsAndSave(dbTx, Number(newTick), block, contractAddress);
}
const numIters = BigInt(
@ -1126,13 +1124,13 @@ export class Indexer implements IndexerInterface {
const firstInitialized = oldTick + tickSpacing - modulo;
for (let i = firstInitialized; i < newTick; i = i + tickSpacing) {
this._loadTickUpdateFeeVarsAndSave(dbTx, Number(i), block, contractAddress);
await this._loadTickUpdateFeeVarsAndSave(dbTx, Number(i), block, contractAddress);
}
} else if (newTick < oldTick) {
const firstInitialized = oldTick - modulo;
for (let i = firstInitialized; i >= newTick; i = i - tickSpacing) {
this._loadTickUpdateFeeVarsAndSave(dbTx, Number(i), block, contractAddress);
await this._loadTickUpdateFeeVarsAndSave(dbTx, Number(i), block, contractAddress);
}
}

View File

@ -2,7 +2,7 @@
// Copyright 2021 Vulcanize, Inc.
//
import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
import { Entity, PrimaryGeneratedColumn, Column, Index, CreateDateColumn } from 'typeorm';
import { BlockProgressInterface } from '@vulcanize/util';
@ -40,4 +40,7 @@ export class BlockProgress implements BlockProgressInterface {
@Column('boolean', { default: false })
isPruned!: boolean
@CreateDateColumn()
createdAt!: Date;
}

View File

@ -42,6 +42,16 @@ export const main = async (): Promise<any> => {
require: true,
demandOption: true,
describe: 'Block number to stop processing at'
},
prefetch: {
type: 'boolean',
default: false,
describe: 'Block and events prefetch mode'
},
batchBlocks: {
type: 'number',
default: 10,
describe: 'Number of blocks prefetched in batch'
}
}).argv;
@ -91,7 +101,7 @@ export const main = async (): Promise<any> => {
assert(jobQueueConfig, 'Missing job queue config');
await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, blockDelayInMilliSecs, argv);
await fillBlocks(jobQueue, indexer, eventWatcher, blockDelayInMilliSecs, argv);
};
main().catch(err => {

View File

@ -431,7 +431,6 @@ export class Indexer implements IndexerInterface {
async _fetchAndSaveEvents ({ blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> {
assert(blockHash);
console.time('time:indexer#_fetchAndSaveEvents-logs_txs');
const logsPromise = this._ethClient.getLogs({ blockHash });
const transactionsPromise = this._postgraphileClient.getBlockWithTransactions({ blockHash });
@ -450,8 +449,6 @@ export class Indexer implements IndexerInterface {
}
] = await Promise.all([logsPromise, transactionsPromise]);
console.timeEnd('time:indexer#_fetchAndSaveEvents-logs_txs');
const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => {
acc[transaction.txHash] = transaction;
return acc;

View File

@ -10,7 +10,8 @@
"ethers": "^5.2.0",
"fs-extra": "^10.0.0",
"pg-boss": "^6.1.0",
"toml": "^3.0.0"
"toml": "^3.0.0",
"lodash": "^4.17.21"
},
"devDependencies": {
"@types/fs-extra": "^9.0.11",
@ -28,7 +29,6 @@
"eslint-plugin-promise": "^5.1.0",
"eslint-plugin-standard": "^5.0.0",
"hardhat": "^2.3.0",
"lodash": "^4.17.21",
"typeorm": "^0.2.32",
"typeorm-naming-strategies": "^2.0.0"
},

View File

@ -47,22 +47,30 @@ export const processBlockByNumber = async (
log(`Process block ${blockNumber}`);
while (true) {
console.time('time:common#processBlockByNumber-postgraphile');
const blockProgressEntities = await indexer.getBlocksAtHeight(blockNumber, false);
const blocks = await indexer.getBlocks({ blockNumber });
let blocks = blockProgressEntities.map((block: any) => {
block.timestamp = block.blockTimestamp;
console.timeEnd('time:common#processBlockByNumber-postgraphile');
return block;
});
if (!blocks.length) {
console.time('time:common#processBlockByNumber-postgraphile');
blocks = await indexer.getBlocks({ blockNumber });
console.timeEnd('time:common#processBlockByNumber-postgraphile');
}
if (blocks.length) {
for (let bi = 0; bi < blocks.length; bi++) {
const { blockHash, blockNumber, parentHash, timestamp } = blocks[bi];
const blockProgress = await indexer.getBlockProgress(blockHash);
if (blockProgress) {
log(`Block number ${blockNumber}, block hash ${blockHash} already processed`);
} else {
await indexer.updateSyncStatusChainHead(blockHash, blockNumber);
console.time('time:common#processBlockByNumber-updateSyncStatusChainHead');
const syncStatus = await indexer.updateSyncStatusChainHead(blockHash, blockNumber);
console.timeEnd('time:common#processBlockByNumber-updateSyncStatusChainHead');
// Stop old blocks from getting pushed to job queue. They are already retried after fail.
if (syncStatus.latestIndexedBlockNumber < blockNumber) {
await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { kind: JOB_KIND_INDEX, blockHash, blockNumber, parentHash, timestamp });
}
}

View File

@ -37,6 +37,8 @@ const OPERATOR_MAP = {
ends: 'LIKE'
};
const INSERT_EVENTS_BATCH = 100;
export interface BlockHeight {
number?: number;
hash?: string;
@ -249,11 +251,17 @@ export class Database {
event.block = blockProgress;
});
await eventRepo.createQueryBuilder()
.insert()
.values(events)
.updateEntity(false)
.execute();
const eventBatches = _.chunk(events, INSERT_EVENTS_BATCH);
const insertPromises = eventBatches.map(async events => {
await eventRepo.createQueryBuilder()
.insert()
.values(events)
.updateEntity(false)
.execute();
});
await Promise.all(insertPromises);
return blockProgress;
}
@ -424,7 +432,9 @@ export class Database {
FROM
block_progress b
LEFT JOIN
${repo.metadata.tableName} e ON e.block_hash = b.block_hash
${repo.metadata.tableName} e
ON e.block_hash = b.block_hash
AND e.id = $2
WHERE
b.block_hash = $1
UNION ALL

View File

@ -146,19 +146,22 @@ export class EventWatcher {
const { blockHash, blockNumber, priority } = jobData;
log(`Job onComplete indexing block ${blockHash} ${blockNumber}`);
// Update sync progress.
const syncStatus = await this._indexer.updateSyncStatusIndexedBlock(blockHash, blockNumber);
const [blockProgress, syncStatus] = await Promise.all([
this._indexer.getBlockProgress(blockHash),
// Update sync progress.
this._indexer.updateSyncStatusIndexedBlock(blockHash, blockNumber)
]);
// Publish block progress event if no events exist.
// Event for blocks with events will be pusblished from eventProcessingCompleteHandler.
if (blockProgress && blockProgress.numEvents === 0) {
await this.publishBlockProgressToSubscribers(blockProgress);
}
// Create pruning job if required.
if (syncStatus && syncStatus.latestIndexedBlockNumber > (syncStatus.latestCanonicalBlockNumber + MAX_REORG_DEPTH)) {
await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority);
}
// Publish block progress event.
const blockProgress = await this._indexer.getBlockProgress(blockHash);
if (blockProgress) {
await this.publishBlockProgressToSubscribers(blockProgress);
}
}
async _handlePruningComplete (jobData: any): Promise<void> {

View File

@ -3,40 +3,56 @@
//
import assert from 'assert';
import { EthClient } from '@vulcanize/ipld-eth-client';
import debug from 'debug';
import { JobQueue } from './job-queue';
import { EventWatcherInterface, IndexerInterface } from './types';
import { wait } from './misc';
import { processBlockByNumber } from './common';
const log = debug('vulcanize:fill');
export const fillBlocks = async (
jobQueue: JobQueue,
indexer: IndexerInterface,
ethClient: EthClient,
eventWatcher: EventWatcherInterface,
blockDelayInMilliSecs: number,
{ startBlock, endBlock }: { startBlock: number, endBlock: number}
argv: {
startBlock: number,
endBlock: number,
prefetch: boolean,
batchBlocks: number,
}
): Promise<any> => {
let { startBlock, endBlock, prefetch, batchBlocks } = argv;
assert(startBlock < endBlock, 'endBlock should be greater than startBlock');
const syncStatus = await indexer.getSyncStatus();
if (prefetch) {
if (syncStatus) {
startBlock = syncStatus.chainHeadBlockNumber + 1;
}
await prefetchBlocks(indexer, blockDelayInMilliSecs, { startBlock, endBlock, batchBlocks });
return;
}
if (syncStatus && syncStatus.latestIndexedBlockNumber > -1) {
if (startBlock > syncStatus.latestIndexedBlockNumber + 1) {
throw new Error(`Missing blocks between startBlock ${startBlock} and latestIndexedBlockNumber ${syncStatus.latestIndexedBlockNumber}`);
}
startBlock = syncStatus.latestIndexedBlockNumber + 1;
}
await eventWatcher.initBlockProcessingOnCompleteHandler();
await eventWatcher.initEventProcessingOnCompleteHandler();
let currentBlockNumber = startBlock;
const syncStatus = await indexer.getSyncStatus();
const numberOfBlocks = endBlock - startBlock + 1;
console.time(`time:fill#fillBlocks-process_block_${startBlock}`);
if (syncStatus) {
if (currentBlockNumber > syncStatus.latestIndexedBlockNumber + 1) {
throw new Error(`Missing blocks between startBlock ${currentBlockNumber} and latestIndexedBlockNumber ${syncStatus.latestIndexedBlockNumber}`);
}
currentBlockNumber = syncStatus.latestIndexedBlockNumber + 1;
}
console.time(`time:fill#fillBlocks-process_block_${currentBlockNumber}`);
processBlockByNumber(jobQueue, indexer, blockDelayInMilliSecs, currentBlockNumber);
processBlockByNumber(jobQueue, indexer, blockDelayInMilliSecs, startBlock);
// Creating an AsyncIterable from AsyncIterator to iterate over the values.
// https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of
@ -52,21 +68,69 @@ export const fillBlocks = async (
for await (const data of blockProgressEventIterable) {
const { onBlockProgressEvent: { blockNumber, isComplete } } = data;
if (blockNumber === currentBlockNumber && isComplete) {
console.timeEnd(`time:fill#fillBlocks-process_block_${currentBlockNumber}`);
if (isComplete) {
console.timeEnd(`time:fill#fillBlocks-process_block_${blockNumber}`);
if (blockNumber >= endBlock) {
console.time(`time:fill#fillBlocks-process_block_${blockNumber + 1}`);
const blocksProcessed = blockNumber - startBlock + 1;
const completePercentage = Math.round(blocksProcessed / numberOfBlocks * 100);
log(`Processed ${blocksProcessed} of ${numberOfBlocks} blocks (${completePercentage}%)`);
await processBlockByNumber(jobQueue, indexer, blockDelayInMilliSecs, blockNumber + 1);
if (blockNumber + 1 >= endBlock) {
// Break the async loop when blockProgress event is for the endBlock and processing is complete.
break;
}
currentBlockNumber++;
console.time(`time:fill#fillBlocks-process_block_${currentBlockNumber}`);
processBlockByNumber(jobQueue, indexer, blockDelayInMilliSecs, currentBlockNumber);
}
}
log('Processed all blocks (100%)');
console.timeEnd('time:fill#fillBlocks-process_blocks');
};
const prefetchBlocks = async (
indexer: IndexerInterface,
blockDelayInMilliSecs: number,
{ startBlock, endBlock, batchBlocks }: {
startBlock: number,
endBlock: number,
batchBlocks: number,
}
) => {
for (let i = startBlock; i <= endBlock; i = i + batchBlocks) {
const batchEndBlock = Math.min(i + batchBlocks, endBlock + 1);
let blockNumbers = [...Array(batchEndBlock - i).keys()].map(n => n + i);
log('Fetching blockNumbers:', blockNumbers);
let blocks = [];
// Fetch blocks again if there are missing blocks.
while (true) {
const blockPromises = blockNumbers.map(async blockNumber => indexer.getBlocks({ blockNumber }));
const res = await Promise.all(blockPromises);
const missingIndex = res.findIndex(blocks => blocks.length === 0);
if (missingIndex < 0) {
blocks = res.flat();
break;
}
blockNumbers = blockNumbers.slice(missingIndex);
await wait(blockDelayInMilliSecs);
}
const fetchBlockPromises = blocks.map(async block => {
const { blockHash, blockNumber, parentHash, timestamp } = block;
const blockProgress = await indexer.getBlockProgress(blockHash);
if (!blockProgress) {
await indexer.fetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
}
});
await Promise.all(fetchBlockPromises);
}
};

View File

@ -141,7 +141,7 @@ export class Indexer {
throw error;
}
log('Block not found. Fetching block after eth_call.');
log('Block not found. Fetching block after RPC call.');
}
}

View File

@ -109,6 +109,7 @@ export class JobRunner {
}
async _indexBlock (job: any, syncStatus: SyncStatusInterface): Promise<void> {
console.time('time:job-runner#_indexBlock');
const { data: { blockHash, blockNumber, parentHash, priority, timestamp } } = job;
log(`Processing block number ${blockNumber} hash ${blockHash} `);
@ -172,26 +173,32 @@ export class JobRunner {
throw new Error(message);
}
} else {
blockProgress = parentBlock;
}
// Check if block is being already processed.
if (!blockProgress) {
const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig;
// Delay required to process block.
await wait(jobDelayInMilliSecs);
blockProgress = await this._indexer.fetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
if (blockProgress.numEvents) {
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: blockProgress.blockHash, publish: true });
}
}
// Check if block is being already processed.
if (blockProgress.numProcessedEvents === 0 && blockProgress.numEvents) {
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: blockProgress.blockHash, publish: true });
}
console.timeEnd('time:job-runner#_indexBlock');
}
async _processEvents (job: any): Promise<void> {
const { blockHash } = job.data;
console.time('time:job-runner#_processEvents-get-block-process');
let block = await this._indexer.getBlockProgress(blockHash);
console.timeEnd('time:job-runner#_processEvents-get-block-process');
assert(block);
console.time('time:job-runner#_processEvents-events');
@ -216,13 +223,17 @@ export class JobRunner {
console.timeEnd('time:job-runner#_processEvents-fetching_events_batch');
if (events.length) {
log(`Processing events batch from index ${events[0].index} to ${events[0].index + events.length - 1}`);
}
console.time('time:job-runner#_processEvents-processing_events_batch');
for (let event of events) {
// Process events in loop
const eventIndex = event.index;
log(`Processing event ${event.id} index ${eventIndex}`);
// log(`Processing event ${event.id} index ${eventIndex}`);
// Check if previous event in block has been processed exactly before this and abort if not.
if (eventIndex > 0) { // Skip the first event in the block.

View File

@ -17,6 +17,7 @@ export interface BlockProgressInterface {
lastProcessedEventIndex: number;
isComplete: boolean;
isPruned: boolean;
createdAt: Date;
}
export interface SyncStatusInterface {