From 105b26d6a3b0c861a13d81c0d7e1bd8ce471c6ac Mon Sep 17 00:00:00 2001 From: nikugogoi Date: Thu, 16 Dec 2021 17:16:48 +0530 Subject: [PATCH] Performance improvements for fill CLI (#314) * Fix fill cli to work with watcher server * Add createdAt column and insert events in batches * Implement prefetch to fill block and events in parallel * Fix getPrevEntity and increase fill prefetch default batch size * Fix watcher creating mulitple jobs for a block --- .../erc20-watcher/src/entity/BlockProgress.ts | 5 +- packages/erc20-watcher/src/fill.ts | 12 +- .../src/entity/BlockProgress.ts | 5 +- packages/uni-info-watcher/src/fill.ts | 12 +- packages/uni-info-watcher/src/indexer.ts | 8 +- .../uni-watcher/src/entity/BlockProgress.ts | 5 +- packages/uni-watcher/src/fill.ts | 12 +- packages/uni-watcher/src/indexer.ts | 3 - packages/util/package.json | 4 +- packages/util/src/common.ts | 24 ++-- packages/util/src/database.ts | 22 +++- packages/util/src/events.ts | 19 +-- packages/util/src/fill.ts | 116 ++++++++++++++---- packages/util/src/indexer.ts | 2 +- packages/util/src/job-runner.ts | 23 +++- packages/util/src/types.ts | 1 + 16 files changed, 202 insertions(+), 71 deletions(-) diff --git a/packages/erc20-watcher/src/entity/BlockProgress.ts b/packages/erc20-watcher/src/entity/BlockProgress.ts index e67cf2dd..0eee8c5f 100644 --- a/packages/erc20-watcher/src/entity/BlockProgress.ts +++ b/packages/erc20-watcher/src/entity/BlockProgress.ts @@ -2,7 +2,7 @@ // Copyright 2021 Vulcanize, Inc. // -import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; +import { Entity, PrimaryGeneratedColumn, Column, Index, CreateDateColumn } from 'typeorm'; import { BlockProgressInterface } from '@vulcanize/util'; @@ -40,4 +40,7 @@ export class BlockProgress implements BlockProgressInterface { @Column('boolean', { default: false }) isPruned!: boolean + + @CreateDateColumn() + createdAt!: Date; } diff --git a/packages/erc20-watcher/src/fill.ts b/packages/erc20-watcher/src/fill.ts index 6643d2c6..6a521a27 100644 --- a/packages/erc20-watcher/src/fill.ts +++ b/packages/erc20-watcher/src/fill.ts @@ -42,6 +42,16 @@ export const main = async (): Promise => { require: true, demandOption: true, describe: 'Block number to stop processing at' + }, + prefetch: { + type: 'boolean', + default: false, + describe: 'Block and events prefetch mode' + }, + batchBlocks: { + type: 'number', + default: 10, + describe: 'Number of blocks prefetched in batch' } }).argv; @@ -90,7 +100,7 @@ export const main = async (): Promise => { assert(jobQueueConfig, 'Missing job queue config'); - await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, blockDelayInMilliSecs, argv); + await fillBlocks(jobQueue, indexer, eventWatcher, blockDelayInMilliSecs, argv); }; main().catch(err => { diff --git a/packages/uni-info-watcher/src/entity/BlockProgress.ts b/packages/uni-info-watcher/src/entity/BlockProgress.ts index b6adf6f4..10c83e98 100644 --- a/packages/uni-info-watcher/src/entity/BlockProgress.ts +++ b/packages/uni-info-watcher/src/entity/BlockProgress.ts @@ -2,7 +2,7 @@ // Copyright 2021 Vulcanize, Inc. // -import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; +import { Entity, PrimaryGeneratedColumn, Column, Index, CreateDateColumn } from 'typeorm'; import { BlockProgressInterface } from '@vulcanize/util'; @@ -40,4 +40,7 @@ export class BlockProgress implements BlockProgressInterface { @Column('boolean', { default: false }) isPruned!: boolean + + @CreateDateColumn() + createdAt!: Date; } diff --git a/packages/uni-info-watcher/src/fill.ts b/packages/uni-info-watcher/src/fill.ts index c4401b33..0f46f700 100644 --- a/packages/uni-info-watcher/src/fill.ts +++ b/packages/uni-info-watcher/src/fill.ts @@ -44,6 +44,16 @@ export const main = async (): Promise => { require: true, demandOption: true, describe: 'Block number to stop processing at' + }, + prefetch: { + type: 'boolean', + default: false, + describe: 'Block and events prefetch mode' + }, + batchBlocks: { + type: 'number', + default: 10, + describe: 'Number of blocks prefetched in batch' } }).argv; @@ -94,7 +104,7 @@ export const main = async (): Promise => { const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); - await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, blockDelayInMilliSecs, argv); + await fillBlocks(jobQueue, indexer, eventWatcher, blockDelayInMilliSecs, argv); }; main().catch(err => { diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index bcd8099d..c278632b 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -363,9 +363,7 @@ export class Indexer implements IndexerInterface { async _fetchAndSaveEvents (block: DeepPartial): Promise { assert(block.blockHash); - console.time('time:indexer#_fetchAndSaveEvents-uni_watcher'); const events = await this._uniClient.getEvents(block.blockHash); - console.timeEnd('time:indexer#_fetchAndSaveEvents-uni_watcher'); const dbEvents: Array> = []; @@ -1106,7 +1104,7 @@ export class Indexer implements IndexerInterface { if (modulo === BigInt(0)) { // Current tick is initialized and needs to be updated. - this._loadTickUpdateFeeVarsAndSave(dbTx, Number(newTick), block, contractAddress); + await this._loadTickUpdateFeeVarsAndSave(dbTx, Number(newTick), block, contractAddress); } const numIters = BigInt( @@ -1126,13 +1124,13 @@ export class Indexer implements IndexerInterface { const firstInitialized = oldTick + tickSpacing - modulo; for (let i = firstInitialized; i < newTick; i = i + tickSpacing) { - this._loadTickUpdateFeeVarsAndSave(dbTx, Number(i), block, contractAddress); + await this._loadTickUpdateFeeVarsAndSave(dbTx, Number(i), block, contractAddress); } } else if (newTick < oldTick) { const firstInitialized = oldTick - modulo; for (let i = firstInitialized; i >= newTick; i = i - tickSpacing) { - this._loadTickUpdateFeeVarsAndSave(dbTx, Number(i), block, contractAddress); + await this._loadTickUpdateFeeVarsAndSave(dbTx, Number(i), block, contractAddress); } } diff --git a/packages/uni-watcher/src/entity/BlockProgress.ts b/packages/uni-watcher/src/entity/BlockProgress.ts index b6adf6f4..10c83e98 100644 --- a/packages/uni-watcher/src/entity/BlockProgress.ts +++ b/packages/uni-watcher/src/entity/BlockProgress.ts @@ -2,7 +2,7 @@ // Copyright 2021 Vulcanize, Inc. // -import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; +import { Entity, PrimaryGeneratedColumn, Column, Index, CreateDateColumn } from 'typeorm'; import { BlockProgressInterface } from '@vulcanize/util'; @@ -40,4 +40,7 @@ export class BlockProgress implements BlockProgressInterface { @Column('boolean', { default: false }) isPruned!: boolean + + @CreateDateColumn() + createdAt!: Date; } diff --git a/packages/uni-watcher/src/fill.ts b/packages/uni-watcher/src/fill.ts index c141809c..2141fe27 100644 --- a/packages/uni-watcher/src/fill.ts +++ b/packages/uni-watcher/src/fill.ts @@ -42,6 +42,16 @@ export const main = async (): Promise => { require: true, demandOption: true, describe: 'Block number to stop processing at' + }, + prefetch: { + type: 'boolean', + default: false, + describe: 'Block and events prefetch mode' + }, + batchBlocks: { + type: 'number', + default: 10, + describe: 'Number of blocks prefetched in batch' } }).argv; @@ -91,7 +101,7 @@ export const main = async (): Promise => { assert(jobQueueConfig, 'Missing job queue config'); - await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, blockDelayInMilliSecs, argv); + await fillBlocks(jobQueue, indexer, eventWatcher, blockDelayInMilliSecs, argv); }; main().catch(err => { diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index 5668a260..2372018f 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -431,7 +431,6 @@ export class Indexer implements IndexerInterface { async _fetchAndSaveEvents ({ blockHash }: DeepPartial): Promise { assert(blockHash); - console.time('time:indexer#_fetchAndSaveEvents-logs_txs'); const logsPromise = this._ethClient.getLogs({ blockHash }); const transactionsPromise = this._postgraphileClient.getBlockWithTransactions({ blockHash }); @@ -450,8 +449,6 @@ export class Indexer implements IndexerInterface { } ] = await Promise.all([logsPromise, transactionsPromise]); - console.timeEnd('time:indexer#_fetchAndSaveEvents-logs_txs'); - const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { acc[transaction.txHash] = transaction; return acc; diff --git a/packages/util/package.json b/packages/util/package.json index 8647301c..2845c27c 100644 --- a/packages/util/package.json +++ b/packages/util/package.json @@ -10,7 +10,8 @@ "ethers": "^5.2.0", "fs-extra": "^10.0.0", "pg-boss": "^6.1.0", - "toml": "^3.0.0" + "toml": "^3.0.0", + "lodash": "^4.17.21" }, "devDependencies": { "@types/fs-extra": "^9.0.11", @@ -28,7 +29,6 @@ "eslint-plugin-promise": "^5.1.0", "eslint-plugin-standard": "^5.0.0", "hardhat": "^2.3.0", - "lodash": "^4.17.21", "typeorm": "^0.2.32", "typeorm-naming-strategies": "^2.0.0" }, diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index ceb9d789..ae140639 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -47,22 +47,30 @@ export const processBlockByNumber = async ( log(`Process block ${blockNumber}`); while (true) { - console.time('time:common#processBlockByNumber-postgraphile'); + const blockProgressEntities = await indexer.getBlocksAtHeight(blockNumber, false); - const blocks = await indexer.getBlocks({ blockNumber }); + let blocks = blockProgressEntities.map((block: any) => { + block.timestamp = block.blockTimestamp; - console.timeEnd('time:common#processBlockByNumber-postgraphile'); + return block; + }); + + if (!blocks.length) { + console.time('time:common#processBlockByNumber-postgraphile'); + blocks = await indexer.getBlocks({ blockNumber }); + console.timeEnd('time:common#processBlockByNumber-postgraphile'); + } if (blocks.length) { for (let bi = 0; bi < blocks.length; bi++) { const { blockHash, blockNumber, parentHash, timestamp } = blocks[bi]; - const blockProgress = await indexer.getBlockProgress(blockHash); - if (blockProgress) { - log(`Block number ${blockNumber}, block hash ${blockHash} already processed`); - } else { - await indexer.updateSyncStatusChainHead(blockHash, blockNumber); + console.time('time:common#processBlockByNumber-updateSyncStatusChainHead'); + const syncStatus = await indexer.updateSyncStatusChainHead(blockHash, blockNumber); + console.timeEnd('time:common#processBlockByNumber-updateSyncStatusChainHead'); + // Stop old blocks from getting pushed to job queue. They are already retried after fail. + if (syncStatus.latestIndexedBlockNumber < blockNumber) { await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { kind: JOB_KIND_INDEX, blockHash, blockNumber, parentHash, timestamp }); } } diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index 6089d818..444d031a 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -37,6 +37,8 @@ const OPERATOR_MAP = { ends: 'LIKE' }; +const INSERT_EVENTS_BATCH = 100; + export interface BlockHeight { number?: number; hash?: string; @@ -249,11 +251,17 @@ export class Database { event.block = blockProgress; }); - await eventRepo.createQueryBuilder() - .insert() - .values(events) - .updateEntity(false) - .execute(); + const eventBatches = _.chunk(events, INSERT_EVENTS_BATCH); + + const insertPromises = eventBatches.map(async events => { + await eventRepo.createQueryBuilder() + .insert() + .values(events) + .updateEntity(false) + .execute(); + }); + + await Promise.all(insertPromises); return blockProgress; } @@ -424,7 +432,9 @@ export class Database { FROM block_progress b LEFT JOIN - ${repo.metadata.tableName} e ON e.block_hash = b.block_hash + ${repo.metadata.tableName} e + ON e.block_hash = b.block_hash + AND e.id = $2 WHERE b.block_hash = $1 UNION ALL diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index a96c0843..e4ff1a8e 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -146,19 +146,22 @@ export class EventWatcher { const { blockHash, blockNumber, priority } = jobData; log(`Job onComplete indexing block ${blockHash} ${blockNumber}`); - // Update sync progress. - const syncStatus = await this._indexer.updateSyncStatusIndexedBlock(blockHash, blockNumber); + const [blockProgress, syncStatus] = await Promise.all([ + this._indexer.getBlockProgress(blockHash), + // Update sync progress. + this._indexer.updateSyncStatusIndexedBlock(blockHash, blockNumber) + ]); + + // Publish block progress event if no events exist. + // Event for blocks with events will be pusblished from eventProcessingCompleteHandler. + if (blockProgress && blockProgress.numEvents === 0) { + await this.publishBlockProgressToSubscribers(blockProgress); + } // Create pruning job if required. if (syncStatus && syncStatus.latestIndexedBlockNumber > (syncStatus.latestCanonicalBlockNumber + MAX_REORG_DEPTH)) { await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority); } - - // Publish block progress event. - const blockProgress = await this._indexer.getBlockProgress(blockHash); - if (blockProgress) { - await this.publishBlockProgressToSubscribers(blockProgress); - } } async _handlePruningComplete (jobData: any): Promise { diff --git a/packages/util/src/fill.ts b/packages/util/src/fill.ts index 93e8d141..04ba5023 100644 --- a/packages/util/src/fill.ts +++ b/packages/util/src/fill.ts @@ -3,40 +3,56 @@ // import assert from 'assert'; - -import { EthClient } from '@vulcanize/ipld-eth-client'; +import debug from 'debug'; import { JobQueue } from './job-queue'; import { EventWatcherInterface, IndexerInterface } from './types'; +import { wait } from './misc'; import { processBlockByNumber } from './common'; +const log = debug('vulcanize:fill'); + export const fillBlocks = async ( jobQueue: JobQueue, indexer: IndexerInterface, - ethClient: EthClient, eventWatcher: EventWatcherInterface, blockDelayInMilliSecs: number, - { startBlock, endBlock }: { startBlock: number, endBlock: number} + argv: { + startBlock: number, + endBlock: number, + prefetch: boolean, + batchBlocks: number, + } ): Promise => { + let { startBlock, endBlock, prefetch, batchBlocks } = argv; assert(startBlock < endBlock, 'endBlock should be greater than startBlock'); + const syncStatus = await indexer.getSyncStatus(); + + if (prefetch) { + if (syncStatus) { + startBlock = syncStatus.chainHeadBlockNumber + 1; + } + + await prefetchBlocks(indexer, blockDelayInMilliSecs, { startBlock, endBlock, batchBlocks }); + return; + } + + if (syncStatus && syncStatus.latestIndexedBlockNumber > -1) { + if (startBlock > syncStatus.latestIndexedBlockNumber + 1) { + throw new Error(`Missing blocks between startBlock ${startBlock} and latestIndexedBlockNumber ${syncStatus.latestIndexedBlockNumber}`); + } + + startBlock = syncStatus.latestIndexedBlockNumber + 1; + } + await eventWatcher.initBlockProcessingOnCompleteHandler(); await eventWatcher.initEventProcessingOnCompleteHandler(); - let currentBlockNumber = startBlock; - const syncStatus = await indexer.getSyncStatus(); + const numberOfBlocks = endBlock - startBlock + 1; + console.time(`time:fill#fillBlocks-process_block_${startBlock}`); - if (syncStatus) { - if (currentBlockNumber > syncStatus.latestIndexedBlockNumber + 1) { - throw new Error(`Missing blocks between startBlock ${currentBlockNumber} and latestIndexedBlockNumber ${syncStatus.latestIndexedBlockNumber}`); - } - - currentBlockNumber = syncStatus.latestIndexedBlockNumber + 1; - } - - console.time(`time:fill#fillBlocks-process_block_${currentBlockNumber}`); - - processBlockByNumber(jobQueue, indexer, blockDelayInMilliSecs, currentBlockNumber); + processBlockByNumber(jobQueue, indexer, blockDelayInMilliSecs, startBlock); // Creating an AsyncIterable from AsyncIterator to iterate over the values. // https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of @@ -52,21 +68,69 @@ export const fillBlocks = async ( for await (const data of blockProgressEventIterable) { const { onBlockProgressEvent: { blockNumber, isComplete } } = data; - if (blockNumber === currentBlockNumber && isComplete) { - console.timeEnd(`time:fill#fillBlocks-process_block_${currentBlockNumber}`); + if (isComplete) { + console.timeEnd(`time:fill#fillBlocks-process_block_${blockNumber}`); - if (blockNumber >= endBlock) { + console.time(`time:fill#fillBlocks-process_block_${blockNumber + 1}`); + + const blocksProcessed = blockNumber - startBlock + 1; + const completePercentage = Math.round(blocksProcessed / numberOfBlocks * 100); + log(`Processed ${blocksProcessed} of ${numberOfBlocks} blocks (${completePercentage}%)`); + + await processBlockByNumber(jobQueue, indexer, blockDelayInMilliSecs, blockNumber + 1); + + if (blockNumber + 1 >= endBlock) { // Break the async loop when blockProgress event is for the endBlock and processing is complete. break; } - - currentBlockNumber++; - - console.time(`time:fill#fillBlocks-process_block_${currentBlockNumber}`); - - processBlockByNumber(jobQueue, indexer, blockDelayInMilliSecs, currentBlockNumber); } } + log('Processed all blocks (100%)'); console.timeEnd('time:fill#fillBlocks-process_blocks'); }; + +const prefetchBlocks = async ( + indexer: IndexerInterface, + blockDelayInMilliSecs: number, + { startBlock, endBlock, batchBlocks }: { + startBlock: number, + endBlock: number, + batchBlocks: number, + } +) => { + for (let i = startBlock; i <= endBlock; i = i + batchBlocks) { + const batchEndBlock = Math.min(i + batchBlocks, endBlock + 1); + let blockNumbers = [...Array(batchEndBlock - i).keys()].map(n => n + i); + log('Fetching blockNumbers:', blockNumbers); + + let blocks = []; + + // Fetch blocks again if there are missing blocks. + while (true) { + const blockPromises = blockNumbers.map(async blockNumber => indexer.getBlocks({ blockNumber })); + const res = await Promise.all(blockPromises); + + const missingIndex = res.findIndex(blocks => blocks.length === 0); + + if (missingIndex < 0) { + blocks = res.flat(); + break; + } + + blockNumbers = blockNumbers.slice(missingIndex); + await wait(blockDelayInMilliSecs); + } + + const fetchBlockPromises = blocks.map(async block => { + const { blockHash, blockNumber, parentHash, timestamp } = block; + const blockProgress = await indexer.getBlockProgress(blockHash); + + if (!blockProgress) { + await indexer.fetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); + } + }); + + await Promise.all(fetchBlockPromises); + } +}; diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 7aa59e57..a1eec4d3 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -141,7 +141,7 @@ export class Indexer { throw error; } - log('Block not found. Fetching block after eth_call.'); + log('Block not found. Fetching block after RPC call.'); } } diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 5e42e755..42117079 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -109,6 +109,7 @@ export class JobRunner { } async _indexBlock (job: any, syncStatus: SyncStatusInterface): Promise { + console.time('time:job-runner#_indexBlock'); const { data: { blockHash, blockNumber, parentHash, priority, timestamp } } = job; log(`Processing block number ${blockNumber} hash ${blockHash} `); @@ -172,26 +173,32 @@ export class JobRunner { throw new Error(message); } + } else { + blockProgress = parentBlock; } - // Check if block is being already processed. if (!blockProgress) { const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig; // Delay required to process block. await wait(jobDelayInMilliSecs); blockProgress = await this._indexer.fetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); - - if (blockProgress.numEvents) { - await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: blockProgress.blockHash, publish: true }); - } } + + // Check if block is being already processed. + if (blockProgress.numProcessedEvents === 0 && blockProgress.numEvents) { + await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: blockProgress.blockHash, publish: true }); + } + + console.timeEnd('time:job-runner#_indexBlock'); } async _processEvents (job: any): Promise { const { blockHash } = job.data; + console.time('time:job-runner#_processEvents-get-block-process'); let block = await this._indexer.getBlockProgress(blockHash); + console.timeEnd('time:job-runner#_processEvents-get-block-process'); assert(block); console.time('time:job-runner#_processEvents-events'); @@ -216,13 +223,17 @@ export class JobRunner { console.timeEnd('time:job-runner#_processEvents-fetching_events_batch'); + if (events.length) { + log(`Processing events batch from index ${events[0].index} to ${events[0].index + events.length - 1}`); + } + console.time('time:job-runner#_processEvents-processing_events_batch'); for (let event of events) { // Process events in loop const eventIndex = event.index; - log(`Processing event ${event.id} index ${eventIndex}`); + // log(`Processing event ${event.id} index ${eventIndex}`); // Check if previous event in block has been processed exactly before this and abort if not. if (eventIndex > 0) { // Skip the first event in the block. diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 93547d16..ef391a17 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -17,6 +17,7 @@ export interface BlockProgressInterface { lastProcessedEventIndex: number; isComplete: boolean; isPruned: boolean; + createdAt: Date; } export interface SyncStatusInterface {