mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-01-22 19:19:05 +00:00
Asynchronous prefetching of block size with eth_getBlockByNumber
(#173)
* Prefetch block size from eth_blockByHash * Fix updating of blockSizeMapLatestHeight * Move block size caching to separate file * Remove timer logs from graph-node store set and get
This commit is contained in:
parent
f3091dee3d
commit
73ca225779
@ -74,9 +74,7 @@ export const instantiate = async (
|
|||||||
const entityId = __getString(id);
|
const entityId = __getString(id);
|
||||||
|
|
||||||
assert(context.block);
|
assert(context.block);
|
||||||
console.time(`time:loader#index.store.get-db-${entityName}`);
|
|
||||||
const entityData = await database.getEntity(entityName, entityId, context.block.blockHash);
|
const entityData = await database.getEntity(entityName, entityId, context.block.blockHash);
|
||||||
console.timeEnd(`time:loader#index.store.get-db-${entityName}`);
|
|
||||||
|
|
||||||
if (!entityData) {
|
if (!entityData) {
|
||||||
return null;
|
return null;
|
||||||
@ -97,9 +95,7 @@ export const instantiate = async (
|
|||||||
|
|
||||||
assert(context.block);
|
assert(context.block);
|
||||||
let dbData = await database.fromGraphEntity(instanceExports, context.block, entityName, entityInstance);
|
let dbData = await database.fromGraphEntity(instanceExports, context.block, entityName, entityInstance);
|
||||||
console.time(`time:loader#index.store.set-db-${entityName}`);
|
|
||||||
await database.saveEntity(entityName, dbData);
|
await database.saveEntity(entityName, dbData);
|
||||||
console.timeEnd(`time:loader#index.store.set-db-${entityName}`);
|
|
||||||
|
|
||||||
// Resolve any field name conflicts in the dbData for auto-diff.
|
// Resolve any field name conflicts in the dbData for auto-diff.
|
||||||
dbData = resolveEntityFieldConflicts(dbData);
|
dbData = resolveEntityFieldConflicts(dbData);
|
||||||
|
@ -124,6 +124,7 @@ export class GraphWatcher {
|
|||||||
async handleEvent (eventData: any) {
|
async handleEvent (eventData: any) {
|
||||||
const { contract, event, eventSignature, block, tx: { hash: txHash }, eventIndex } = eventData;
|
const { contract, event, eventSignature, block, tx: { hash: txHash }, eventIndex } = eventData;
|
||||||
|
|
||||||
|
// Check if block data is already fetched by a previous event in the same block.
|
||||||
if (!this._context.block || this._context.block.blockHash !== block.hash) {
|
if (!this._context.block || this._context.block.blockHash !== block.hash) {
|
||||||
this._context.block = await getFullBlock(this._ethClient, this._ethProvider, block.hash);
|
this._context.block = await getFullBlock(this._ethClient, this._ethProvider, block.hash);
|
||||||
}
|
}
|
||||||
@ -185,9 +186,13 @@ export class GraphWatcher {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async handleBlock (blockHash: string) {
|
async handleBlock (blockHash: string) {
|
||||||
const blockData = await getFullBlock(this._ethClient, this._ethProvider, blockHash);
|
// Check if block data is already fetched in handleEvent method for the same block.
|
||||||
|
if (!this._context.block || this._context.block.blockHash !== blockHash) {
|
||||||
|
this._context.block = await getFullBlock(this._ethClient, this._ethProvider, blockHash);
|
||||||
|
}
|
||||||
|
|
||||||
this._context.block = blockData;
|
const blockData = this._context.block;
|
||||||
|
assert(blockData);
|
||||||
|
|
||||||
// Clear transactions map on handling new block.
|
// Clear transactions map on handling new block.
|
||||||
this._transactionsMap.clear();
|
this._transactionsMap.clear();
|
||||||
@ -195,7 +200,7 @@ export class GraphWatcher {
|
|||||||
// Call block handler(s) for each contract.
|
// Call block handler(s) for each contract.
|
||||||
for (const dataSource of this._dataSources) {
|
for (const dataSource of this._dataSources) {
|
||||||
// Reinstantiate WASM after every N blocks.
|
// Reinstantiate WASM after every N blocks.
|
||||||
if (blockData.blockNumber % this._wasmRestartBlocksInterval === 0) {
|
if (Number(blockData.blockNumber) % this._wasmRestartBlocksInterval === 0) {
|
||||||
// The WASM instance allocates memory as required and the limit is 4GB.
|
// The WASM instance allocates memory as required and the limit is 4GB.
|
||||||
// https://stackoverflow.com/a/40453962
|
// https://stackoverflow.com/a/40453962
|
||||||
// https://github.com/AssemblyScript/assemblyscript/pull/1268#issue-618411291
|
// https://github.com/AssemblyScript/assemblyscript/pull/1268#issue-618411291
|
||||||
@ -227,7 +232,7 @@ export class GraphWatcher {
|
|||||||
assert(this._indexer?.getContractsByKind);
|
assert(this._indexer?.getContractsByKind);
|
||||||
const watchedContracts = this._indexer.getContractsByKind(dataSource.name);
|
const watchedContracts = this._indexer.getContractsByKind(dataSource.name);
|
||||||
|
|
||||||
contractAddressList = watchedContracts.filter(contract => blockData.blockNumber >= contract.startingBlock)
|
contractAddressList = watchedContracts.filter(contract => Number(blockData.blockNumber) >= contract.startingBlock)
|
||||||
.map(contract => contract.address);
|
.map(contract => contract.address);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
63
packages/util/src/block-size-cache.ts
Normal file
63
packages/util/src/block-size-cache.ts
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
//
|
||||||
|
// Copyright 2022 Vulcanize, Inc.
|
||||||
|
//
|
||||||
|
|
||||||
|
import { utils, providers } from 'ethers';
|
||||||
|
import debug from 'debug';
|
||||||
|
|
||||||
|
const log = debug('vulcanize:block-size-cache');
|
||||||
|
|
||||||
|
// Number of blocks to cache after current block being processed.
|
||||||
|
const BLOCK_SIZE_CACHE_BUFFER = 10;
|
||||||
|
// Block height interval at which blockSizeMap is cleared.
|
||||||
|
// If the block being processed is divisible by BLOCK_SIZE_MAP_CLEAR_HEIGHT_INTERVAL then blocks below that height are removed from the map.
|
||||||
|
const BLOCK_SIZE_MAP_CLEAR_HEIGHT_INTERVAL = 50;
|
||||||
|
|
||||||
|
const blockSizeMap: Map<string, { size: string, blockNumber: number }> = new Map();
|
||||||
|
let blockSizeMapLatestHeight = -1;
|
||||||
|
|
||||||
|
export const getCachedBlockSize = async (provider: providers.JsonRpcProvider, blockHash: string, blockNumber: number): Promise<string> => {
|
||||||
|
const block = blockSizeMap.get(blockHash);
|
||||||
|
cacheBlockSizesAsync(provider, blockNumber);
|
||||||
|
|
||||||
|
if (!block) {
|
||||||
|
console.time(`time:misc#getCachedBlockSize-eth_getBlockByHash-${blockNumber}`);
|
||||||
|
const { size } = await provider.send('eth_getBlockByHash', [blockHash, false]);
|
||||||
|
console.timeEnd(`time:misc#getCachedBlockSize-eth_getBlockByHash-${blockNumber}`);
|
||||||
|
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
|
return block.size;
|
||||||
|
};
|
||||||
|
|
||||||
|
const cacheBlockSizesAsync = async (provider: providers.JsonRpcProvider, blockNumber: number): Promise<void> => {
|
||||||
|
const endBlockHeight = blockNumber + BLOCK_SIZE_CACHE_BUFFER;
|
||||||
|
|
||||||
|
if (blockSizeMapLatestHeight < 0) {
|
||||||
|
blockSizeMapLatestHeight = blockNumber;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (endBlockHeight > blockSizeMapLatestHeight) {
|
||||||
|
const startBlockHeight = blockSizeMapLatestHeight + 1;
|
||||||
|
blockSizeMapLatestHeight = endBlockHeight;
|
||||||
|
|
||||||
|
// Start prefetching blocks after latest height in blockSizeMap.
|
||||||
|
for (let i = startBlockHeight; i <= endBlockHeight; i++) {
|
||||||
|
console.time(`time:misc#cacheBlockSizesAsync-eth_getBlockByNumber-${i}`);
|
||||||
|
const { size, hash } = await provider.send('eth_getBlockByNumber', [utils.hexlify(i), false]);
|
||||||
|
console.timeEnd(`time:misc#cacheBlockSizesAsync-eth_getBlockByNumber-${i}`);
|
||||||
|
blockSizeMap.set(hash, { size, blockNumber: i });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// At interval clear previous blocks below height blockNumber from map.
|
||||||
|
if (blockNumber % BLOCK_SIZE_MAP_CLEAR_HEIGHT_INTERVAL === 0) {
|
||||||
|
log(`cacheBlockSizesAsync-clear-map-below-${blockNumber}`);
|
||||||
|
const previousBlockHashes = Array.from(blockSizeMap.entries())
|
||||||
|
.filter(([_, value]) => value.blockNumber <= blockNumber)
|
||||||
|
.map(([blockHash]) => blockHash);
|
||||||
|
|
||||||
|
previousBlockHashes.forEach(blockHash => blockSizeMap.delete(blockHash));
|
||||||
|
}
|
||||||
|
};
|
@ -8,6 +8,7 @@ import yargs from 'yargs';
|
|||||||
import { hideBin } from 'yargs/helpers';
|
import { hideBin } from 'yargs/helpers';
|
||||||
import { utils, providers } from 'ethers';
|
import { utils, providers } from 'ethers';
|
||||||
import Decimal from 'decimal.js';
|
import Decimal from 'decimal.js';
|
||||||
|
import debug from 'debug';
|
||||||
|
|
||||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||||
|
|
||||||
@ -16,6 +17,9 @@ import { Config } from './config';
|
|||||||
import { JobQueue } from './job-queue';
|
import { JobQueue } from './job-queue';
|
||||||
import { GraphDecimal } from './graph-decimal';
|
import { GraphDecimal } from './graph-decimal';
|
||||||
import * as EthDecoder from './eth';
|
import * as EthDecoder from './eth';
|
||||||
|
import { getCachedBlockSize } from './block-size-cache';
|
||||||
|
|
||||||
|
const log = debug('vulcanize:misc');
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Method to wait for specified time.
|
* Method to wait for specified time.
|
||||||
@ -184,16 +188,14 @@ export const getFullBlock = async (ethClient: EthClient, ethProvider: providers.
|
|||||||
|
|
||||||
assert(fullBlock.blockByMhKey);
|
assert(fullBlock.blockByMhKey);
|
||||||
|
|
||||||
// Deecode the header data.
|
// Decode the header data.
|
||||||
const header = EthDecoder.decodeHeader(EthDecoder.decodeData(fullBlock.blockByMhKey.data));
|
const header = EthDecoder.decodeHeader(EthDecoder.decodeData(fullBlock.blockByMhKey.data));
|
||||||
assert(header);
|
assert(header);
|
||||||
|
|
||||||
// TODO: Calculate size from rlp encoded data.
|
// TODO: Calculate size from rlp encoded data.
|
||||||
// Get block info from JSON RPC API provided by ipld-eth-server.
|
// Get block info from JSON RPC API provided by ipld-eth-server.
|
||||||
const provider = ethProvider as providers.JsonRpcProvider;
|
const provider = ethProvider as providers.JsonRpcProvider;
|
||||||
console.time('time:misc#getFullBlock-eth_getBlockByHash');
|
const size = await getCachedBlockSize(provider, blockHash, Number(fullBlock.blockNumber));
|
||||||
const { size } = await provider.send('eth_getBlockByHash', [blockHash, false]);
|
|
||||||
console.timeEnd('time:misc#getFullBlock-eth_getBlockByHash');
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
headerId: fullBlock.id,
|
headerId: fullBlock.id,
|
||||||
|
Loading…
Reference in New Issue
Block a user