From aba0c665f33e7285562d06557379ff83f9587f18 Mon Sep 17 00:00:00 2001 From: prathamesh0 <42446521+prathamesh0@users.noreply.github.com> Date: Thu, 24 Nov 2022 03:58:38 -0600 Subject: [PATCH] Refactor fill and fill-state CLIs to cli package (#257) * Refactor fill CLI to cli package * Refactor method to fill-state to graph-node * Refactor fill-state CLI to cli package * Move subgraph state utils to a separate file * Refactor subgraph state helper methods to graph-node * Update mock indexer * Move watcher job-runner to util * Remove mock server and data from erc20-watcher * Import watcher job-runner from util --- packages/cli/src/fill.ts | 159 +++++++++++++ packages/cli/src/index.ts | 1 + packages/cli/src/job-runner.ts | 45 +--- .../templates/database-template.handlebars | 4 + .../src/templates/indexer-template.handlebars | 35 ++- packages/eden-watcher/src/database.ts | 4 + packages/eden-watcher/src/fill-state.ts | 112 ---------- packages/eden-watcher/src/fill.ts | 97 ++------ packages/eden-watcher/src/indexer.ts | 31 +-- packages/eden-watcher/src/job-runner.ts | 3 +- packages/erc20-watcher/README.md | 12 - packages/erc20-watcher/package.json | 2 - packages/erc20-watcher/src/database.ts | 4 + packages/erc20-watcher/src/fill.ts | 71 +----- packages/erc20-watcher/src/indexer.ts | 18 +- packages/erc20-watcher/src/job-runner.ts | 3 +- packages/erc20-watcher/src/mock/data.ts | 50 ----- packages/erc20-watcher/src/mock/resolvers.ts | 90 -------- .../erc20-watcher/src/mock/server.test.ts | 173 --------------- packages/erc20-watcher/src/server.ts | 3 +- packages/erc721-watcher/src/database.ts | 4 + packages/erc721-watcher/src/fill.ts | 69 +----- packages/erc721-watcher/src/indexer.ts | 8 + packages/erc721-watcher/src/job-runner.ts | 3 +- packages/graph-node/src/index.ts | 3 +- packages/graph-node/src/loader.ts | 4 +- packages/graph-node/src/state-utils.ts | 208 ++++++++++++++++++ packages/graph-node/src/utils.ts | 75 +------ packages/graph-node/src/watcher.ts | 3 +- packages/graph-node/test/utils/indexer.ts | 14 +- packages/graph-test-watcher/src/database.ts | 4 + packages/graph-test-watcher/src/fill-state.ts | 112 ---------- packages/graph-test-watcher/src/fill.ts | 95 +------- packages/graph-test-watcher/src/indexer.ts | 31 +-- packages/graph-test-watcher/src/job-runner.ts | 3 +- packages/mobymask-watcher/src/database.ts | 4 + packages/mobymask-watcher/src/fill.ts | 69 +----- packages/mobymask-watcher/src/indexer.ts | 8 + packages/mobymask-watcher/src/job-runner.ts | 3 +- packages/util/src/database.ts | 12 + packages/util/src/job-runner.ts | 42 +++- packages/util/src/types.ts | 6 + 42 files changed, 597 insertions(+), 1100 deletions(-) create mode 100644 packages/cli/src/fill.ts delete mode 100644 packages/eden-watcher/src/fill-state.ts delete mode 100644 packages/erc20-watcher/src/mock/data.ts delete mode 100644 packages/erc20-watcher/src/mock/resolvers.ts delete mode 100644 packages/erc20-watcher/src/mock/server.test.ts create mode 100644 packages/graph-node/src/state-utils.ts delete mode 100644 packages/graph-test-watcher/src/fill-state.ts diff --git a/packages/cli/src/fill.ts b/packages/cli/src/fill.ts new file mode 100644 index 00000000..791854ca --- /dev/null +++ b/packages/cli/src/fill.ts @@ -0,0 +1,159 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import assert from 'assert'; +import 'reflect-metadata'; +import debug from 'debug'; +import yargs from 'yargs'; +import { hideBin } from 'yargs/helpers'; +import { ConnectionOptions } from 'typeorm'; +import { PubSub } from 'graphql-subscriptions'; + +import { JsonRpcProvider } from '@ethersproject/providers'; +import { GraphWatcher, fillState } from '@cerc-io/graph-node'; +import { EthClient } from '@cerc-io/ipld-eth-client'; +import { + DEFAULT_CONFIG_PATH, + JobQueue, + DatabaseInterface, + IndexerInterface, + ServerConfig, + Clients, + EventWatcherInterface, + fillBlocks +} from '@cerc-io/util'; + +import { BaseCmd } from './base'; + +const log = debug('vulcanize:fill'); + +interface Arguments { + configFile: string; + startBlock: number; + endBlock: number; + prefetch: boolean; + batchBlocks: number; + state: boolean; +} + +export class FillCmd { + _argv?: Arguments + _baseCmd: BaseCmd; + + constructor () { + this._baseCmd = new BaseCmd(); + } + + get indexer (): IndexerInterface | undefined { + return this._baseCmd.indexer; + } + + async initConfig (): Promise { + this._argv = this._getArgv(); + assert(this._argv); + + return this._baseCmd.initConfig(this._argv.configFile); + } + + async init ( + Database: new (config: ConnectionOptions, + serverConfig?: ServerConfig + ) => DatabaseInterface, + Indexer: new ( + serverConfig: ServerConfig, + db: DatabaseInterface, + clients: Clients, + ethProvider: JsonRpcProvider, + jobQueue: JobQueue, + graphWatcher?: GraphWatcher + ) => IndexerInterface, + EventWatcher: new( + ethClient: EthClient, + indexer: IndexerInterface, + pubsub: PubSub, + jobQueue: JobQueue + ) => EventWatcherInterface, + clients: { [key: string]: any } = {}, + entityQueryTypeMap?: Map, + entityToLatestEntityMap?: Map + ): Promise { + await this.initConfig(); + + await this._baseCmd.init(Database, Indexer, clients, entityQueryTypeMap, entityToLatestEntityMap); + await this._baseCmd.initEventWatcher(EventWatcher); + } + + async exec (contractEntitiesMap: Map = new Map()): Promise { + assert(this._argv); + + const config = this._baseCmd.config; + const jobQueue = this._baseCmd.jobQueue; + const database = this._baseCmd.database; + const indexer = this._baseCmd.indexer; + const eventWatcher = this._baseCmd.eventWatcher; + + assert(config); + assert(jobQueue); + assert(database); + assert(indexer); + assert(eventWatcher); + + if (this._argv.state) { + assert(config.server.enableState, 'State creation disabled'); + + const { startBlock, endBlock } = this._argv; + + // NOTE: Assuming all blocks in the given range are in the pruned region + log(`Filling state for subgraph entities in range: [${startBlock}, ${endBlock}]`); + await fillState(indexer, contractEntitiesMap, this._argv); + log(`Filled state for subgraph entities in range: [${startBlock}, ${endBlock}]`); + } else { + await fillBlocks(jobQueue, indexer, eventWatcher, config.jobQueue.blockDelayInMilliSecs, this._argv); + } + + await database.close(); + } + + _getArgv (): any { + return yargs(hideBin(process.argv)).parserConfiguration({ + 'parse-numbers': false + }).env( + 'FILL' + ).options({ + configFile: { + alias: 'f', + type: 'string', + require: true, + demandOption: true, + describe: 'Configuration file path (toml)', + default: DEFAULT_CONFIG_PATH + }, + startBlock: { + type: 'number', + demandOption: true, + describe: 'Block number to start processing at' + }, + endBlock: { + type: 'number', + demandOption: true, + describe: 'Block number to stop processing at' + }, + prefetch: { + type: 'boolean', + default: false, + describe: 'Block and events prefetch mode' + }, + batchBlocks: { + type: 'number', + default: 10, + describe: 'Number of blocks prefetched in batch' + }, + state: { + type: 'boolean', + default: false, + describe: 'Fill state for subgraph entities' + } + }).argv; + } +} diff --git a/packages/cli/src/index.ts b/packages/cli/src/index.ts index 2ce4d508..95b637d6 100644 --- a/packages/cli/src/index.ts +++ b/packages/cli/src/index.ts @@ -13,3 +13,4 @@ export * from './export-state'; export * from './server'; export * from './job-runner'; export * from './index-block'; +export * from './fill'; diff --git a/packages/cli/src/job-runner.ts b/packages/cli/src/job-runner.ts index b03a94a3..9e69598b 100644 --- a/packages/cli/src/job-runner.ts +++ b/packages/cli/src/job-runner.ts @@ -17,12 +17,7 @@ import { IndexerInterface, ServerConfig, Clients, - JobRunner as BaseJobRunner, - JobQueueConfig, - QUEUE_BLOCK_PROCESSING, - QUEUE_EVENT_PROCESSING, - QUEUE_BLOCK_CHECKPOINT, - QUEUE_HOOKS, + WatcherJobRunner as JobRunner, startMetricsServer } from '@cerc-io/util'; @@ -112,41 +107,3 @@ export class JobRunnerCmd { .argv; } } - -export class JobRunner { - jobQueue: JobQueue - baseJobRunner: BaseJobRunner - _indexer: IndexerInterface - _jobQueueConfig: JobQueueConfig - - constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) { - this._jobQueueConfig = jobQueueConfig; - this._indexer = indexer; - this.jobQueue = jobQueue; - this.baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this.jobQueue); - } - - async subscribeBlockProcessingQueue (): Promise { - await this.jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { - await this.baseJobRunner.processBlock(job); - }); - } - - async subscribeEventProcessingQueue (): Promise { - await this.jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { - await this.baseJobRunner.processEvent(job); - }); - } - - async subscribeHooksQueue (): Promise { - await this.jobQueue.subscribe(QUEUE_HOOKS, async (job) => { - await this.baseJobRunner.processHooks(job); - }); - } - - async subscribeBlockCheckpointQueue (): Promise { - await this.jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => { - await this.baseJobRunner.processCheckpoint(job); - }); - } -} diff --git a/packages/codegen/src/templates/database-template.handlebars b/packages/codegen/src/templates/database-template.handlebars index 304f0c90..34032d0b 100644 --- a/packages/codegen/src/templates/database-template.handlebars +++ b/packages/codegen/src/templates/database-template.handlebars @@ -286,6 +286,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getBlockProgressEntities(repo, where, options); } + async getEntitiesForBlock (blockHash: string, tableName: string): Promise { + return this._baseDatabase.getEntitiesForBlock(blockHash, tableName); + } + async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 0f084222..13981992 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -7,7 +7,6 @@ import debug from 'debug'; import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm'; import JSONbig from 'json-bigint'; import { ethers } from 'ethers'; -import _ from 'lodash'; {{#if (subgraphPath)}} import { SelectionNode } from 'graphql'; {{/if}} @@ -35,7 +34,7 @@ import { getResultEvent } from '@cerc-io/util'; {{#if (subgraphPath)}} -import { GraphWatcher } from '@cerc-io/graph-node'; +import { GraphWatcher, updateSubgraphState, dumpSubgraphState } from '@cerc-io/graph-node'; {{/if}} {{#each contracts as | contract |}} @@ -143,6 +142,12 @@ export class Indexer implements IndexerInterface { return this._storageLayoutMap; } + {{#if (subgraphPath)}} + get graphWatcher (): GraphWatcher { + return this._graphWatcher; + } + + {{/if}} async init (): Promise { await this._baseIndexer.fetchContracts(); await this._baseIndexer.fetchStateStatus(); @@ -248,6 +253,10 @@ export class Indexer implements IndexerInterface { ); } + async getEntitiesForBlock (blockHash: string, tableName: string): Promise { + return this._db.getEntitiesForBlock(blockHash, tableName); + } + async processInitialState (contractAddress: string, blockHash: string): Promise { // Call initial state hook. return createInitialState(this, contractAddress, blockHash); @@ -341,12 +350,14 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.createCheckpoint(this, contractAddress, block); } + {{#if (subgraphPath)}} // Method to be used by fill-state CLI. async createInit (blockHash: string, blockNumber: number): Promise { // Create initial state for contracts. await this._baseIndexer.createInit(this, blockHash, blockNumber); } + {{/if}} async saveOrUpdateState (state: State): Promise { return this._baseIndexer.saveOrUpdateState(state); } @@ -636,27 +647,11 @@ export class Indexer implements IndexerInterface { } updateSubgraphState (contractAddress: string, data: any): void { - // Update the subgraph state for a given contract. - const oldData = this._subgraphStateMap.get(contractAddress); - const updatedData = _.merge(oldData, data); - this._subgraphStateMap.set(contractAddress, updatedData); + return updateSubgraphState(this._subgraphStateMap, contractAddress, data); } async dumpSubgraphState (blockHash: string, isStateFinalized = false): Promise { - // Create a diff for each contract in the subgraph state map. - const createDiffPromises = Array.from(this._subgraphStateMap.entries()) - .map(([contractAddress, data]): Promise => { - if (isStateFinalized) { - return this.createDiff(contractAddress, blockHash, data); - } - - return this.createDiffStaged(contractAddress, blockHash, data); - }); - - await Promise.all(createDiffPromises); - - // Reset the subgraph state map. - this._subgraphStateMap.clear(); + return dumpSubgraphState(this, this._subgraphStateMap, blockHash, isStateFinalized); } _populateEntityTypesMap (): void { diff --git a/packages/eden-watcher/src/database.ts b/packages/eden-watcher/src/database.ts index a7ad3965..fdefa693 100644 --- a/packages/eden-watcher/src/database.ts +++ b/packages/eden-watcher/src/database.ts @@ -242,6 +242,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getBlockProgressEntities(repo, where, options); } + async getEntitiesForBlock (blockHash: string, tableName: string): Promise { + return this._baseDatabase.getEntitiesForBlock(blockHash, tableName); + } + async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); diff --git a/packages/eden-watcher/src/fill-state.ts b/packages/eden-watcher/src/fill-state.ts deleted file mode 100644 index e1994321..00000000 --- a/packages/eden-watcher/src/fill-state.ts +++ /dev/null @@ -1,112 +0,0 @@ -// -// Copyright 2022 Vulcanize, Inc. -// - -import 'reflect-metadata'; -import debug from 'debug'; -import { Between } from 'typeorm'; - -import { Database as GraphDatabase, prepareEntityState } from '@cerc-io/graph-node'; - -import { Indexer } from './indexer'; - -const log = debug('vulcanize:fill-state'); - -export const fillState = async ( - indexer: Indexer, - graphDb: GraphDatabase, - dataSources: any[], - argv: { - startBlock: number, - endBlock: number - } -): Promise => { - const { startBlock, endBlock } = argv; - if (startBlock > endBlock) { - log('endBlock should be greater than or equal to startBlock'); - process.exit(1); - } - - // NOTE: Assuming all blocks in the given range are in the pruned region - log(`Filling state for subgraph entities in range: [${startBlock}, ${endBlock}]`); - - // Check that there are no existing diffs in this range - const existingStates = await indexer.getStates({ block: { blockNumber: Between(startBlock, endBlock) } }); - if (existingStates.length > 0) { - log('found existing state(s) in the given range'); - process.exit(1); - } - - // Map: contractAddress -> entities updated - const contractEntitiesMap: Map = new Map(); - - // Populate contractEntitiesMap using data sources from subgraph - // NOTE: Assuming each entity type is only mapped to a single contract - // This is true for eden subgraph; may not be the case for other subgraphs - dataSources.forEach((dataSource: any) => { - const { source: { address: contractAddress }, mapping: { entities } } = dataSource; - contractEntitiesMap.set(contractAddress, entities as string[]); - }); - - console.time('time:fill-state'); - - // Fill state for blocks in the given range - for (let blockNumber = startBlock; blockNumber <= endBlock; blockNumber++) { - console.time(`time:fill-state-${blockNumber}`); - - // Get the canonical block hash at current height - const blocks = await indexer.getBlocksAtHeight(blockNumber, false); - - if (blocks.length === 0) { - log(`block not found at height ${blockNumber}`); - process.exit(1); - } else if (blocks.length > 1) { - log(`found more than one non-pruned block at height ${blockNumber}`); - process.exit(1); - } - - const blockHash = blocks[0].blockHash; - - // Create initial state for contracts - await indexer.createInit(blockHash, blockNumber); - - // Fill state for each contract in contractEntitiesMap - const contractStatePromises = Array.from(contractEntitiesMap.entries()) - .map(async ([contractAddress, entities]): Promise => { - // Get all the updated entities at this block - const updatedEntitiesListPromises = entities.map(async (entity): Promise => { - return graphDb.getEntitiesForBlock(blockHash, entity); - }); - const updatedEntitiesList = await Promise.all(updatedEntitiesListPromises); - - // Populate state with all the updated entities of each entity type - updatedEntitiesList.forEach((updatedEntities, index) => { - const entityName = entities[index]; - - updatedEntities.forEach((updatedEntity) => { - // Prepare diff data for the entity update - const diffData = prepareEntityState(updatedEntity, entityName, indexer.getRelationsMap()); - - // Update the in-memory subgraph state - indexer.updateSubgraphState(contractAddress, diffData); - }); - }); - }); - - await Promise.all(contractStatePromises); - - // Persist subgraph state to the DB - await indexer.dumpSubgraphState(blockHash, true); - await indexer.updateStateSyncStatusIndexedBlock(blockNumber); - - // Create checkpoints - await indexer.processCheckpoint(blockHash); - await indexer.updateStateSyncStatusCheckpointBlock(blockNumber); - - console.timeEnd(`time:fill-state-${blockNumber}`); - } - - console.timeEnd('time:fill-state'); - - log(`Filled state for subgraph entities in range: [${startBlock}, ${endBlock}]`); -}; diff --git a/packages/eden-watcher/src/fill.ts b/packages/eden-watcher/src/fill.ts index fd027258..1bd9d2f2 100644 --- a/packages/eden-watcher/src/fill.ts +++ b/packages/eden-watcher/src/fill.ts @@ -4,101 +4,30 @@ import assert from 'assert'; import 'reflect-metadata'; -import yargs from 'yargs'; -import { hideBin } from 'yargs/helpers'; import debug from 'debug'; -import { PubSub } from 'graphql-subscriptions'; -import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@cerc-io/util'; -import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; +import { FillCmd } from '@cerc-io/cli'; +import { getContractEntitiesMap } from '@cerc-io/graph-node'; -import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; +import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; -import { fillState } from './fill-state'; -const log = debug('vulcanize:server'); +const log = debug('vulcanize:fill'); export const main = async (): Promise => { - const argv = await yargs(hideBin(process.argv)).parserConfiguration({ - 'parse-numbers': false - }).env( - 'FILL' - ).options({ - configFile: { - alias: 'f', - type: 'string', - demandOption: true, - describe: 'configuration file path (toml)', - default: DEFAULT_CONFIG_PATH - }, - startBlock: { - type: 'number', - demandOption: true, - describe: 'Block number to start processing at' - }, - endBlock: { - type: 'number', - demandOption: true, - describe: 'Block number to stop processing at' - }, - prefetch: { - type: 'boolean', - default: false, - describe: 'Block and events prefetch mode' - }, - batchBlocks: { - type: 'number', - default: 10, - describe: 'Number of blocks prefetched in batch' - }, - state: { - type: 'boolean', - default: false, - describe: 'Fill state for subgraph entities' - } - }).argv; + const fillCmd = new FillCmd(); + await fillCmd.init(Database, Indexer, EventWatcher, {}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP); - const config: Config = await getConfig(argv.configFile); - const { ethClient, ethProvider } = await initClients(config); + const indexer = fillCmd.indexer as Indexer; + assert(indexer); - const db = new Database(config.database); - await db.init(); + // Get contractEntitiesMap required for fill-state + // NOTE: Assuming each entity type is only mapped to a single contract + // This is true for eden subgraph; may not be the case for other subgraphs + const contractEntitiesMap = getContractEntitiesMap(indexer.graphWatcher.dataSources); - const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); - await graphDb.init(); - - const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue, graphWatcher); - await indexer.init(); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); - - if (argv.state) { - assert(config.server.enableState, 'State creation disabled'); - await fillState(indexer, graphDb, graphWatcher.dataSources, argv); - - return; - } - - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. - // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries - const pubsub = new PubSub(); - - const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); - - await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); + await fillCmd.exec(contractEntitiesMap); }; main().catch(err => { diff --git a/packages/eden-watcher/src/indexer.ts b/packages/eden-watcher/src/indexer.ts index c90a3b2b..da7eb6ac 100644 --- a/packages/eden-watcher/src/indexer.ts +++ b/packages/eden-watcher/src/indexer.ts @@ -5,7 +5,6 @@ import assert from 'assert'; import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm'; import { ethers } from 'ethers'; -import _ from 'lodash'; import { SelectionNode } from 'graphql'; import { JsonFragment } from '@ethersproject/abi'; @@ -28,7 +27,7 @@ import { DatabaseInterface, Clients } from '@cerc-io/util'; -import { GraphWatcher } from '@cerc-io/graph-node'; +import { GraphWatcher, updateSubgraphState, dumpSubgraphState } from '@cerc-io/graph-node'; import { Database, ENTITIES, SUBGRAPH_ENTITIES } from './database'; import { Contract } from './entity/Contract'; @@ -139,6 +138,10 @@ export class Indexer implements IndexerInterface { return this._storageLayoutMap; } + get graphWatcher (): GraphWatcher { + return this._graphWatcher; + } + async init (): Promise { await this._baseIndexer.fetchContracts(); await this._baseIndexer.fetchStateStatus(); @@ -158,6 +161,10 @@ export class Indexer implements IndexerInterface { ); } + async getEntitiesForBlock (blockHash: string, tableName: string): Promise { + return this._db.getEntitiesForBlock(blockHash, tableName); + } + async processInitialState (contractAddress: string, blockHash: string): Promise { // Call initial state hook. return createInitialState(this, contractAddress, blockHash); @@ -521,27 +528,11 @@ export class Indexer implements IndexerInterface { } updateSubgraphState (contractAddress: string, data: any): void { - // Update the subgraph state for a given contract. - const oldData = this._subgraphStateMap.get(contractAddress); - const updatedData = _.merge(oldData, data); - this._subgraphStateMap.set(contractAddress, updatedData); + return updateSubgraphState(this._subgraphStateMap, contractAddress, data); } async dumpSubgraphState (blockHash: string, isStateFinalized = false): Promise { - // Create a diff for each contract in the subgraph state map. - const createDiffPromises = Array.from(this._subgraphStateMap.entries()) - .map(([contractAddress, data]): Promise => { - if (isStateFinalized) { - return this.createDiff(contractAddress, blockHash, data); - } - - return this.createDiffStaged(contractAddress, blockHash, data); - }); - - await Promise.all(createDiffPromises); - - // Reset the subgraph state map. - this._subgraphStateMap.clear(); + return dumpSubgraphState(this, this._subgraphStateMap, blockHash, isStateFinalized); } async resetWatcherToBlock (blockNumber: number): Promise { diff --git a/packages/eden-watcher/src/job-runner.ts b/packages/eden-watcher/src/job-runner.ts index d8d8794f..a3c3c585 100644 --- a/packages/eden-watcher/src/job-runner.ts +++ b/packages/eden-watcher/src/job-runner.ts @@ -4,7 +4,8 @@ import debug from 'debug'; -import { JobRunner, JobRunnerCmd } from '@cerc-io/cli'; +import { JobRunnerCmd } from '@cerc-io/cli'; +import { WatcherJobRunner as JobRunner } from '@cerc-io/util'; import { Indexer } from './indexer'; import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; diff --git a/packages/erc20-watcher/README.md b/packages/erc20-watcher/README.md index 9633d62d..a78f6b66 100644 --- a/packages/erc20-watcher/README.md +++ b/packages/erc20-watcher/README.md @@ -189,15 +189,3 @@ $ yarn fill --startBlock 1000 --endBlock 2000 } ``` - -## Test - -To run tests (GQL queries) against the mock server: - -``` -yarn run server:mock -``` - -```bash -yarn test -``` diff --git a/packages/erc20-watcher/package.json b/packages/erc20-watcher/package.json index 7241bc29..b863358b 100644 --- a/packages/erc20-watcher/package.json +++ b/packages/erc20-watcher/package.json @@ -6,11 +6,9 @@ "main": "dist/index.js", "scripts": { "lint": "eslint .", - "test": "mocha -r ts-node/register src/**/*.test.ts", "build": "tsc", "server": "DEBUG=vulcanize:* node --enable-source-maps dist/server.js", "server:dev": "DEBUG=vulcanize:* nodemon --watch src src/server.ts", - "server:mock": "MOCK=1 nodemon src/server.ts", "job-runner": "DEBUG=vulcanize:* YARN_CHILD_PROCESS=true node --enable-source-maps dist/job-runner.js", "job-runner:dev": "DEBUG=vulcanize:* YARN_CHILD_PROCESS=true ts-node src/job-runner.ts", "watch:contract": "node --enable-source-maps dist/cli/watch-contract.js", diff --git a/packages/erc20-watcher/src/database.ts b/packages/erc20-watcher/src/database.ts index 4fce6350..ac371f8b 100644 --- a/packages/erc20-watcher/src/database.ts +++ b/packages/erc20-watcher/src/database.ts @@ -252,6 +252,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getBlockProgressEntities(repo, where, options); } + async getEntitiesForBlock (blockHash: string, tableName: string): Promise { + return this._baseDatabase.getEntitiesForBlock(blockHash, tableName); + } + async saveBlockProgress (queryRunner: QueryRunner, block: DeepPartial): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); diff --git a/packages/erc20-watcher/src/fill.ts b/packages/erc20-watcher/src/fill.ts index 305b892c..c37485a1 100644 --- a/packages/erc20-watcher/src/fill.ts +++ b/packages/erc20-watcher/src/fill.ts @@ -2,83 +2,22 @@ // Copyright 2021 Vulcanize, Inc. // -import assert from 'assert'; import 'reflect-metadata'; -import yargs from 'yargs'; -import { hideBin } from 'yargs/helpers'; import debug from 'debug'; -import { PubSub } from 'graphql-subscriptions'; -import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@cerc-io/util'; +import { FillCmd } from '@cerc-io/cli'; import { Database } from './database'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; -const log = debug('vulcanize:server'); +const log = debug('vulcanize:fill'); export const main = async (): Promise => { - const argv = await yargs(hideBin(process.argv)).parserConfiguration({ - 'parse-numbers': false - }).env( - 'FILL' - ).options({ - configFile: { - alias: 'f', - type: 'string', - require: true, - demandOption: true, - describe: 'configuration file path (toml)', - default: DEFAULT_CONFIG_PATH - }, - startBlock: { - type: 'number', - require: true, - demandOption: true, - describe: 'Block number to start processing at' - }, - endBlock: { - type: 'number', - require: true, - demandOption: true, - describe: 'Block number to stop processing at' - }, - prefetch: { - type: 'boolean', - default: false, - describe: 'Block and events prefetch mode' - }, - batchBlocks: { - type: 'number', - default: 10, - describe: 'Number of blocks prefetched in batch' - } - }).argv; + const fillCmd = new FillCmd(); + await fillCmd.init(Database, Indexer, EventWatcher); - const config: Config = await getConfig(argv.configFile); - const { ethClient, ethProvider } = await initClients(config); - - const db = new Database(config.database); - await db.init(); - - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. - // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries - const pubsub = new PubSub(); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); - - const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); - - await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); + await fillCmd.exec(); }; main().catch(err => { diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index 6c8cd282..3c9f7365 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -262,6 +262,10 @@ export class Indexer implements IndexerInterface { ); } + async getEntitiesForBlock (blockHash: string, tableName: string): Promise { + return this._db.getEntitiesForBlock(blockHash, tableName); + } + async processCanonicalBlock (blockHash: string, blockNumber: number): Promise { // TODO Implement } @@ -295,7 +299,19 @@ export class Indexer implements IndexerInterface { return undefined; } - // Method to be used by export-state CLI. + async getStates (where: FindConditions): Promise { + // TODO Implement + return []; + } + + async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise { + // TODO Implement + } + + async createDiff (contractAddress: string, blockHash: string, data: any): Promise { + // TODO Implement + } + async createCheckpoint (contractAddress: string, blockHash: string): Promise { // TODO Implement return undefined; diff --git a/packages/erc20-watcher/src/job-runner.ts b/packages/erc20-watcher/src/job-runner.ts index 4eebcb42..2a336d94 100644 --- a/packages/erc20-watcher/src/job-runner.ts +++ b/packages/erc20-watcher/src/job-runner.ts @@ -4,7 +4,8 @@ import debug from 'debug'; -import { JobRunner, JobRunnerCmd } from '@cerc-io/cli'; +import { JobRunnerCmd } from '@cerc-io/cli'; +import { WatcherJobRunner as JobRunner } from '@cerc-io/util'; import { Indexer } from './indexer'; import { Database } from './database'; diff --git a/packages/erc20-watcher/src/mock/data.ts b/packages/erc20-watcher/src/mock/data.ts deleted file mode 100644 index 0160e0fa..00000000 --- a/packages/erc20-watcher/src/mock/data.ts +++ /dev/null @@ -1,50 +0,0 @@ -// -// Copyright 2021 Vulcanize, Inc. -// - -// TODO: Pull mock data for 5 tokens from rinkeby. - -export const tokens: {[address: string]: {[variable: string]: string}} = { - '0xd87fea54f506972e3267239ec8e159548892074a': { - name: 'ChainLink Token', - symbol: 'LINK', - decimals: '18', - totalSupply: '1000000' - } -}; - -export const blocks: {[blockHash: string]: {[address: string]: any}} = { - // Block hash. - '0x77b5479a5856dd8ec63df6aabf9ce0913071a6dda3a3d54f3c9c940574bcb8ab': { - - // ERC20 token address. - '0xd87fea54f506972e3267239ec8e159548892074a': { - ...tokens['0xd87fea54f506972e3267239ec8e159548892074a'], - - balanceOf: { - '0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc': '10000', - '0xCA6D29232D1435D8198E3E5302495417dD073d61': '500' - }, - allowance: { - '0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc': { - '0xCA6D29232D1435D8198E3E5302495417dD073d61': '100', - '0x9273D9437B0bf2F1b7999d8dB72960d6379564d1': '200' - } - }, - events: [ - { - name: 'Transfer', - from: '0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc', - to: '0xCA6D29232D1435D8198E3E5302495417dD073d61', - value: '500' - }, - { - name: 'Approval', - owner: '0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc', - spender: '0xCA6D29232D1435D8198E3E5302495417dD073d61', - value: '100' - } - ] - } - } -}; diff --git a/packages/erc20-watcher/src/mock/resolvers.ts b/packages/erc20-watcher/src/mock/resolvers.ts deleted file mode 100644 index 4bfee4b2..00000000 --- a/packages/erc20-watcher/src/mock/resolvers.ts +++ /dev/null @@ -1,90 +0,0 @@ -// -// Copyright 2021 Vulcanize, Inc. -// - -import debug from 'debug'; -import BigInt from 'apollo-type-bigint'; - -import { blocks } from './data'; - -const log = debug('test'); - -export const createResolvers = async (): Promise => { - return { - BigInt: new BigInt('bigInt'), - - TokenEvent: { - __resolveType: (obj: any) => { - if (obj.owner) { - return 'ApprovalEvent'; - } - - return 'TransferEvent'; - } - }, - - Query: { - - totalSupply: (_: any, { blockHash, token }: { blockHash: string, token: string }) => { - log('totalSupply', blockHash, token); - - return { - value: blocks[blockHash][token].totalSupply, - proof: { data: '' } - }; - }, - - balanceOf: (_: any, { blockHash, token, owner }: { blockHash: string, token: string, owner: string }) => { - log('balanceOf', blockHash, token, owner); - - return { - value: blocks[blockHash][token].balanceOf[owner], - proof: { data: '' } - }; - }, - - allowance: (_: any, { blockHash, token, owner, spender }: { blockHash: string, token: string, owner: string, spender: string }) => { - log('allowance', blockHash, token, owner, spender); - - return { - value: blocks[blockHash][token].allowance[owner][spender], - proof: { data: '' } - }; - }, - - name: (_: any, { blockHash, token }: { blockHash: string, token: string }) => { - log('name', blockHash, token); - - return { - value: blocks[blockHash][token].name, - proof: { data: '' } - }; - }, - - symbol: (_: any, { blockHash, token }: { blockHash: string, token: string }) => { - log('symbol', blockHash, token); - - return { - value: blocks[blockHash][token].symbol, - proof: { data: '' } - }; - }, - - decimals: (_: any, { blockHash, token }: { blockHash: string, token: string }) => { - log('decimals', blockHash, token); - - return { - value: blocks[blockHash][token].decimals, - proof: { data: '' } - }; - }, - - events: (_: any, { blockHash, token, name }: { blockHash: string, token: string, name: string }) => { - log('events', blockHash, token, name); - return blocks[blockHash][token].events - .filter((e: any) => !name || name === e.name) - .map((e: any) => ({ event: e })); - } - } - }; -}; diff --git a/packages/erc20-watcher/src/mock/server.test.ts b/packages/erc20-watcher/src/mock/server.test.ts deleted file mode 100644 index 7e6a8152..00000000 --- a/packages/erc20-watcher/src/mock/server.test.ts +++ /dev/null @@ -1,173 +0,0 @@ -// -// Copyright 2021 Vulcanize, Inc. -// - -import 'mocha'; -import { expect } from 'chai'; -import _ from 'lodash'; - -import { GraphQLClient } from 'graphql-request'; - -import { - queryName, - querySymbol, - queryDecimals, - queryTotalSupply, - queryBalanceOf, - queryAllowance, - queryEvents -} from '../queries'; - -import { blocks, tokens as tokenInfo } from './data'; - -const testCases: { - balanceOf: any[], - allowance: any[], - events: any[], - tokens: any[] -} = { - balanceOf: [], - allowance: [], - events: [], - tokens: [] -}; - -const blockHashes = _.keys(blocks); -blockHashes.forEach(blockHash => { - const block = blocks[blockHash]; - const tokens = _.keys(block); - tokens.forEach(token => { - const tokenObj = block[token]; - - // Token info test cases. - testCases.tokens.push({ - blockHash, - token, - info: tokenInfo[token] - }); - - // Event test cases. - testCases.events.push({ - blockHash, - token, - events: tokenObj.events - }); - - // Balance test cases. - const balanceOfOwners = _.keys(tokenObj.balanceOf); - balanceOfOwners.forEach(owner => { - testCases.balanceOf.push({ - blockHash, - token, - owner, - balance: tokenObj.balanceOf[owner] - }); - }); - - // Allowance test cases. - const allowanceOwners = _.keys(tokenObj.allowance); - allowanceOwners.forEach(owner => { - const allowanceObj = tokenObj.allowance[owner]; - const spenders = _.keys(allowanceObj); - spenders.forEach(spender => { - testCases.allowance.push({ - blockHash, - token, - owner, - spender, - allowance: allowanceObj[spender] - }); - }); - }); - }); -}); - -describe('server', () => { - const client = new GraphQLClient('http://localhost:3001/graphql'); - - it('query token info', async () => { - const tests = testCases.tokens; - expect(tests.length).to.be.greaterThan(0); - - for (let i = 0; i < tests.length; i++) { - const testCase = tests[i]; - - // Token totalSupply. - let result = await client.request(queryTotalSupply, testCase); - expect(result.totalSupply.value).to.equal(testCase.info.totalSupply); - expect(result.totalSupply.proof.data).to.equal(''); - - // Token name. - result = await client.request(queryName, testCase); - expect(result.name.value).to.equal(testCase.info.name); - expect(result.name.proof.data).to.equal(''); - - // Token symbol. - result = await client.request(querySymbol, testCase); - expect(result.symbol.value).to.equal(testCase.info.symbol); - expect(result.symbol.proof.data).to.equal(''); - - // Token decimals. - result = await client.request(queryDecimals, testCase); - expect(result.decimals.value).to.equal(testCase.info.decimals); - expect(result.decimals.proof.data).to.equal(''); - } - }); - - it('query balanceOf', async () => { - const tests = testCases.balanceOf; - expect(tests.length).to.be.greaterThan(0); - - for (let i = 0; i < tests.length; i++) { - const testCase = tests[i]; - const result = await client.request(queryBalanceOf, testCase); - - expect(result.balanceOf.value).to.equal(testCase.balance); - - // TODO: Check proof. - expect(result.balanceOf.proof.data).to.equal(''); - } - }); - - it('query allowance', async () => { - const tests = testCases.allowance; - expect(tests.length).to.be.greaterThan(0); - - for (let i = 0; i < tests.length; i++) { - const testCase = tests[i]; - const result = await client.request(queryAllowance, testCase); - - expect(result.allowance.value).to.equal(testCase.allowance); - - // TODO: Check proof. - expect(result.allowance.proof.data).to.equal(''); - } - }); - - it('query events', async () => { - const tests = testCases.events; - expect(tests.length).to.be.greaterThan(0); - - for (let i = 0; i < tests.length; i++) { - const testCase = tests[i]; - const result = await client.request(queryEvents, testCase); - - const resultEvents = result.events.map((record: any) => record.event); - expect(resultEvents.length).to.equal(testCase.events.length); - - resultEvents.forEach((resultEvent: any, index: number) => { - const { name, ...testCaseEvent } = testCase.events[index]; - - if (name === 'Transfer') { - expect(resultEvent.__typename).to.equal('TransferEvent'); - } else if (name === 'Approval') { - expect(resultEvent.__typename).to.equal('ApprovalEvent'); - } - - expect(resultEvent).to.include(testCaseEvent); - }); - - // TODO: Check proof. - } - }); -}); diff --git a/packages/erc20-watcher/src/server.ts b/packages/erc20-watcher/src/server.ts index ee0cb27e..fe9f6d11 100644 --- a/packages/erc20-watcher/src/server.ts +++ b/packages/erc20-watcher/src/server.ts @@ -8,7 +8,6 @@ import 'graphql-import-node'; import { ServerCmd } from '@cerc-io/cli'; import typeDefs from './schema'; -import { createResolvers as createMockResolvers } from './mock/resolvers'; import { createResolvers } from './resolvers'; import { Indexer } from './indexer'; import { Database } from './database'; @@ -20,7 +19,7 @@ export const main = async (): Promise => { const serverCmd = new ServerCmd(); await serverCmd.init(Database, Indexer, EventWatcher); - return process.env.MOCK ? serverCmd.exec(createMockResolvers, typeDefs) : serverCmd.exec(createResolvers, typeDefs); + return serverCmd.exec(createResolvers, typeDefs); }; main().then(() => { diff --git a/packages/erc721-watcher/src/database.ts b/packages/erc721-watcher/src/database.ts index a6982a31..37b763fa 100644 --- a/packages/erc721-watcher/src/database.ts +++ b/packages/erc721-watcher/src/database.ts @@ -482,6 +482,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getBlockProgressEntities(repo, where, options); } + async getEntitiesForBlock (blockHash: string, tableName: string): Promise { + return this._baseDatabase.getEntitiesForBlock(blockHash, tableName); + } + async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); diff --git a/packages/erc721-watcher/src/fill.ts b/packages/erc721-watcher/src/fill.ts index aaf25f36..75d2c508 100644 --- a/packages/erc721-watcher/src/fill.ts +++ b/packages/erc721-watcher/src/fill.ts @@ -2,81 +2,22 @@ // Copyright 2021 Vulcanize, Inc. // -import assert from 'assert'; import 'reflect-metadata'; -import yargs from 'yargs'; -import { hideBin } from 'yargs/helpers'; import debug from 'debug'; -import { PubSub } from 'graphql-subscriptions'; -import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@cerc-io/util'; +import { FillCmd } from '@cerc-io/cli'; import { Database } from './database'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; -const log = debug('vulcanize:server'); +const log = debug('vulcanize:fill'); export const main = async (): Promise => { - const argv = await yargs(hideBin(process.argv)).parserConfiguration({ - 'parse-numbers': false - }).env( - 'FILL' - ).options({ - configFile: { - alias: 'f', - type: 'string', - demandOption: true, - describe: 'configuration file path (toml)', - default: DEFAULT_CONFIG_PATH - }, - startBlock: { - type: 'number', - demandOption: true, - describe: 'Block number to start processing at' - }, - endBlock: { - type: 'number', - demandOption: true, - describe: 'Block number to stop processing at' - }, - prefetch: { - type: 'boolean', - default: false, - describe: 'Block and events prefetch mode' - }, - batchBlocks: { - type: 'number', - default: 10, - describe: 'Number of blocks prefetched in batch' - } - }).argv; + const fillCmd = new FillCmd(); + await fillCmd.init(Database, Indexer, EventWatcher); - const config: Config = await getConfig(argv.configFile); - const { ethClient, ethProvider } = await initClients(config); - - const db = new Database(config.database); - await db.init(); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); - await indexer.init(); - - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. - // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries - const pubsub = new PubSub(); - - const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); - - await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); + await fillCmd.exec(); }; main().catch(err => { diff --git a/packages/erc721-watcher/src/indexer.ts b/packages/erc721-watcher/src/indexer.ts index 62cd858d..19024d72 100644 --- a/packages/erc721-watcher/src/indexer.ts +++ b/packages/erc721-watcher/src/indexer.ts @@ -596,6 +596,10 @@ export class Indexer implements IndexerInterface { ); } + async getEntitiesForBlock (blockHash: string, tableName: string): Promise { + return this._db.getEntitiesForBlock(blockHash, tableName); + } + async processInitialState (contractAddress: string, blockHash: string): Promise { // Call initial state hook. return createInitialState(this, contractAddress, blockHash); @@ -646,6 +650,10 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getStateByCID(cid); } + async getStates (where: FindConditions): Promise { + return this._db.getStates(where); + } + getStateData (state: State): any { return this._baseIndexer.getStateData(state); } diff --git a/packages/erc721-watcher/src/job-runner.ts b/packages/erc721-watcher/src/job-runner.ts index b2446ef0..84ff73cd 100644 --- a/packages/erc721-watcher/src/job-runner.ts +++ b/packages/erc721-watcher/src/job-runner.ts @@ -4,7 +4,8 @@ import debug from 'debug'; -import { JobRunner, JobRunnerCmd } from '@cerc-io/cli'; +import { JobRunnerCmd } from '@cerc-io/cli'; +import { WatcherJobRunner as JobRunner } from '@cerc-io/util'; import { Indexer } from './indexer'; import { Database } from './database'; diff --git a/packages/graph-node/src/index.ts b/packages/graph-node/src/index.ts index b2d9b33d..4103d004 100644 --- a/packages/graph-node/src/index.ts +++ b/packages/graph-node/src/index.ts @@ -1,8 +1,7 @@ export * from './watcher'; export * from './database'; export { - prepareEntityState, - updateEntitiesFromState, resolveEntityFieldConflicts, afterEntityInsertOrUpdate } from './utils'; +export * from './state-utils'; diff --git a/packages/graph-node/src/loader.ts b/packages/graph-node/src/loader.ts index 80f6bb69..70ef1c14 100644 --- a/packages/graph-node/src/loader.ts +++ b/packages/graph-node/src/loader.ts @@ -24,9 +24,9 @@ import { toEthereumValue, getEthereumTypes, jsonFromBytes, - getStorageValueType, - prepareEntityState + getStorageValueType } from './utils'; +import { prepareEntityState } from './state-utils'; import { Database } from './database'; // Endianness of BN used in bigInt store host API. diff --git a/packages/graph-node/src/state-utils.ts b/packages/graph-node/src/state-utils.ts new file mode 100644 index 00000000..1746bf38 --- /dev/null +++ b/packages/graph-node/src/state-utils.ts @@ -0,0 +1,208 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import assert from 'assert'; +import debug from 'debug'; +import _ from 'lodash'; + +import { Between } from 'typeorm'; +import { IndexerInterface, jsonBigIntStringReplacer, StateInterface } from '@cerc-io/util'; + +import { Database } from './database'; +import { resolveEntityFieldConflicts } from './utils'; + +const log = debug('vulcanize:state-utils'); + +export const prepareEntityState = (updatedEntity: any, entityName: string, relationsMap: Map): any => { + // Resolve any field name conflicts in the dbData for auto-diff. + updatedEntity = resolveEntityFieldConflicts(updatedEntity); + + // Prepare the diff data. + const diffData: any = { state: {} }; + + const result = Array.from(relationsMap.entries()) + .find(([key]) => key.name === entityName); + + if (result) { + // Update entity data if relations exist. + const [_, relations] = result; + + // Update relation fields for diff data to be similar to GQL query entities. + Object.entries(relations).forEach(([relation, { isArray, isDerived }]) => { + if (isDerived || !updatedEntity[relation]) { + // Field is not present in dbData for derived relations + return; + } + + if (isArray) { + updatedEntity[relation] = updatedEntity[relation].map((id: string) => ({ id })); + } else { + updatedEntity[relation] = { id: updatedEntity[relation] }; + } + }); + } + + // JSON stringify and parse data for handling unknown types when encoding. + // For example, decimal.js values are converted to string in the diff data. + diffData.state[entityName] = { + // Using custom replacer to store bigints as string values to be encoded by IPLD dag-cbor. + // TODO: Parse and store as native bigint by using Type encoders in IPLD dag-cbor encode. + // https://github.com/rvagg/cborg#type-encoders + [updatedEntity.id]: JSON.parse(JSON.stringify(updatedEntity, jsonBigIntStringReplacer)) + }; + + return diffData; +}; + +export const updateEntitiesFromState = async (database: Database, indexer: IndexerInterface, state: StateInterface) => { + const data = indexer.getStateData(state); + + // Get relations for subgraph entity + assert(indexer.getRelationsMap); + const relationsMap = indexer.getRelationsMap(); + + for (const [entityName, entities] of Object.entries(data.state)) { + const result = Array.from(relationsMap.entries()) + .find(([key]) => key.name === entityName); + + const relations = result ? result[1] : {}; + + log(`Updating entities from State for entity ${entityName}`); + console.time(`time:watcher#GraphWatcher-updateEntitiesFromState-update-entity-${entityName}`); + for (const [id, entityData] of Object.entries(entities as any)) { + const dbData = database.fromState(state.block, entityName, entityData, relations); + await database.saveEntity(entityName, dbData); + } + console.timeEnd(`time:watcher#GraphWatcher-updateEntitiesFromState-update-entity-${entityName}`); + } +}; + +export const updateSubgraphState = (subgraphStateMap: Map, contractAddress: string, data: any): void => { + // Update the subgraph state for a given contract. + const oldData = subgraphStateMap.get(contractAddress); + const updatedData = _.merge(oldData, data); + subgraphStateMap.set(contractAddress, updatedData); +}; + +export const dumpSubgraphState = async ( + indexer: IndexerInterface, + subgraphStateMap: Map, + blockHash: string, + isStateFinalized = false +): Promise => { + // Create a diff for each contract in the subgraph state map. + const createDiffPromises = Array.from(subgraphStateMap.entries()) + .map(([contractAddress, data]): Promise => { + if (isStateFinalized) { + return indexer.createDiff(contractAddress, blockHash, data); + } + + return indexer.createDiffStaged(contractAddress, blockHash, data); + }); + + await Promise.all(createDiffPromises); + + // Reset the subgraph state map. + subgraphStateMap.clear(); +}; + +export const getContractEntitiesMap = (dataSources: any[]): Map => { + // Map: contractAddress -> entities updated + const contractEntitiesMap: Map = new Map(); + + // Populate contractEntitiesMap using data sources from subgraph + dataSources.forEach((dataSource: any) => { + const { source: { address: contractAddress }, mapping: { entities } } = dataSource; + contractEntitiesMap.set(contractAddress, entities as string[]); + }); + + return contractEntitiesMap; +}; + +export const fillState = async ( + indexer: IndexerInterface, + contractEntitiesMap: Map, + argv: { + startBlock: number, + endBlock: number + } +): Promise => { + const { startBlock, endBlock } = argv; + if (startBlock > endBlock) { + log('endBlock should be greater than or equal to startBlock'); + process.exit(1); + } + + // Check that there are no existing diffs in this range + const existingStates = await indexer.getStates({ block: { blockNumber: Between(startBlock, endBlock) } }); + if (existingStates.length > 0) { + log('found existing state(s) in the given range'); + process.exit(1); + } + + console.time('time:fill-state'); + + // Fill state for blocks in the given range + for (let blockNumber = startBlock; blockNumber <= endBlock; blockNumber++) { + console.time(`time:fill-state-${blockNumber}`); + + // Get the canonical block hash at current height + const blocks = await indexer.getBlocksAtHeight(blockNumber, false); + + if (blocks.length === 0) { + log(`block not found at height ${blockNumber}`); + process.exit(1); + } else if (blocks.length > 1) { + log(`found more than one non-pruned block at height ${blockNumber}`); + process.exit(1); + } + + const blockHash = blocks[0].blockHash; + + // Create initial state for contracts + assert(indexer.createInit); + await indexer.createInit(blockHash, blockNumber); + + // Fill state for each contract in contractEntitiesMap + const contractStatePromises = Array.from(contractEntitiesMap.entries()) + .map(async ([contractAddress, entities]): Promise => { + // Get all the updated entities at this block + const updatedEntitiesListPromises = entities.map(async (entity): Promise => { + return indexer.getEntitiesForBlock(blockHash, entity); + }); + const updatedEntitiesList = await Promise.all(updatedEntitiesListPromises); + + // Populate state with all the updated entities of each entity type + updatedEntitiesList.forEach((updatedEntities, index) => { + const entityName = entities[index]; + + updatedEntities.forEach((updatedEntity) => { + assert(indexer.getRelationsMap); + assert(indexer.updateSubgraphState); + + // Prepare diff data for the entity update + const diffData = prepareEntityState(updatedEntity, entityName, indexer.getRelationsMap()); + + // Update the in-memory subgraph state + indexer.updateSubgraphState(contractAddress, diffData); + }); + }); + }); + + await Promise.all(contractStatePromises); + + // Persist subgraph state to the DB + assert(indexer.dumpSubgraphState); + await indexer.dumpSubgraphState(blockHash, true); + await indexer.updateStateSyncStatusIndexedBlock(blockNumber); + + // Create checkpoints + await indexer.processCheckpoint(blockHash); + await indexer.updateStateSyncStatusCheckpointBlock(blockNumber); + + console.timeEnd(`time:fill-state-${blockNumber}`); + } + + console.timeEnd('time:fill-state'); +}; diff --git a/packages/graph-node/src/utils.ts b/packages/graph-node/src/utils.ts index 67890128..4480435f 100644 --- a/packages/graph-node/src/utils.ts +++ b/packages/graph-node/src/utils.ts @@ -1,3 +1,7 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + import { BigNumber, utils } from 'ethers'; import path from 'path'; import fs from 'fs-extra'; @@ -8,11 +12,10 @@ import { ColumnMetadata } from 'typeorm/metadata/ColumnMetadata'; import assert from 'assert'; import _ from 'lodash'; -import { GraphDecimal, IndexerInterface, jsonBigIntStringReplacer, StateInterface } from '@cerc-io/util'; +import { GraphDecimal } from '@cerc-io/util'; import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; import { TypeId, EthereumValueKind, ValueKind } from './types'; -import { Database } from './database'; const log = debug('vulcanize:utils'); @@ -802,47 +805,6 @@ const getEthereumType = (storageTypes: StorageLayout['types'], type: string, map return utils.ParamType.from(label); }; -export const prepareEntityState = (updatedEntity: any, entityName: string, relationsMap: Map): any => { - // Resolve any field name conflicts in the dbData for auto-diff. - updatedEntity = resolveEntityFieldConflicts(updatedEntity); - - // Prepare the diff data. - const diffData: any = { state: {} }; - - const result = Array.from(relationsMap.entries()) - .find(([key]) => key.name === entityName); - - if (result) { - // Update entity data if relations exist. - const [_, relations] = result; - - // Update relation fields for diff data to be similar to GQL query entities. - Object.entries(relations).forEach(([relation, { isArray, isDerived }]) => { - if (isDerived || !updatedEntity[relation]) { - // Field is not present in dbData for derived relations - return; - } - - if (isArray) { - updatedEntity[relation] = updatedEntity[relation].map((id: string) => ({ id })); - } else { - updatedEntity[relation] = { id: updatedEntity[relation] }; - } - }); - } - - // JSON stringify and parse data for handling unknown types when encoding. - // For example, decimal.js values are converted to string in the diff data. - diffData.state[entityName] = { - // Using custom replacer to store bigints as string values to be encoded by IPLD dag-cbor. - // TODO: Parse and store as native bigint by using Type encoders in IPLD dag-cbor encode. - // https://github.com/rvagg/cborg#type-encoders - [updatedEntity.id]: JSON.parse(JSON.stringify(updatedEntity, jsonBigIntStringReplacer)) - }; - - return diffData; -}; - export const fromStateEntityValues = ( stateEntity: any, propertyName: string, @@ -876,29 +838,6 @@ export const fromStateEntityValues = ( return stateEntity[propertyName]; }; -export const updateEntitiesFromState = async (database: Database, indexer: IndexerInterface, state: StateInterface) => { - const data = indexer.getStateData(state); - - // Get relations for subgraph entity - assert(indexer.getRelationsMap); - const relationsMap = indexer.getRelationsMap(); - - for (const [entityName, entities] of Object.entries(data.state)) { - const result = Array.from(relationsMap.entries()) - .find(([key]) => key.name === entityName); - - const relations = result ? result[1] : {}; - - log(`Updating entities from State for entity ${entityName}`); - console.time(`time:watcher#GraphWatcher-updateEntitiesFromState-update-entity-${entityName}`); - for (const [id, entityData] of Object.entries(entities as any)) { - const dbData = database.fromState(state.block, entityName, entityData, relations); - await database.saveEntity(entityName, dbData); - } - console.timeEnd(`time:watcher#GraphWatcher-updateEntitiesFromState-update-entity-${entityName}`); - } -}; - export const afterEntityInsertOrUpdate = async ( frothyEntityType: EntityTarget, entities: Set, @@ -953,7 +892,7 @@ export const afterEntityInsertOrUpdate = async ( .execute(); }; -export function getLatestEntityFromEntity (latestEntityRepo: Repository, entity: any): Entity { +export const getLatestEntityFromEntity = (latestEntityRepo: Repository, entity: any): Entity => { const latestEntityFields = latestEntityRepo.metadata.columns.map(column => column.propertyName); return latestEntityRepo.create(_.pick(entity, latestEntityFields) as DeepPartial); -} +}; diff --git a/packages/graph-node/src/watcher.ts b/packages/graph-node/src/watcher.ts index 66c5cd94..958311ce 100644 --- a/packages/graph-node/src/watcher.ts +++ b/packages/graph-node/src/watcher.ts @@ -14,7 +14,8 @@ import { ResultObject } from '@vulcanize/assemblyscript/lib/loader'; import { EthClient } from '@cerc-io/ipld-eth-client'; import { getFullBlock, BlockHeight, ServerConfig, getFullTransaction, QueryOptions, StateInterface, IndexerInterface, BlockProgressInterface } from '@cerc-io/util'; -import { createBlock, createEvent, getSubgraphConfig, resolveEntityFieldConflicts, Transaction, updateEntitiesFromState } from './utils'; +import { createBlock, createEvent, getSubgraphConfig, resolveEntityFieldConflicts, Transaction } from './utils'; +import { updateEntitiesFromState } from './state-utils'; import { Context, GraphData, instantiate } from './loader'; import { Database, DEFAULT_LIMIT } from './database'; diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index 53a5fdb6..993602c2 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -58,6 +58,10 @@ export class Indexer implements IndexerInterface { ); } + async getEntitiesForBlock (blockHash: string, tableName: string): Promise { + return []; + } + async getBlockProgress (blockHash: string): Promise { assert(blockHash); @@ -217,6 +221,14 @@ export class Indexer implements IndexerInterface { return undefined; } + async getStates (where: FindConditions): Promise { + return []; + } + + async createDiff (contractAddress: string, blockHash: string, data: any): Promise { + return undefined; + } + async createCheckpoint (contractAddress: string, blockHash: string): Promise { return undefined; } @@ -230,7 +242,7 @@ export class Indexer implements IndexerInterface { } async removeStates (blockNumber: number, kind: StateKind): Promise { - // TODO Implement + return undefined; } getStateData (state: StateInterface): any { diff --git a/packages/graph-test-watcher/src/database.ts b/packages/graph-test-watcher/src/database.ts index 89d8afce..c0b53be2 100644 --- a/packages/graph-test-watcher/src/database.ts +++ b/packages/graph-test-watcher/src/database.ts @@ -262,6 +262,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getBlockProgressEntities(repo, where, options); } + async getEntitiesForBlock (blockHash: string, tableName: string): Promise { + return this._baseDatabase.getEntitiesForBlock(blockHash, tableName); + } + async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); diff --git a/packages/graph-test-watcher/src/fill-state.ts b/packages/graph-test-watcher/src/fill-state.ts deleted file mode 100644 index e1994321..00000000 --- a/packages/graph-test-watcher/src/fill-state.ts +++ /dev/null @@ -1,112 +0,0 @@ -// -// Copyright 2022 Vulcanize, Inc. -// - -import 'reflect-metadata'; -import debug from 'debug'; -import { Between } from 'typeorm'; - -import { Database as GraphDatabase, prepareEntityState } from '@cerc-io/graph-node'; - -import { Indexer } from './indexer'; - -const log = debug('vulcanize:fill-state'); - -export const fillState = async ( - indexer: Indexer, - graphDb: GraphDatabase, - dataSources: any[], - argv: { - startBlock: number, - endBlock: number - } -): Promise => { - const { startBlock, endBlock } = argv; - if (startBlock > endBlock) { - log('endBlock should be greater than or equal to startBlock'); - process.exit(1); - } - - // NOTE: Assuming all blocks in the given range are in the pruned region - log(`Filling state for subgraph entities in range: [${startBlock}, ${endBlock}]`); - - // Check that there are no existing diffs in this range - const existingStates = await indexer.getStates({ block: { blockNumber: Between(startBlock, endBlock) } }); - if (existingStates.length > 0) { - log('found existing state(s) in the given range'); - process.exit(1); - } - - // Map: contractAddress -> entities updated - const contractEntitiesMap: Map = new Map(); - - // Populate contractEntitiesMap using data sources from subgraph - // NOTE: Assuming each entity type is only mapped to a single contract - // This is true for eden subgraph; may not be the case for other subgraphs - dataSources.forEach((dataSource: any) => { - const { source: { address: contractAddress }, mapping: { entities } } = dataSource; - contractEntitiesMap.set(contractAddress, entities as string[]); - }); - - console.time('time:fill-state'); - - // Fill state for blocks in the given range - for (let blockNumber = startBlock; blockNumber <= endBlock; blockNumber++) { - console.time(`time:fill-state-${blockNumber}`); - - // Get the canonical block hash at current height - const blocks = await indexer.getBlocksAtHeight(blockNumber, false); - - if (blocks.length === 0) { - log(`block not found at height ${blockNumber}`); - process.exit(1); - } else if (blocks.length > 1) { - log(`found more than one non-pruned block at height ${blockNumber}`); - process.exit(1); - } - - const blockHash = blocks[0].blockHash; - - // Create initial state for contracts - await indexer.createInit(blockHash, blockNumber); - - // Fill state for each contract in contractEntitiesMap - const contractStatePromises = Array.from(contractEntitiesMap.entries()) - .map(async ([contractAddress, entities]): Promise => { - // Get all the updated entities at this block - const updatedEntitiesListPromises = entities.map(async (entity): Promise => { - return graphDb.getEntitiesForBlock(blockHash, entity); - }); - const updatedEntitiesList = await Promise.all(updatedEntitiesListPromises); - - // Populate state with all the updated entities of each entity type - updatedEntitiesList.forEach((updatedEntities, index) => { - const entityName = entities[index]; - - updatedEntities.forEach((updatedEntity) => { - // Prepare diff data for the entity update - const diffData = prepareEntityState(updatedEntity, entityName, indexer.getRelationsMap()); - - // Update the in-memory subgraph state - indexer.updateSubgraphState(contractAddress, diffData); - }); - }); - }); - - await Promise.all(contractStatePromises); - - // Persist subgraph state to the DB - await indexer.dumpSubgraphState(blockHash, true); - await indexer.updateStateSyncStatusIndexedBlock(blockNumber); - - // Create checkpoints - await indexer.processCheckpoint(blockHash); - await indexer.updateStateSyncStatusCheckpointBlock(blockNumber); - - console.timeEnd(`time:fill-state-${blockNumber}`); - } - - console.timeEnd('time:fill-state'); - - log(`Filled state for subgraph entities in range: [${startBlock}, ${endBlock}]`); -}; diff --git a/packages/graph-test-watcher/src/fill.ts b/packages/graph-test-watcher/src/fill.ts index fd027258..b4691cae 100644 --- a/packages/graph-test-watcher/src/fill.ts +++ b/packages/graph-test-watcher/src/fill.ts @@ -4,101 +4,28 @@ import assert from 'assert'; import 'reflect-metadata'; -import yargs from 'yargs'; -import { hideBin } from 'yargs/helpers'; import debug from 'debug'; -import { PubSub } from 'graphql-subscriptions'; -import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@cerc-io/util'; -import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; +import { FillCmd } from '@cerc-io/cli'; +import { getContractEntitiesMap } from '@cerc-io/graph-node'; -import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; +import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; -import { fillState } from './fill-state'; -const log = debug('vulcanize:server'); +const log = debug('vulcanize:fill'); export const main = async (): Promise => { - const argv = await yargs(hideBin(process.argv)).parserConfiguration({ - 'parse-numbers': false - }).env( - 'FILL' - ).options({ - configFile: { - alias: 'f', - type: 'string', - demandOption: true, - describe: 'configuration file path (toml)', - default: DEFAULT_CONFIG_PATH - }, - startBlock: { - type: 'number', - demandOption: true, - describe: 'Block number to start processing at' - }, - endBlock: { - type: 'number', - demandOption: true, - describe: 'Block number to stop processing at' - }, - prefetch: { - type: 'boolean', - default: false, - describe: 'Block and events prefetch mode' - }, - batchBlocks: { - type: 'number', - default: 10, - describe: 'Number of blocks prefetched in batch' - }, - state: { - type: 'boolean', - default: false, - describe: 'Fill state for subgraph entities' - } - }).argv; + const fillCmd = new FillCmd(); + await fillCmd.init(Database, Indexer, EventWatcher, {}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP); - const config: Config = await getConfig(argv.configFile); - const { ethClient, ethProvider } = await initClients(config); + const indexer = fillCmd.indexer as Indexer; + assert(indexer); - const db = new Database(config.database); - await db.init(); + // Get contractEntitiesMap required for fill-state + const contractEntitiesMap = getContractEntitiesMap(indexer.graphWatcher.dataSources); - const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); - await graphDb.init(); - - const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue, graphWatcher); - await indexer.init(); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); - - if (argv.state) { - assert(config.server.enableState, 'State creation disabled'); - await fillState(indexer, graphDb, graphWatcher.dataSources, argv); - - return; - } - - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. - // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries - const pubsub = new PubSub(); - - const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); - - await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); + await fillCmd.exec(contractEntitiesMap); }; main().catch(err => { diff --git a/packages/graph-test-watcher/src/indexer.ts b/packages/graph-test-watcher/src/indexer.ts index 194d8088..1f39d271 100644 --- a/packages/graph-test-watcher/src/indexer.ts +++ b/packages/graph-test-watcher/src/indexer.ts @@ -7,7 +7,6 @@ import debug from 'debug'; import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm'; import JSONbig from 'json-bigint'; import { ethers } from 'ethers'; -import _ from 'lodash'; import { SelectionNode } from 'graphql'; import { JsonFragment } from '@ethersproject/abi'; @@ -31,7 +30,7 @@ import { DatabaseInterface, Clients } from '@cerc-io/util'; -import { GraphWatcher } from '@cerc-io/graph-node'; +import { GraphWatcher, updateSubgraphState, dumpSubgraphState } from '@cerc-io/graph-node'; import { Database, ENTITIES, SUBGRAPH_ENTITIES } from './database'; import { Contract } from './entity/Contract'; @@ -114,6 +113,10 @@ export class Indexer implements IndexerInterface { return this._storageLayoutMap; } + get graphWatcher (): GraphWatcher { + return this._graphWatcher; + } + async init (): Promise { await this._baseIndexer.fetchContracts(); await this._baseIndexer.fetchStateStatus(); @@ -198,6 +201,10 @@ export class Indexer implements IndexerInterface { ); } + async getEntitiesForBlock (blockHash: string, tableName: string): Promise { + return this._db.getEntitiesForBlock(blockHash, tableName); + } + async processInitialState (contractAddress: string, blockHash: string): Promise { // Call initial state hook. return createInitialState(this, contractAddress, blockHash); @@ -554,27 +561,11 @@ export class Indexer implements IndexerInterface { } updateSubgraphState (contractAddress: string, data: any): void { - // Update the subgraph state for a given contract. - const oldData = this._subgraphStateMap.get(contractAddress); - const updatedData = _.merge(oldData, data); - this._subgraphStateMap.set(contractAddress, updatedData); + return updateSubgraphState(this._subgraphStateMap, contractAddress, data); } async dumpSubgraphState (blockHash: string, isStateFinalized = false): Promise { - // Create a diff for each contract in the subgraph state map. - const createDiffPromises = Array.from(this._subgraphStateMap.entries()) - .map(([contractAddress, data]): Promise => { - if (isStateFinalized) { - return this.createDiff(contractAddress, blockHash, data); - } - - return this.createDiffStaged(contractAddress, blockHash, data); - }); - - await Promise.all(createDiffPromises); - - // Reset the subgraph state map. - this._subgraphStateMap.clear(); + return dumpSubgraphState(this, this._subgraphStateMap, blockHash, isStateFinalized); } async resetWatcherToBlock (blockNumber: number): Promise { diff --git a/packages/graph-test-watcher/src/job-runner.ts b/packages/graph-test-watcher/src/job-runner.ts index b2446ef0..84ff73cd 100644 --- a/packages/graph-test-watcher/src/job-runner.ts +++ b/packages/graph-test-watcher/src/job-runner.ts @@ -4,7 +4,8 @@ import debug from 'debug'; -import { JobRunner, JobRunnerCmd } from '@cerc-io/cli'; +import { JobRunnerCmd } from '@cerc-io/cli'; +import { WatcherJobRunner as JobRunner } from '@cerc-io/util'; import { Indexer } from './indexer'; import { Database } from './database'; diff --git a/packages/mobymask-watcher/src/database.ts b/packages/mobymask-watcher/src/database.ts index 26709140..ca233bb4 100644 --- a/packages/mobymask-watcher/src/database.ts +++ b/packages/mobymask-watcher/src/database.ts @@ -314,6 +314,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getBlockProgressEntities(repo, where, options); } + async getEntitiesForBlock (blockHash: string, tableName: string): Promise { + return this._baseDatabase.getEntitiesForBlock(blockHash, tableName); + } + async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); diff --git a/packages/mobymask-watcher/src/fill.ts b/packages/mobymask-watcher/src/fill.ts index aaf25f36..75d2c508 100644 --- a/packages/mobymask-watcher/src/fill.ts +++ b/packages/mobymask-watcher/src/fill.ts @@ -2,81 +2,22 @@ // Copyright 2021 Vulcanize, Inc. // -import assert from 'assert'; import 'reflect-metadata'; -import yargs from 'yargs'; -import { hideBin } from 'yargs/helpers'; import debug from 'debug'; -import { PubSub } from 'graphql-subscriptions'; -import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@cerc-io/util'; +import { FillCmd } from '@cerc-io/cli'; import { Database } from './database'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; -const log = debug('vulcanize:server'); +const log = debug('vulcanize:fill'); export const main = async (): Promise => { - const argv = await yargs(hideBin(process.argv)).parserConfiguration({ - 'parse-numbers': false - }).env( - 'FILL' - ).options({ - configFile: { - alias: 'f', - type: 'string', - demandOption: true, - describe: 'configuration file path (toml)', - default: DEFAULT_CONFIG_PATH - }, - startBlock: { - type: 'number', - demandOption: true, - describe: 'Block number to start processing at' - }, - endBlock: { - type: 'number', - demandOption: true, - describe: 'Block number to stop processing at' - }, - prefetch: { - type: 'boolean', - default: false, - describe: 'Block and events prefetch mode' - }, - batchBlocks: { - type: 'number', - default: 10, - describe: 'Number of blocks prefetched in batch' - } - }).argv; + const fillCmd = new FillCmd(); + await fillCmd.init(Database, Indexer, EventWatcher); - const config: Config = await getConfig(argv.configFile); - const { ethClient, ethProvider } = await initClients(config); - - const db = new Database(config.database); - await db.init(); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); - await indexer.init(); - - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. - // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries - const pubsub = new PubSub(); - - const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); - - await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); + await fillCmd.exec(); }; main().catch(err => { diff --git a/packages/mobymask-watcher/src/indexer.ts b/packages/mobymask-watcher/src/indexer.ts index c105fa2a..05c8fd2f 100644 --- a/packages/mobymask-watcher/src/indexer.ts +++ b/packages/mobymask-watcher/src/indexer.ts @@ -323,6 +323,10 @@ export class Indexer implements IndexerInterface { ); } + async getEntitiesForBlock (blockHash: string, tableName: string): Promise { + return this._db.getEntitiesForBlock(blockHash, tableName); + } + async processInitialState (contractAddress: string, blockHash: string): Promise { // Call initial state hook. return createInitialState(this, contractAddress, blockHash); @@ -373,6 +377,10 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getStateByCID(cid); } + async getStates (where: FindConditions): Promise { + return this._db.getStates(where); + } + getStateData (state: State): any { return this._baseIndexer.getStateData(state); } diff --git a/packages/mobymask-watcher/src/job-runner.ts b/packages/mobymask-watcher/src/job-runner.ts index b2446ef0..84ff73cd 100644 --- a/packages/mobymask-watcher/src/job-runner.ts +++ b/packages/mobymask-watcher/src/job-runner.ts @@ -4,7 +4,8 @@ import debug from 'debug'; -import { JobRunner, JobRunnerCmd } from '@cerc-io/cli'; +import { JobRunnerCmd } from '@cerc-io/cli'; +import { WatcherJobRunner as JobRunner } from '@cerc-io/util'; import { Indexer } from './indexer'; import { Database } from './database'; diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index 631cba29..5552600b 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -305,6 +305,18 @@ export class Database { eventCount.inc(knownEvents); } + async getEntitiesForBlock (blockHash: string, tableName: string): Promise { + const repo = this._conn.getRepository(tableName); + + const entities = await repo.find({ + where: { + blockHash + } + }); + + return entities; + } + async getEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions): Promise { const repo = queryRunner.manager.getRepository(entity); diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 913576b1..f6a0af3b 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -14,7 +14,9 @@ import { JOB_KIND_CONTRACT, MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, - QUEUE_EVENT_PROCESSING + QUEUE_EVENT_PROCESSING, + QUEUE_BLOCK_CHECKPOINT, + QUEUE_HOOKS } from './constants'; import { JobQueue } from './job-queue'; import { EventInterface, IndexerInterface } from './types'; @@ -427,3 +429,41 @@ export class JobRunner { this._indexer.updateStateStatusMap(contract.address, {}); } } + +export class WatcherJobRunner { + jobQueue: JobQueue + baseJobRunner: JobRunner + _indexer: IndexerInterface + _jobQueueConfig: JobQueueConfig + + constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) { + this._jobQueueConfig = jobQueueConfig; + this._indexer = indexer; + this.jobQueue = jobQueue; + this.baseJobRunner = new JobRunner(this._jobQueueConfig, this._indexer, this.jobQueue); + } + + async subscribeBlockProcessingQueue (): Promise { + await this.jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { + await this.baseJobRunner.processBlock(job); + }); + } + + async subscribeEventProcessingQueue (): Promise { + await this.jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { + await this.baseJobRunner.processEvent(job); + }); + } + + async subscribeHooksQueue (): Promise { + await this.jobQueue.subscribe(QUEUE_HOOKS, async (job) => { + await this.baseJobRunner.processHooks(job); + }); + } + + async subscribeBlockCheckpointQueue (): Promise { + await this.jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => { + await this.baseJobRunner.processCheckpoint(job); + }); + } +} diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index bc6d7466..5ea4f27f 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -86,6 +86,7 @@ export interface IndexerInterface { init (): Promise getBlockProgress (blockHash: string): Promise getBlockProgressEntities (where: FindConditions, options: FindManyOptions): Promise + getEntitiesForBlock (blockHash: string, tableName: string): Promise getEvent (id: string): Promise getSyncStatus (): Promise getStateSyncStatus (): Promise @@ -121,12 +122,17 @@ export interface IndexerInterface { processCanonicalBlock (blockHash: string, blockNumber: number): Promise processCheckpoint (blockHash: string): Promise processCLICheckpoint (contractAddress: string, blockHash?: string): Promise + createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise + createDiff (contractAddress: string, blockHash: string, data: any): Promise createCheckpoint (contractAddress: string, blockHash: string): Promise + createInit? (blockHash: string, blockNumber: number): Promise getStorageValue (storageLayout: StorageLayout, blockHash: string, contractAddress: string, variable: string, ...mappingKeys: MappingKey[]): Promise updateSubgraphState?: (contractAddress: string, data: any) => void + dumpSubgraphState?: (blockHash: string, isStateFinalized?: boolean) => Promise updateStateStatusMap (address: string, stateStatus: StateStatus): void getStateData (state: StateInterface): any getStateByCID (cid: string): Promise + getStates (where: FindConditions): Promise getLatestState (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise saveOrUpdateState (state: StateInterface): Promise removeStates (blockNumber: number, kind: StateKind): Promise