From 306bbb73ca86e6145e579c31b9015a1cd2e61722 Mon Sep 17 00:00:00 2001 From: prathamesh0 <42446521+prathamesh0@users.noreply.github.com> Date: Thu, 20 Oct 2022 08:16:56 -0500 Subject: [PATCH] Use prefetching of blocks with events in watchers and codegen (#206) * Avoid refetching block while fetching events * Prefetch a batch of blocks with events while indexing * Update mock indexer used in graph-node testing * Process available blocks while prefetching * Refactor events fetching to a method in util * Move method to get GQL event query result to util --- .../src/templates/config-template.handlebars | 4 +- .../src/templates/fill-template.handlebars | 2 +- .../src/templates/hooks-template.handlebars | 4 +- .../import-state-template.handlebars | 2 +- .../src/templates/indexer-template.handlebars | 189 ++--------------- packages/eden-watcher/environments/local.toml | 4 +- packages/eden-watcher/src/cli/import-state.ts | 2 +- packages/eden-watcher/src/fill.ts | 2 +- packages/eden-watcher/src/hooks.ts | 4 +- packages/eden-watcher/src/indexer.ts | 198 ++---------------- .../erc20-watcher/environments/local.toml | 4 +- packages/erc20-watcher/src/fill.ts | 2 +- packages/erc20-watcher/src/indexer.ts | 86 ++------ .../erc721-watcher/environments/local.toml | 4 +- .../erc721-watcher/src/cli/import-state.ts | 2 +- packages/erc721-watcher/src/fill.ts | 2 +- packages/erc721-watcher/src/hooks.ts | 4 +- packages/erc721-watcher/src/indexer.ts | 193 ++--------------- packages/graph-node/test/utils/indexer.ts | 6 +- .../environments/local.toml | 4 +- .../src/cli/import-state.ts | 2 +- packages/graph-test-watcher/src/fill.ts | 2 +- packages/graph-test-watcher/src/hooks.ts | 4 +- packages/graph-test-watcher/src/indexer.ts | 193 ++--------------- .../mobymask-watcher/environments/local.toml | 4 +- .../mobymask-watcher/src/cli/import-state.ts | 2 +- packages/mobymask-watcher/src/fill.ts | 2 +- packages/mobymask-watcher/src/hooks.ts | 4 +- packages/mobymask-watcher/src/indexer.ts | 193 ++--------------- packages/util/src/common.ts | 100 ++------- packages/util/src/config.ts | 3 +- packages/util/src/events.ts | 35 ++-- packages/util/src/fill.ts | 8 +- packages/util/src/index-block.ts | 2 +- packages/util/src/indexer.ts | 146 +++++++++++-- packages/util/src/job-runner.ts | 79 +++++-- packages/util/src/misc.ts | 41 +++- packages/util/src/types.ts | 3 +- 38 files changed, 436 insertions(+), 1105 deletions(-) diff --git a/packages/codegen/src/templates/config-template.handlebars b/packages/codegen/src/templates/config-template.handlebars index 621d698a..87ec8e45 100644 --- a/packages/codegen/src/templates/config-template.handlebars +++ b/packages/codegen/src/templates/config-template.handlebars @@ -50,7 +50,6 @@ [upstream.ethServer] gqlApiEndpoint = "http://127.0.0.1:8082/graphql" rpcProviderEndpoint = "http://127.0.0.1:8081" - blockDelayInMilliSecs = 2000 [upstream.cache] name = "requests" @@ -62,3 +61,6 @@ maxCompletionLagInSecs = 300 jobDelayInMilliSecs = 100 eventsInBatch = 50 + blockDelayInMilliSecs = 2000 + prefetchBlocksInMem = true + prefetchBlockCount = 10 diff --git a/packages/codegen/src/templates/fill-template.handlebars b/packages/codegen/src/templates/fill-template.handlebars index 9ffa4490..b8c2a990 100644 --- a/packages/codegen/src/templates/fill-template.handlebars +++ b/packages/codegen/src/templates/fill-template.handlebars @@ -109,7 +109,7 @@ export const main = async (): Promise => { const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); - await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv); + await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); }; main().catch(err => { diff --git a/packages/codegen/src/templates/hooks-template.handlebars b/packages/codegen/src/templates/hooks-template.handlebars index 1971beac..7e43a634 100644 --- a/packages/codegen/src/templates/hooks-template.handlebars +++ b/packages/codegen/src/templates/hooks-template.handlebars @@ -4,9 +4,9 @@ import assert from 'assert'; -// import { updateStateForMappingType, updateStateForElementaryType } from '@cerc-io/util'; +import { updateStateForMappingType, updateStateForElementaryType, ResultEvent } from '@cerc-io/util'; -import { Indexer, ResultEvent } from './indexer'; +import { Indexer } from './indexer'; /** * Hook function to store an initial state. diff --git a/packages/codegen/src/templates/import-state-template.handlebars b/packages/codegen/src/templates/import-state-template.handlebars index 60cef1cc..4824fc57 100644 --- a/packages/codegen/src/templates/import-state-template.handlebars +++ b/packages/codegen/src/templates/import-state-template.handlebars @@ -89,7 +89,7 @@ export const main = async (): Promise => { jobQueue, indexer, eventWatcher, - config.upstream.ethServer.blockDelayInMilliSecs, + jobQueueConfig.blockDelayInMilliSecs, { prefetch: true, startBlock: importData.snapshotBlock.blockNumber, diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index a9c83f5c..957137fc 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -20,7 +20,6 @@ import { Indexer as BaseIndexer, IndexerInterface, ValueResult, - UNKNOWN_EVENT_NAME, ServerConfig, JobQueue, Where, @@ -31,7 +30,9 @@ import { BlockHeight, {{/if}} StateKind, - StateStatus + StateStatus, + ResultEvent, + getResultEvent } from '@cerc-io/util'; {{#if (subgraphPath)}} import { GraphWatcher } from '@cerc-io/graph-node'; @@ -64,30 +65,6 @@ const KIND_{{capitalize contract.contractName}} = '{{contract.contractKind}}'; const {{capitalize event}}_EVENT = '{{event}}'; {{/each}} -export type ResultEvent = { - block: { - cid: string; - hash: string; - number: number; - timestamp: number; - parentHash: string; - }; - tx: { - hash: string; - from: string; - to: string; - index: number; - }; - - contract: string; - - eventIndex: number; - eventSignature: string; - event: any; - - proof: string; -}; - export class Indexer implements IndexerInterface { _db: Database _ethClient: EthClient @@ -170,38 +147,7 @@ export class Indexer implements IndexerInterface { } getResultEvent (event: Event): ResultEvent { - const block = event.block; - const eventFields = JSONbigNative.parse(event.eventInfo); - const { tx, eventSignature } = JSONbigNative.parse(event.extraInfo); - - return { - block: { - cid: block.cid, - hash: block.blockHash, - number: block.blockNumber, - timestamp: block.blockTimestamp, - parentHash: block.parentHash - }, - - tx: { - hash: event.txHash, - from: tx.src, - to: tx.dst, - index: tx.index - }, - - contract: event.contract, - - eventIndex: event.index, - eventSignature, - event: { - __typename: `${event.eventName}Event`, - ...eventFields - }, - - // TODO: Return proof only if requested. - proof: JSON.parse(event.proof) - }; + return getResultEvent(event); } {{#each queries as | query |}} @@ -458,12 +404,12 @@ export class Indexer implements IndexerInterface { const logDescription = contract.parseLog({ data, topics }); - const { eventName, eventInfo } = this._baseIndexer.parseEvent(logDescription); + const { eventName, eventInfo, eventSignature } = this._baseIndexer.parseEvent(logDescription); return { eventName, eventInfo, - eventSignature: logDescription.signature + eventSignature }; } @@ -687,126 +633,33 @@ export class Indexer implements IndexerInterface { } {{/if}} - async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { + async _saveBlockAndFetchEvents ({ + cid: blockCid, + blockHash, + blockNumber, + blockTimestamp, + parentHash + }: DeepPartial): Promise<[BlockProgress, DeepPartial[]]> { assert(blockHash); - const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash }); - const blockPromise = this._ethClient.getBlockByHash(blockHash); - let logs: any[]; - console.time('time:indexer#_fetchAndSaveEvents-fetch-logs'); - if (this._serverConfig.filterLogs) { - const watchedContracts = this._baseIndexer.getWatchedContracts(); - const addresses = watchedContracts.map((watchedContract): string => { - return watchedContract.address; - }); - - const logsResult = await this._ethClient.getLogs({ - blockHash, - addresses - }); - - logs = logsResult.logs; - } else { - ({ logs } = await this._ethClient.getLogs({ blockHash })); - } - console.timeEnd('time:indexer#_fetchAndSaveEvents-fetch-logs'); - - let [ - { block }, - { - allEthHeaderCids: { - nodes: [ - { - ethTransactionCidsByHeaderId: { - nodes: transactions - } - } - ] - } - } - ] = await Promise.all([blockPromise, transactionsPromise]); - - const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { - acc[transaction.txHash] = transaction; - return acc; - }, {}); - - const dbEvents: Array> = []; - - for (let li = 0; li < logs.length; li++) { - const logObj = logs[li]; - const { - topics, - data, - index: logIndex, - cid, - ipldBlock, - account: { - address - }, - transaction: { - hash: txHash - }, - receiptCID, - status - } = logObj; - - if (status) { - let eventName = UNKNOWN_EVENT_NAME; - let eventInfo = {}; - const tx = transactionMap[txHash]; - const extraInfo: { [key: string]: any } = { topics, data, tx }; - - const contract = ethers.utils.getAddress(address); - const watchedContract = await this.isWatchedContract(contract); - - if (watchedContract) { - const eventDetails = this.parseEventNameAndArgs(watchedContract.kind, logObj); - eventName = eventDetails.eventName; - eventInfo = eventDetails.eventInfo; - extraInfo.eventSignature = eventDetails.eventSignature; - } - - dbEvents.push({ - index: logIndex, - txHash, - contract, - eventName, - eventInfo: JSONbigNative.stringify(eventInfo), - extraInfo: JSONbigNative.stringify(extraInfo), - proof: JSONbigNative.stringify({ - data: JSONbigNative.stringify({ - blockHash, - receiptCID, - log: { - cid, - ipldBlock - } - }) - }) - }); - } else { - log(`Skipping event for receipt ${receiptCID} due to failed transaction.`); - } - } + const dbEvents = await this._baseIndexer.fetchEvents(blockHash, this.parseEventNameAndArgs.bind(this)); const dbTx = await this._db.createTransactionRunner(); - try { - block = { + const block = { cid: blockCid, blockHash, - blockNumber: block.number, - blockTimestamp: block.timestamp, - parentHash: block.parent.hash + blockNumber, + blockTimestamp, + parentHash }; - console.time('time:indexer#_fetchAndSaveEvents-save-block-events'); + console.time(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`); const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents); await dbTx.commitTransaction(); - console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events'); + console.timeEnd(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`); - return blockProgress; + return [blockProgress, []]; } catch (error) { await dbTx.rollbackTransaction(); throw error; diff --git a/packages/eden-watcher/environments/local.toml b/packages/eden-watcher/environments/local.toml index 970eef70..c74299c6 100644 --- a/packages/eden-watcher/environments/local.toml +++ b/packages/eden-watcher/environments/local.toml @@ -48,7 +48,6 @@ [upstream.ethServer] gqlApiEndpoint = "http://127.0.0.1:8083/graphql" rpcProviderEndpoint = "http://127.0.0.1:8082" - blockDelayInMilliSecs = 2000 [upstream.cache] name = "requests" @@ -60,3 +59,6 @@ maxCompletionLagInSecs = 300 jobDelayInMilliSecs = 100 eventsInBatch = 50 + blockDelayInMilliSecs = 2000 + prefetchBlocksInMem = true + prefetchBlockCount = 10 \ No newline at end of file diff --git a/packages/eden-watcher/src/cli/import-state.ts b/packages/eden-watcher/src/cli/import-state.ts index ebb96f44..64063e0e 100644 --- a/packages/eden-watcher/src/cli/import-state.ts +++ b/packages/eden-watcher/src/cli/import-state.ts @@ -83,7 +83,7 @@ export const main = async (): Promise => { jobQueue, indexer, eventWatcher, - config.upstream.ethServer.blockDelayInMilliSecs, + jobQueueConfig.blockDelayInMilliSecs, { prefetch: true, startBlock: importData.snapshotBlock.blockNumber, diff --git a/packages/eden-watcher/src/fill.ts b/packages/eden-watcher/src/fill.ts index fd69d654..57ebbfe1 100644 --- a/packages/eden-watcher/src/fill.ts +++ b/packages/eden-watcher/src/fill.ts @@ -97,7 +97,7 @@ export const main = async (): Promise => { const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); - await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv); + await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); }; main().catch(err => { diff --git a/packages/eden-watcher/src/hooks.ts b/packages/eden-watcher/src/hooks.ts index 3dc50420..0d8a2876 100644 --- a/packages/eden-watcher/src/hooks.ts +++ b/packages/eden-watcher/src/hooks.ts @@ -4,7 +4,9 @@ import assert from 'assert'; -import { Indexer, ResultEvent } from './indexer'; +import { ResultEvent } from '@cerc-io/util'; + +import { Indexer } from './indexer'; /** * Hook function to store an initial state. diff --git a/packages/eden-watcher/src/indexer.ts b/packages/eden-watcher/src/indexer.ts index da69f04f..d2546bc3 100644 --- a/packages/eden-watcher/src/indexer.ts +++ b/packages/eden-watcher/src/indexer.ts @@ -3,9 +3,7 @@ // import assert from 'assert'; -import debug from 'debug'; import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm'; -import JSONbig from 'json-bigint'; import { ethers } from 'ethers'; import _ from 'lodash'; import { SelectionNode } from 'graphql'; @@ -16,7 +14,6 @@ import { EthClient } from '@cerc-io/ipld-eth-client'; import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; import { Indexer as BaseIndexer, - UNKNOWN_EVENT_NAME, ServerConfig, JobQueue, Where, @@ -25,7 +22,9 @@ import { StateKind, IndexerInterface, StateStatus, - ValueResult + ValueResult, + ResultEvent, + getResultEvent } from '@cerc-io/util'; import { GraphWatcher } from '@cerc-io/graph-node'; @@ -57,37 +56,10 @@ import { Claim } from './entity/Claim'; import { Account } from './entity/Account'; import { Slash } from './entity/Slash'; -const log = debug('vulcanize:indexer'); -const JSONbigNative = JSONbig({ useNativeBigInt: true }); - const KIND_EDENNETWORK = 'EdenNetwork'; const KIND_MERKLEDISTRIBUTOR = 'EdenNetworkDistribution'; const KIND_DISTRIBUTORGOVERNANCE = 'EdenNetworkGovernance'; -export type ResultEvent = { - block: { - cid: string; - hash: string; - number: number; - timestamp: number; - parentHash: string; - }; - tx: { - hash: string; - from: string; - to: string; - index: number; - }; - - contract: string; - - eventIndex: number; - eventSignature: string; - event: any; - - proof: string; -}; - export class Indexer implements IndexerInterface { _db: Database _ethClient: EthClient @@ -168,38 +140,7 @@ export class Indexer implements IndexerInterface { } getResultEvent (event: Event): ResultEvent { - const block = event.block; - const eventFields = JSONbigNative.parse(event.eventInfo); - const { tx, eventSignature } = JSONbigNative.parse(event.extraInfo); - - return { - block: { - cid: block.cid, - hash: block.blockHash, - number: block.blockNumber, - timestamp: block.blockTimestamp, - parentHash: block.parentHash - }, - - tx: { - hash: event.txHash, - from: tx.src, - to: tx.dst, - index: tx.index - }, - - contract: event.contract, - - eventIndex: event.index, - eventSignature, - event: { - __typename: `${event.eventName}Event`, - ...eventFields - }, - - // TODO: Return proof only if requested. - proof: JSON.parse(event.proof) - }; + return getResultEvent(event); } async getStorageValue (storageLayout: StorageLayout, blockHash: string, contractAddress: string, variable: string, ...mappingKeys: MappingKey[]): Promise { @@ -393,12 +334,12 @@ export class Indexer implements IndexerInterface { const logDescription = contract.parseLog({ data, topics }); - const { eventName, eventInfo } = this._baseIndexer.parseEvent(logDescription); + const { eventName, eventInfo, eventSignature } = this._baseIndexer.parseEvent(logDescription); return { eventName, eventInfo, - eventSignature: logDescription.signature + eventSignature }; } @@ -522,8 +463,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getBlocksAtHeight(height, isPruned); } - async fetchBlockWithEvents (block: DeepPartial): Promise { - return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this)); + async saveBlockAndFetchEvents (block: DeepPartial): Promise<[BlockProgress, DeepPartial[]]> { + return this._baseIndexer.saveBlockAndFetchEvents(block, this._saveBlockAndFetchEvents.bind(this)); } async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise> { @@ -939,126 +880,33 @@ export class Indexer implements IndexerInterface { }); } - async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { + async _saveBlockAndFetchEvents ({ + cid: blockCid, + blockHash, + blockNumber, + blockTimestamp, + parentHash + }: DeepPartial): Promise<[BlockProgress, DeepPartial[]]> { assert(blockHash); - const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash }); - const blockPromise = this._ethClient.getBlockByHash(blockHash); - let logs: any[]; - console.time('time:indexer#_fetchAndSaveEvents-fetch-logs'); - if (this._serverConfig.filterLogs) { - const watchedContracts = this._baseIndexer.getWatchedContracts(); - const addresses = watchedContracts.map((watchedContract): string => { - return watchedContract.address; - }); - - const logsResult = await this._ethClient.getLogs({ - blockHash, - addresses - }); - - logs = logsResult.logs; - } else { - ({ logs } = await this._ethClient.getLogs({ blockHash })); - } - console.timeEnd('time:indexer#_fetchAndSaveEvents-fetch-logs'); - - let [ - { block }, - { - allEthHeaderCids: { - nodes: [ - { - ethTransactionCidsByHeaderId: { - nodes: transactions - } - } - ] - } - } - ] = await Promise.all([blockPromise, transactionsPromise]); - - const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { - acc[transaction.txHash] = transaction; - return acc; - }, {}); - - const dbEvents: Array> = []; - - for (let li = 0; li < logs.length; li++) { - const logObj = logs[li]; - const { - topics, - data, - index: logIndex, - cid, - ipldBlock, - account: { - address - }, - transaction: { - hash: txHash - }, - receiptCID, - status - } = logObj; - - if (status) { - let eventName = UNKNOWN_EVENT_NAME; - let eventInfo = {}; - const tx = transactionMap[txHash]; - const extraInfo: { [key: string]: any } = { topics, data, tx }; - - const contract = ethers.utils.getAddress(address); - const watchedContract = await this.isWatchedContract(contract); - - if (watchedContract) { - const eventDetails = this.parseEventNameAndArgs(watchedContract.kind, logObj); - eventName = eventDetails.eventName; - eventInfo = eventDetails.eventInfo; - extraInfo.eventSignature = eventDetails.eventSignature; - } - - dbEvents.push({ - index: logIndex, - txHash, - contract, - eventName, - eventInfo: JSONbigNative.stringify(eventInfo), - extraInfo: JSONbigNative.stringify(extraInfo), - proof: JSONbigNative.stringify({ - data: JSONbigNative.stringify({ - blockHash, - receiptCID, - log: { - cid, - ipldBlock - } - }) - }) - }); - } else { - log(`Skipping event for receipt ${receiptCID} due to failed transaction.`); - } - } + const dbEvents = await this._baseIndexer.fetchEvents(blockHash, this.parseEventNameAndArgs.bind(this)); const dbTx = await this._db.createTransactionRunner(); - try { - block = { + const block = { cid: blockCid, blockHash, - blockNumber: block.number, - blockTimestamp: block.timestamp, - parentHash: block.parent.hash + blockNumber, + blockTimestamp, + parentHash }; - console.time('time:indexer#_fetchAndSaveEvents-save-block-events'); + console.time(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`); const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents); await dbTx.commitTransaction(); - console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events'); + console.timeEnd(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`); - return blockProgress; + return [blockProgress, []]; } catch (error) { await dbTx.rollbackTransaction(); throw error; diff --git a/packages/erc20-watcher/environments/local.toml b/packages/erc20-watcher/environments/local.toml index df252c34..68b44472 100644 --- a/packages/erc20-watcher/environments/local.toml +++ b/packages/erc20-watcher/environments/local.toml @@ -22,7 +22,6 @@ [upstream.ethServer] gqlApiEndpoint = "http://127.0.0.1:8082/graphql" rpcProviderEndpoint = "http://127.0.0.1:8081" - blockDelayInMilliSecs = 2000 [upstream.cache] name = "requests" @@ -34,3 +33,6 @@ maxCompletionLagInSecs = 300 jobDelayInMilliSecs = 100 eventsInBatch = 50 + blockDelayInMilliSecs = 2000 + prefetchBlocksInMem = true + prefetchBlockCount = 10 diff --git a/packages/erc20-watcher/src/fill.ts b/packages/erc20-watcher/src/fill.ts index 23061b4b..03250034 100644 --- a/packages/erc20-watcher/src/fill.ts +++ b/packages/erc20-watcher/src/fill.ts @@ -78,7 +78,7 @@ export const main = async (): Promise => { const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); - await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv); + await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); }; main().catch(err => { diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index 62558464..989eb837 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -12,7 +12,7 @@ import { BaseProvider } from '@ethersproject/providers'; import { EthClient } from '@cerc-io/ipld-eth-client'; import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; -import { IndexerInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, JobQueue, Where, QueryOptions, ServerConfig, StateStatus } from '@cerc-io/util'; +import { IndexerInterface, Indexer as BaseIndexer, ValueResult, JobQueue, Where, QueryOptions, ServerConfig, StateStatus } from '@cerc-io/util'; import { Database } from './database'; import { Event } from './entity/Event'; @@ -398,8 +398,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getBlocksAtHeight(height, isPruned); } - async fetchBlockWithEvents (block: DeepPartial): Promise { - return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this)); + async saveBlockAndFetchEvents (block: DeepPartial): Promise<[BlockProgress, DeepPartial[]]> { + return this._baseIndexer.saveBlockAndFetchEvents(block, this._saveBlockAndFetchEvents.bind(this)); } async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise> { @@ -422,79 +422,25 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getAncestorAtDepth(blockHash, depth); } - async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { + async _saveBlockAndFetchEvents ({ + cid: blockCid, + blockHash, + blockNumber, + blockTimestamp, + parentHash + }: DeepPartial): Promise<[BlockProgress, DeepPartial[]]> { assert(blockHash); - let [{ block }, { logs }] = await Promise.all([ - this._ethClient.getBlockByHash(blockHash), - this._ethClient.getLogs({ blockHash }) - ]); - const dbEvents: Array> = []; - - for (let li = 0; li < logs.length; li++) { - const logObj = logs[li]; - const { - topics, - data, - index: logIndex, - cid, - ipldBlock, - account: { - address - }, - transaction: { - hash: txHash - }, - receiptCID, - status - } = logObj; - - if (status) { - let eventName = UNKNOWN_EVENT_NAME; - let eventInfo = {}; - const extraInfo = { topics, data }; - - const contract = ethers.utils.getAddress(address); - const watchedContract = await this.isWatchedContract(contract); - - if (watchedContract) { - const eventDetails = this.parseEventNameAndArgs(watchedContract.kind, logObj); - eventName = eventDetails.eventName; - eventInfo = eventDetails.eventInfo; - } - - dbEvents.push({ - index: logIndex, - txHash, - contract, - eventName, - eventInfo: JSONbigNative.stringify(eventInfo), - extraInfo: JSONbigNative.stringify(extraInfo), - proof: JSONbigNative.stringify({ - data: JSONbigNative.stringify({ - blockHash, - receiptCID, - log: { - cid, - ipldBlock - } - }) - }) - }); - } else { - log(`Skipping event for receipt ${receiptCID} due to failed transaction.`); - } - } + const dbEvents = await this._baseIndexer.fetchEvents(blockHash, this.parseEventNameAndArgs.bind(this)); const dbTx = await this._db.createTransactionRunner(); - try { - block = { + const block = { cid: blockCid, blockHash, - blockNumber: block.number, - blockTimestamp: block.timestamp, - parentHash: block.parent.hash + blockNumber, + blockTimestamp, + parentHash }; console.time('time:indexer#_fetchAndSaveEvents-save-block-events'); @@ -502,7 +448,7 @@ export class Indexer implements IndexerInterface { await dbTx.commitTransaction(); console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events'); - return blockProgress; + return [blockProgress, []]; } catch (error) { await dbTx.rollbackTransaction(); throw error; diff --git a/packages/erc721-watcher/environments/local.toml b/packages/erc721-watcher/environments/local.toml index 6af96571..837c5ae4 100644 --- a/packages/erc721-watcher/environments/local.toml +++ b/packages/erc721-watcher/environments/local.toml @@ -34,7 +34,6 @@ [upstream.ethServer] gqlApiEndpoint = "http://127.0.0.1:8082/graphql" rpcProviderEndpoint = "http://127.0.0.1:8081" - blockDelayInMilliSecs = 2000 [upstream.cache] name = "requests" @@ -46,3 +45,6 @@ maxCompletionLagInSecs = 300 jobDelayInMilliSecs = 100 eventsInBatch = 50 + blockDelayInMilliSecs = 2000 + prefetchBlocksInMem = true + prefetchBlockCount = 10 diff --git a/packages/erc721-watcher/src/cli/import-state.ts b/packages/erc721-watcher/src/cli/import-state.ts index 250d09bd..729d3027 100644 --- a/packages/erc721-watcher/src/cli/import-state.ts +++ b/packages/erc721-watcher/src/cli/import-state.ts @@ -74,7 +74,7 @@ export const main = async (): Promise => { jobQueue, indexer, eventWatcher, - config.upstream.ethServer.blockDelayInMilliSecs, + jobQueueConfig.blockDelayInMilliSecs, { prefetch: true, startBlock: importData.snapshotBlock.blockNumber, diff --git a/packages/erc721-watcher/src/fill.ts b/packages/erc721-watcher/src/fill.ts index b916f4f4..f20037c7 100644 --- a/packages/erc721-watcher/src/fill.ts +++ b/packages/erc721-watcher/src/fill.ts @@ -76,7 +76,7 @@ export const main = async (): Promise => { const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); - await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv); + await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); }; main().catch(err => { diff --git a/packages/erc721-watcher/src/hooks.ts b/packages/erc721-watcher/src/hooks.ts index a8a03e92..322a6ab6 100644 --- a/packages/erc721-watcher/src/hooks.ts +++ b/packages/erc721-watcher/src/hooks.ts @@ -4,9 +4,9 @@ import assert from 'assert'; -import { updateStateForElementaryType } from '@cerc-io/util'; +import { updateStateForElementaryType, ResultEvent } from '@cerc-io/util'; -import { Indexer, ResultEvent } from './indexer'; +import { Indexer } from './indexer'; import { TransferCount } from './entity/TransferCount'; /** diff --git a/packages/erc721-watcher/src/indexer.ts b/packages/erc721-watcher/src/indexer.ts index f73c7287..dde7de9e 100644 --- a/packages/erc721-watcher/src/indexer.ts +++ b/packages/erc721-watcher/src/indexer.ts @@ -16,7 +16,6 @@ import { Indexer as BaseIndexer, IndexerInterface, ValueResult, - UNKNOWN_EVENT_NAME, ServerConfig, JobQueue, Where, @@ -25,7 +24,9 @@ import { updateStateForMappingType, BlockHeight, StateKind, - StateStatus + StateStatus, + ResultEvent, + getResultEvent } from '@cerc-io/util'; import ERC721Artifacts from './artifacts/ERC721.json'; @@ -44,30 +45,6 @@ const JSONbigNative = JSONbig({ useNativeBigInt: true }); const KIND_ERC721 = 'ERC721'; -export type ResultEvent = { - block: { - cid: string; - hash: string; - number: number; - timestamp: number; - parentHash: string; - }; - tx: { - hash: string; - from: string; - to: string; - index: number; - }; - - contract: string; - - eventIndex: number; - eventSignature: string; - event: any; - - proof: string; -}; - export class Indexer implements IndexerInterface { _db: Database _ethClient: EthClient @@ -119,38 +96,7 @@ export class Indexer implements IndexerInterface { } getResultEvent (event: Event): ResultEvent { - const block = event.block; - const eventFields = JSONbigNative.parse(event.eventInfo); - const { tx, eventSignature } = JSONbigNative.parse(event.extraInfo); - - return { - block: { - cid: block.cid, - hash: block.blockHash, - number: block.blockNumber, - timestamp: block.blockTimestamp, - parentHash: block.parentHash - }, - - tx: { - hash: event.txHash, - from: tx.src, - to: tx.dst, - index: tx.index - }, - - contract: event.contract, - - eventIndex: event.index, - eventSignature, - event: { - __typename: `${event.eventName}Event`, - ...eventFields - }, - - // TODO: Return proof only if requested. - proof: JSON.parse(event.proof) - }; + return getResultEvent(event); } async supportsInterface (blockHash: string, contractAddress: string, interfaceId: string): Promise { @@ -760,12 +706,12 @@ export class Indexer implements IndexerInterface { const logDescription = contract.parseLog({ data, topics }); - const { eventName, eventInfo } = this._baseIndexer.parseEvent(logDescription); + const { eventName, eventInfo, eventSignature } = this._baseIndexer.parseEvent(logDescription); return { eventName, eventInfo, - eventSignature: logDescription.signature + eventSignature }; } @@ -893,8 +839,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getBlocksAtHeight(height, isPruned); } - async fetchBlockWithEvents (block: DeepPartial): Promise { - return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this)); + async saveBlockAndFetchEvents (block: DeepPartial): Promise<[BlockProgress, DeepPartial[]]> { + return this._baseIndexer.saveBlockAndFetchEvents(block, this._saveBlockAndFetchEvents.bind(this)); } async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise> { @@ -917,126 +863,33 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getAncestorAtDepth(blockHash, depth); } - async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { + async _saveBlockAndFetchEvents ({ + cid: blockCid, + blockHash, + blockNumber, + blockTimestamp, + parentHash + }: DeepPartial): Promise<[BlockProgress, DeepPartial[]]> { assert(blockHash); - const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash }); - const blockPromise = this._ethClient.getBlockByHash(blockHash); - let logs: any[]; - console.time('time:indexer#_fetchAndSaveEvents-fetch-logs'); - if (this._serverConfig.filterLogs) { - const watchedContracts = this._baseIndexer.getWatchedContracts(); - const addresses = watchedContracts.map((watchedContract): string => { - return watchedContract.address; - }); - - const logsResult = await this._ethClient.getLogs({ - blockHash, - addresses - }); - - logs = logsResult.logs; - } else { - ({ logs } = await this._ethClient.getLogs({ blockHash })); - } - console.timeEnd('time:indexer#_fetchAndSaveEvents-fetch-logs'); - - let [ - { block }, - { - allEthHeaderCids: { - nodes: [ - { - ethTransactionCidsByHeaderId: { - nodes: transactions - } - } - ] - } - } - ] = await Promise.all([blockPromise, transactionsPromise]); - - const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { - acc[transaction.txHash] = transaction; - return acc; - }, {}); - - const dbEvents: Array> = []; - - for (let li = 0; li < logs.length; li++) { - const logObj = logs[li]; - const { - topics, - data, - index: logIndex, - cid, - ipldBlock, - account: { - address - }, - transaction: { - hash: txHash - }, - receiptCID, - status - } = logObj; - - if (status) { - let eventName = UNKNOWN_EVENT_NAME; - let eventInfo = {}; - const tx = transactionMap[txHash]; - const extraInfo: { [key: string]: any } = { topics, data, tx }; - - const contract = ethers.utils.getAddress(address); - const watchedContract = await this.isWatchedContract(contract); - - if (watchedContract) { - const eventDetails = this.parseEventNameAndArgs(watchedContract.kind, logObj); - eventName = eventDetails.eventName; - eventInfo = eventDetails.eventInfo; - extraInfo.eventSignature = eventDetails.eventSignature; - } - - dbEvents.push({ - index: logIndex, - txHash, - contract, - eventName, - eventInfo: JSONbigNative.stringify(eventInfo), - extraInfo: JSONbigNative.stringify(extraInfo), - proof: JSONbigNative.stringify({ - data: JSONbigNative.stringify({ - blockHash, - receiptCID, - log: { - cid, - ipldBlock - } - }) - }) - }); - } else { - log(`Skipping event for receipt ${receiptCID} due to failed transaction.`); - } - } + const dbEvents = await this._baseIndexer.fetchEvents(blockHash, this.parseEventNameAndArgs.bind(this)); const dbTx = await this._db.createTransactionRunner(); - try { - block = { + const block = { cid: blockCid, blockHash, - blockNumber: block.number, - blockTimestamp: block.timestamp, - parentHash: block.parent.hash + blockNumber, + blockTimestamp, + parentHash }; - console.time('time:indexer#_fetchAndSaveEvents-save-block-events'); + console.time(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`); const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents); await dbTx.commitTransaction(); - console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events'); + console.timeEnd(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`); - return blockProgress; + return [blockProgress, []]; } catch (error) { await dbTx.rollbackTransaction(); throw error; diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index 82dba32a..45205889 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -1,5 +1,5 @@ import assert from 'assert'; -import { FindConditions, FindManyOptions } from 'typeorm'; +import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm'; import { IndexerInterface, @@ -89,8 +89,8 @@ export class Indexer implements IndexerInterface { return ''; } - async fetchBlockWithEvents (block: BlockProgressInterface): Promise { - return block; + async saveBlockAndFetchEvents (block: BlockProgressInterface): Promise<[BlockProgressInterface, DeepPartial[]]> { + return [block, []]; } async removeUnknownEvents (block: BlockProgressInterface): Promise { diff --git a/packages/graph-test-watcher/environments/local.toml b/packages/graph-test-watcher/environments/local.toml index 24cba88e..17a472f4 100644 --- a/packages/graph-test-watcher/environments/local.toml +++ b/packages/graph-test-watcher/environments/local.toml @@ -40,7 +40,6 @@ [upstream.ethServer] gqlApiEndpoint = "http://127.0.0.1:8082/graphql" rpcProviderEndpoint = "http://127.0.0.1:8081" - blockDelayInMilliSecs = 2000 [upstream.cache] name = "requests" @@ -52,3 +51,6 @@ maxCompletionLagInSecs = 300 jobDelayInMilliSecs = 100 eventsInBatch = 50 + blockDelayInMilliSecs = 2000 + prefetchBlocksInMem = true + prefetchBlockCount = 10 diff --git a/packages/graph-test-watcher/src/cli/import-state.ts b/packages/graph-test-watcher/src/cli/import-state.ts index ebb96f44..64063e0e 100644 --- a/packages/graph-test-watcher/src/cli/import-state.ts +++ b/packages/graph-test-watcher/src/cli/import-state.ts @@ -83,7 +83,7 @@ export const main = async (): Promise => { jobQueue, indexer, eventWatcher, - config.upstream.ethServer.blockDelayInMilliSecs, + jobQueueConfig.blockDelayInMilliSecs, { prefetch: true, startBlock: importData.snapshotBlock.blockNumber, diff --git a/packages/graph-test-watcher/src/fill.ts b/packages/graph-test-watcher/src/fill.ts index 19d015f4..f24b42e4 100644 --- a/packages/graph-test-watcher/src/fill.ts +++ b/packages/graph-test-watcher/src/fill.ts @@ -86,7 +86,7 @@ export const main = async (): Promise => { const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); - await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv); + await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); }; main().catch(err => { diff --git a/packages/graph-test-watcher/src/hooks.ts b/packages/graph-test-watcher/src/hooks.ts index 3e61d0a3..e727296e 100644 --- a/packages/graph-test-watcher/src/hooks.ts +++ b/packages/graph-test-watcher/src/hooks.ts @@ -4,7 +4,9 @@ import assert from 'assert'; -import { Indexer, ResultEvent } from './indexer'; +import { ResultEvent } from '@cerc-io/util'; + +import { Indexer } from './indexer'; /** * Hook function to store an initial state. diff --git a/packages/graph-test-watcher/src/indexer.ts b/packages/graph-test-watcher/src/indexer.ts index 874c6831..a8f989e4 100644 --- a/packages/graph-test-watcher/src/indexer.ts +++ b/packages/graph-test-watcher/src/indexer.ts @@ -17,7 +17,6 @@ import { StorageLayout, MappingKey } from '@cerc-io/solidity-mapper'; import { Indexer as BaseIndexer, ValueResult, - UNKNOWN_EVENT_NAME, ServerConfig, updateStateForElementaryType, JobQueue, @@ -26,7 +25,9 @@ import { BlockHeight, StateKind, IndexerInterface, - StateStatus + StateStatus, + ResultEvent, + getResultEvent } from '@cerc-io/util'; import { GraphWatcher } from '@cerc-io/graph-node'; @@ -48,30 +49,6 @@ const JSONbigNative = JSONbig({ useNativeBigInt: true }); const KIND_EXAMPLE1 = 'Example1'; -export type ResultEvent = { - block: { - cid: string; - hash: string; - number: number; - timestamp: number; - parentHash: string; - }; - tx: { - hash: string; - from: string; - to: string; - index: number; - }; - - contract: string; - - eventIndex: number; - eventSignature: string; - event: any; - - proof: string; -}; - export class Indexer implements IndexerInterface { _db: Database _ethClient: EthClient @@ -138,38 +115,7 @@ export class Indexer implements IndexerInterface { } getResultEvent (event: Event): ResultEvent { - const block = event.block; - const eventFields = JSONbigNative.parse(event.eventInfo); - const { tx, eventSignature } = JSONbigNative.parse(event.extraInfo); - - return { - block: { - cid: block.cid, - hash: block.blockHash, - number: block.blockNumber, - timestamp: block.blockTimestamp, - parentHash: block.parentHash - }, - - tx: { - hash: event.txHash, - from: tx.src, - to: tx.dst, - index: tx.index - }, - - contract: event.contract, - - eventIndex: event.index, - eventSignature, - event: { - __typename: `${event.eventName}Event`, - ...eventFields - }, - - // TODO: Return proof only if requested. - proof: JSON.parse(event.proof) - }; + return getResultEvent(event); } async getMethod (blockHash: string, contractAddress: string): Promise { @@ -389,12 +335,12 @@ export class Indexer implements IndexerInterface { const logDescription = contract.parseLog({ data, topics }); - const { eventName, eventInfo } = this._baseIndexer.parseEvent(logDescription); + const { eventName, eventInfo, eventSignature } = this._baseIndexer.parseEvent(logDescription); return { eventName, eventInfo, - eventSignature: logDescription.signature + eventSignature }; } @@ -522,8 +468,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getBlocksAtHeight(height, isPruned); } - async fetchBlockWithEvents (block: DeepPartial): Promise { - return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this)); + async saveBlockAndFetchEvents (block: DeepPartial): Promise<[BlockProgress, DeepPartial[]]> { + return this._baseIndexer.saveBlockAndFetchEvents(block, this._saveBlockAndFetchEvents.bind(this)); } async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise> { @@ -639,126 +585,33 @@ export class Indexer implements IndexerInterface { }); } - async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { + async _saveBlockAndFetchEvents ({ + cid: blockCid, + blockHash, + blockNumber, + blockTimestamp, + parentHash + }: DeepPartial): Promise<[BlockProgress, DeepPartial[]]> { assert(blockHash); - const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash }); - const blockPromise = this._ethClient.getBlockByHash(blockHash); - let logs: any[]; - console.time('time:indexer#_fetchAndSaveEvents-fetch-logs'); - if (this._serverConfig.filterLogs) { - const watchedContracts = this._baseIndexer.getWatchedContracts(); - const addresses = watchedContracts.map((watchedContract): string => { - return watchedContract.address; - }); - - const logsResult = await this._ethClient.getLogs({ - blockHash, - addresses - }); - - logs = logsResult.logs; - } else { - ({ logs } = await this._ethClient.getLogs({ blockHash })); - } - console.timeEnd('time:indexer#_fetchAndSaveEvents-fetch-logs'); - - let [ - { block }, - { - allEthHeaderCids: { - nodes: [ - { - ethTransactionCidsByHeaderId: { - nodes: transactions - } - } - ] - } - } - ] = await Promise.all([blockPromise, transactionsPromise]); - - const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { - acc[transaction.txHash] = transaction; - return acc; - }, {}); - - const dbEvents: Array> = []; - - for (let li = 0; li < logs.length; li++) { - const logObj = logs[li]; - const { - topics, - data, - index: logIndex, - cid, - ipldBlock, - account: { - address - }, - transaction: { - hash: txHash - }, - receiptCID, - status - } = logObj; - - if (status) { - let eventName = UNKNOWN_EVENT_NAME; - let eventInfo = {}; - const tx = transactionMap[txHash]; - const extraInfo: { [key: string]: any } = { topics, data, tx }; - - const contract = ethers.utils.getAddress(address); - const watchedContract = await this.isWatchedContract(contract); - - if (watchedContract) { - const eventDetails = this.parseEventNameAndArgs(watchedContract.kind, logObj); - eventName = eventDetails.eventName; - eventInfo = eventDetails.eventInfo; - extraInfo.eventSignature = eventDetails.eventSignature; - } - - dbEvents.push({ - index: logIndex, - txHash, - contract, - eventName, - eventInfo: JSONbigNative.stringify(eventInfo), - extraInfo: JSONbigNative.stringify(extraInfo), - proof: JSONbigNative.stringify({ - data: JSONbigNative.stringify({ - blockHash, - receiptCID, - log: { - cid, - ipldBlock - } - }) - }) - }); - } else { - log(`Skipping event for receipt ${receiptCID} due to failed transaction.`); - } - } + const dbEvents = await this._baseIndexer.fetchEvents(blockHash, this.parseEventNameAndArgs.bind(this)); const dbTx = await this._db.createTransactionRunner(); - try { - block = { + const block = { cid: blockCid, blockHash, - blockNumber: block.number, - blockTimestamp: block.timestamp, - parentHash: block.parent.hash + blockNumber, + blockTimestamp, + parentHash }; - console.time('time:indexer#_fetchAndSaveEvents-save-block-events'); + console.time(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`); const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents); await dbTx.commitTransaction(); - console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events'); + console.timeEnd(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`); - return blockProgress; + return [blockProgress, []]; } catch (error) { await dbTx.rollbackTransaction(); throw error; diff --git a/packages/mobymask-watcher/environments/local.toml b/packages/mobymask-watcher/environments/local.toml index 3a4e2910..27b59f26 100644 --- a/packages/mobymask-watcher/environments/local.toml +++ b/packages/mobymask-watcher/environments/local.toml @@ -36,7 +36,6 @@ [upstream.ethServer] gqlApiEndpoint = "http://127.0.0.1:8082/graphql" rpcProviderEndpoint = "http://127.0.0.1:8081" - blockDelayInMilliSecs = 60000 [upstream.cache] name = "requests" @@ -48,3 +47,6 @@ maxCompletionLagInSecs = 300 jobDelayInMilliSecs = 100 eventsInBatch = 50 + blockDelayInMilliSecs = 60000 + prefetchBlocksInMem = true + prefetchBlockCount = 10 diff --git a/packages/mobymask-watcher/src/cli/import-state.ts b/packages/mobymask-watcher/src/cli/import-state.ts index 250d09bd..729d3027 100644 --- a/packages/mobymask-watcher/src/cli/import-state.ts +++ b/packages/mobymask-watcher/src/cli/import-state.ts @@ -74,7 +74,7 @@ export const main = async (): Promise => { jobQueue, indexer, eventWatcher, - config.upstream.ethServer.blockDelayInMilliSecs, + jobQueueConfig.blockDelayInMilliSecs, { prefetch: true, startBlock: importData.snapshotBlock.blockNumber, diff --git a/packages/mobymask-watcher/src/fill.ts b/packages/mobymask-watcher/src/fill.ts index b916f4f4..f20037c7 100644 --- a/packages/mobymask-watcher/src/fill.ts +++ b/packages/mobymask-watcher/src/fill.ts @@ -76,7 +76,7 @@ export const main = async (): Promise => { const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); - await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv); + await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); }; main().catch(err => { diff --git a/packages/mobymask-watcher/src/hooks.ts b/packages/mobymask-watcher/src/hooks.ts index 2166a765..0d56df69 100644 --- a/packages/mobymask-watcher/src/hooks.ts +++ b/packages/mobymask-watcher/src/hooks.ts @@ -5,9 +5,9 @@ import assert from 'assert'; import { utils } from 'ethers'; -// import { updateStateForMappingType, updateStateForElementaryType } from '@cerc-io/util'; +import { ResultEvent } from '@cerc-io/util'; -import { Indexer, KIND_PHISHERREGISTRY, ResultEvent } from './indexer'; +import { Indexer, KIND_PHISHERREGISTRY } from './indexer'; const INVOKE_SIGNATURE = 'invoke(((((address,uint256,bytes),((address,bytes32,(address,bytes)[]),bytes)[])[],(uint256,uint256)),bytes)[])'; const CLAIM_IF_MEMBER_SIGNATURE = 'claimIfMember(string,bool)'; diff --git a/packages/mobymask-watcher/src/indexer.ts b/packages/mobymask-watcher/src/indexer.ts index 743cbfe4..6c2ecc0d 100644 --- a/packages/mobymask-watcher/src/indexer.ts +++ b/packages/mobymask-watcher/src/indexer.ts @@ -16,7 +16,6 @@ import { Indexer as BaseIndexer, IndexerInterface, ValueResult, - UNKNOWN_EVENT_NAME, ServerConfig, JobQueue, Where, @@ -26,7 +25,9 @@ import { BlockHeight, StateKind, StateStatus, - getFullTransaction + getFullTransaction, + ResultEvent, + getResultEvent } from '@cerc-io/util'; import PhisherRegistryArtifacts from './artifacts/PhisherRegistry.json'; @@ -49,30 +50,6 @@ const JSONbigNative = JSONbig({ useNativeBigInt: true }); export const KIND_PHISHERREGISTRY = 'PhisherRegistry'; -export type ResultEvent = { - block: { - cid: string; - hash: string; - number: number; - timestamp: number; - parentHash: string; - }; - tx: { - hash: string; - from: string; - to: string; - index: number; - }; - - contract: string; - - eventIndex: number; - eventSignature: string; - event: any; - - proof: string; -}; - export class Indexer implements IndexerInterface { _db: Database _ethClient: EthClient @@ -124,38 +101,7 @@ export class Indexer implements IndexerInterface { } getResultEvent (event: Event): ResultEvent { - const block = event.block; - const eventFields = JSONbigNative.parse(event.eventInfo); - const { tx, eventSignature } = JSONbigNative.parse(event.extraInfo); - - return { - block: { - cid: block.cid, - hash: block.blockHash, - number: block.blockNumber, - timestamp: block.blockTimestamp, - parentHash: block.parentHash - }, - - tx: { - hash: event.txHash, - from: tx.src, - to: tx.dst, - index: tx.index - }, - - contract: event.contract, - - eventIndex: event.index, - eventSignature, - event: { - __typename: `${event.eventName}Event`, - ...eventFields - }, - - // TODO: Return proof only if requested. - proof: JSON.parse(event.proof) - }; + return getResultEvent(event); } async multiNonce (blockHash: string, contractAddress: string, key0: string, key1: bigint, diff = false): Promise { @@ -487,12 +433,12 @@ export class Indexer implements IndexerInterface { const logDescription = contract.parseLog({ data, topics }); - const { eventName, eventInfo } = this._baseIndexer.parseEvent(logDescription); + const { eventName, eventInfo, eventSignature } = this._baseIndexer.parseEvent(logDescription); return { eventName, eventInfo, - eventSignature: logDescription.signature + eventSignature }; } @@ -620,8 +566,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getBlocksAtHeight(height, isPruned); } - async fetchBlockWithEvents (block: DeepPartial): Promise { - return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this)); + async saveBlockAndFetchEvents (block: DeepPartial): Promise<[BlockProgress, DeepPartial[]]> { + return this._baseIndexer.saveBlockAndFetchEvents(block, this._saveBlockAndFetchEvents.bind(this)); } async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise> { @@ -661,126 +607,33 @@ export class Indexer implements IndexerInterface { return this._contractMap.get(kind); } - async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { + async _saveBlockAndFetchEvents ({ + cid: blockCid, + blockHash, + blockNumber, + blockTimestamp, + parentHash + }: DeepPartial): Promise<[BlockProgress, DeepPartial[]]> { assert(blockHash); - const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash }); - const blockPromise = this._ethClient.getBlockByHash(blockHash); - let logs: any[]; - console.time('time:indexer#_fetchAndSaveEvents-fetch-logs'); - if (this._serverConfig.filterLogs) { - const watchedContracts = this._baseIndexer.getWatchedContracts(); - const addresses = watchedContracts.map((watchedContract): string => { - return watchedContract.address; - }); - - const logsResult = await this._ethClient.getLogs({ - blockHash, - addresses - }); - - logs = logsResult.logs; - } else { - ({ logs } = await this._ethClient.getLogs({ blockHash })); - } - console.timeEnd('time:indexer#_fetchAndSaveEvents-fetch-logs'); - - let [ - { block }, - { - allEthHeaderCids: { - nodes: [ - { - ethTransactionCidsByHeaderId: { - nodes: transactions - } - } - ] - } - } - ] = await Promise.all([blockPromise, transactionsPromise]); - - const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { - acc[transaction.txHash] = transaction; - return acc; - }, {}); - - const dbEvents: Array> = []; - - for (let li = 0; li < logs.length; li++) { - const logObj = logs[li]; - const { - topics, - data, - index: logIndex, - cid, - ipldBlock, - account: { - address - }, - transaction: { - hash: txHash - }, - receiptCID, - status - } = logObj; - - if (status) { - let eventName = UNKNOWN_EVENT_NAME; - let eventInfo = {}; - const tx = transactionMap[txHash]; - const extraInfo: { [key: string]: any } = { topics, data, tx }; - - const contract = ethers.utils.getAddress(address); - const watchedContract = await this.isWatchedContract(contract); - - if (watchedContract) { - const eventDetails = this.parseEventNameAndArgs(watchedContract.kind, logObj); - eventName = eventDetails.eventName; - eventInfo = eventDetails.eventInfo; - extraInfo.eventSignature = eventDetails.eventSignature; - } - - dbEvents.push({ - index: logIndex, - txHash, - contract, - eventName, - eventInfo: JSONbigNative.stringify(eventInfo), - extraInfo: JSONbigNative.stringify(extraInfo), - proof: JSONbigNative.stringify({ - data: JSONbigNative.stringify({ - blockHash, - receiptCID, - log: { - cid, - ipldBlock - } - }) - }) - }); - } else { - log(`Skipping event for receipt ${receiptCID} due to failed transaction.`); - } - } + const dbEvents = await this._baseIndexer.fetchEvents(blockHash, this.parseEventNameAndArgs.bind(this)); const dbTx = await this._db.createTransactionRunner(); - try { - block = { + const block = { cid: blockCid, blockHash, - blockNumber: block.number, - blockTimestamp: block.timestamp, - parentHash: block.parent.hash + blockNumber, + blockTimestamp, + parentHash }; - console.time('time:indexer#_fetchAndSaveEvents-save-block-events'); + console.time(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`); const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents); await dbTx.commitTransaction(); - console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events'); + console.timeEnd(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`); - return blockProgress; + return [blockProgress, []]; } catch (error) { await dbTx.rollbackTransaction(); throw error; diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index 03bd8606..01dbba53 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -21,76 +21,10 @@ const DEFAULT_EVENTS_IN_BATCH = 50; const log = debug('vulcanize:common'); export interface PrefetchedBlock { - block: any; + block: BlockProgressInterface; events: DeepPartial[]; } -/** - * Method to fetch block by number and push to job queue. - * @param jobQueue - * @param indexer - * @param blockDelayInMilliSecs - * @param blockNumber - */ -export const processBlockByNumber = async ( - jobQueue: JobQueue, - indexer: IndexerInterface, - blockDelayInMilliSecs: number, - blockNumber: number -): Promise => { - log(`Process block ${blockNumber}`); - - console.time('time:common#processBlockByNumber-get-blockProgress-syncStatus'); - - const [blockProgressEntities, syncStatus] = await Promise.all([ - indexer.getBlocksAtHeight(blockNumber, false), - indexer.getSyncStatus() - ]); - - console.timeEnd('time:common#processBlockByNumber-get-blockProgress-syncStatus'); - - while (true) { - let blocks = blockProgressEntities.map((block: any) => { - block.timestamp = block.blockTimestamp; - - return block; - }); - - if (!blocks.length) { - blocks = await indexer.getBlocks({ blockNumber }); - } - - if (blocks.length) { - for (let bi = 0; bi < blocks.length; bi++) { - const { cid, blockHash, blockNumber, parentHash, timestamp } = blocks[bi]; - - // Stop blocks already pushed to job queue. They are already retried after fail. - if (!syncStatus || syncStatus.chainHeadBlockNumber < blockNumber) { - await jobQueue.pushJob( - QUEUE_BLOCK_PROCESSING, - { - kind: JOB_KIND_INDEX, - blockNumber: Number(blockNumber), - cid, - blockHash, - parentHash, - timestamp - } - ); - } - } - - await indexer.updateSyncStatusChainHead(blocks[0].blockHash, Number(blocks[0].blockNumber)); - - return; - } - - log(`No blocks fetched for block number ${blockNumber}, retrying after ${blockDelayInMilliSecs} ms delay.`); - - await wait(blockDelayInMilliSecs); - } -}; - /** * Create a processing job in QUEUE_BLOCK_PROCESSING. * @param jobQueue @@ -166,7 +100,6 @@ export const fetchBlocksAtHeight = async ( if (!blocks.length) { log(`No blocks fetched for block number ${blockNumber}, retrying after ${jobQueueConfig.blockDelayInMilliSecs} ms delay.`); - assert(jobQueueConfig.blockDelayInMilliSecs); await wait(jobQueueConfig.blockDelayInMilliSecs); } } @@ -205,8 +138,8 @@ export const _prefetchBlocks = async ( blockNumber + jobQueueConfig.prefetchBlockCount ); - blocksWithEvents.forEach(({ block, events }) => { - prefetchedBlocksMap.set(block.blockHash, { block, events }); + blocksWithEvents.forEach(({ blockProgress, events }) => { + prefetchedBlocksMap.set(blockProgress.blockHash, { block: blockProgress, events }); }); }; @@ -218,7 +151,7 @@ export const _prefetchBlocks = async ( * @param endBlock */ export const _fetchBatchBlocks = async (indexer: IndexerInterface, jobQueueConfig: JobQueueConfig, startBlock: number, endBlock: number): Promise => { - let blockNumbers = [...Array(endBlock - startBlock).keys()].map(n => n + startBlock); + const blockNumbers = [...Array(endBlock - startBlock).keys()].map(n => n + startBlock); let blocks = []; // Fetch blocks again if there are missing blocks. @@ -228,20 +161,17 @@ export const _fetchBatchBlocks = async (indexer: IndexerInterface, jobQueueConfi const res = await Promise.all(blockPromises); console.timeEnd('time:common#fetchBatchBlocks-getBlocks'); - const missingIndex = res.findIndex(blocks => blocks.length === 0); + const firstMissingBlockIndex = res.findIndex(blocks => blocks.length === 0); - // TODO Continue to process available blocks instead of retrying for whole range. - if (missingIndex < 0) { - blocks = blocks.concat(res); + if (firstMissingBlockIndex === -1) { + blocks = res; + break; + } else if (firstMissingBlockIndex > 0) { + blocks = res.slice(0, firstMissingBlockIndex); break; } - log('missing block number:', blockNumbers[missingIndex]); - - blocks.push(res.slice(0, missingIndex)); - blockNumbers = blockNumbers.slice(missingIndex); - - assert(jobQueueConfig.blockDelayInMilliSecs); + log(`No blocks fetched for block number ${blockNumbers[0]}, retrying after ${jobQueueConfig.blockDelayInMilliSecs} ms delay.`); await wait(jobQueueConfig.blockDelayInMilliSecs); } @@ -254,11 +184,9 @@ export const _fetchBatchBlocks = async (indexer: IndexerInterface, jobQueueConfi // TODO Catch errors and continue to process available events instead of retrying for whole range because of an error. const blockAndEventPromises = blocks.map(async block => { block.blockTimestamp = block.timestamp; + const [blockProgress, events] = await indexer.saveBlockAndFetchEvents(block); - assert(indexer.fetchBlockEvents); - const events = await indexer.fetchBlockEvents(block); - - return { block, events }; + return { blockProgress, events }; }); return Promise.all(blockAndEventPromises); @@ -326,7 +254,7 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block // uni-info-watcher indexer doesn't have watched contracts implementation. watchedContract = true; } else { - watchedContract = await indexer.isWatchedContract(event.contract); + watchedContract = indexer.isWatchedContract(event.contract); } if (watchedContract) { diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index 3d9061e8..220e05e3 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -24,7 +24,7 @@ export interface JobQueueConfig { eventsInBatch: number; lazyUpdateBlockProgress?: boolean; subgraphEventsOrder: boolean; - blockDelayInMilliSecs?: number; + blockDelayInMilliSecs: number; prefetchBlocksInMem: boolean; prefetchBlockCount: number; } @@ -49,7 +49,6 @@ export interface UpstreamConfig { ethServer: { gqlApiEndpoint: string; rpcProviderEndpoint: string; - blockDelayInMilliSecs: number; } traceProviderEndpoint: string; uniWatcher: { diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index d2c8d87d..e1147145 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -11,7 +11,7 @@ import { EthClient } from '@cerc-io/ipld-eth-client'; import { JobQueue } from './job-queue'; import { BlockProgressInterface, EventInterface, IndexerInterface } from './types'; import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME } from './constants'; -import { createPruningJob, processBlockByNumber } from './common'; +import { createPruningJob, processBlockByNumberWithCache } from './common'; import { UpstreamConfig } from './config'; import { OrderDirection } from './database'; @@ -58,10 +58,8 @@ export class EventWatcher { startBlockNumber = syncStatus.chainHeadBlockNumber + 1; } - const { ethServer: { blockDelayInMilliSecs } } = this._upstreamConfig; - // Wait for block processing as blockProgress event might process the same block. - await processBlockByNumber(this._jobQueue, this._indexer, blockDelayInMilliSecs, startBlockNumber); + await processBlockByNumberWithCache(this._jobQueue, startBlockNumber); // Creating an AsyncIterable from AsyncIterator to iterate over the values. // https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of @@ -76,7 +74,7 @@ export class EventWatcher { const { onBlockProgressEvent: { blockNumber, isComplete } } = data; if (isComplete) { - await processBlockByNumber(this._jobQueue, this._indexer, blockDelayInMilliSecs, blockNumber + 1); + await processBlockByNumberWithCache(this._jobQueue, blockNumber + 1); } } } @@ -139,23 +137,22 @@ export class EventWatcher { } async _handleIndexingComplete (jobData: any): Promise { - const { blockHash, blockNumber, priority } = jobData; + const { blockNumber, priority } = jobData; - const [blockProgress, syncStatus] = await Promise.all([ - this._indexer.getBlockProgress(blockHash), - // Update sync progress. - this._indexer.updateSyncStatusIndexedBlock(blockHash, blockNumber) - ]); + const blockProgressEntities = await this._indexer.getBlocksAtHeight(Number(blockNumber), false); - if (blockProgress) { - log(`Job onComplete indexing block ${blockHash} ${blockNumber}`); + // Log a warning and return if block entries not found. + if (blockProgressEntities.length === 0) { + log(`block not indexed at height ${blockNumber}`); + return; + } - // Create pruning job if required. - if (syncStatus && syncStatus.latestIndexedBlockNumber > (syncStatus.latestCanonicalBlockNumber + MAX_REORG_DEPTH)) { - await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority); - } - } else { - log(`block not indexed for ${blockHash} ${blockNumber}`); + const syncStatus = await this._indexer.updateSyncStatusIndexedBlock(blockProgressEntities[0].blockHash, Number(blockNumber)); + log(`Job onComplete indexing block ${blockProgressEntities[0].blockHash} ${blockNumber}`); + + // Create pruning job if required. + if (syncStatus && syncStatus.latestIndexedBlockNumber > (syncStatus.latestCanonicalBlockNumber + MAX_REORG_DEPTH)) { + await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority); } } diff --git a/packages/util/src/fill.ts b/packages/util/src/fill.ts index e85faba0..e72a4011 100644 --- a/packages/util/src/fill.ts +++ b/packages/util/src/fill.ts @@ -7,7 +7,7 @@ import debug from 'debug'; import { JobQueue } from './job-queue'; import { EventWatcherInterface, IndexerInterface } from './types'; import { wait } from './misc'; -import { processBlockByNumber } from './common'; +import { processBlockByNumberWithCache } from './common'; const log = debug('vulcanize:fill'); @@ -59,7 +59,7 @@ export const fillBlocks = async ( const numberOfBlocks = endBlock - startBlock + 1; - processBlockByNumber(jobQueue, indexer, blockDelayInMilliSecs, startBlock); + processBlockByNumberWithCache(jobQueue, startBlock); // Creating an AsyncIterable from AsyncIterator to iterate over the values. // https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of @@ -80,7 +80,7 @@ export const fillBlocks = async ( const completePercentage = Math.round(blocksProcessed / numberOfBlocks * 100); log(`Processed ${blocksProcessed} of ${numberOfBlocks} blocks (${completePercentage}%)`); - await processBlockByNumber(jobQueue, indexer, blockDelayInMilliSecs, blockNumber + 1); + await processBlockByNumberWithCache(jobQueue, blockNumber + 1); if (blockNumber + 1 >= endBlock) { // Break the async loop when blockProgress event is for the endBlock and processing is complete. @@ -130,7 +130,7 @@ const prefetchBlocks = async ( const blockProgress = await indexer.getBlockProgress(blockHash); if (!blockProgress) { - await indexer.fetchBlockWithEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); + await indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); } }); diff --git a/packages/util/src/index-block.ts b/packages/util/src/index-block.ts index 31375a5c..de593732 100644 --- a/packages/util/src/index-block.ts +++ b/packages/util/src/index-block.ts @@ -36,7 +36,7 @@ export const indexBlock = async ( // Check if blockProgress fetched from database. if (!partialblockProgress.id) { - blockProgress = await indexer.fetchBlockWithEvents(partialblockProgress); + [blockProgress] = await indexer.saveBlockAndFetchEvents(partialblockProgress); } else { blockProgress = partialblockProgress as BlockProgressInterface; } diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 82cdedb8..33e5606c 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -5,6 +5,7 @@ import assert from 'assert'; import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm'; import debug from 'debug'; +import JSONbig from 'json-bigint'; import { ethers } from 'ethers'; import _ from 'lodash'; import { sha256 } from 'multiformats/hashes/sha2'; @@ -32,6 +33,7 @@ import { ServerConfig } from './config'; const DEFAULT_MAX_EVENTS_BLOCK_RANGE = 1000; const log = debug('vulcanize:indexer'); +const JSONbigNative = JSONbig({ useNativeBigInt: true }); export interface ValueResult { value: any; @@ -62,6 +64,30 @@ export type ResultState = { data: string; }; +export type ResultEvent = { + block: { + cid: string; + hash: string; + number: number; + timestamp: number; + parentHash: string; + }; + tx: { + hash: string; + from: string; + to: string; + index: number; + }; + + contract: string; + + eventIndex: number; + eventSignature: string; + event: any; + + proof: string; +}; + export class Indexer { _serverConfig: ServerConfig; _db: DatabaseInterface; @@ -237,26 +263,115 @@ export class Indexer { return this._db.getEvent(id); } - async fetchBlockWithEvents (block: DeepPartial, fetchAndSaveEvents: (block: DeepPartial) => Promise): Promise { + async saveBlockAndFetchEvents (block: DeepPartial, saveBlockAndFetchEvents: (block: DeepPartial) => Promise<[BlockProgressInterface, DeepPartial[]]>): Promise<[BlockProgressInterface, DeepPartial[]]> { assert(block.blockHash); log(`getBlockEvents: fetching from upstream server ${block.blockHash}`); - const blockProgress = await fetchAndSaveEvents(block); + const [blockProgress, events] = await saveBlockAndFetchEvents(block); log(`getBlockEvents: fetched for block: ${blockProgress.blockHash} num events: ${blockProgress.numEvents}`); - return blockProgress; + return [blockProgress, events]; } - async fetchBlockEvents (block: DeepPartial, fetchEvents: (block: DeepPartial) => Promise[]>): Promise[]> { - assert(block.blockHash); + async fetchEvents (blockHash: string, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise[]> { + let logsPromise: Promise; - log(`getBlockEvents: fetching from upstream server ${block.blockHash}`); - console.time(`time:indexer#fetchBlockEvents-fetchAndSaveEvents-${block.blockHash}`); - const events = await fetchEvents(block); - console.timeEnd(`time:indexer#fetchBlockEvents-fetchAndSaveEvents-${block.blockHash}`); - log(`getBlockEvents: fetched for block: ${block.blockHash} num events: ${events.length}`); + if (this._serverConfig.filterLogs) { + const watchedContracts = this.getWatchedContracts(); + const addresses = watchedContracts.map((watchedContract): string => { + return watchedContract.address; + }); - return events; + logsPromise = this._ethClient.getLogs({ + blockHash, + addresses + }); + } else { + logsPromise = this._ethClient.getLogs({ blockHash }); + } + + const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash }); + + const [ + { logs }, + { + allEthHeaderCids: { + nodes: [ + { + ethTransactionCidsByHeaderId: { + nodes: transactions + } + } + ] + } + } + ] = await Promise.all([logsPromise, transactionsPromise]); + + const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { + acc[transaction.txHash] = transaction; + return acc; + }, {}); + + const dbEvents: Array> = []; + + for (let li = 0; li < logs.length; li++) { + const logObj = logs[li]; + const { + topics, + data, + index: logIndex, + cid, + ipldBlock, + account: { + address + }, + transaction: { + hash: txHash + }, + receiptCID, + status + } = logObj; + + if (status) { + let eventName = UNKNOWN_EVENT_NAME; + let eventInfo = {}; + const tx = transactionMap[txHash]; + const extraInfo: { [key: string]: any } = { topics, data, tx }; + + const contract = ethers.utils.getAddress(address); + const watchedContract = this.isWatchedContract(contract); + + if (watchedContract) { + const eventDetails = parseEventNameAndArgs(watchedContract.kind, logObj); + eventName = eventDetails.eventName; + eventInfo = eventDetails.eventInfo; + extraInfo.eventSignature = eventDetails.eventSignature; + } + + dbEvents.push({ + index: logIndex, + txHash, + contract, + eventName, + eventInfo: JSONbigNative.stringify(eventInfo), + extraInfo: JSONbigNative.stringify(extraInfo), + proof: JSONbigNative.stringify({ + data: JSONbigNative.stringify({ + blockHash, + receiptCID, + log: { + cid, + ipldBlock + } + }) + }) + }); + } else { + log(`Skipping event for receipt ${receiptCID} due to failed transaction.`); + } + } + + return dbEvents; } async saveBlockProgress (block: DeepPartial): Promise { @@ -872,9 +987,7 @@ export class Indexer { this._stateStatusMap[address] = _.merge(oldStateStatus, stateStatus); } - parseEvent (logDescription: ethers.utils.LogDescription): { eventName: string, eventInfo: any } { - const eventName = logDescription.name; - + parseEvent (logDescription: ethers.utils.LogDescription): { eventName: string, eventInfo: any, eventSignature: string } { const eventInfo = logDescription.eventFragment.inputs.reduce((acc: any, input, index) => { acc[input.name] = this._parseLogArg(input, logDescription.args[index]); @@ -882,8 +995,9 @@ export class Indexer { }, {}); return { - eventName, - eventInfo + eventName: logDescription.name, + eventInfo, + eventSignature: logDescription.signature }; } diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index ee01e26e..f180355f 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -17,13 +17,15 @@ import { QUEUE_EVENT_PROCESSING } from './constants'; import { JobQueue } from './job-queue'; -import { EventInterface, IndexerInterface, SyncStatusInterface } from './types'; +import { EventInterface, IndexerInterface } from './types'; import { wait } from './misc'; import { createPruningJob, createHooksJob, createCheckpointJob, - processBatchEvents + processBatchEvents, + PrefetchedBlock, + fetchBlocksAtHeight } from './common'; import { lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber } from './metrics'; @@ -37,6 +39,7 @@ export class JobRunner { _endBlockProcessTimer?: () => void _shutDown = false _signalCount = 0 + _prefetchedBlocksMap: Map = new Map() constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) { this._indexer = indexer; @@ -47,16 +50,21 @@ export class JobRunner { async processBlock (job: any): Promise { const { data: { kind } } = job; - const syncStatus = await this._indexer.getSyncStatus(); - assert(syncStatus); - switch (kind) { - case JOB_KIND_INDEX: - await this._indexBlock(job, syncStatus); + case JOB_KIND_INDEX: { + const blocksToBeIndexed = await fetchBlocksAtHeight( + job, + this._indexer, + this._jobQueueConfig, + this._prefetchedBlocksMap + ); + const indexBlockPromises = blocksToBeIndexed.map(blockToBeIndexed => this._indexBlock(job, blockToBeIndexed)); + await Promise.all(indexBlockPromises); break; + } case JOB_KIND_PRUNE: { - await this._pruneChain(job, syncStatus); + await this._pruneChain(job); // Create a hooks job for parent block of latestCanonicalBlock pruning for first block is skipped as it is assumed to be a canonical block. const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock(); @@ -180,8 +188,12 @@ export class JobRunner { } } - async _pruneChain (job: any, syncStatus: SyncStatusInterface): Promise { + async _pruneChain (job: any): Promise { console.time('time:job-runner#_pruneChain'); + + const syncStatus = await this._indexer.getSyncStatus(); + assert(syncStatus); + const { pruneBlockHeight } = job.data; log(`Processing chain pruning at ${pruneBlockHeight}`); @@ -226,8 +238,12 @@ export class JobRunner { console.timeEnd('time:job-runner#_pruneChain'); } - async _indexBlock (job: any, syncStatus: SyncStatusInterface): Promise { - const { data: { cid, blockHash, blockNumber, parentHash, priority, timestamp } } = job; + async _indexBlock (job: any, blockToBeIndexed: any): Promise { + const syncStatus = await this._indexer.getSyncStatus(); + assert(syncStatus); + + const { data: { priority } } = job; + const { cid, blockHash, blockNumber, parentHash, blockTimestamp } = blockToBeIndexed; const indexBlockStartTime = new Date(); @@ -325,13 +341,21 @@ export class JobRunner { } if (!blockProgress) { - const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig; + const prefetchedBlock = this._prefetchedBlocksMap.get(blockHash); - // Delay required to process block. - await wait(jobDelayInMilliSecs); - console.time('time:job-runner#_indexBlock-fetch-block-events'); - blockProgress = await this._indexer.fetchBlockWithEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); - console.timeEnd('time:job-runner#_indexBlock-fetch-block-events'); + if (prefetchedBlock) { + ({ block: blockProgress } = prefetchedBlock); + } else { + // Delay required to process block. + const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig; + await wait(jobDelayInMilliSecs); + + console.time('time:job-runner#_indexBlock-saveBlockAndFetchEvents'); + [blockProgress] = await this._indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp }); + console.timeEnd('time:job-runner#_indexBlock-saveBlockAndFetchEvents'); + + this._prefetchedBlocksMap.set(blockHash, { block: blockProgress, events: [] }); + } } await this._indexer.processBlock(blockProgress); @@ -347,21 +371,30 @@ export class JobRunner { async _processEvents (job: any): Promise { const { blockHash } = job.data; - console.time('time:job-runner#_processEvents-get-block-progress'); - const block = await this._indexer.getBlockProgress(blockHash); - console.timeEnd('time:job-runner#_processEvents-get-block-progress'); - assert(block); + if (!this._prefetchedBlocksMap.has(blockHash)) { + console.time('time:job-runner#_processEvents-get-block-progress'); + const block = await this._indexer.getBlockProgress(blockHash); + console.timeEnd('time:job-runner#_processEvents-get-block-progress'); + + assert(block); + this._prefetchedBlocksMap.set(blockHash, { block, events: [] }); + } + + const prefetchedBlock = this._prefetchedBlocksMap.get(blockHash); + assert(prefetchedBlock); + + const { block } = prefetchedBlock; console.time('time:job-runner#_processEvents-events'); - await processBatchEvents(this._indexer, block, this._jobQueueConfig.eventsInBatch); - console.timeEnd('time:job-runner#_processEvents-events'); // Update metrics lastProcessedBlockNumber.set(block.blockNumber); lastBlockNumEvents.set(block.numEvents); + this._prefetchedBlocksMap.delete(block.blockHash); + if (this._endBlockProcessTimer) { this._endBlockProcessTimer(); } diff --git a/packages/util/src/misc.ts b/packages/util/src/misc.ts index 36d032b8..c40a92d5 100644 --- a/packages/util/src/misc.ts +++ b/packages/util/src/misc.ts @@ -7,8 +7,8 @@ import { ValueTransformer } from 'typeorm'; import yargs from 'yargs'; import { hideBin } from 'yargs/helpers'; import { utils, providers } from 'ethers'; +import JSONbig from 'json-bigint'; import Decimal from 'decimal.js'; -import debug from 'debug'; import { EthClient } from '@cerc-io/ipld-eth-client'; @@ -18,6 +18,10 @@ import { JobQueue } from './job-queue'; import { GraphDecimal } from './graph-decimal'; import * as EthDecoder from './eth'; import { getCachedBlockSize } from './block-size-cache'; +import { ResultEvent } from './indexer'; +import { EventInterface } from './types'; + +const JSONbigNative = JSONbig({ useNativeBigInt: true }); /** * Method to wait for specified time. @@ -248,3 +252,38 @@ export const jsonBigIntStringReplacer = (_: string, value: any): any => { return value; }; + +export const getResultEvent = (event: EventInterface): ResultEvent => { + const block = event.block; + const eventFields = JSONbigNative.parse(event.eventInfo); + const { tx, eventSignature } = JSONbigNative.parse(event.extraInfo); + + return { + block: { + cid: block.cid, + hash: block.blockHash, + number: block.blockNumber, + timestamp: block.blockTimestamp, + parentHash: block.parentHash + }, + + tx: { + hash: event.txHash, + from: tx.src, + to: tx.dst, + index: tx.index + }, + + contract: event.contract, + + eventIndex: event.index, + eventSignature, + event: { + __typename: `${event.eventName}Event`, + ...eventFields + }, + + // TODO: Return proof only if requested. + proof: JSON.parse(event.proof) + }; +}; diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index e5011ad7..243f582c 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -91,8 +91,7 @@ export interface IndexerInterface { getLatestCanonicalBlock (): Promise getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise> getAncestorAtDepth (blockHash: string, depth: number): Promise - fetchBlockWithEvents (block: DeepPartial): Promise - fetchBlockEvents?: (block: DeepPartial) => Promise[]> + saveBlockAndFetchEvents (block: DeepPartial): Promise<[BlockProgressInterface, DeepPartial[]]> removeUnknownEvents (block: BlockProgressInterface): Promise updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise updateSyncStatusChainHead (blockHash: string, blockNumber: number, force?: boolean): Promise