From 76522afa8da120172a3d775bb1ead617db157c13 Mon Sep 17 00:00:00 2001 From: Nabarun Gogoi Date: Mon, 15 May 2023 12:10:27 +0530 Subject: [PATCH] Implement CLI to create checkpoint state from existing GQL endpoint (#382) * Add util method for preparing state from GQL response entity * Refactor method to create or update state data * Fix typeorm bigint transformer and convert to checksum contract address * Skip resetting to previous block in job-runner if isComplete set to true * Fix creating subgraph event with struct params * Use quotes for table names in custom queries * Fix indexer prepareStateEntry method * Fix toEthereumValue method when used with ethereum.decode * Add CLI for creating snapshot checkpoint state from GQL endpoint * Skip import-state if block is already indexed * Review changes --- packages/cli/src/create-state-gql.ts | 289 +++++++++++++++++++++++++ packages/cli/src/import-state.ts | 11 +- packages/cli/src/index.ts | 1 + packages/util/src/database.ts | 4 +- packages/util/src/graph/state-utils.ts | 39 +++- packages/util/src/graph/utils.ts | 8 +- packages/util/src/indexer.ts | 21 +- packages/util/src/job-runner.ts | 20 +- packages/util/src/misc.ts | 12 +- packages/util/src/state-helper.ts | 44 +++- 10 files changed, 420 insertions(+), 29 deletions(-) create mode 100644 packages/cli/src/create-state-gql.ts diff --git a/packages/cli/src/create-state-gql.ts b/packages/cli/src/create-state-gql.ts new file mode 100644 index 00000000..6cd17c1a --- /dev/null +++ b/packages/cli/src/create-state-gql.ts @@ -0,0 +1,289 @@ +// +// Copyright 2023 Vulcanize, Inc. +// + +import yargs from 'yargs'; +import { hideBin } from 'yargs/helpers'; +import assert from 'assert'; +import { ConnectionOptions } from 'typeorm'; +import debug from 'debug'; +import pluralize from 'pluralize'; +import { merge } from 'lodash'; +import path from 'path'; +import fs from 'fs'; +import { ethers } from 'ethers'; + +import { JsonRpcProvider } from '@ethersproject/providers'; +import { + DEFAULT_CONFIG_PATH, + JobQueue, + DatabaseInterface, + IndexerInterface, + ServerConfig, + Clients, + GraphWatcherInterface, + Config, + BlockProgressInterface, + StateKind, + createOrUpdateStateData, + getContractEntitiesMap, + prepareEntityStateFromGQLResponse +} from '@cerc-io/util'; +import { GraphQLClient } from '@cerc-io/ipld-eth-client'; + +import { BaseCmd } from './base'; + +const log = debug('vulcanize:create-state-gql'); + +const ENTITIES_QUERY_LIMIT = 1000; + +interface Arguments { + configFile: string; + snapshotBlockHash: string; + output: string; + gqlEndpoint: string; +} + +export class CreateStateFromGQLCmd { + _argv?: Arguments; + _gqlClient?: GraphQLClient; + _baseCmd: BaseCmd; + _queries: { [key: string]: string }; + + constructor (queries: { [key: string]: string }) { + this._baseCmd = new BaseCmd(); + this._queries = queries; + } + + get config (): Config { + return this._baseCmd.config; + } + + get clients (): Clients { + return this._baseCmd.clients; + } + + get ethProvider (): JsonRpcProvider { + return this._baseCmd.ethProvider; + } + + get database (): DatabaseInterface { + return this._baseCmd.database; + } + + get indexer (): IndexerInterface { + return this._baseCmd.indexer; + } + + async initConfig (): Promise { + this._argv = this._getArgv(); + assert(this._argv); + this._gqlClient = new GraphQLClient({ gqlEndpoint: this._argv.gqlEndpoint }); + + return this._baseCmd.initConfig(this._argv.configFile); + } + + async init ( + Database: new ( + config: ConnectionOptions, + serverConfig?: ServerConfig + ) => DatabaseInterface, + clients: { [key: string]: any } = {} + ): Promise { + await this.initConfig(); + + await this._baseCmd.init(Database, clients); + } + + async initIndexer ( + Indexer: new ( + serverConfig: ServerConfig, + db: DatabaseInterface, + clients: Clients, + ethProvider: JsonRpcProvider, + jobQueue: JobQueue, + graphWatcher?: GraphWatcherInterface + ) => IndexerInterface, + graphWatcher?: GraphWatcherInterface + ): Promise { + return this._baseCmd.initIndexer(Indexer, graphWatcher); + } + + async exec (dataSources: any[]): Promise { + const indexer = this._baseCmd.indexer; + const database = this._baseCmd.database; + + assert(indexer); + assert(database); + assert(this._argv); + + const [block] = await indexer.getBlocks({ blockHash: this._argv.snapshotBlockHash }); + + if (!block) { + log(`No blocks fetched for block hash ${this._argv.snapshotBlockHash}, use an existing block`); + return; + } + + const blockProgress: Partial = { + ...block, + blockNumber: Number(block.blockNumber) + }; + + // Get watched contracts using subgraph dataSources + const watchedContracts = dataSources.map(dataSource => { + const { source: { address, startBlock }, name } = dataSource; + + return { + address: ethers.utils.getAddress(address), + kind: name, + checkpoint: true, + startingBlock: startBlock + }; + }); + + const exportData: any = { + snapshotBlock: { + blockNumber: blockProgress.blockNumber, + blockHash: blockProgress.blockHash + }, + contracts: watchedContracts, + stateCheckpoints: [] + }; + + // Get contractEntitiesMap + // NOTE: Assuming each entity type is only mapped to a single contract + // TODO: Decouple subgraph entities and contracts in watcher state + const contractEntitiesMap = getContractEntitiesMap(dataSources); + + // Create state checkpoint for each contract in contractEntitiesMap + const contractStatePromises = Array.from(contractEntitiesMap.entries()) + .map(async ([contractAddress, entities]): Promise => { + // Get all the updated entities at this block + const updatedGQLEntitiesListPromises = entities.map(async (entity): Promise> => { + assert(this._argv); + + // Get entities for block from GQL query + return this._getGQLEntitiesForSnapshotBlock(entity); + }); + + const updatedGQLEntitiesList = await Promise.all(updatedGQLEntitiesListPromises); + + let checkpointData = { state: {} }; + + // Populate checkpoint state with all the updated entities of each entity type + updatedGQLEntitiesList.forEach((updatedEntities, index) => { + const entityName = entities[index]; + + updatedEntities.forEach((updatedEntity) => { + assert(indexer.getRelationsMap); + + // Prepare diff data for the entity update + const diffData = prepareEntityStateFromGQLResponse(updatedEntity, entityName, indexer.getRelationsMap()); + + // Merge diffData for each entity + checkpointData = merge(checkpointData, diffData); + }); + }); + + assert(blockProgress.cid); + assert(blockProgress.blockNumber); + + const stateDataMeta = { + id: contractAddress, + kind: StateKind.Checkpoint, + parent: { + '/': null + }, + ethBlock: { + cid: { + '/': blockProgress.cid + }, + num: blockProgress.blockNumber + } + }; + + const { cid, data } = await createOrUpdateStateData( + checkpointData, + stateDataMeta + ); + + assert(data.meta); + + exportData.stateCheckpoints.push({ + contractAddress, + cid: cid.toString(), + kind: data.meta.kind, + data + }); + }); + + await Promise.all(contractStatePromises); + + if (this._argv.output) { + const codec = await import('@ipld/dag-cbor'); + const encodedExportData = codec.encode(exportData); + + const filePath = path.resolve(this._argv.output); + const fileDir = path.dirname(filePath); + + if (!fs.existsSync(fileDir)) fs.mkdirSync(fileDir, { recursive: true }); + + fs.writeFileSync(filePath, encodedExportData); + } else { + log(exportData); + } + + log(`Snapshot checkpoint state created at height ${blockProgress.blockNumber}`); + await database.close(); + } + + _getGQLEntitiesForSnapshotBlock = async (entityName: string): Promise> => { + const queryName = pluralize(`${entityName.charAt(0).toLowerCase().concat(entityName.slice(1))}`); + const gqlQuery = this._queries[queryName]; + + assert(this._argv); + assert(this._gqlClient); + + const block = { + hash: this._argv.snapshotBlockHash + }; + + const { gql } = await import('@apollo/client/core/index.js'); + + // TODO: Get all entity data using pagination as query limit is 1000 entities + const data = await this._gqlClient.query( + gql(gqlQuery), + { + block, + first: ENTITIES_QUERY_LIMIT + } + ); + + return data[queryName]; + }; + + _getArgv (): any { + return yargs(hideBin(process.argv)) + .option('configFile', { + alias: 'f', + demandOption: true, + describe: 'configuration file path (toml)', + type: 'string', + default: DEFAULT_CONFIG_PATH + }) + .option('output', { + alias: 'o', + type: 'string', + describe: 'Output file path of created checkpoint state' + }) + .option('snapshotBlockHash', { + type: 'string', + describe: 'Block hash to create snapshot at' + }) + .option('gqlEndpoint', { + type: 'string', + describe: 'GQL endpoint to fetch entities from' + }) + .argv; + } +} diff --git a/packages/cli/src/import-state.ts b/packages/cli/src/import-state.ts index 65877fbb..3f536479 100644 --- a/packages/cli/src/import-state.ts +++ b/packages/cli/src/import-state.ts @@ -114,6 +114,15 @@ export class ImportStateCmd { const codec = await import('@ipld/dag-cbor'); const importData = codec.decode(Buffer.from(encodedImportData)) as any; + let block = await indexer.getBlockProgress(importData.snapshotBlock.blockHash); + + // Check if block already present in DB + if (block) { + // Exit CLI if it already exists + log(`block ${block.blockHash} is already indexed. Exiting import-state CLI.`); + return; + } + // Fill the snapshot block. await fillBlocks( jobQueue, @@ -133,7 +142,7 @@ export class ImportStateCmd { } // Get the snapshot block. - const block = await indexer.getBlockProgress(importData.snapshotBlock.blockHash); + block = await indexer.getBlockProgress(importData.snapshotBlock.blockHash); assert(block); // Fill the States. diff --git a/packages/cli/src/index.ts b/packages/cli/src/index.ts index b4caf7d0..5b381481 100644 --- a/packages/cli/src/index.ts +++ b/packages/cli/src/index.ts @@ -14,5 +14,6 @@ export * from './server'; export * from './job-runner'; export * from './index-block'; export * from './fill'; +export * from './create-state-gql'; export * from './peer'; export * from './utils'; diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index 0f8138c3..135882d1 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -442,7 +442,7 @@ export class Database { FROM block_progress b LEFT JOIN - ${repo.metadata.tableName} e + "${repo.metadata.tableName}" e ON e.block_hash = b.block_hash AND e.id = $2 WHERE @@ -457,7 +457,7 @@ export class Database { FROM block_progress b LEFT JOIN - ${repo.metadata.tableName} e + "${repo.metadata.tableName}" e ON e.block_hash = b.block_hash AND e.id = $2 INNER JOIN diff --git a/packages/util/src/graph/state-utils.ts b/packages/util/src/graph/state-utils.ts index cdde0dd8..ba56a5e9 100644 --- a/packages/util/src/graph/state-utils.ts +++ b/packages/util/src/graph/state-utils.ts @@ -6,6 +6,7 @@ import assert from 'assert'; import debug from 'debug'; import _ from 'lodash'; import { Between, ValueTransformer } from 'typeorm'; +import { ethers } from 'ethers'; import { jsonBigIntStringReplacer } from '../misc'; import { IndexerInterface, StateInterface } from '../types'; @@ -55,6 +56,42 @@ export const prepareEntityState = (updatedEntity: any, entityName: string, relat return diffData; }; +export const prepareEntityStateFromGQLResponse = (entity: any, entityName: string, relationsMap: Map): any => { + // Prepare the diff data. + const diffData: any = { state: {} }; + + const result = Array.from(relationsMap.entries()) + .find(([key]) => key.name === entityName); + + if (result) { + // Update entity data if relations exist. + const [, relations] = result; + + // Update relation fields for diff data to be similar to GQL query entities. + Object.entries(relations).forEach(([relation, { isArray, isDerived }]) => { + if (isDerived || !entity[relation]) { + // Field is not present in dbData for derived relations + return; + } + + if (isArray) { + entity[relation] = entity[relation].map(({ id }: { id: string }) => ({ id })); + } else { + entity[relation] = { id: entity[relation].id }; + } + }); + } + + // Remove typename field included in GQL response + delete entity.__typename; + + diffData.state[entityName] = { + [entity.id]: entity + }; + + return diffData; +}; + export const updateEntitiesFromState = async (database: GraphDatabase, indexer: IndexerInterface, state: StateInterface): Promise => { const data = indexer.getStateData(state); @@ -114,7 +151,7 @@ export const getContractEntitiesMap = (dataSources: any[]): Map { const { source: { address: contractAddress }, mapping: { entities } } = dataSource; - contractEntitiesMap.set(contractAddress, entities as string[]); + contractEntitiesMap.set(ethers.utils.getAddress(contractAddress), entities as string[]); }); return contractEntitiesMap; diff --git a/packages/util/src/graph/utils.ts b/packages/util/src/graph/utils.ts index 60282b3f..1035834c 100644 --- a/packages/util/src/graph/utils.ts +++ b/packages/util/src/graph/utils.ts @@ -274,10 +274,14 @@ export const toEthereumValue = async (instanceExports: any, output: utils.ParamT // Get values for struct elements. const ethereumValuePromises = output.components .map( - async (component: utils.ParamType, index) => toEthereumValue( + async (component: utils.ParamType, index: number) => toEthereumValue( instanceExports, component, - value[index] + // Event data passed from watcher is not an array + // When method is used for ethereum.decode component.name can be null + // TODO: Pass event data as ethers.js Result interface + // https://docs.ethers.org/v5/api/utils/abi/interface/#Result + value[component.name] ?? value[index] ) ); diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index e0cc2c03..69f0a5a7 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -8,8 +8,6 @@ import debug from 'debug'; import JSONbig from 'json-bigint'; import { ethers } from 'ethers'; import _ from 'lodash'; -import { sha256 } from 'multiformats/hashes/sha2'; -import { CID } from 'multiformats/cid'; import * as codec from '@ipld/dag-cbor'; import { EthClient } from '@cerc-io/ipld-eth-client'; @@ -29,6 +27,7 @@ import { UNKNOWN_EVENT_NAME, JOB_KIND_CONTRACT, QUEUE_EVENT_PROCESSING, DIFF_MER import { JobQueue } from './job-queue'; import { Where, QueryOptions } from './database'; import { ServerConfig } from './config'; +import { createOrUpdateStateData, StateDataMeta } from './state-helper'; const DEFAULT_MAX_EVENTS_BLOCK_RANGE = 1000; @@ -873,6 +872,8 @@ export class Indexer { currentState = currentStates[0]; } + let stateDataMeta: StateDataMeta | undefined; + if (currentState) { // Update current State of same kind if it exists. stateEntry = currentState; @@ -888,7 +889,7 @@ export class Indexer { const parentState = await this._db.getLatestState(contractAddress, null, block.blockNumber); // Setting the meta-data for a State entry (done only once per State entry). - data.meta = { + stateDataMeta = { id: contractAddress, kind, parent: { @@ -903,21 +904,19 @@ export class Indexer { }; } - // Encoding the data using dag-cbor codec. - const bytes = codec.encode(data); + const { cid, data: { meta }, bytes } = await createOrUpdateStateData( + data, + stateDataMeta + ); - // Calculating sha256 (multi)hash of the encoded data. - const hash = await sha256.digest(bytes); - - // Calculating the CID: v1, code: dag-cbor, hash. - const cid = CID.create(1, codec.code, hash); + assert(meta); // Update stateEntry with new data. stateEntry = Object.assign(stateEntry, { block, contractAddress, cid: cid.toString(), - kind: data.meta.kind, + kind: meta.kind, data: Buffer.from(bytes) }); diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index e1d60dee..142811f7 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -218,12 +218,22 @@ export class JobRunner { async resetToPrevIndexedBlock (): Promise { const syncStatus = await this._indexer.getSyncStatus(); - if (syncStatus) { - // Resetting to block before latest indexed as all events might not be processed in latest indexed block. - // Reprocessing of events in subgraph watchers is not possible as DB transaction is not implemented. - // TODO: Check updating latestIndexedBlock after blockProgress.isComplete is set to true. - await this._indexer.resetWatcherToBlock(syncStatus.latestIndexedBlockNumber - 1); + // Watcher running for first time if syncStatus does not exist + if (!syncStatus) { + return; } + + const blockProgress = await this._indexer.getBlockProgress(syncStatus.latestIndexedBlockHash); + assert(blockProgress); + + // Don't reset to previous block if block is complete (all events processed) + if (blockProgress.isComplete) { + return; + } + + // Resetting to block before latest indexed block as all events should be processed in the previous block. + // Reprocessing of events in subgraph watchers is not possible as DB transaction is not implemented. + await this._indexer.resetWatcherToBlock(syncStatus.latestIndexedBlockNumber - 1); } handleShutdown (): void { diff --git a/packages/util/src/misc.ts b/packages/util/src/misc.ts index 7395b9b3..cc5dff92 100644 --- a/packages/util/src/misc.ts +++ b/packages/util/src/misc.ts @@ -37,14 +37,14 @@ export const wait = async (time: number): Promise => new Promise(resolve = */ export const graphDecimalTransformer: ValueTransformer = { to: (value?: GraphDecimal) => { - if (value !== undefined) { + if (value !== undefined && value !== null) { return value.toFixed(); } return value; }, from: (value?: string) => { - if (value !== undefined) { + if (value !== undefined && value !== null) { return new GraphDecimal(value); } @@ -57,14 +57,14 @@ export const graphDecimalTransformer: ValueTransformer = { */ export const decimalTransformer: ValueTransformer = { to: (value?: Decimal) => { - if (value !== undefined) { + if (value !== undefined && value !== null) { return value.toString(); } return value; }, from: (value?: string) => { - if (value !== undefined) { + if (value !== undefined && value !== null) { return new Decimal(value); } @@ -77,14 +77,14 @@ export const decimalTransformer: ValueTransformer = { */ export const bigintTransformer: ValueTransformer = { to: (value?: bigint) => { - if (value !== undefined) { + if (value !== undefined && value !== null) { return value.toString(); } return value; }, from: (value?: string) => { - if (value !== undefined) { + if (value !== undefined && value !== null) { return BigInt(value); } diff --git a/packages/util/src/state-helper.ts b/packages/util/src/state-helper.ts index 35ef2b46..6d98ebd2 100644 --- a/packages/util/src/state-helper.ts +++ b/packages/util/src/state-helper.ts @@ -1,13 +1,35 @@ import _ from 'lodash'; import debug from 'debug'; +import { sha256 } from 'multiformats/hashes/sha2'; +import { CID } from 'multiformats/cid'; + import * as codec from '@ipld/dag-cbor'; -import { BlockProgressInterface, GraphDatabaseInterface, StateInterface } from './types'; +import { BlockProgressInterface, GraphDatabaseInterface, StateInterface, StateKind } from './types'; import { jsonBigIntStringReplacer } from './misc'; import { ResultState } from './indexer'; const log = debug('vulcanize:state-helper'); +export interface StateDataMeta { + id: string + kind: StateKind + parent: { + '/': string | null + }, + ethBlock: { + cid: { + '/': string + }, + num: number + } +} + +interface StateData { + meta?: StateDataMeta; + state: any +} + export const updateStateForElementaryType = (initialObject: any, stateVariable: string, value: any): any => { const object = _.cloneDeep(initialObject); const path = ['state', stateVariable]; @@ -90,3 +112,23 @@ export const getResultState = (state: StateInterface): ResultState => { data: JSON.stringify(data) }; }; + +export const createOrUpdateStateData = async ( + data: StateData, + meta?: StateDataMeta +): Promise<{ cid: CID, data: StateData, bytes: codec.ByteView }> => { + if (meta) { + data.meta = meta; + } + + // Encoding the data using dag-cbor codec. + const bytes = codec.encode(data); + + // Calculating sha256 (multi)hash of the encoded data. + const hash = await sha256.digest(bytes); + + // Calculating the CID: v1, code: dag-cbor, hash. + const cid = CID.create(1, codec.code, hash); + + return { cid, data, bytes }; +};