From 6622d0874e88dd3327c6e6ba0350addb1f36a53d Mon Sep 17 00:00:00 2001 From: nikugogoi Date: Tue, 22 Nov 2022 15:20:44 +0530 Subject: [PATCH] Accommodate GQL optimizations in graph-test-watcher (#249) * Use separate subgraph entities list * Implement changes in graph-test-watcher * Reset latest entity tables and use cache directives in schema GQL --- packages/eden-watcher/src/database.ts | 4 +- .../eden-watcher/src/entity/Subscriber.ts | 6 +- packages/eden-watcher/src/indexer.ts | 6 +- packages/graph-test-watcher/README.md | 8 +- .../environments/local.toml | 13 ++ packages/graph-test-watcher/package.json | 3 +- .../src/cli/checkpoint-cmds/verify.ts | 4 +- .../src/cli/export-state.ts | 4 +- .../src/cli/import-state.ts | 4 +- .../graph-test-watcher/src/cli/index-block.ts | 4 +- .../src/cli/reset-cmds/state.ts | 24 ++++ packages/graph-test-watcher/src/database.ts | 17 ++- .../graph-test-watcher/src/entity/Author.ts | 3 + .../graph-test-watcher/src/entity/Blog.ts | 5 +- .../graph-test-watcher/src/entity/Category.ts | 3 + .../src/entity/Subscriber.ts | 9 +- packages/graph-test-watcher/src/fill-state.ts | 112 ++++++++++++++++++ packages/graph-test-watcher/src/fill.ts | 17 ++- packages/graph-test-watcher/src/indexer.ts | 44 ++++++- packages/graph-test-watcher/src/job-runner.ts | 4 +- packages/graph-test-watcher/src/resolvers.ts | 13 +- packages/graph-test-watcher/src/schema.gql | 17 ++- packages/graph-test-watcher/src/server.ts | 8 +- 23 files changed, 291 insertions(+), 41 deletions(-) create mode 100644 packages/graph-test-watcher/src/cli/reset-cmds/state.ts create mode 100644 packages/graph-test-watcher/src/fill-state.ts diff --git a/packages/eden-watcher/src/database.ts b/packages/eden-watcher/src/database.ts index 02476e4a..e1cc786d 100644 --- a/packages/eden-watcher/src/database.ts +++ b/packages/eden-watcher/src/database.ts @@ -32,8 +32,8 @@ import { Slot } from './entity/Slot'; import { SlotClaim } from './entity/SlotClaim'; import { Staker } from './entity/Staker'; -export const ENTITIES = new Set([Account, Claim, Distribution, Distributor, Epoch, Network, Producer, ProducerEpoch, ProducerRewardCollectorChange, ProducerSet, ProducerSetChange, RewardSchedule, RewardScheduleEntry, Slash, Slot, SlotClaim, Staker]); - +export const SUBGRAPH_ENTITIES = new Set([Account, Claim, Distribution, Distributor, Epoch, Network, Producer, ProducerEpoch, ProducerRewardCollectorChange, ProducerSet, ProducerSetChange, RewardSchedule, RewardScheduleEntry, Slash, Slot, SlotClaim, Staker]); +export const ENTITIES = [...SUBGRAPH_ENTITIES]; export const ENTITY_TO_LATEST_ENTITY_MAP: Map = new Map(); export class Database implements DatabaseInterface { diff --git a/packages/eden-watcher/src/entity/Subscriber.ts b/packages/eden-watcher/src/entity/Subscriber.ts index d9b29c9b..8a685533 100644 --- a/packages/eden-watcher/src/entity/Subscriber.ts +++ b/packages/eden-watcher/src/entity/Subscriber.ts @@ -7,15 +7,15 @@ import { EventSubscriber, EntitySubscriberInterface, InsertEvent, UpdateEvent } import { afterEntityInsertOrUpdate } from '@cerc-io/graph-node'; import { FrothyEntity } from './FrothyEntity'; -import { ENTITIES, ENTITY_TO_LATEST_ENTITY_MAP } from '../database'; +import { ENTITY_TO_LATEST_ENTITY_MAP, SUBGRAPH_ENTITIES } from '../database'; @EventSubscriber() export class EntitySubscriber implements EntitySubscriberInterface { async afterInsert (event: InsertEvent): Promise { - await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP); + await afterEntityInsertOrUpdate(FrothyEntity, SUBGRAPH_ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP); } async afterUpdate (event: UpdateEvent): Promise { - await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP); + await afterEntityInsertOrUpdate(FrothyEntity, SUBGRAPH_ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP); } } diff --git a/packages/eden-watcher/src/indexer.ts b/packages/eden-watcher/src/indexer.ts index 3cd74e47..6064ca60 100644 --- a/packages/eden-watcher/src/indexer.ts +++ b/packages/eden-watcher/src/indexer.ts @@ -30,7 +30,7 @@ import { } from '@cerc-io/util'; import { GraphWatcher } from '@cerc-io/graph-node'; -import { Database, ENTITIES } from './database'; +import { Database, ENTITIES, SUBGRAPH_ENTITIES } from './database'; import { Contract } from './entity/Contract'; import { Event } from './entity/Event'; import { SyncStatus } from './entity/SyncStatus'; @@ -486,7 +486,7 @@ export class Indexer implements IndexerInterface { async markBlocksAsPruned (blocks: BlockProgress[]): Promise { await this._baseIndexer.markBlocksAsPruned(blocks); - await this._graphWatcher.pruneEntities(FrothyEntity, blocks, ENTITIES); + await this._graphWatcher.pruneEntities(FrothyEntity, blocks, SUBGRAPH_ENTITIES); } async pruneFrothyEntities (blockNumber: number): Promise { @@ -495,6 +495,8 @@ export class Indexer implements IndexerInterface { async resetLatestEntities (blockNumber: number): Promise { await this._graphWatcher.resetLatestEntities(blockNumber); + + await this.resetLatestEntities(blockNumber); } async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise { diff --git a/packages/graph-test-watcher/README.md b/packages/graph-test-watcher/README.md index eef1f12c..648569f1 100644 --- a/packages/graph-test-watcher/README.md +++ b/packages/graph-test-watcher/README.md @@ -53,7 +53,7 @@ * Generating state: - * Edit the custom hook function `createInitialCheckpoint` (triggered on watch-contract, checkpoint: `true`) in [hooks.ts](./src/hooks.ts) to save an initial checkpoint `State` using the `Indexer` object. + * Edit the custom hook function `createInitialState` (triggered if the watcher passes the start block, checkpoint: `true`) in [hooks.ts](./src/hooks.ts) to save an initial `State` using the `Indexer` object. * Edit the custom hook function `createStateDiff` (triggered on a block) in [hooks.ts](./src/hooks.ts) to save the state in a `diff` `State` using the `Indexer` object. The default state (if exists) is updated. @@ -144,6 +144,12 @@ GQL console: http://localhost:3008/graphql yarn reset job-queue --block-number ``` + * Reset state: + + ```bash + yarn reset state --block-number + ``` + * `block-number`: Block number to which to reset the watcher. * To export and import the watcher state: diff --git a/packages/graph-test-watcher/environments/local.toml b/packages/graph-test-watcher/environments/local.toml index 5397e56a..1ac82113 100644 --- a/packages/graph-test-watcher/environments/local.toml +++ b/packages/graph-test-watcher/environments/local.toml @@ -25,9 +25,22 @@ # Interval in number of blocks at which to clear entities cache. clearEntitiesCacheInterval = 1000 + # GQL cache settings + [server.gqlCache] + enabled = true + + # Max in-memory cache size (in bytes) (default 8 MB) + # maxCacheSize + + # GQL cache-control max-age settings (in seconds) + maxAge = 15 + timeTravelMaxAge = 86400 # 1 day + [metrics] host = "127.0.0.1" port = 9000 + [metrics.gql] + port = 9001 [database] type = "postgres" diff --git a/packages/graph-test-watcher/package.json b/packages/graph-test-watcher/package.json index fecf4b84..b91eafbb 100644 --- a/packages/graph-test-watcher/package.json +++ b/packages/graph-test-watcher/package.json @@ -38,7 +38,6 @@ "homepage": "https://github.com/cerc-io/watcher-ts#readme", "dependencies": { "@apollo/client": "^3.3.19", - "@cerc-io/cache": "^0.2.13", "@cerc-io/graph-node": "^0.2.13", "@cerc-io/ipld-eth-client": "^0.2.13", "@cerc-io/solidity-mapper": "^0.2.13", @@ -56,13 +55,13 @@ "graphql-subscriptions": "^2.0.0", "json-bigint": "^1.0.0", "lodash": "^4.17.21", - "multiformats": "^9.4.8", "reflect-metadata": "^0.1.13", "typeorm": "^0.2.32", "yargs": "^17.0.1" }, "devDependencies": { "@ethersproject/abi": "^5.3.0", + "@types/express": "^4.17.14", "@types/yargs": "^17.0.0", "@typescript-eslint/eslint-plugin": "^4.25.0", "@typescript-eslint/parser": "^4.25.0", diff --git a/packages/graph-test-watcher/src/cli/checkpoint-cmds/verify.ts b/packages/graph-test-watcher/src/cli/checkpoint-cmds/verify.ts index 3f85e148..84419586 100644 --- a/packages/graph-test-watcher/src/cli/checkpoint-cmds/verify.ts +++ b/packages/graph-test-watcher/src/cli/checkpoint-cmds/verify.ts @@ -8,7 +8,7 @@ import assert from 'assert'; import { getConfig, initClients, JobQueue, Config, verifyCheckpointData } from '@cerc-io/util'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; -import { Database } from '../../database'; +import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../../database'; import { Indexer } from '../../indexer'; const log = debug('vulcanize:checkpoint-verify'); @@ -33,7 +33,7 @@ export const handler = async (argv: any): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/graph-test-watcher/src/cli/export-state.ts b/packages/graph-test-watcher/src/cli/export-state.ts index b36b7e8f..33f7ff27 100644 --- a/packages/graph-test-watcher/src/cli/export-state.ts +++ b/packages/graph-test-watcher/src/cli/export-state.ts @@ -13,7 +13,7 @@ import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, StateKin import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; import * as codec from '@ipld/dag-cbor'; -import { Database } from '../database'; +import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../database'; import { Indexer } from '../indexer'; const log = debug('vulcanize:export-state'); @@ -47,7 +47,7 @@ const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/graph-test-watcher/src/cli/import-state.ts b/packages/graph-test-watcher/src/cli/import-state.ts index 9620b466..1182abb2 100644 --- a/packages/graph-test-watcher/src/cli/import-state.ts +++ b/packages/graph-test-watcher/src/cli/import-state.ts @@ -15,7 +15,7 @@ import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClien import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; import * as codec from '@ipld/dag-cbor'; -import { Database } from '../database'; +import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../database'; import { Indexer } from '../indexer'; import { EventWatcher } from '../events'; import { State } from '../entity/State'; @@ -47,7 +47,7 @@ export const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/graph-test-watcher/src/cli/index-block.ts b/packages/graph-test-watcher/src/cli/index-block.ts index f2d8ca94..9c54bbc8 100644 --- a/packages/graph-test-watcher/src/cli/index-block.ts +++ b/packages/graph-test-watcher/src/cli/index-block.ts @@ -10,7 +10,7 @@ import assert from 'assert'; import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, indexBlock } from '@cerc-io/util'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; -import { Database } from '../database'; +import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../database'; import { Indexer } from '../indexer'; const log = debug('vulcanize:index-block'); @@ -41,7 +41,7 @@ const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/graph-test-watcher/src/cli/reset-cmds/state.ts b/packages/graph-test-watcher/src/cli/reset-cmds/state.ts new file mode 100644 index 00000000..33211d6e --- /dev/null +++ b/packages/graph-test-watcher/src/cli/reset-cmds/state.ts @@ -0,0 +1,24 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import { ResetStateCmd } from '@cerc-io/cli'; + +import { Database } from '../../database'; + +export const command = 'state'; + +export const desc = 'Reset State to a given block number'; + +export const builder = { + blockNumber: { + type: 'number' + } +}; + +export const handler = async (argv: any): Promise => { + const resetStateCmd = new ResetStateCmd(); + await resetStateCmd.init(argv, Database); + + await resetStateCmd.exec(); +}; diff --git a/packages/graph-test-watcher/src/database.ts b/packages/graph-test-watcher/src/database.ts index 9948a2b5..ec7d49a5 100644 --- a/packages/graph-test-watcher/src/database.ts +++ b/packages/graph-test-watcher/src/database.ts @@ -3,7 +3,7 @@ // import assert from 'assert'; -import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm'; +import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions, EntityTarget } from 'typeorm'; import path from 'path'; import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util'; @@ -21,7 +21,9 @@ import { Author } from './entity/Author'; import { Blog } from './entity/Blog'; import { Category } from './entity/Category'; -export const ENTITIES = new Set([_Test, Author, Blog, Category, GetMethod]); +export const SUBGRAPH_ENTITIES = new Set([Author, Blog, Category]); +export const ENTITIES = [_Test, GetMethod, ...SUBGRAPH_ENTITIES]; +export const ENTITY_TO_LATEST_ENTITY_MAP: Map = new Map(); export class Database implements DatabaseInterface { _config: ConnectionOptions; @@ -34,7 +36,8 @@ export class Database implements DatabaseInterface { this._config = { ...config, - entities: [path.join(__dirname, 'entity/*')] + entities: [path.join(__dirname, 'entity/*')], + subscribers: [path.join(__dirname, 'entity/Subscriber.*')] }; this._baseDatabase = new BaseDatabase(this._config); @@ -123,6 +126,12 @@ export class Database implements DatabaseInterface { await this._baseDatabase.removeStates(repo, blockNumber, kind); } + async removeStatesAfterBlock (dbTx: QueryRunner, blockNumber: number): Promise { + const repo = dbTx.manager.getRepository(State); + + await this._baseDatabase.removeStatesAfterBlock(repo, blockNumber); + } + async getStateSyncStatus (): Promise { const repo = this._conn.getRepository(StateSyncStatus); @@ -262,7 +271,7 @@ export class Database implements DatabaseInterface { return this._baseDatabase.removeEntities(queryRunner, entity, findConditions); } - async deleteEntitiesByConditions (queryRunner: QueryRunner, entity: new () => Entity, findConditions: FindConditions): Promise { + async deleteEntitiesByConditions (queryRunner: QueryRunner, entity: EntityTarget, findConditions: FindConditions): Promise { await this._baseDatabase.deleteEntitiesByConditions(queryRunner, entity, findConditions); } diff --git a/packages/graph-test-watcher/src/entity/Author.ts b/packages/graph-test-watcher/src/entity/Author.ts index aa19feb1..832bc3f6 100644 --- a/packages/graph-test-watcher/src/entity/Author.ts +++ b/packages/graph-test-watcher/src/entity/Author.ts @@ -36,4 +36,7 @@ export class Author { @Column('numeric', { default: 0, transformer: decimalTransformer }) rating!: Decimal; + + @Column('boolean', { default: false }) + isPruned!: boolean; } diff --git a/packages/graph-test-watcher/src/entity/Blog.ts b/packages/graph-test-watcher/src/entity/Blog.ts index 6d7ff5c0..6dd508de 100644 --- a/packages/graph-test-watcher/src/entity/Blog.ts +++ b/packages/graph-test-watcher/src/entity/Blog.ts @@ -40,5 +40,8 @@ export class Blog { author!: string; @Column('varchar', { array: true }) - categories!: string[] + categories!: string[]; + + @Column('boolean', { default: false }) + isPruned!: boolean; } diff --git a/packages/graph-test-watcher/src/entity/Category.ts b/packages/graph-test-watcher/src/entity/Category.ts index f6a39ebb..4a685fb5 100644 --- a/packages/graph-test-watcher/src/entity/Category.ts +++ b/packages/graph-test-watcher/src/entity/Category.ts @@ -23,4 +23,7 @@ export class Category { @Column('varchar') name!: string; + + @Column('boolean', { default: false }) + isPruned!: boolean; } diff --git a/packages/graph-test-watcher/src/entity/Subscriber.ts b/packages/graph-test-watcher/src/entity/Subscriber.ts index fc1127d5..8a685533 100644 --- a/packages/graph-test-watcher/src/entity/Subscriber.ts +++ b/packages/graph-test-watcher/src/entity/Subscriber.ts @@ -4,17 +4,18 @@ import { EventSubscriber, EntitySubscriberInterface, InsertEvent, UpdateEvent } from 'typeorm'; -import { FrothyEntity } from './FrothyEntity'; -import { ENTITIES } from '../database'; import { afterEntityInsertOrUpdate } from '@cerc-io/graph-node'; +import { FrothyEntity } from './FrothyEntity'; +import { ENTITY_TO_LATEST_ENTITY_MAP, SUBGRAPH_ENTITIES } from '../database'; + @EventSubscriber() export class EntitySubscriber implements EntitySubscriberInterface { async afterInsert (event: InsertEvent): Promise { - await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event); + await afterEntityInsertOrUpdate(FrothyEntity, SUBGRAPH_ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP); } async afterUpdate (event: UpdateEvent): Promise { - await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event); + await afterEntityInsertOrUpdate(FrothyEntity, SUBGRAPH_ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP); } } diff --git a/packages/graph-test-watcher/src/fill-state.ts b/packages/graph-test-watcher/src/fill-state.ts new file mode 100644 index 00000000..e1994321 --- /dev/null +++ b/packages/graph-test-watcher/src/fill-state.ts @@ -0,0 +1,112 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import 'reflect-metadata'; +import debug from 'debug'; +import { Between } from 'typeorm'; + +import { Database as GraphDatabase, prepareEntityState } from '@cerc-io/graph-node'; + +import { Indexer } from './indexer'; + +const log = debug('vulcanize:fill-state'); + +export const fillState = async ( + indexer: Indexer, + graphDb: GraphDatabase, + dataSources: any[], + argv: { + startBlock: number, + endBlock: number + } +): Promise => { + const { startBlock, endBlock } = argv; + if (startBlock > endBlock) { + log('endBlock should be greater than or equal to startBlock'); + process.exit(1); + } + + // NOTE: Assuming all blocks in the given range are in the pruned region + log(`Filling state for subgraph entities in range: [${startBlock}, ${endBlock}]`); + + // Check that there are no existing diffs in this range + const existingStates = await indexer.getStates({ block: { blockNumber: Between(startBlock, endBlock) } }); + if (existingStates.length > 0) { + log('found existing state(s) in the given range'); + process.exit(1); + } + + // Map: contractAddress -> entities updated + const contractEntitiesMap: Map = new Map(); + + // Populate contractEntitiesMap using data sources from subgraph + // NOTE: Assuming each entity type is only mapped to a single contract + // This is true for eden subgraph; may not be the case for other subgraphs + dataSources.forEach((dataSource: any) => { + const { source: { address: contractAddress }, mapping: { entities } } = dataSource; + contractEntitiesMap.set(contractAddress, entities as string[]); + }); + + console.time('time:fill-state'); + + // Fill state for blocks in the given range + for (let blockNumber = startBlock; blockNumber <= endBlock; blockNumber++) { + console.time(`time:fill-state-${blockNumber}`); + + // Get the canonical block hash at current height + const blocks = await indexer.getBlocksAtHeight(blockNumber, false); + + if (blocks.length === 0) { + log(`block not found at height ${blockNumber}`); + process.exit(1); + } else if (blocks.length > 1) { + log(`found more than one non-pruned block at height ${blockNumber}`); + process.exit(1); + } + + const blockHash = blocks[0].blockHash; + + // Create initial state for contracts + await indexer.createInit(blockHash, blockNumber); + + // Fill state for each contract in contractEntitiesMap + const contractStatePromises = Array.from(contractEntitiesMap.entries()) + .map(async ([contractAddress, entities]): Promise => { + // Get all the updated entities at this block + const updatedEntitiesListPromises = entities.map(async (entity): Promise => { + return graphDb.getEntitiesForBlock(blockHash, entity); + }); + const updatedEntitiesList = await Promise.all(updatedEntitiesListPromises); + + // Populate state with all the updated entities of each entity type + updatedEntitiesList.forEach((updatedEntities, index) => { + const entityName = entities[index]; + + updatedEntities.forEach((updatedEntity) => { + // Prepare diff data for the entity update + const diffData = prepareEntityState(updatedEntity, entityName, indexer.getRelationsMap()); + + // Update the in-memory subgraph state + indexer.updateSubgraphState(contractAddress, diffData); + }); + }); + }); + + await Promise.all(contractStatePromises); + + // Persist subgraph state to the DB + await indexer.dumpSubgraphState(blockHash, true); + await indexer.updateStateSyncStatusIndexedBlock(blockNumber); + + // Create checkpoints + await indexer.processCheckpoint(blockHash); + await indexer.updateStateSyncStatusCheckpointBlock(blockNumber); + + console.timeEnd(`time:fill-state-${blockNumber}`); + } + + console.timeEnd('time:fill-state'); + + log(`Filled state for subgraph entities in range: [${startBlock}, ${endBlock}]`); +}; diff --git a/packages/graph-test-watcher/src/fill.ts b/packages/graph-test-watcher/src/fill.ts index 754b0c75..09b25ff6 100644 --- a/packages/graph-test-watcher/src/fill.ts +++ b/packages/graph-test-watcher/src/fill.ts @@ -12,9 +12,10 @@ import { PubSub } from 'graphql-subscriptions'; import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@cerc-io/util'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; -import { Database } from './database'; +import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; +import { fillState } from './fill-state'; const log = debug('vulcanize:server'); @@ -50,6 +51,11 @@ export const main = async (): Promise => { type: 'number', default: 10, describe: 'Number of blocks prefetched in batch' + }, + state: { + type: 'boolean', + default: false, + describe: 'Fill state for subgraph entities' } }).argv; @@ -59,7 +65,7 @@ export const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); @@ -79,6 +85,13 @@ export const main = async (): Promise => { graphWatcher.setIndexer(indexer); await graphWatcher.init(); + if (argv.state) { + assert(config.server.enableState, 'State creation disabled'); + await fillState(indexer, graphDb, graphWatcher.dataSources, argv); + + return; + } + // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); diff --git a/packages/graph-test-watcher/src/indexer.ts b/packages/graph-test-watcher/src/indexer.ts index 333f2479..dc632a74 100644 --- a/packages/graph-test-watcher/src/indexer.ts +++ b/packages/graph-test-watcher/src/indexer.ts @@ -33,7 +33,7 @@ import { } from '@cerc-io/util'; import { GraphWatcher } from '@cerc-io/graph-node'; -import { Database, ENTITIES } from './database'; +import { Database, ENTITIES, SUBGRAPH_ENTITIES } from './database'; import { Contract } from './entity/Contract'; import { Event } from './entity/Event'; import { SyncStatus } from './entity/SyncStatus'; @@ -209,8 +209,10 @@ export class Indexer implements IndexerInterface { } async processCanonicalBlock (blockHash: string, blockNumber: number): Promise { + console.time('time:indexer#processCanonicalBlock-finalize_auto_diffs'); // Finalize staged diff blocks if any. await this._baseIndexer.finalizeDiffStaged(blockHash); + console.timeEnd('time:indexer#processCanonicalBlock-finalize_auto_diffs'); // Call custom stateDiff hook. await createStateDiff(this, blockHash); @@ -223,7 +225,9 @@ export class Indexer implements IndexerInterface { const checkpointInterval = this._serverConfig.checkpointInterval; if (checkpointInterval <= 0) return; + console.time('time:indexer#processCheckpoint-checkpoint'); await this._baseIndexer.processCheckpoint(this, blockHash, checkpointInterval); + console.timeEnd('time:indexer#processCheckpoint-checkpoint'); } async processCLICheckpoint (contractAddress: string, blockHash?: string): Promise { @@ -256,7 +260,9 @@ export class Indexer implements IndexerInterface { // Method used to create auto diffs (diff_staged). async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise { + console.time('time:indexer#createDiffStaged-auto_diff'); await this._baseIndexer.createDiffStaged(contractAddress, blockHash, data); + console.timeEnd('time:indexer#createDiffStaged-auto_diff'); } // Method to be used by createStateDiff hook. @@ -275,7 +281,7 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.createStateCheckpoint(contractAddress, block, data); } - // Method to be used by checkpoint CLI. + // Method to be used by export-state CLI. async createCheckpoint (contractAddress: string, blockHash: string): Promise { const block = await this.getBlockProgress(blockHash); assert(block); @@ -283,6 +289,12 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.createCheckpoint(this, contractAddress, block); } + // Method to be used by fill-state CLI. + async createInit (blockHash: string, blockNumber: number): Promise { + // Create initial state for contracts. + await this._baseIndexer.createInit(this, blockHash, blockNumber); + } + async saveOrUpdateState (state: State): Promise { return this._baseIndexer.saveOrUpdateState(state); } @@ -302,11 +314,23 @@ export class Indexer implements IndexerInterface { return data; } + async getSubgraphEntities ( + entity: new () => Entity, + block: BlockHeight, + where: { [key: string]: any } = {}, + queryOptions: QueryOptions = {}, + selections: ReadonlyArray = [] + ): Promise { + return this._graphWatcher.getEntities(entity, this._relationsMap, block, where, queryOptions, selections); + } + async triggerIndexingOnEvent (event: Event): Promise { const resultEvent = this.getResultEvent(event); + console.time('time:indexer#processEvent-mapping_code'); // Call subgraph handler for event. await this._graphWatcher.handleEvent(resultEvent); + console.timeEnd('time:indexer#processEvent-mapping_code'); // Call custom hook function for indexing on event. await handleEvent(this, resultEvent); @@ -318,18 +342,24 @@ export class Indexer implements IndexerInterface { } async processBlock (blockProgress: BlockProgress): Promise { + console.time('time:indexer#processBlock-init_state'); // Call a function to create initial state for contracts. await this._baseIndexer.createInit(this, blockProgress.blockHash, blockProgress.blockNumber); + console.timeEnd('time:indexer#processBlock-init_state'); this._graphWatcher.updateEntityCacheFrothyBlocks(blockProgress); } async processBlockAfterEvents (blockHash: string, blockNumber: number): Promise { + console.time('time:indexer#processBlockAfterEvents-mapping_code'); // Call subgraph handler for block. await this._graphWatcher.handleBlock(blockHash, blockNumber); + console.timeEnd('time:indexer#processBlockAfterEvents-mapping_code'); + console.time('time:indexer#processBlockAfterEvents-dump_subgraph_state'); // Persist subgraph state to the DB. await this.dumpSubgraphState(blockHash); + console.timeEnd('time:indexer#processBlockAfterEvents-dump_subgraph_state'); } parseEventNameAndArgs (kind: string, logObj: any): any { @@ -489,13 +519,19 @@ export class Indexer implements IndexerInterface { } async markBlocksAsPruned (blocks: BlockProgress[]): Promise { - return this._baseIndexer.markBlocksAsPruned(blocks); + await this._baseIndexer.markBlocksAsPruned(blocks); + + await this._graphWatcher.pruneEntities(FrothyEntity, blocks, SUBGRAPH_ENTITIES); } async pruneFrothyEntities (blockNumber: number): Promise { await this._graphWatcher.pruneFrothyEntities(FrothyEntity, blockNumber); } + async resetLatestEntities (blockNumber: number): Promise { + await this._graphWatcher.resetLatestEntities(blockNumber); + } + async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise { return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex); } @@ -539,6 +575,8 @@ export class Indexer implements IndexerInterface { async resetWatcherToBlock (blockNumber: number): Promise { const entities = [...ENTITIES, FrothyEntity]; await this._baseIndexer.resetWatcherToBlock(blockNumber, entities); + + await this.resetLatestEntities(blockNumber); } _populateEntityTypesMap (): void { diff --git a/packages/graph-test-watcher/src/job-runner.ts b/packages/graph-test-watcher/src/job-runner.ts index 67f5338f..52fccec6 100644 --- a/packages/graph-test-watcher/src/job-runner.ts +++ b/packages/graph-test-watcher/src/job-runner.ts @@ -25,7 +25,7 @@ import { import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; import { Indexer } from './indexer'; -import { Database } from './database'; +import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; const log = debug('vulcanize:job-runner'); @@ -94,7 +94,7 @@ export const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/graph-test-watcher/src/resolvers.ts b/packages/graph-test-watcher/src/resolvers.ts index 4ea13311..403b7a62 100644 --- a/packages/graph-test-watcher/src/resolvers.ts +++ b/packages/graph-test-watcher/src/resolvers.ts @@ -8,7 +8,7 @@ import debug from 'debug'; import Decimal from 'decimal.js'; import { GraphQLResolveInfo, GraphQLScalarType } from 'graphql'; -import { ValueResult, BlockHeight, gqlTotalQueryCount, gqlQueryCount, jsonBigIntStringReplacer, getResultState } from '@cerc-io/util'; +import { ValueResult, BlockHeight, gqlTotalQueryCount, gqlQueryCount, jsonBigIntStringReplacer, getResultState, setGQLCacheHints } from '@cerc-io/util'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; @@ -22,6 +22,8 @@ const log = debug('vulcanize:resolver'); export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatcher): Promise => { assert(indexer); + const gqlCacheConfig = indexer.serverConfig.gqlCache; + return { BigInt: new BigInt('bigInt'), @@ -89,6 +91,9 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch gqlQueryCount.labels('blog').inc(1); assert(info.fieldNodes[0].selectionSet); + // Set cache-control hints + setGQLCacheHints(info, block, gqlCacheConfig); + return indexer.getSubgraphEntity(Blog, id, block, info.fieldNodes[0].selectionSet.selections); }, @@ -103,6 +108,9 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch gqlQueryCount.labels('category').inc(1); assert(info.fieldNodes[0].selectionSet); + // Set cache-control hints + setGQLCacheHints(info, block, gqlCacheConfig); + return indexer.getSubgraphEntity(Category, id, block, info.fieldNodes[0].selectionSet.selections); }, @@ -117,6 +125,9 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch gqlQueryCount.labels('author').inc(1); assert(info.fieldNodes[0].selectionSet); + // Set cache-control hints + setGQLCacheHints(info, block, gqlCacheConfig); + return indexer.getSubgraphEntity(Author, id, block, info.fieldNodes[0].selectionSet.selections); }, diff --git a/packages/graph-test-watcher/src/schema.gql b/packages/graph-test-watcher/src/schema.gql index 337bd259..6cdb1581 100644 --- a/packages/graph-test-watcher/src/schema.gql +++ b/packages/graph-test-watcher/src/schema.gql @@ -1,3 +1,14 @@ +enum CacheControlScope { + PUBLIC + PRIVATE +} + +directive @cacheControl( + maxAge: Int + scope: CacheControlScope + inheritMaxAge: Boolean +) on FIELD_DEFINITION | OBJECT | INTERFACE | UNION + scalar BigInt scalar Bytes @@ -103,8 +114,8 @@ type Blog { kind: BlogKind! isActive: Boolean! reviews: [BigInt!]! - author: Author! - categories: [Category!]! + author: Author! @cacheControl(inheritMaxAge: true) + categories: [Category!]! @cacheControl(inheritMaxAge: true) } type Category { @@ -121,7 +132,7 @@ type Author { paramInt: Int! paramBigInt: BigInt! paramBytes: Bytes! - blogs: [Blog!]! + blogs: [Blog!]! @cacheControl(inheritMaxAge: true) } type Mutation { diff --git a/packages/graph-test-watcher/src/server.ts b/packages/graph-test-watcher/src/server.ts index bd1fb4f9..29c81a9f 100644 --- a/packages/graph-test-watcher/src/server.ts +++ b/packages/graph-test-watcher/src/server.ts @@ -13,12 +13,12 @@ import { hideBin } from 'yargs/helpers'; import debug from 'debug'; import 'graphql-import-node'; -import { DEFAULT_CONFIG_PATH, getConfig, Config, JobQueue, KIND_ACTIVE, initClients, createAndStartServer } from '@cerc-io/util'; +import { DEFAULT_CONFIG_PATH, getConfig, Config, JobQueue, KIND_ACTIVE, initClients, startGQLMetricsServer, createAndStartServer } from '@cerc-io/util'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; import { createResolvers } from './resolvers'; import { Indexer } from './indexer'; -import { Database } from './database'; +import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; import { EventWatcher } from './events'; const log = debug('vulcanize:server'); @@ -42,7 +42,7 @@ export const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); @@ -81,6 +81,8 @@ export const main = async (): Promise => { const app: Application = express(); const server = createAndStartServer(app, typeDefs, resolvers, config.server); + startGQLMetricsServer(config); + return { app, server }; };