From b8d6eeaba990495f13cf8f823c19b4d295078b97 Mon Sep 17 00:00:00 2001 From: prathamesh0 Date: Fri, 16 Sep 2022 19:58:41 +0530 Subject: [PATCH] Add a CLI to validate state for a given range --- packages/eden-watcher/package.json | 1 + .../eden-watcher/src/cli/validate-state.ts | 167 ++++++++++++++++++ packages/eden-watcher/src/fill-state.ts | 81 +++++---- 3 files changed, 217 insertions(+), 32 deletions(-) create mode 100644 packages/eden-watcher/src/cli/validate-state.ts diff --git a/packages/eden-watcher/package.json b/packages/eden-watcher/package.json index 8732e516..2a4dffe7 100644 --- a/packages/eden-watcher/package.json +++ b/packages/eden-watcher/package.json @@ -20,6 +20,7 @@ "checkpoint": "DEBUG=vulcanize:* ts-node src/cli/checkpoint.ts", "export-state": "DEBUG=vulcanize:* ts-node src/cli/export-state.ts", "import-state": "DEBUG=vulcanize:* ts-node src/cli/import-state.ts", + "validate-state": "DEBUG=vulcanize:* ts-node src/cli/validate-state.ts", "inspect-cid": "DEBUG=vulcanize:* ts-node src/cli/inspect-cid.ts", "index-block": "DEBUG=vulcanize:* ts-node src/cli/index-block.ts" }, diff --git a/packages/eden-watcher/src/cli/validate-state.ts b/packages/eden-watcher/src/cli/validate-state.ts new file mode 100644 index 00000000..1d45c1cb --- /dev/null +++ b/packages/eden-watcher/src/cli/validate-state.ts @@ -0,0 +1,167 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import path from 'path'; +import util from 'util'; +import assert from 'assert'; +import 'reflect-metadata'; +import debug from 'debug'; +import yargs from 'yargs'; +import { hideBin } from 'yargs/helpers'; +import { diffString, diff } from 'json-diff'; + +import { Config, getConfig, JobQueue, DEFAULT_CONFIG_PATH, initClients, StateKind } from '@cerc-io/util'; +import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; + +import { Database } from '../database'; +import { Indexer } from '../indexer'; +import { getContractEntitiesMap, createStateFromUpdatedEntities } from '../fill-state'; + +const log = debug('vulcanize:validate-state'); + +export const main = async (): Promise => { + const argv = await yargs(hideBin(process.argv)).parserConfiguration({ + 'parse-numbers': false + }).env( + 'VALIDATE' + ).options({ + configFile: { + alias: 'f', + type: 'string', + demandOption: true, + describe: 'configuration file path (toml)', + default: DEFAULT_CONFIG_PATH + }, + startBlock: { + type: 'number', + demandOption: true, + describe: 'Block number to start state valildation at' + }, + endBlock: { + type: 'number', + demandOption: true, + describe: 'Block number to stop state valildation at' + } + }).argv; + + const config: Config = await getConfig(argv.configFile); + const { ethClient, ethProvider } = await initClients(config); + + const db = new Database(config.database); + await db.init(); + + const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); + await graphDb.init(); + + const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); + + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + await indexer.init(); + + graphWatcher.setIndexer(indexer); + await graphWatcher.init(); + + await validateState(indexer, graphDb, graphWatcher.dataSources, argv); +}; + +export const validateState = async ( + indexer: Indexer, + graphDb: GraphDatabase, + dataSources: any[], + argv: { + startBlock: number, + endBlock: number + } +): Promise => { + const { startBlock, endBlock } = argv; + if (startBlock > endBlock) { + log('endBlock should be greater than or equal to startBlock'); + process.exit(1); + } + + log(`Validating state for subgraph entities in range: [${startBlock}, ${endBlock}]`); + + // Map: contractAddress -> entities updated + const contractEntitiesMap = getContractEntitiesMap(dataSources); + + console.time('time:validate-state'); + + let diffFound = false; + // Validate state for blocks in the given range + for (let blockNumber = startBlock; blockNumber <= endBlock && !diffFound; blockNumber++) { + console.time(`time:validate-state-${blockNumber}`); + + // Get the canonical block hash at current height + const blocks = await indexer.getBlocksAtHeight(blockNumber, false); + + if (blocks.length === 0) { + log(`block not found at height ${blockNumber}`); + process.exit(1); + } else if (blocks.length > 1) { + log(`found more than one non-pruned block at height ${blockNumber}`); + process.exit(1); + } + + const blockHash = blocks[0].blockHash; + + // Create state from entities of each contract in contractEntitiesMap + createStateFromUpdatedEntities(indexer, graphDb, blockHash, contractEntitiesMap); + + const comparisonPromises = Array.from(indexer._subgraphStateMap.entries()) + .map(async ([contractAddress, data]): Promise => { + const ipldBlock = await indexer.getLatestIPLDBlock(contractAddress, StateKind.Diff, blockNumber); + const ipldState = ipldBlock ? indexer.getIPLDData(ipldBlock).data.state : {}; + + return compareObjects(data, ipldState, false); + }); + + const comparisonResults = await Promise.all(comparisonPromises); + + comparisonResults.forEach(resultDiff => { + if (resultDiff) { + log('Results mismatch:', resultDiff); + diffFound = true; + } else { + log('Results match.'); + } + }); + + console.timeEnd(`time:validate-state-${blockNumber}`); + } + + console.timeEnd('time:validate-state'); + + log(`Validated state for subgraph entities in range: [${startBlock}, ${endBlock}]`); +}; + +main().catch(err => { + log(err); +}).finally(() => { + process.exit(); +}); + +// obj1: expected +// obj2: actual +const compareObjects = (obj1: any, obj2: any, rawJson: boolean): string => { + if (rawJson) { + const diffObj = diff(obj1, obj2); + + if (diffObj) { + // Use util.inspect to extend depth limit in the output. + return util.inspect(diffObj, false, null); + } + + return ''; + } else { + return diffString(obj1, obj2); + } +}; diff --git a/packages/eden-watcher/src/fill-state.ts b/packages/eden-watcher/src/fill-state.ts index 2e6639e8..b4b29441 100644 --- a/packages/eden-watcher/src/fill-state.ts +++ b/packages/eden-watcher/src/fill-state.ts @@ -38,15 +38,7 @@ export const fillState = async ( } // Map: contractAddress -> entities updated - const contractEntitiesMap: Map = new Map(); - - // Populate contractEntitiesMap using data sources from subgraph - // NOTE: Assuming each entity type is only mapped to a single contract - // This is true for eden subgraph; may not be the case for other subgraphs - dataSources.forEach((dataSource: any) => { - const { source: { address: contractAddress }, mapping: { entities } } = dataSource; - contractEntitiesMap.set(contractAddress, entities as string[]); - }); + const contractEntitiesMap = getContractEntitiesMap(dataSources); console.time('time:fill-state'); @@ -71,29 +63,7 @@ export const fillState = async ( await indexer.createInit(blockHash, blockNumber); // Fill state for each contract in contractEntitiesMap - const contractStatePromises = Array.from(contractEntitiesMap.entries()) - .map(async ([contractAddress, entities]): Promise => { - // Get all the updated entities at this block - const updatedEntitiesListPromises = entities.map(async (entity): Promise => { - return graphDb.getEntitiesForBlock(blockHash, entity); - }); - const updatedEntitiesList = await Promise.all(updatedEntitiesListPromises); - - // Populate state with all the updated entities of each entity type - updatedEntitiesList.forEach((updatedEntities, index) => { - const entityName = entities[index]; - - updatedEntities.forEach((updatedEntity) => { - // Prepare diff data for the entity update - const diffData = prepareEntityState(updatedEntity, entityName, indexer.getRelationsMap()); - - // Update the in-memory subgraph state - indexer.updateSubgraphState(contractAddress, diffData); - }); - }); - }); - - await Promise.all(contractStatePromises); + createStateFromUpdatedEntities(indexer, graphDb, blockHash, contractEntitiesMap); // Persist subgraph state to the DB await indexer.dumpSubgraphState(blockHash, true); @@ -110,3 +80,50 @@ export const fillState = async ( log(`Filled state for subgraph entities in range: [${startBlock}, ${endBlock}]`); }; + +export const getContractEntitiesMap = (dataSources: any[]): Map => { + // Map: contractAddress -> entities updated + const contractEntitiesMap: Map = new Map(); + + // Populate contractEntitiesMap using data sources from subgraph + // NOTE: Assuming each entity type is only mapped to a single contract + // This is true for eden subgraph; may not be the case for other subgraphs + dataSources.forEach((dataSource: any) => { + const { source: { address: contractAddress }, mapping: { entities } } = dataSource; + contractEntitiesMap.set(contractAddress, entities as string[]); + }); + + return contractEntitiesMap; +}; + +export const createStateFromUpdatedEntities = async ( + indexer: Indexer, + graphDb: GraphDatabase, + blockHash: string, + contractEntitiesMap: Map +): Promise => { + // Create state for each contract in contractEntitiesMap + const contractStatePromises = Array.from(contractEntitiesMap.entries()) + .map(async ([contractAddress, entities]): Promise => { + // Get all the updated entities at this block + const updatedEntitiesListPromises = entities.map(async (entity): Promise => { + return graphDb.getEntitiesForBlock(blockHash, entity); + }); + const updatedEntitiesList = await Promise.all(updatedEntitiesListPromises); + + // Populate state with all the updated entities of each entity type + updatedEntitiesList.forEach((updatedEntities, index) => { + const entityName = entities[index]; + + updatedEntities.forEach((updatedEntity) => { + // Prepare diff data for the entity update + const diffData = prepareEntityState(updatedEntity, entityName, indexer.getRelationsMap()); + + // Update the in-memory subgraph state + indexer.updateSubgraphState(contractAddress, diffData); + }); + }); + }); + + await Promise.all(contractStatePromises); +};