diff --git a/packages/graph-node/src/loader.ts b/packages/graph-node/src/loader.ts index a70dd7ea..6401749a 100644 --- a/packages/graph-node/src/loader.ts +++ b/packages/graph-node/src/loader.ts @@ -74,9 +74,7 @@ export const instantiate = async ( const entityId = __getString(id); assert(context.block); - console.time(`time:loader#index.store.get-db-${entityName}`); const entityData = await database.getEntity(entityName, entityId, context.block.blockHash); - console.timeEnd(`time:loader#index.store.get-db-${entityName}`); if (!entityData) { return null; @@ -97,9 +95,7 @@ export const instantiate = async ( assert(context.block); let dbData = await database.fromGraphEntity(instanceExports, context.block, entityName, entityInstance); - console.time(`time:loader#index.store.set-db-${entityName}`); await database.saveEntity(entityName, dbData); - console.timeEnd(`time:loader#index.store.set-db-${entityName}`); // Resolve any field name conflicts in the dbData for auto-diff. dbData = resolveEntityFieldConflicts(dbData); diff --git a/packages/graph-node/src/watcher.ts b/packages/graph-node/src/watcher.ts index 2f1088c1..0e8863c8 100644 --- a/packages/graph-node/src/watcher.ts +++ b/packages/graph-node/src/watcher.ts @@ -124,6 +124,7 @@ export class GraphWatcher { async handleEvent (eventData: any) { const { contract, event, eventSignature, block, tx: { hash: txHash }, eventIndex } = eventData; + // Check if block data is already fetched by a previous event in the same block. if (!this._context.block || this._context.block.blockHash !== block.hash) { this._context.block = await getFullBlock(this._ethClient, this._ethProvider, block.hash); } @@ -185,9 +186,13 @@ export class GraphWatcher { } async handleBlock (blockHash: string) { - const blockData = await getFullBlock(this._ethClient, this._ethProvider, blockHash); + // Check if block data is already fetched in handleEvent method for the same block. + if (!this._context.block || this._context.block.blockHash !== blockHash) { + this._context.block = await getFullBlock(this._ethClient, this._ethProvider, blockHash); + } - this._context.block = blockData; + const blockData = this._context.block; + assert(blockData); // Clear transactions map on handling new block. this._transactionsMap.clear(); @@ -195,7 +200,7 @@ export class GraphWatcher { // Call block handler(s) for each contract. for (const dataSource of this._dataSources) { // Reinstantiate WASM after every N blocks. - if (blockData.blockNumber % this._wasmRestartBlocksInterval === 0) { + if (Number(blockData.blockNumber) % this._wasmRestartBlocksInterval === 0) { // The WASM instance allocates memory as required and the limit is 4GB. // https://stackoverflow.com/a/40453962 // https://github.com/AssemblyScript/assemblyscript/pull/1268#issue-618411291 @@ -227,7 +232,7 @@ export class GraphWatcher { assert(this._indexer?.getContractsByKind); const watchedContracts = this._indexer.getContractsByKind(dataSource.name); - contractAddressList = watchedContracts.filter(contract => blockData.blockNumber >= contract.startingBlock) + contractAddressList = watchedContracts.filter(contract => Number(blockData.blockNumber) >= contract.startingBlock) .map(contract => contract.address); } diff --git a/packages/util/src/block-size-cache.ts b/packages/util/src/block-size-cache.ts new file mode 100644 index 00000000..50df9d3d --- /dev/null +++ b/packages/util/src/block-size-cache.ts @@ -0,0 +1,63 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import { utils, providers } from 'ethers'; +import debug from 'debug'; + +const log = debug('vulcanize:block-size-cache'); + +// Number of blocks to cache after current block being processed. +const BLOCK_SIZE_CACHE_BUFFER = 10; +// Block height interval at which blockSizeMap is cleared. +// If the block being processed is divisible by BLOCK_SIZE_MAP_CLEAR_HEIGHT_INTERVAL then blocks below that height are removed from the map. +const BLOCK_SIZE_MAP_CLEAR_HEIGHT_INTERVAL = 50; + +const blockSizeMap: Map = new Map(); +let blockSizeMapLatestHeight = -1; + +export const getCachedBlockSize = async (provider: providers.JsonRpcProvider, blockHash: string, blockNumber: number): Promise => { + const block = blockSizeMap.get(blockHash); + cacheBlockSizesAsync(provider, blockNumber); + + if (!block) { + console.time(`time:misc#getCachedBlockSize-eth_getBlockByHash-${blockNumber}`); + const { size } = await provider.send('eth_getBlockByHash', [blockHash, false]); + console.timeEnd(`time:misc#getCachedBlockSize-eth_getBlockByHash-${blockNumber}`); + + return size; + } + + return block.size; +}; + +const cacheBlockSizesAsync = async (provider: providers.JsonRpcProvider, blockNumber: number): Promise => { + const endBlockHeight = blockNumber + BLOCK_SIZE_CACHE_BUFFER; + + if (blockSizeMapLatestHeight < 0) { + blockSizeMapLatestHeight = blockNumber; + } + + if (endBlockHeight > blockSizeMapLatestHeight) { + const startBlockHeight = blockSizeMapLatestHeight + 1; + blockSizeMapLatestHeight = endBlockHeight; + + // Start prefetching blocks after latest height in blockSizeMap. + for (let i = startBlockHeight; i <= endBlockHeight; i++) { + console.time(`time:misc#cacheBlockSizesAsync-eth_getBlockByNumber-${i}`); + const { size, hash } = await provider.send('eth_getBlockByNumber', [utils.hexlify(i), false]); + console.timeEnd(`time:misc#cacheBlockSizesAsync-eth_getBlockByNumber-${i}`); + blockSizeMap.set(hash, { size, blockNumber: i }); + } + } + + // At interval clear previous blocks below height blockNumber from map. + if (blockNumber % BLOCK_SIZE_MAP_CLEAR_HEIGHT_INTERVAL === 0) { + log(`cacheBlockSizesAsync-clear-map-below-${blockNumber}`); + const previousBlockHashes = Array.from(blockSizeMap.entries()) + .filter(([_, value]) => value.blockNumber <= blockNumber) + .map(([blockHash]) => blockHash); + + previousBlockHashes.forEach(blockHash => blockSizeMap.delete(blockHash)); + } +}; diff --git a/packages/util/src/misc.ts b/packages/util/src/misc.ts index 241e6852..db873fa9 100644 --- a/packages/util/src/misc.ts +++ b/packages/util/src/misc.ts @@ -8,6 +8,7 @@ import yargs from 'yargs'; import { hideBin } from 'yargs/helpers'; import { utils, providers } from 'ethers'; import Decimal from 'decimal.js'; +import debug from 'debug'; import { EthClient } from '@vulcanize/ipld-eth-client'; @@ -16,6 +17,9 @@ import { Config } from './config'; import { JobQueue } from './job-queue'; import { GraphDecimal } from './graph-decimal'; import * as EthDecoder from './eth'; +import { getCachedBlockSize } from './block-size-cache'; + +const log = debug('vulcanize:misc'); /** * Method to wait for specified time. @@ -184,16 +188,14 @@ export const getFullBlock = async (ethClient: EthClient, ethProvider: providers. assert(fullBlock.blockByMhKey); - // Deecode the header data. + // Decode the header data. const header = EthDecoder.decodeHeader(EthDecoder.decodeData(fullBlock.blockByMhKey.data)); assert(header); // TODO: Calculate size from rlp encoded data. // Get block info from JSON RPC API provided by ipld-eth-server. const provider = ethProvider as providers.JsonRpcProvider; - console.time('time:misc#getFullBlock-eth_getBlockByHash'); - const { size } = await provider.send('eth_getBlockByHash', [blockHash, false]); - console.timeEnd('time:misc#getFullBlock-eth_getBlockByHash'); + const size = await getCachedBlockSize(provider, blockHash, Number(fullBlock.blockNumber)); return { headerId: fullBlock.id,