From e30af92901af054875685c304bce8a63d28533b1 Mon Sep 17 00:00:00 2001 From: prathamesh0 <42446521+prathamesh0@users.noreply.github.com> Date: Fri, 9 Sep 2022 16:23:41 +0530 Subject: [PATCH] Add a CLI in eden-watcher to fill state for a given range (#176) * Add a CLI to fill state for a given range * Refactor code * Add a CLI to reset IPLD state * Replace ORDER BY clause in the query to get latest IPLD block * Optimize delete query in CLI to reset IPLD state * Add an option to decouple subgraph state creation from mapping code * Use a raw SQL query to delete IPLD blocks in a block range * Accomodate changes in codegen --- packages/codegen/src/client.ts | 1 - packages/codegen/src/fill.ts | 19 +++- packages/codegen/src/generate-code.ts | 22 +++- packages/codegen/src/indexer.ts | 2 +- packages/codegen/src/reset.ts | 14 ++- .../src/templates/config-template.handlebars | 6 + .../templates/database-template.handlebars | 6 + .../templates/fill-state-template.handlebars | 96 ++++++++++++++++ .../src/templates/fill-template.handlebars | 15 +++ .../src/templates/indexer-template.handlebars | 36 +++--- .../src/templates/package-template.handlebars | 3 + .../reset-ipld-state-template.handlebars | 55 +++++++++ packages/codegen/src/visitor.ts | 4 +- packages/eden-watcher/environments/local.toml | 6 + packages/eden-watcher/package.json | 1 + .../src/cli/reset-cmds/ipld-state.ts | 57 ++++++++++ packages/eden-watcher/src/database.ts | 6 + packages/eden-watcher/src/fill-state.ts | 104 ++++++++++++++++++ packages/eden-watcher/src/fill.ts | 11 ++ packages/eden-watcher/src/indexer.ts | 36 +++--- packages/graph-node/src/database.ts | 12 ++ packages/graph-node/src/index.ts | 1 + packages/graph-node/src/loader.ts | 56 ++-------- packages/graph-node/src/utils.ts | 45 +++++++- packages/graph-node/src/watcher.ts | 4 + packages/graph-node/test/utils/indexer.ts | 2 + packages/test/src/get-storage-at.ts | 4 +- packages/util/src/config.ts | 1 + packages/util/src/ipld-database.ts | 43 +++++++- 29 files changed, 569 insertions(+), 99 deletions(-) create mode 100644 packages/codegen/src/templates/fill-state-template.handlebars create mode 100644 packages/codegen/src/templates/reset-ipld-state-template.handlebars create mode 100644 packages/eden-watcher/src/cli/reset-cmds/ipld-state.ts create mode 100644 packages/eden-watcher/src/fill-state.ts diff --git a/packages/codegen/src/client.ts b/packages/codegen/src/client.ts index f06f5ea3..027a7753 100644 --- a/packages/codegen/src/client.ts +++ b/packages/codegen/src/client.ts @@ -26,7 +26,6 @@ export class Client { /** * Stores the query to be passed to the template. - * @param mode Code generation mode. * @param name Name of the query. * @param params Parameters to the query. * @param returnType Return type for the query. diff --git a/packages/codegen/src/fill.ts b/packages/codegen/src/fill.ts index 049a79b9..2dc254e2 100644 --- a/packages/codegen/src/fill.ts +++ b/packages/codegen/src/fill.ts @@ -7,15 +7,24 @@ import path from 'path'; import Handlebars from 'handlebars'; import { Writable } from 'stream'; -const TEMPLATE_FILE = './templates/fill-template.handlebars'; +const FILL_TEMPLATE_FILE = './templates/fill-template.handlebars'; +const FILL_STATE_TEMPLATE_FILE = './templates/fill-state-template.handlebars'; /** * Writes the fill file generated from a template to a stream. - * @param outStream A writable output stream to write the fill file to. + * @param fillOutStream A writable output stream to write the fill file to. + * @param fillStateOutStream A writable output stream to write the fill state file to. */ -export function exportFill (outStream: Writable, subgraphPath: string): void { - const templateString = fs.readFileSync(path.resolve(__dirname, TEMPLATE_FILE)).toString(); +export function exportFill (fillOutStream: Writable, fillStateOutStream: Writable | undefined, subgraphPath: string): void { + const templateString = fs.readFileSync(path.resolve(__dirname, FILL_TEMPLATE_FILE)).toString(); const template = Handlebars.compile(templateString); const fill = template({ subgraphPath }); - outStream.write(fill); + fillOutStream.write(fill); + + if (fillStateOutStream) { + const templateString = fs.readFileSync(path.resolve(__dirname, FILL_STATE_TEMPLATE_FILE)).toString(); + const template = Handlebars.compile(templateString); + const fillState = template({}); + fillStateOutStream.write(fillState); + } } diff --git a/packages/codegen/src/generate-code.ts b/packages/codegen/src/generate-code.ts index f7489763..041752c7 100644 --- a/packages/codegen/src/generate-code.ts +++ b/packages/codegen/src/generate-code.ts @@ -246,10 +246,18 @@ function generateWatcher (visitor: Visitor, contracts: any[], config: any) { : process.stdout; exportHooks(outStream); - outStream = outputDir + const fillOutStream = outputDir ? fs.createWriteStream(path.join(outputDir, 'src/fill.ts')) : process.stdout; - exportFill(outStream, config.subgraphPath); + + let fillStateOutStream; + if (config.subgraphPath) { + fillStateOutStream = outputDir + ? fs.createWriteStream(path.join(outputDir, 'src/fill-state.ts')) + : process.stdout; + } + + exportFill(fillOutStream, fillStateOutStream, config.subgraphPath); outStream = outputDir ? fs.createWriteStream(path.join(outputDir, 'src/types.ts')) @@ -273,19 +281,25 @@ function generateWatcher (visitor: Visitor, contracts: any[], config: any) { : process.stdout; visitor.exportClient(outStream, schemaContent, path.join(outputDir, 'src/gql')); - let resetOutStream, resetJQOutStream, resetStateOutStream; + let resetOutStream, resetJQOutStream, resetStateOutStream, resetIPLDStateOutStream; if (outputDir) { resetOutStream = fs.createWriteStream(path.join(outputDir, 'src/cli/reset.ts')); resetJQOutStream = fs.createWriteStream(path.join(outputDir, 'src/cli/reset-cmds/job-queue.ts')); resetStateOutStream = fs.createWriteStream(path.join(outputDir, 'src/cli/reset-cmds/state.ts')); + if (config.subgraphPath) { + resetIPLDStateOutStream = fs.createWriteStream(path.join(outputDir, 'src/cli/reset-cmds/ipld-state.ts')); + } } else { resetOutStream = process.stdout; resetJQOutStream = process.stdout; resetStateOutStream = process.stdout; + if (config.subgraphPath) { + resetIPLDStateOutStream = process.stdout; + } } - visitor.exportReset(resetOutStream, resetJQOutStream, resetStateOutStream, config.subgraphPath); + visitor.exportReset(resetOutStream, resetJQOutStream, resetStateOutStream, resetIPLDStateOutStream, config.subgraphPath); outStream = outputDir ? fs.createWriteStream(path.join(outputDir, 'src/cli/export-state.ts')) diff --git a/packages/codegen/src/indexer.ts b/packages/codegen/src/indexer.ts index eeea106d..cae33f5d 100644 --- a/packages/codegen/src/indexer.ts +++ b/packages/codegen/src/indexer.ts @@ -35,7 +35,7 @@ export class Indexer { * @param name Name of the query. * @param params Parameters to the query. * @param returnType Return type for the query. - * @param stateVariableTypeName Type of the state variable in case of state variable query. + * @param stateVariableType Type of the state variable in case of state variable query. */ addQuery (contract: string, mode: string, name: string, params: Array, returnType: string, stateVariableType?: string): void { // Check if the query is already added. diff --git a/packages/codegen/src/reset.ts b/packages/codegen/src/reset.ts index 7f173e22..39f17a99 100644 --- a/packages/codegen/src/reset.ts +++ b/packages/codegen/src/reset.ts @@ -10,25 +10,26 @@ import { Writable } from 'stream'; const RESET_TEMPLATE_FILE = './templates/reset-template.handlebars'; const RESET_JQ_TEMPLATE_FILE = './templates/reset-job-queue-template.handlebars'; const RESET_STATE_TEMPLATE_FILE = './templates/reset-state-template.handlebars'; +const RESET_IPLD_STATE_TEMPLATE_FILE = './templates/reset-ipld-state-template.handlebars'; export class Reset { _queries: Array; _resetTemplateString: string; _resetJQTemplateString: string; _resetStateTemplateString: string; + _resetIPLDStateTemplateString: string; constructor () { this._queries = []; this._resetTemplateString = fs.readFileSync(path.resolve(__dirname, RESET_TEMPLATE_FILE)).toString(); this._resetJQTemplateString = fs.readFileSync(path.resolve(__dirname, RESET_JQ_TEMPLATE_FILE)).toString(); this._resetStateTemplateString = fs.readFileSync(path.resolve(__dirname, RESET_STATE_TEMPLATE_FILE)).toString(); + this._resetIPLDStateTemplateString = fs.readFileSync(path.resolve(__dirname, RESET_IPLD_STATE_TEMPLATE_FILE)).toString(); } /** * Stores the query to be passed to the template. * @param name Name of the query. - * @param params Parameters to the query. - * @param returnType Return type for the query. */ addQuery (name: string): void { // Check if the query is already added. @@ -74,8 +75,9 @@ export class Reset { * @param resetOutStream A writable output stream to write the reset file to. * @param resetJQOutStream A writable output stream to write the reset job-queue file to. * @param resetStateOutStream A writable output stream to write the reset state file to. + * @param resetIPLDStateOutStream A writable output stream to write the reset IPLD state file to. */ - exportReset (resetOutStream: Writable, resetJQOutStream: Writable, resetStateOutStream: Writable, subgraphPath: string): void { + exportReset (resetOutStream: Writable, resetJQOutStream: Writable, resetStateOutStream: Writable, resetIPLDStateOutStream: Writable | undefined, subgraphPath: string): void { const resetTemplate = Handlebars.compile(this._resetTemplateString); const resetString = resetTemplate({}); resetOutStream.write(resetString); @@ -91,5 +93,11 @@ export class Reset { }; const resetState = resetStateTemplate(obj); resetStateOutStream.write(resetState); + + if (resetIPLDStateOutStream) { + const resetIPLDStateTemplate = Handlebars.compile(this._resetIPLDStateTemplateString); + const resetIPLDStateString = resetIPLDStateTemplate({}); + resetIPLDStateOutStream.write(resetIPLDStateString); + } } } diff --git a/packages/codegen/src/templates/config-template.handlebars b/packages/codegen/src/templates/config-template.handlebars index e9558cc2..17ad623a 100644 --- a/packages/codegen/src/templates/config-template.handlebars +++ b/packages/codegen/src/templates/config-template.handlebars @@ -14,6 +14,12 @@ {{#if (subgraphPath)}} subgraphPath = "{{subgraphPath}}" + + # Disable creation of state from subgraph entity updates + # CAUTION: Disable only if subgraph state is not desired or can be filled subsequently + disableSubgraphState = false + + # Interval to restart wasm instance periodically wasmRestartBlocksInterval = 20 {{/if}} diff --git a/packages/codegen/src/templates/database-template.handlebars b/packages/codegen/src/templates/database-template.handlebars index 4022807c..49de41f2 100644 --- a/packages/codegen/src/templates/database-template.handlebars +++ b/packages/codegen/src/templates/database-template.handlebars @@ -120,6 +120,12 @@ export class Database implements IPLDDatabaseInterface { await this._baseDatabase.removeIPLDBlocks(repo, blockNumber, kind); } + async removeIPLDBlocksInRange (dbTx: QueryRunner, startBlock: number, endBlock: number): Promise { + const repo = dbTx.manager.getRepository(IPLDBlock); + + await this._baseDatabase.removeIPLDBlocksInRange(repo, startBlock, endBlock); + } + async getIPLDStatus (): Promise { const repo = this._conn.getRepository(IpldStatus); diff --git a/packages/codegen/src/templates/fill-state-template.handlebars b/packages/codegen/src/templates/fill-state-template.handlebars new file mode 100644 index 00000000..089079d3 --- /dev/null +++ b/packages/codegen/src/templates/fill-state-template.handlebars @@ -0,0 +1,96 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import 'reflect-metadata'; +import debug from 'debug'; +import { Between } from 'typeorm'; + +import { Database as GraphDatabase, prepareEntityState } from '@vulcanize/graph-node'; + +import { Indexer } from './indexer'; + +const log = debug('vulcanize:fill-state'); + +export const fillState = async ( + indexer: Indexer, + graphDb: GraphDatabase, + dataSources: any[], + argv: { + startBlock: number, + endBlock: number + } +): Promise => { + const { startBlock, endBlock } = argv; + if (startBlock > endBlock) { + log('endBlock should be greater than or equal to startBlock'); + process.exit(1); + } + + // NOTE: Assuming all blocks in the given range are in the pruned region + log(`Filling state for subgraph entities in range: [${startBlock}, ${endBlock}]`); + + // Check that there are no existing diffs in this range + const existingStates = await indexer.getIPLDBlocks({ block: { blockNumber: Between(startBlock, endBlock) } }); + if (existingStates.length > 0) { + log('found existing state(s) in the given range'); + process.exit(1); + } + + // Map: contractAddress -> entities updated + const contractEntitiesMap: Map = new Map(); + + // Populate contractEntitiesMap using data sources from subgraph + // NOTE: Assuming each entity type is only mapped to a single contract + // This is true for eden subgraph; may not be the case for other subgraphs + dataSources.forEach((dataSource: any) => { + const { source: { address: contractAddress }, mapping: { entities } } = dataSource; + contractEntitiesMap.set(contractAddress, entities as string[]); + }); + + // Fill state for blocks in the given range + for (let blockNumber = startBlock; blockNumber <= endBlock; blockNumber++) { + // Get the canonical block hash at current height + const blocks = await indexer.getBlocksAtHeight(blockNumber, false); + + if (blocks.length === 0) { + log(`block not found at height ${blockNumber}`); + process.exit(1); + } else if (blocks.length > 1) { + log(`found more than one non-pruned block at height ${blockNumber}`); + process.exit(1); + } + + const blockHash = blocks[0].blockHash; + + // Fill state for each contract in contractEntitiesMap + const contractStatePromises = Array.from(contractEntitiesMap.entries()) + .map(async ([contractAddress, entities]): Promise => { + // Get all the updated entities at this block + const updatedEntitiesListPromises = entities.map(async (entity): Promise => { + return graphDb.getEntitiesForBlock(blockHash, entity); + }); + const updatedEntitiesList = await Promise.all(updatedEntitiesListPromises); + + // Populate state with all the updated entities of each entity type + updatedEntitiesList.forEach((updatedEntities, index) => { + const entityName = entities[index]; + + updatedEntities.forEach((updatedEntity) => { + // Prepare diff data for the entity update + const diffData = prepareEntityState(updatedEntity, entityName, indexer.getRelationsMap()); + + // Update the in-memory subgraph state + indexer.updateSubgraphState(contractAddress, diffData); + }); + }); + }); + + await Promise.all(contractStatePromises); + + // Persist subgraph state to the DB + await indexer.dumpSubgraphState(blockHash, true); + } + + log(`Filled state for subgraph entities in range: [${startBlock}, ${endBlock}]`); +}; diff --git a/packages/codegen/src/templates/fill-template.handlebars b/packages/codegen/src/templates/fill-template.handlebars index c5ee710d..fb1d1e2b 100644 --- a/packages/codegen/src/templates/fill-template.handlebars +++ b/packages/codegen/src/templates/fill-template.handlebars @@ -20,6 +20,9 @@ import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Database } from './database'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; +{{#if (subgraphPath)}} +import { fillState } from './fill-state'; +{{/if}} const log = debug('vulcanize:server'); @@ -41,6 +44,13 @@ export const main = async (): Promise => { demandOption: true, describe: 'Block number to start processing at' }, + {{#if (subgraphPath)}} + state: { + type: 'boolean', + default: false, + describe: 'Fill state for subgraph entities' + }, + {{/if}} endBlock: { type: 'number', demandOption: true, @@ -86,6 +96,11 @@ export const main = async (): Promise => { graphWatcher.setIndexer(indexer); await graphWatcher.init(); + + if (argv.state) { + await fillState(indexer, graphDb, graphWatcher.dataSources, argv); + return; + } {{/if}} // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index d5d960bd..d9762955 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -386,6 +386,10 @@ export class Indexer implements IPLDIndexerInterface { return this._baseIndexer.getIPLDBlockByCid(cid); } + async getIPLDBlocks (where: FindConditions): Promise { + return this._db.getIPLDBlocks(where); + } + getIPLDData (ipldBlock: IPLDBlock): any { return this._baseIndexer.getIPLDData(ipldBlock); } @@ -469,7 +473,7 @@ export class Indexer implements IPLDIndexerInterface { await this._graphWatcher.handleBlock(blockHash); // Persist subgraph state to the DB. - await this._dumpSubgraphState(blockHash); + await this.dumpSubgraphState(blockHash); } {{/if}} @@ -673,6 +677,23 @@ export class Indexer implements IPLDIndexerInterface { this._subgraphStateMap.set(contractAddress, updatedData); } + async dumpSubgraphState (blockHash: string, isStateFinalized = false): Promise { + // Create a diff for each contract in the subgraph state map. + const createDiffPromises = Array.from(this._subgraphStateMap.entries()) + .map(([contractAddress, data]): Promise => { + if (isStateFinalized) { + return this.createDiff(contractAddress, blockHash, data); + } + + return this.createDiffStaged(contractAddress, blockHash, data); + }); + + await Promise.all(createDiffPromises); + + // Reset the subgraph state map. + this._subgraphStateMap.clear(); + } + _populateEntityTypesMap (): void { {{#each subgraphEntities as | subgraphEntity |}} this._entityTypesMap.set('{{subgraphEntity.className}}', { @@ -710,19 +731,6 @@ export class Indexer implements IPLDIndexerInterface { {{/if}} {{/each}} } - - async _dumpSubgraphState (blockHash: string): Promise { - // Create a diff for each contract in the subgraph state map. - const createDiffPromises = Array.from(this._subgraphStateMap.entries()) - .map(([contractAddress, data]): Promise => { - return this.createDiffStaged(contractAddress, blockHash, data); - }); - - await Promise.all(createDiffPromises); - - // Reset the subgraph state map. - this._subgraphStateMap.clear(); - } {{/if}} async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { diff --git a/packages/codegen/src/templates/package-template.handlebars b/packages/codegen/src/templates/package-template.handlebars index c449e9f3..adb83838 100644 --- a/packages/codegen/src/templates/package-template.handlebars +++ b/packages/codegen/src/templates/package-template.handlebars @@ -15,6 +15,9 @@ "job-runner:dev": "DEBUG=vulcanize:* ts-node src/job-runner.ts", "watch:contract": "DEBUG=vulcanize:* ts-node src/cli/watch-contract.ts", "fill": "DEBUG=vulcanize:* ts-node src/fill.ts", + {{#if (subgraphPath)}} + "fill:state": "DEBUG=vulcanize:* ts-node src/fill.ts --state", + {{/if}} "reset": "DEBUG=vulcanize:* ts-node src/cli/reset.ts", "checkpoint": "DEBUG=vulcanize:* ts-node src/cli/checkpoint.ts", "export-state": "DEBUG=vulcanize:* ts-node src/cli/export-state.ts", diff --git a/packages/codegen/src/templates/reset-ipld-state-template.handlebars b/packages/codegen/src/templates/reset-ipld-state-template.handlebars new file mode 100644 index 00000000..ef04ad32 --- /dev/null +++ b/packages/codegen/src/templates/reset-ipld-state-template.handlebars @@ -0,0 +1,55 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import debug from 'debug'; + +import { getConfig } from '@vulcanize/util'; + +import { Database } from '../../database'; + +const log = debug('vulcanize:reset-ipld-state'); + +export const command = 'ipld-state'; + +export const desc = 'Reset IPLD state in the given range'; + +export const builder = { + startBlock: { + type: 'number' + }, + endBlock: { + type: 'number' + } +}; + +export const handler = async (argv: any): Promise => { + const { startBlock, endBlock } = argv; + if (startBlock > endBlock) { + log('endBlock should be greater than or equal to startBlock'); + process.exit(1); + } + + const config = await getConfig(argv.configFile); + + // Initialize database + const db = new Database(config.database); + await db.init(); + + // Create a DB transaction + const dbTx = await db.createTransactionRunner(); + + try { + // Delete all IPLDBlock entries in the given range + await db.removeIPLDBlocksInRange(dbTx, startBlock, endBlock); + + dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + + log(`Reset ipld-state successfully for range [${startBlock}, ${endBlock}]`); +}; diff --git a/packages/codegen/src/visitor.ts b/packages/codegen/src/visitor.ts index 9ab83324..812d5eca 100644 --- a/packages/codegen/src/visitor.ts +++ b/packages/codegen/src/visitor.ts @@ -215,8 +215,8 @@ export class Visitor { * @param resetJQOutStream A writable output stream to write the reset job-queue file to. * @param resetStateOutStream A writable output stream to write the reset state file to. */ - exportReset (resetOutStream: Writable, resetJQOutStream: Writable, resetStateOutStream: Writable, subgraphPath: string): void { - this._reset.exportReset(resetOutStream, resetJQOutStream, resetStateOutStream, subgraphPath); + exportReset (resetOutStream: Writable, resetJQOutStream: Writable, resetStateOutStream: Writable, resetIPLDStateOutStream: Writable | undefined, subgraphPath: string): void { + this._reset.exportReset(resetOutStream, resetJQOutStream, resetStateOutStream, resetIPLDStateOutStream, subgraphPath); } /** diff --git a/packages/eden-watcher/environments/local.toml b/packages/eden-watcher/environments/local.toml index 498c1285..a2c9e3a0 100644 --- a/packages/eden-watcher/environments/local.toml +++ b/packages/eden-watcher/environments/local.toml @@ -13,6 +13,12 @@ # ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001" subgraphPath = "../graph-node/test/subgraph/eden" + + # Disable creation of state from subgraph entity updates + # CAUTION: Disable only if subgraph state is not desired or can be filled subsequently + disableSubgraphState = false + + # Interval to restart wasm instance periodically wasmRestartBlocksInterval = 20 # Boolean to filter logs by contract. diff --git a/packages/eden-watcher/package.json b/packages/eden-watcher/package.json index 7f9caa8d..dbb06faf 100644 --- a/packages/eden-watcher/package.json +++ b/packages/eden-watcher/package.json @@ -15,6 +15,7 @@ "job-runner:dev": "DEBUG=vulcanize:* YARN_CHILD_PROCESS=true ts-node src/job-runner.ts", "watch:contract": "DEBUG=vulcanize:* ts-node src/cli/watch-contract.ts", "fill": "DEBUG=vulcanize:* ts-node src/fill.ts", + "fill:state": "DEBUG=vulcanize:* ts-node src/fill.ts --state", "reset": "DEBUG=vulcanize:* ts-node src/cli/reset.ts", "checkpoint": "DEBUG=vulcanize:* ts-node src/cli/checkpoint.ts", "export-state": "DEBUG=vulcanize:* ts-node src/cli/export-state.ts", diff --git a/packages/eden-watcher/src/cli/reset-cmds/ipld-state.ts b/packages/eden-watcher/src/cli/reset-cmds/ipld-state.ts new file mode 100644 index 00000000..f7de0f2a --- /dev/null +++ b/packages/eden-watcher/src/cli/reset-cmds/ipld-state.ts @@ -0,0 +1,57 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import debug from 'debug'; + +import { getConfig } from '@vulcanize/util'; + +import { Database } from '../../database'; + +const log = debug('vulcanize:reset-ipld-state'); + +export const command = 'ipld-state'; + +export const desc = 'Reset IPLD state in the given range'; + +export const builder = { + startBlock: { + type: 'number' + }, + endBlock: { + type: 'number' + } +}; + +export const handler = async (argv: any): Promise => { + const { startBlock, endBlock } = argv; + if (startBlock > endBlock) { + log('endBlock should be greater than or equal to startBlock'); + process.exit(1); + } + + const config = await getConfig(argv.configFile); + + // Initialize database + const db = new Database(config.database); + await db.init(); + + // Create a DB transaction + const dbTx = await db.createTransactionRunner(); + + console.time('time:reset-ipld-state'); + try { + // Delete all IPLDBlock entries in the given range + await db.removeIPLDBlocksInRange(dbTx, startBlock, endBlock); + + dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + console.timeEnd('time:reset-ipld-state'); + + log(`Reset ipld-state successfully for range [${startBlock}, ${endBlock}]`); +}; diff --git a/packages/eden-watcher/src/database.ts b/packages/eden-watcher/src/database.ts index 36cbd3d7..67d46f88 100644 --- a/packages/eden-watcher/src/database.ts +++ b/packages/eden-watcher/src/database.ts @@ -80,6 +80,12 @@ export class Database implements IPLDDatabaseInterface { await this._baseDatabase.removeIPLDBlocks(repo, blockNumber, kind); } + async removeIPLDBlocksInRange (dbTx: QueryRunner, startBlock: number, endBlock: number): Promise { + const repo = dbTx.manager.getRepository(IPLDBlock); + + await this._baseDatabase.removeIPLDBlocksInRange(repo, startBlock, endBlock); + } + async getIPLDStatus (): Promise { const repo = this._conn.getRepository(IpldStatus); diff --git a/packages/eden-watcher/src/fill-state.ts b/packages/eden-watcher/src/fill-state.ts new file mode 100644 index 00000000..c56bef80 --- /dev/null +++ b/packages/eden-watcher/src/fill-state.ts @@ -0,0 +1,104 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import 'reflect-metadata'; +import debug from 'debug'; +import { Between } from 'typeorm'; + +import { Database as GraphDatabase, prepareEntityState } from '@vulcanize/graph-node'; + +import { Indexer } from './indexer'; + +const log = debug('vulcanize:fill-state'); + +export const fillState = async ( + indexer: Indexer, + graphDb: GraphDatabase, + dataSources: any[], + argv: { + startBlock: number, + endBlock: number + } +): Promise => { + const { startBlock, endBlock } = argv; + if (startBlock > endBlock) { + log('endBlock should be greater than or equal to startBlock'); + process.exit(1); + } + + // NOTE: Assuming all blocks in the given range are in the pruned region + log(`Filling state for subgraph entities in range: [${startBlock}, ${endBlock}]`); + + // Check that there are no existing diffs in this range + const existingStates = await indexer.getIPLDBlocks({ block: { blockNumber: Between(startBlock, endBlock) } }); + if (existingStates.length > 0) { + log('found existing state(s) in the given range'); + process.exit(1); + } + + // Map: contractAddress -> entities updated + const contractEntitiesMap: Map = new Map(); + + // Populate contractEntitiesMap using data sources from subgraph + // NOTE: Assuming each entity type is only mapped to a single contract + // This is true for eden subgraph; may not be the case for other subgraphs + dataSources.forEach((dataSource: any) => { + const { source: { address: contractAddress }, mapping: { entities } } = dataSource; + contractEntitiesMap.set(contractAddress, entities as string[]); + }); + + console.time('time:fill-state'); + + // Fill state for blocks in the given range + for (let blockNumber = startBlock; blockNumber <= endBlock; blockNumber++) { + console.time(`time:fill-state-${blockNumber}`); + + // Get the canonical block hash at current height + const blocks = await indexer.getBlocksAtHeight(blockNumber, false); + + if (blocks.length === 0) { + log(`block not found at height ${blockNumber}`); + process.exit(1); + } else if (blocks.length > 1) { + log(`found more than one non-pruned block at height ${blockNumber}`); + process.exit(1); + } + + const blockHash = blocks[0].blockHash; + + // Fill state for each contract in contractEntitiesMap + const contractStatePromises = Array.from(contractEntitiesMap.entries()) + .map(async ([contractAddress, entities]): Promise => { + // Get all the updated entities at this block + const updatedEntitiesListPromises = entities.map(async (entity): Promise => { + return graphDb.getEntitiesForBlock(blockHash, entity); + }); + const updatedEntitiesList = await Promise.all(updatedEntitiesListPromises); + + // Populate state with all the updated entities of each entity type + updatedEntitiesList.forEach((updatedEntities, index) => { + const entityName = entities[index]; + + updatedEntities.forEach((updatedEntity) => { + // Prepare diff data for the entity update + const diffData = prepareEntityState(updatedEntity, entityName, indexer.getRelationsMap()); + + // Update the in-memory subgraph state + indexer.updateSubgraphState(contractAddress, diffData); + }); + }); + }); + + await Promise.all(contractStatePromises); + + // Persist subgraph state to the DB + await indexer.dumpSubgraphState(blockHash, true); + + console.timeEnd(`time:fill-state-${blockNumber}`); + } + + console.timeEnd('time:fill-state'); + + log(`Filled state for subgraph entities in range: [${startBlock}, ${endBlock}]`); +}; diff --git a/packages/eden-watcher/src/fill.ts b/packages/eden-watcher/src/fill.ts index af2e61a4..43a1fc68 100644 --- a/packages/eden-watcher/src/fill.ts +++ b/packages/eden-watcher/src/fill.ts @@ -16,6 +16,7 @@ import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Database } from './database'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; +import { fillState } from './fill-state'; const log = debug('vulcanize:server'); @@ -51,6 +52,11 @@ export const main = async (): Promise => { type: 'number', default: 10, describe: 'Number of blocks prefetched in batch' + }, + state: { + type: 'boolean', + default: false, + describe: 'Fill state for subgraph entities' } }).argv; @@ -80,6 +86,11 @@ export const main = async (): Promise => { graphWatcher.setIndexer(indexer); await graphWatcher.init(); + if (argv.state) { + await fillState(indexer, graphDb, graphWatcher.dataSources, argv); + return; + } + // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); diff --git a/packages/eden-watcher/src/indexer.ts b/packages/eden-watcher/src/indexer.ts index b5c25526..65060432 100644 --- a/packages/eden-watcher/src/indexer.ts +++ b/packages/eden-watcher/src/indexer.ts @@ -308,6 +308,10 @@ export class Indexer implements IPLDIndexerInterface { return this._baseIndexer.getIPLDBlockByCid(cid); } + async getIPLDBlocks (where: FindConditions): Promise { + return this._db.getIPLDBlocks(where); + } + async getDiffIPLDBlocksInRange (contractAddress: string, startBlock: number, endBlock: number): Promise { return this._db.getDiffIPLDBlocksInRange(contractAddress, startBlock, endBlock); } @@ -410,7 +414,7 @@ export class Indexer implements IPLDIndexerInterface { console.time('time:indexer#processBlockAfterEvents-dump_subgraph_state'); // Persist subgraph state to the DB. - await this._dumpSubgraphState(blockHash); + await this.dumpSubgraphState(blockHash); console.timeEnd('time:indexer#processBlockAfterEvents-dump_subgraph_state'); } @@ -610,6 +614,23 @@ export class Indexer implements IPLDIndexerInterface { this._subgraphStateMap.set(contractAddress, updatedData); } + async dumpSubgraphState (blockHash: string, isStateFinalized = false): Promise { + // Create a diff for each contract in the subgraph state map. + const createDiffPromises = Array.from(this._subgraphStateMap.entries()) + .map(([contractAddress, data]): Promise => { + if (isStateFinalized) { + return this.createDiff(contractAddress, blockHash, data); + } + + return this.createDiffStaged(contractAddress, blockHash, data); + }); + + await Promise.all(createDiffPromises); + + // Reset the subgraph state map. + this._subgraphStateMap.clear(); + } + _populateEntityTypesMap (): void { this._entityTypesMap.set( 'Producer', @@ -971,19 +992,6 @@ export class Indexer implements IPLDIndexerInterface { }); } - async _dumpSubgraphState (blockHash: string): Promise { - // Create a diff for each contract in the subgraph state map. - const createDiffPromises = Array.from(this._subgraphStateMap.entries()) - .map(([contractAddress, data]): Promise => { - return this.createDiffStaged(contractAddress, blockHash, data); - }); - - await Promise.all(createDiffPromises); - - // Reset the subgraph state map. - this._subgraphStateMap.clear(); - } - async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { assert(blockHash); const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash }); diff --git a/packages/graph-node/src/database.ts b/packages/graph-node/src/database.ts index 8d6e417b..b549fe02 100644 --- a/packages/graph-node/src/database.ts +++ b/packages/graph-node/src/database.ts @@ -82,6 +82,18 @@ export class Database { } } + async getEntitiesForBlock (blockHash: string, tableName: string): Promise { + const repo = this._conn.getRepository(tableName); + + const entities = await repo.find({ + where: { + blockHash + } + }); + + return entities; + } + async getEntityIdsAtBlockNumber (blockNumber: number, tableName: string): Promise { const repo = this._conn.getRepository(tableName); diff --git a/packages/graph-node/src/index.ts b/packages/graph-node/src/index.ts index e24d7051..351f260a 100644 --- a/packages/graph-node/src/index.ts +++ b/packages/graph-node/src/index.ts @@ -1,2 +1,3 @@ export * from './watcher'; export * from './database'; +export { prepareEntityState } from './utils'; diff --git a/packages/graph-node/src/loader.ts b/packages/graph-node/src/loader.ts index 6401749a..34caab59 100644 --- a/packages/graph-node/src/loader.ts +++ b/packages/graph-node/src/loader.ts @@ -22,10 +22,10 @@ import { Block, fromEthereumValue, toEthereumValue, - resolveEntityFieldConflicts, getEthereumTypes, jsonFromBytes, - getStorageValueType + getStorageValueType, + prepareEntityState } from './utils'; import { Database } from './database'; @@ -94,53 +94,19 @@ export const instantiate = async ( const entityInstance = await Entity.wrap(data); assert(context.block); - let dbData = await database.fromGraphEntity(instanceExports, context.block, entityName, entityInstance); + const dbData = await database.fromGraphEntity(instanceExports, context.block, entityName, entityInstance); await database.saveEntity(entityName, dbData); - // Resolve any field name conflicts in the dbData for auto-diff. - dbData = resolveEntityFieldConflicts(dbData); + // Update the in-memory subgraph state if not disabled. + if (!indexer.serverConfig.disableSubgraphState) { + // Prepare diff data for the entity update + assert(indexer.getRelationsMap); + const diffData = prepareEntityState(dbData, entityName, indexer.getRelationsMap()); - // Prepare the diff data. - const diffData: any = { state: {} }; - assert(indexer.getRelationsMap); - - const result = Array.from(indexer.getRelationsMap().entries()) - .find(([key]) => key.name === entityName); - - if (result) { - // Update dbData if relations exist. - const [_, relations] = result; - - // Update relation fields for diff data to be similar to GQL query entities. - Object.entries(relations).forEach(([relation, { isArray, isDerived }]) => { - if (isDerived || !dbData[relation]) { - // Field is not present in dbData for derived relations - return; - } - - if (isArray) { - dbData[relation] = dbData[relation] - .map((id: string) => ({ id })) - .sort((a: any, b: any) => a.id.localeCompare(b.id)); - } else { - dbData[relation] = { id: dbData[relation] }; - } - }); + assert(indexer.updateSubgraphState); + assert(context.contractAddress); + indexer.updateSubgraphState(context.contractAddress, diffData); } - - // JSON stringify and parse data for handling unknown types when encoding. - // For example, decimal.js values are converted to string in the diff data. - diffData.state[entityName] = { - // Using custom replacer to store bigints as string values to be encoded by IPLD dag-cbor. - // TODO: Parse and store as native bigint by using Type encoders in IPLD dag-cbor encode. - // https://github.com/rvagg/cborg#type-encoders - [dbData.id]: JSON.parse(JSON.stringify(dbData, jsonBigIntStringReplacer)) - }; - - // Update the in-memory subgraph state. - assert(indexer.updateSubgraphState); - assert(context.contractAddress); - indexer.updateSubgraphState(context.contractAddress, diffData); }, 'log.log': (level: number, msg: number) => { diff --git a/packages/graph-node/src/utils.ts b/packages/graph-node/src/utils.ts index ede17b6c..54a0cb47 100644 --- a/packages/graph-node/src/utils.ts +++ b/packages/graph-node/src/utils.ts @@ -6,7 +6,7 @@ import yaml from 'js-yaml'; import { ColumnMetadata } from 'typeorm/metadata/ColumnMetadata'; import assert from 'assert'; -import { GraphDecimal } from '@vulcanize/util'; +import { GraphDecimal, jsonBigIntStringReplacer } from '@vulcanize/util'; import { TypeId, EthereumValueKind, ValueKind } from './types'; import { MappingKey, StorageLayout } from '@vulcanize/solidity-mapper'; @@ -798,3 +798,46 @@ const getEthereumType = (storageTypes: StorageLayout['types'], type: string, map return utils.ParamType.from(label); }; + +export const prepareEntityState = (updatedEntity: any, entityName: string, relationsMap: Map): any => { + // Resolve any field name conflicts in the dbData for auto-diff. + updatedEntity = resolveEntityFieldConflicts(updatedEntity); + + // Prepare the diff data. + const diffData: any = { state: {} }; + + const result = Array.from(relationsMap.entries()) + .find(([key]) => key.name === entityName); + + if (result) { + // Update entity data if relations exist. + const [_, relations] = result; + + // Update relation fields for diff data to be similar to GQL query entities. + Object.entries(relations).forEach(([relation, { isArray, isDerived }]) => { + if (isDerived || !updatedEntity[relation]) { + // Field is not present in dbData for derived relations + return; + } + + if (isArray) { + updatedEntity[relation] = updatedEntity[relation] + .map((id: string) => ({ id })) + .sort((a: any, b: any) => a.id.localeCompare(b.id)); + } else { + updatedEntity[relation] = { id: updatedEntity[relation] }; + } + }); + } + + // JSON stringify and parse data for handling unknown types when encoding. + // For example, decimal.js values are converted to string in the diff data. + diffData.state[entityName] = { + // Using custom replacer to store bigints as string values to be encoded by IPLD dag-cbor. + // TODO: Parse and store as native bigint by using Type encoders in IPLD dag-cbor encode. + // https://github.com/rvagg/cborg#type-encoders + [updatedEntity.id]: JSON.parse(JSON.stringify(updatedEntity, jsonBigIntStringReplacer)) + }; + + return diffData; +}; diff --git a/packages/graph-node/src/watcher.ts b/packages/graph-node/src/watcher.ts index 0e8863c8..a7c521dc 100644 --- a/packages/graph-node/src/watcher.ts +++ b/packages/graph-node/src/watcher.ts @@ -101,6 +101,10 @@ export class GraphWatcher { }, {}); } + get dataSources (): any[] { + return this._dataSources; + } + async addContracts () { assert(this._indexer); assert(this._indexer.watchContract); diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index c813cdcb..fe1a20a2 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -188,6 +188,7 @@ class ServerConfig implements ServerConfigInterface { checkpointInterval: number; ipfsApiAddr: string; subgraphPath: string; + disableSubgraphState: boolean; wasmRestartBlocksInterval: number; filterLogs: boolean; maxEventsBlockRange: number; @@ -201,6 +202,7 @@ class ServerConfig implements ServerConfigInterface { this.checkpointInterval = 0; this.ipfsApiAddr = ''; this.subgraphPath = ''; + this.disableSubgraphState = false; this.wasmRestartBlocksInterval = 0; this.filterLogs = false; this.maxEventsBlockRange = 0; diff --git a/packages/test/src/get-storage-at.ts b/packages/test/src/get-storage-at.ts index 0a4dcdea..ad81b2c1 100644 --- a/packages/test/src/get-storage-at.ts +++ b/packages/test/src/get-storage-at.ts @@ -3,11 +3,9 @@ // import yargs from 'yargs'; -import { ethers, providers } from 'ethers'; +import { providers } from 'ethers'; import debug from 'debug'; -import { readAbi } from './common'; - const log = debug('vulcanize:test'); const main = async (): Promise => { diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index e0ff9f23..6854cbe4 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -33,6 +33,7 @@ export interface ServerConfig { checkpointInterval: number; ipfsApiAddr: string; subgraphPath: string; + disableSubgraphState: boolean; wasmRestartBlocksInterval: number; filterLogs: boolean; maxEventsBlockRange: number; diff --git a/packages/util/src/ipld-database.ts b/packages/util/src/ipld-database.ts index bd806d20..db31c7b4 100644 --- a/packages/util/src/ipld-database.ts +++ b/packages/util/src/ipld-database.ts @@ -22,16 +22,33 @@ export class IPLDDatabase extends Database { queryBuilder.andWhere('block.block_number <= :blockNumber', { blockNumber }); } - // Filter using kind if specified else order by id to give preference to checkpoint. + // Filter using kind if specified else avoid diff_staged block. queryBuilder = kind ? queryBuilder.andWhere('ipld_block.kind = :kind', { kind }) - : queryBuilder.andWhere('ipld_block.kind != :kind', { kind: StateKind.DiffStaged }) - .addOrderBy('ipld_block.id', 'DESC'); + : queryBuilder.andWhere('ipld_block.kind != :kind', { kind: StateKind.DiffStaged }); - // Get the first entry. - queryBuilder.limit(1); + // Get the first two entries. + queryBuilder.limit(2); - return queryBuilder.getOne(); + const results = await queryBuilder.getMany(); + + switch (results.length) { + case 0: + // No result found. + return; + case 1: + // Return the only IPLD block entry found. + return results[0]; + case 2: + // If there are two entries in the result and both are at the same block number, give preference to checkpoint kind. + if (results[0].block.blockNumber === results[1].block.blockNumber) { + return (results[1].kind === StateKind.Checkpoint) ? results[1] : results[0]; + } else { + return results[0]; + } + default: + throw new Error(`Unexpected results length ${results.length}`); + } } async getPrevIPLDBlock (repo: Repository, blockHash: string, contractAddress: string, kind?: string): Promise { @@ -148,6 +165,20 @@ export class IPLDDatabase extends Database { } } + async removeIPLDBlocksInRange (repo: Repository, startBlock: number, endBlock: number): Promise { + // Use raw SQL as TypeORM curently doesn't support delete via 'join' or 'using' + const deleteQuery = ` + DELETE FROM + ipld_block + USING block_progress + WHERE + ipld_block.block_id = block_progress.id + AND block_progress.block_number BETWEEN $1 AND $2; + `; + + await repo.query(deleteQuery, [startBlock, endBlock]); + } + async getIPLDStatus (repo: Repository): Promise { return repo.findOne(); }