Accommodate GQL optimizations in graph-test-watcher (#249)

* Use separate subgraph entities list

* Implement changes in graph-test-watcher

* Reset latest entity tables and use cache directives in schema GQL
This commit is contained in:
nikugogoi 2022-11-22 15:20:44 +05:30 committed by GitHub
parent 570640d4bc
commit 6622d0874e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 291 additions and 41 deletions

View File

@ -32,8 +32,8 @@ import { Slot } from './entity/Slot';
import { SlotClaim } from './entity/SlotClaim'; import { SlotClaim } from './entity/SlotClaim';
import { Staker } from './entity/Staker'; import { Staker } from './entity/Staker';
export const ENTITIES = new Set([Account, Claim, Distribution, Distributor, Epoch, Network, Producer, ProducerEpoch, ProducerRewardCollectorChange, ProducerSet, ProducerSetChange, RewardSchedule, RewardScheduleEntry, Slash, Slot, SlotClaim, Staker]); export const SUBGRAPH_ENTITIES = new Set([Account, Claim, Distribution, Distributor, Epoch, Network, Producer, ProducerEpoch, ProducerRewardCollectorChange, ProducerSet, ProducerSetChange, RewardSchedule, RewardScheduleEntry, Slash, Slot, SlotClaim, Staker]);
export const ENTITIES = [...SUBGRAPH_ENTITIES];
export const ENTITY_TO_LATEST_ENTITY_MAP: Map<any, any> = new Map(); export const ENTITY_TO_LATEST_ENTITY_MAP: Map<any, any> = new Map();
export class Database implements DatabaseInterface { export class Database implements DatabaseInterface {

View File

@ -7,15 +7,15 @@ import { EventSubscriber, EntitySubscriberInterface, InsertEvent, UpdateEvent }
import { afterEntityInsertOrUpdate } from '@cerc-io/graph-node'; import { afterEntityInsertOrUpdate } from '@cerc-io/graph-node';
import { FrothyEntity } from './FrothyEntity'; import { FrothyEntity } from './FrothyEntity';
import { ENTITIES, ENTITY_TO_LATEST_ENTITY_MAP } from '../database'; import { ENTITY_TO_LATEST_ENTITY_MAP, SUBGRAPH_ENTITIES } from '../database';
@EventSubscriber() @EventSubscriber()
export class EntitySubscriber implements EntitySubscriberInterface { export class EntitySubscriber implements EntitySubscriberInterface {
async afterInsert (event: InsertEvent<any>): Promise<void> { async afterInsert (event: InsertEvent<any>): Promise<void> {
await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP); await afterEntityInsertOrUpdate(FrothyEntity, SUBGRAPH_ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP);
} }
async afterUpdate (event: UpdateEvent<any>): Promise<void> { async afterUpdate (event: UpdateEvent<any>): Promise<void> {
await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP); await afterEntityInsertOrUpdate(FrothyEntity, SUBGRAPH_ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP);
} }
} }

View File

@ -30,7 +30,7 @@ import {
} from '@cerc-io/util'; } from '@cerc-io/util';
import { GraphWatcher } from '@cerc-io/graph-node'; import { GraphWatcher } from '@cerc-io/graph-node';
import { Database, ENTITIES } from './database'; import { Database, ENTITIES, SUBGRAPH_ENTITIES } from './database';
import { Contract } from './entity/Contract'; import { Contract } from './entity/Contract';
import { Event } from './entity/Event'; import { Event } from './entity/Event';
import { SyncStatus } from './entity/SyncStatus'; import { SyncStatus } from './entity/SyncStatus';
@ -486,7 +486,7 @@ export class Indexer implements IndexerInterface {
async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> { async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> {
await this._baseIndexer.markBlocksAsPruned(blocks); await this._baseIndexer.markBlocksAsPruned(blocks);
await this._graphWatcher.pruneEntities(FrothyEntity, blocks, ENTITIES); await this._graphWatcher.pruneEntities(FrothyEntity, blocks, SUBGRAPH_ENTITIES);
} }
async pruneFrothyEntities (blockNumber: number): Promise<void> { async pruneFrothyEntities (blockNumber: number): Promise<void> {
@ -495,6 +495,8 @@ export class Indexer implements IndexerInterface {
async resetLatestEntities (blockNumber: number): Promise<void> { async resetLatestEntities (blockNumber: number): Promise<void> {
await this._graphWatcher.resetLatestEntities(blockNumber); await this._graphWatcher.resetLatestEntities(blockNumber);
await this.resetLatestEntities(blockNumber);
} }
async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> { async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {

View File

@ -53,7 +53,7 @@
* Generating state: * Generating state:
* Edit the custom hook function `createInitialCheckpoint` (triggered on watch-contract, checkpoint: `true`) in [hooks.ts](./src/hooks.ts) to save an initial checkpoint `State` using the `Indexer` object. * Edit the custom hook function `createInitialState` (triggered if the watcher passes the start block, checkpoint: `true`) in [hooks.ts](./src/hooks.ts) to save an initial `State` using the `Indexer` object.
* Edit the custom hook function `createStateDiff` (triggered on a block) in [hooks.ts](./src/hooks.ts) to save the state in a `diff` `State` using the `Indexer` object. The default state (if exists) is updated. * Edit the custom hook function `createStateDiff` (triggered on a block) in [hooks.ts](./src/hooks.ts) to save the state in a `diff` `State` using the `Indexer` object. The default state (if exists) is updated.
@ -144,6 +144,12 @@ GQL console: http://localhost:3008/graphql
yarn reset job-queue --block-number <previous-block-number> yarn reset job-queue --block-number <previous-block-number>
``` ```
* Reset state:
```bash
yarn reset state --block-number <previous-block-number>
```
* `block-number`: Block number to which to reset the watcher. * `block-number`: Block number to which to reset the watcher.
* To export and import the watcher state: * To export and import the watcher state:

View File

@ -25,9 +25,22 @@
# Interval in number of blocks at which to clear entities cache. # Interval in number of blocks at which to clear entities cache.
clearEntitiesCacheInterval = 1000 clearEntitiesCacheInterval = 1000
# GQL cache settings
[server.gqlCache]
enabled = true
# Max in-memory cache size (in bytes) (default 8 MB)
# maxCacheSize
# GQL cache-control max-age settings (in seconds)
maxAge = 15
timeTravelMaxAge = 86400 # 1 day
[metrics] [metrics]
host = "127.0.0.1" host = "127.0.0.1"
port = 9000 port = 9000
[metrics.gql]
port = 9001
[database] [database]
type = "postgres" type = "postgres"

View File

@ -38,7 +38,6 @@
"homepage": "https://github.com/cerc-io/watcher-ts#readme", "homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": { "dependencies": {
"@apollo/client": "^3.3.19", "@apollo/client": "^3.3.19",
"@cerc-io/cache": "^0.2.13",
"@cerc-io/graph-node": "^0.2.13", "@cerc-io/graph-node": "^0.2.13",
"@cerc-io/ipld-eth-client": "^0.2.13", "@cerc-io/ipld-eth-client": "^0.2.13",
"@cerc-io/solidity-mapper": "^0.2.13", "@cerc-io/solidity-mapper": "^0.2.13",
@ -56,13 +55,13 @@
"graphql-subscriptions": "^2.0.0", "graphql-subscriptions": "^2.0.0",
"json-bigint": "^1.0.0", "json-bigint": "^1.0.0",
"lodash": "^4.17.21", "lodash": "^4.17.21",
"multiformats": "^9.4.8",
"reflect-metadata": "^0.1.13", "reflect-metadata": "^0.1.13",
"typeorm": "^0.2.32", "typeorm": "^0.2.32",
"yargs": "^17.0.1" "yargs": "^17.0.1"
}, },
"devDependencies": { "devDependencies": {
"@ethersproject/abi": "^5.3.0", "@ethersproject/abi": "^5.3.0",
"@types/express": "^4.17.14",
"@types/yargs": "^17.0.0", "@types/yargs": "^17.0.0",
"@typescript-eslint/eslint-plugin": "^4.25.0", "@typescript-eslint/eslint-plugin": "^4.25.0",
"@typescript-eslint/parser": "^4.25.0", "@typescript-eslint/parser": "^4.25.0",

View File

@ -8,7 +8,7 @@ import assert from 'assert';
import { getConfig, initClients, JobQueue, Config, verifyCheckpointData } from '@cerc-io/util'; import { getConfig, initClients, JobQueue, Config, verifyCheckpointData } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { Database } from '../../database'; import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../../database';
import { Indexer } from '../../indexer'; import { Indexer } from '../../indexer';
const log = debug('vulcanize:checkpoint-verify'); const log = debug('vulcanize:checkpoint-verify');
@ -33,7 +33,7 @@ export const handler = async (argv: any): Promise<void> => {
const db = new Database(config.database); const db = new Database(config.database);
await db.init(); await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase); const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -13,7 +13,7 @@ import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, StateKin
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import * as codec from '@ipld/dag-cbor'; import * as codec from '@ipld/dag-cbor';
import { Database } from '../database'; import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../database';
import { Indexer } from '../indexer'; import { Indexer } from '../indexer';
const log = debug('vulcanize:export-state'); const log = debug('vulcanize:export-state');
@ -47,7 +47,7 @@ const main = async (): Promise<void> => {
const db = new Database(config.database); const db = new Database(config.database);
await db.init(); await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase); const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -15,7 +15,7 @@ import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClien
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import * as codec from '@ipld/dag-cbor'; import * as codec from '@ipld/dag-cbor';
import { Database } from '../database'; import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../database';
import { Indexer } from '../indexer'; import { Indexer } from '../indexer';
import { EventWatcher } from '../events'; import { EventWatcher } from '../events';
import { State } from '../entity/State'; import { State } from '../entity/State';
@ -47,7 +47,7 @@ export const main = async (): Promise<any> => {
const db = new Database(config.database); const db = new Database(config.database);
await db.init(); await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase); const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -10,7 +10,7 @@ import assert from 'assert';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, indexBlock } from '@cerc-io/util'; import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, indexBlock } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { Database } from '../database'; import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../database';
import { Indexer } from '../indexer'; import { Indexer } from '../indexer';
const log = debug('vulcanize:index-block'); const log = debug('vulcanize:index-block');
@ -41,7 +41,7 @@ const main = async (): Promise<void> => {
const db = new Database(config.database); const db = new Database(config.database);
await db.init(); await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase); const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -0,0 +1,24 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import { ResetStateCmd } from '@cerc-io/cli';
import { Database } from '../../database';
export const command = 'state';
export const desc = 'Reset State to a given block number';
export const builder = {
blockNumber: {
type: 'number'
}
};
export const handler = async (argv: any): Promise<void> => {
const resetStateCmd = new ResetStateCmd();
await resetStateCmd.init(argv, Database);
await resetStateCmd.exec();
};

View File

@ -3,7 +3,7 @@
// //
import assert from 'assert'; import assert from 'assert';
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm'; import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions, EntityTarget } from 'typeorm';
import path from 'path'; import path from 'path';
import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util'; import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util';
@ -21,7 +21,9 @@ import { Author } from './entity/Author';
import { Blog } from './entity/Blog'; import { Blog } from './entity/Blog';
import { Category } from './entity/Category'; import { Category } from './entity/Category';
export const ENTITIES = new Set([_Test, Author, Blog, Category, GetMethod]); export const SUBGRAPH_ENTITIES = new Set([Author, Blog, Category]);
export const ENTITIES = [_Test, GetMethod, ...SUBGRAPH_ENTITIES];
export const ENTITY_TO_LATEST_ENTITY_MAP: Map<any, any> = new Map();
export class Database implements DatabaseInterface { export class Database implements DatabaseInterface {
_config: ConnectionOptions; _config: ConnectionOptions;
@ -34,7 +36,8 @@ export class Database implements DatabaseInterface {
this._config = { this._config = {
...config, ...config,
entities: [path.join(__dirname, 'entity/*')] entities: [path.join(__dirname, 'entity/*')],
subscribers: [path.join(__dirname, 'entity/Subscriber.*')]
}; };
this._baseDatabase = new BaseDatabase(this._config); this._baseDatabase = new BaseDatabase(this._config);
@ -123,6 +126,12 @@ export class Database implements DatabaseInterface {
await this._baseDatabase.removeStates(repo, blockNumber, kind); await this._baseDatabase.removeStates(repo, blockNumber, kind);
} }
async removeStatesAfterBlock (dbTx: QueryRunner, blockNumber: number): Promise<void> {
const repo = dbTx.manager.getRepository(State);
await this._baseDatabase.removeStatesAfterBlock(repo, blockNumber);
}
async getStateSyncStatus (): Promise<StateSyncStatus | undefined> { async getStateSyncStatus (): Promise<StateSyncStatus | undefined> {
const repo = this._conn.getRepository(StateSyncStatus); const repo = this._conn.getRepository(StateSyncStatus);
@ -262,7 +271,7 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions); return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
} }
async deleteEntitiesByConditions<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions: FindConditions<Entity>): Promise<void> { async deleteEntitiesByConditions<Entity> (queryRunner: QueryRunner, entity: EntityTarget<Entity>, findConditions: FindConditions<Entity>): Promise<void> {
await this._baseDatabase.deleteEntitiesByConditions(queryRunner, entity, findConditions); await this._baseDatabase.deleteEntitiesByConditions(queryRunner, entity, findConditions);
} }

View File

@ -36,4 +36,7 @@ export class Author {
@Column('numeric', { default: 0, transformer: decimalTransformer }) @Column('numeric', { default: 0, transformer: decimalTransformer })
rating!: Decimal; rating!: Decimal;
@Column('boolean', { default: false })
isPruned!: boolean;
} }

View File

@ -40,5 +40,8 @@ export class Blog {
author!: string; author!: string;
@Column('varchar', { array: true }) @Column('varchar', { array: true })
categories!: string[] categories!: string[];
@Column('boolean', { default: false })
isPruned!: boolean;
} }

View File

@ -23,4 +23,7 @@ export class Category {
@Column('varchar') @Column('varchar')
name!: string; name!: string;
@Column('boolean', { default: false })
isPruned!: boolean;
} }

View File

@ -4,17 +4,18 @@
import { EventSubscriber, EntitySubscriberInterface, InsertEvent, UpdateEvent } from 'typeorm'; import { EventSubscriber, EntitySubscriberInterface, InsertEvent, UpdateEvent } from 'typeorm';
import { FrothyEntity } from './FrothyEntity';
import { ENTITIES } from '../database';
import { afterEntityInsertOrUpdate } from '@cerc-io/graph-node'; import { afterEntityInsertOrUpdate } from '@cerc-io/graph-node';
import { FrothyEntity } from './FrothyEntity';
import { ENTITY_TO_LATEST_ENTITY_MAP, SUBGRAPH_ENTITIES } from '../database';
@EventSubscriber() @EventSubscriber()
export class EntitySubscriber implements EntitySubscriberInterface { export class EntitySubscriber implements EntitySubscriberInterface {
async afterInsert (event: InsertEvent<any>): Promise<void> { async afterInsert (event: InsertEvent<any>): Promise<void> {
await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event); await afterEntityInsertOrUpdate(FrothyEntity, SUBGRAPH_ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP);
} }
async afterUpdate (event: UpdateEvent<any>): Promise<void> { async afterUpdate (event: UpdateEvent<any>): Promise<void> {
await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event); await afterEntityInsertOrUpdate(FrothyEntity, SUBGRAPH_ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP);
} }
} }

View File

@ -0,0 +1,112 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import 'reflect-metadata';
import debug from 'debug';
import { Between } from 'typeorm';
import { Database as GraphDatabase, prepareEntityState } from '@cerc-io/graph-node';
import { Indexer } from './indexer';
const log = debug('vulcanize:fill-state');
export const fillState = async (
indexer: Indexer,
graphDb: GraphDatabase,
dataSources: any[],
argv: {
startBlock: number,
endBlock: number
}
): Promise<void> => {
const { startBlock, endBlock } = argv;
if (startBlock > endBlock) {
log('endBlock should be greater than or equal to startBlock');
process.exit(1);
}
// NOTE: Assuming all blocks in the given range are in the pruned region
log(`Filling state for subgraph entities in range: [${startBlock}, ${endBlock}]`);
// Check that there are no existing diffs in this range
const existingStates = await indexer.getStates({ block: { blockNumber: Between(startBlock, endBlock) } });
if (existingStates.length > 0) {
log('found existing state(s) in the given range');
process.exit(1);
}
// Map: contractAddress -> entities updated
const contractEntitiesMap: Map<string, string[]> = new Map();
// Populate contractEntitiesMap using data sources from subgraph
// NOTE: Assuming each entity type is only mapped to a single contract
// This is true for eden subgraph; may not be the case for other subgraphs
dataSources.forEach((dataSource: any) => {
const { source: { address: contractAddress }, mapping: { entities } } = dataSource;
contractEntitiesMap.set(contractAddress, entities as string[]);
});
console.time('time:fill-state');
// Fill state for blocks in the given range
for (let blockNumber = startBlock; blockNumber <= endBlock; blockNumber++) {
console.time(`time:fill-state-${blockNumber}`);
// Get the canonical block hash at current height
const blocks = await indexer.getBlocksAtHeight(blockNumber, false);
if (blocks.length === 0) {
log(`block not found at height ${blockNumber}`);
process.exit(1);
} else if (blocks.length > 1) {
log(`found more than one non-pruned block at height ${blockNumber}`);
process.exit(1);
}
const blockHash = blocks[0].blockHash;
// Create initial state for contracts
await indexer.createInit(blockHash, blockNumber);
// Fill state for each contract in contractEntitiesMap
const contractStatePromises = Array.from(contractEntitiesMap.entries())
.map(async ([contractAddress, entities]): Promise<void> => {
// Get all the updated entities at this block
const updatedEntitiesListPromises = entities.map(async (entity): Promise<any[]> => {
return graphDb.getEntitiesForBlock(blockHash, entity);
});
const updatedEntitiesList = await Promise.all(updatedEntitiesListPromises);
// Populate state with all the updated entities of each entity type
updatedEntitiesList.forEach((updatedEntities, index) => {
const entityName = entities[index];
updatedEntities.forEach((updatedEntity) => {
// Prepare diff data for the entity update
const diffData = prepareEntityState(updatedEntity, entityName, indexer.getRelationsMap());
// Update the in-memory subgraph state
indexer.updateSubgraphState(contractAddress, diffData);
});
});
});
await Promise.all(contractStatePromises);
// Persist subgraph state to the DB
await indexer.dumpSubgraphState(blockHash, true);
await indexer.updateStateSyncStatusIndexedBlock(blockNumber);
// Create checkpoints
await indexer.processCheckpoint(blockHash);
await indexer.updateStateSyncStatusCheckpointBlock(blockNumber);
console.timeEnd(`time:fill-state-${blockNumber}`);
}
console.timeEnd('time:fill-state');
log(`Filled state for subgraph entities in range: [${startBlock}, ${endBlock}]`);
};

View File

@ -12,9 +12,10 @@ import { PubSub } from 'graphql-subscriptions';
import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@cerc-io/util'; import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { Database } from './database'; import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
import { EventWatcher } from './events'; import { EventWatcher } from './events';
import { fillState } from './fill-state';
const log = debug('vulcanize:server'); const log = debug('vulcanize:server');
@ -50,6 +51,11 @@ export const main = async (): Promise<any> => {
type: 'number', type: 'number',
default: 10, default: 10,
describe: 'Number of blocks prefetched in batch' describe: 'Number of blocks prefetched in batch'
},
state: {
type: 'boolean',
default: false,
describe: 'Fill state for subgraph entities'
} }
}).argv; }).argv;
@ -59,7 +65,7 @@ export const main = async (): Promise<any> => {
const db = new Database(config.database); const db = new Database(config.database);
await db.init(); await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase); const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
@ -79,6 +85,13 @@ export const main = async (): Promise<any> => {
graphWatcher.setIndexer(indexer); graphWatcher.setIndexer(indexer);
await graphWatcher.init(); await graphWatcher.init();
if (argv.state) {
assert(config.server.enableState, 'State creation disabled');
await fillState(indexer, graphDb, graphWatcher.dataSources, argv);
return;
}
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub(); const pubsub = new PubSub();

View File

@ -33,7 +33,7 @@ import {
} from '@cerc-io/util'; } from '@cerc-io/util';
import { GraphWatcher } from '@cerc-io/graph-node'; import { GraphWatcher } from '@cerc-io/graph-node';
import { Database, ENTITIES } from './database'; import { Database, ENTITIES, SUBGRAPH_ENTITIES } from './database';
import { Contract } from './entity/Contract'; import { Contract } from './entity/Contract';
import { Event } from './entity/Event'; import { Event } from './entity/Event';
import { SyncStatus } from './entity/SyncStatus'; import { SyncStatus } from './entity/SyncStatus';
@ -209,8 +209,10 @@ export class Indexer implements IndexerInterface {
} }
async processCanonicalBlock (blockHash: string, blockNumber: number): Promise<void> { async processCanonicalBlock (blockHash: string, blockNumber: number): Promise<void> {
console.time('time:indexer#processCanonicalBlock-finalize_auto_diffs');
// Finalize staged diff blocks if any. // Finalize staged diff blocks if any.
await this._baseIndexer.finalizeDiffStaged(blockHash); await this._baseIndexer.finalizeDiffStaged(blockHash);
console.timeEnd('time:indexer#processCanonicalBlock-finalize_auto_diffs');
// Call custom stateDiff hook. // Call custom stateDiff hook.
await createStateDiff(this, blockHash); await createStateDiff(this, blockHash);
@ -223,7 +225,9 @@ export class Indexer implements IndexerInterface {
const checkpointInterval = this._serverConfig.checkpointInterval; const checkpointInterval = this._serverConfig.checkpointInterval;
if (checkpointInterval <= 0) return; if (checkpointInterval <= 0) return;
console.time('time:indexer#processCheckpoint-checkpoint');
await this._baseIndexer.processCheckpoint(this, blockHash, checkpointInterval); await this._baseIndexer.processCheckpoint(this, blockHash, checkpointInterval);
console.timeEnd('time:indexer#processCheckpoint-checkpoint');
} }
async processCLICheckpoint (contractAddress: string, blockHash?: string): Promise<string | undefined> { async processCLICheckpoint (contractAddress: string, blockHash?: string): Promise<string | undefined> {
@ -256,7 +260,9 @@ export class Indexer implements IndexerInterface {
// Method used to create auto diffs (diff_staged). // Method used to create auto diffs (diff_staged).
async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise<void> { async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise<void> {
console.time('time:indexer#createDiffStaged-auto_diff');
await this._baseIndexer.createDiffStaged(contractAddress, blockHash, data); await this._baseIndexer.createDiffStaged(contractAddress, blockHash, data);
console.timeEnd('time:indexer#createDiffStaged-auto_diff');
} }
// Method to be used by createStateDiff hook. // Method to be used by createStateDiff hook.
@ -275,7 +281,7 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.createStateCheckpoint(contractAddress, block, data); return this._baseIndexer.createStateCheckpoint(contractAddress, block, data);
} }
// Method to be used by checkpoint CLI. // Method to be used by export-state CLI.
async createCheckpoint (contractAddress: string, blockHash: string): Promise<string | undefined> { async createCheckpoint (contractAddress: string, blockHash: string): Promise<string | undefined> {
const block = await this.getBlockProgress(blockHash); const block = await this.getBlockProgress(blockHash);
assert(block); assert(block);
@ -283,6 +289,12 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.createCheckpoint(this, contractAddress, block); return this._baseIndexer.createCheckpoint(this, contractAddress, block);
} }
// Method to be used by fill-state CLI.
async createInit (blockHash: string, blockNumber: number): Promise<void> {
// Create initial state for contracts.
await this._baseIndexer.createInit(this, blockHash, blockNumber);
}
async saveOrUpdateState (state: State): Promise<State> { async saveOrUpdateState (state: State): Promise<State> {
return this._baseIndexer.saveOrUpdateState(state); return this._baseIndexer.saveOrUpdateState(state);
} }
@ -302,11 +314,23 @@ export class Indexer implements IndexerInterface {
return data; return data;
} }
async getSubgraphEntities<Entity> (
entity: new () => Entity,
block: BlockHeight,
where: { [key: string]: any } = {},
queryOptions: QueryOptions = {},
selections: ReadonlyArray<SelectionNode> = []
): Promise<any[]> {
return this._graphWatcher.getEntities(entity, this._relationsMap, block, where, queryOptions, selections);
}
async triggerIndexingOnEvent (event: Event): Promise<void> { async triggerIndexingOnEvent (event: Event): Promise<void> {
const resultEvent = this.getResultEvent(event); const resultEvent = this.getResultEvent(event);
console.time('time:indexer#processEvent-mapping_code');
// Call subgraph handler for event. // Call subgraph handler for event.
await this._graphWatcher.handleEvent(resultEvent); await this._graphWatcher.handleEvent(resultEvent);
console.timeEnd('time:indexer#processEvent-mapping_code');
// Call custom hook function for indexing on event. // Call custom hook function for indexing on event.
await handleEvent(this, resultEvent); await handleEvent(this, resultEvent);
@ -318,18 +342,24 @@ export class Indexer implements IndexerInterface {
} }
async processBlock (blockProgress: BlockProgress): Promise<void> { async processBlock (blockProgress: BlockProgress): Promise<void> {
console.time('time:indexer#processBlock-init_state');
// Call a function to create initial state for contracts. // Call a function to create initial state for contracts.
await this._baseIndexer.createInit(this, blockProgress.blockHash, blockProgress.blockNumber); await this._baseIndexer.createInit(this, blockProgress.blockHash, blockProgress.blockNumber);
console.timeEnd('time:indexer#processBlock-init_state');
this._graphWatcher.updateEntityCacheFrothyBlocks(blockProgress); this._graphWatcher.updateEntityCacheFrothyBlocks(blockProgress);
} }
async processBlockAfterEvents (blockHash: string, blockNumber: number): Promise<void> { async processBlockAfterEvents (blockHash: string, blockNumber: number): Promise<void> {
console.time('time:indexer#processBlockAfterEvents-mapping_code');
// Call subgraph handler for block. // Call subgraph handler for block.
await this._graphWatcher.handleBlock(blockHash, blockNumber); await this._graphWatcher.handleBlock(blockHash, blockNumber);
console.timeEnd('time:indexer#processBlockAfterEvents-mapping_code');
console.time('time:indexer#processBlockAfterEvents-dump_subgraph_state');
// Persist subgraph state to the DB. // Persist subgraph state to the DB.
await this.dumpSubgraphState(blockHash); await this.dumpSubgraphState(blockHash);
console.timeEnd('time:indexer#processBlockAfterEvents-dump_subgraph_state');
} }
parseEventNameAndArgs (kind: string, logObj: any): any { parseEventNameAndArgs (kind: string, logObj: any): any {
@ -489,13 +519,19 @@ export class Indexer implements IndexerInterface {
} }
async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> { async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> {
return this._baseIndexer.markBlocksAsPruned(blocks); await this._baseIndexer.markBlocksAsPruned(blocks);
await this._graphWatcher.pruneEntities(FrothyEntity, blocks, SUBGRAPH_ENTITIES);
} }
async pruneFrothyEntities (blockNumber: number): Promise<void> { async pruneFrothyEntities (blockNumber: number): Promise<void> {
await this._graphWatcher.pruneFrothyEntities(FrothyEntity, blockNumber); await this._graphWatcher.pruneFrothyEntities(FrothyEntity, blockNumber);
} }
async resetLatestEntities (blockNumber: number): Promise<void> {
await this._graphWatcher.resetLatestEntities(blockNumber);
}
async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> { async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex); return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex);
} }
@ -539,6 +575,8 @@ export class Indexer implements IndexerInterface {
async resetWatcherToBlock (blockNumber: number): Promise<void> { async resetWatcherToBlock (blockNumber: number): Promise<void> {
const entities = [...ENTITIES, FrothyEntity]; const entities = [...ENTITIES, FrothyEntity];
await this._baseIndexer.resetWatcherToBlock(blockNumber, entities); await this._baseIndexer.resetWatcherToBlock(blockNumber, entities);
await this.resetLatestEntities(blockNumber);
} }
_populateEntityTypesMap (): void { _populateEntityTypesMap (): void {

View File

@ -25,7 +25,7 @@ import {
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
import { Database } from './database'; import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database';
const log = debug('vulcanize:job-runner'); const log = debug('vulcanize:job-runner');
@ -94,7 +94,7 @@ export const main = async (): Promise<any> => {
const db = new Database(config.database); const db = new Database(config.database);
await db.init(); await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase); const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -8,7 +8,7 @@ import debug from 'debug';
import Decimal from 'decimal.js'; import Decimal from 'decimal.js';
import { GraphQLResolveInfo, GraphQLScalarType } from 'graphql'; import { GraphQLResolveInfo, GraphQLScalarType } from 'graphql';
import { ValueResult, BlockHeight, gqlTotalQueryCount, gqlQueryCount, jsonBigIntStringReplacer, getResultState } from '@cerc-io/util'; import { ValueResult, BlockHeight, gqlTotalQueryCount, gqlQueryCount, jsonBigIntStringReplacer, getResultState, setGQLCacheHints } from '@cerc-io/util';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
import { EventWatcher } from './events'; import { EventWatcher } from './events';
@ -22,6 +22,8 @@ const log = debug('vulcanize:resolver');
export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatcher): Promise<any> => { export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatcher): Promise<any> => {
assert(indexer); assert(indexer);
const gqlCacheConfig = indexer.serverConfig.gqlCache;
return { return {
BigInt: new BigInt('bigInt'), BigInt: new BigInt('bigInt'),
@ -89,6 +91,9 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
gqlQueryCount.labels('blog').inc(1); gqlQueryCount.labels('blog').inc(1);
assert(info.fieldNodes[0].selectionSet); assert(info.fieldNodes[0].selectionSet);
// Set cache-control hints
setGQLCacheHints(info, block, gqlCacheConfig);
return indexer.getSubgraphEntity(Blog, id, block, info.fieldNodes[0].selectionSet.selections); return indexer.getSubgraphEntity(Blog, id, block, info.fieldNodes[0].selectionSet.selections);
}, },
@ -103,6 +108,9 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
gqlQueryCount.labels('category').inc(1); gqlQueryCount.labels('category').inc(1);
assert(info.fieldNodes[0].selectionSet); assert(info.fieldNodes[0].selectionSet);
// Set cache-control hints
setGQLCacheHints(info, block, gqlCacheConfig);
return indexer.getSubgraphEntity(Category, id, block, info.fieldNodes[0].selectionSet.selections); return indexer.getSubgraphEntity(Category, id, block, info.fieldNodes[0].selectionSet.selections);
}, },
@ -117,6 +125,9 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
gqlQueryCount.labels('author').inc(1); gqlQueryCount.labels('author').inc(1);
assert(info.fieldNodes[0].selectionSet); assert(info.fieldNodes[0].selectionSet);
// Set cache-control hints
setGQLCacheHints(info, block, gqlCacheConfig);
return indexer.getSubgraphEntity(Author, id, block, info.fieldNodes[0].selectionSet.selections); return indexer.getSubgraphEntity(Author, id, block, info.fieldNodes[0].selectionSet.selections);
}, },

View File

@ -1,3 +1,14 @@
enum CacheControlScope {
PUBLIC
PRIVATE
}
directive @cacheControl(
maxAge: Int
scope: CacheControlScope
inheritMaxAge: Boolean
) on FIELD_DEFINITION | OBJECT | INTERFACE | UNION
scalar BigInt scalar BigInt
scalar Bytes scalar Bytes
@ -103,8 +114,8 @@ type Blog {
kind: BlogKind! kind: BlogKind!
isActive: Boolean! isActive: Boolean!
reviews: [BigInt!]! reviews: [BigInt!]!
author: Author! author: Author! @cacheControl(inheritMaxAge: true)
categories: [Category!]! categories: [Category!]! @cacheControl(inheritMaxAge: true)
} }
type Category { type Category {
@ -121,7 +132,7 @@ type Author {
paramInt: Int! paramInt: Int!
paramBigInt: BigInt! paramBigInt: BigInt!
paramBytes: Bytes! paramBytes: Bytes!
blogs: [Blog!]! blogs: [Blog!]! @cacheControl(inheritMaxAge: true)
} }
type Mutation { type Mutation {

View File

@ -13,12 +13,12 @@ import { hideBin } from 'yargs/helpers';
import debug from 'debug'; import debug from 'debug';
import 'graphql-import-node'; import 'graphql-import-node';
import { DEFAULT_CONFIG_PATH, getConfig, Config, JobQueue, KIND_ACTIVE, initClients, createAndStartServer } from '@cerc-io/util'; import { DEFAULT_CONFIG_PATH, getConfig, Config, JobQueue, KIND_ACTIVE, initClients, startGQLMetricsServer, createAndStartServer } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { createResolvers } from './resolvers'; import { createResolvers } from './resolvers';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
import { Database } from './database'; import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database';
import { EventWatcher } from './events'; import { EventWatcher } from './events';
const log = debug('vulcanize:server'); const log = debug('vulcanize:server');
@ -42,7 +42,7 @@ export const main = async (): Promise<any> => {
const db = new Database(config.database); const db = new Database(config.database);
await db.init(); await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase); const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
@ -81,6 +81,8 @@ export const main = async (): Promise<any> => {
const app: Application = express(); const app: Application = express();
const server = createAndStartServer(app, typeDefs, resolvers, config.server); const server = createAndStartServer(app, typeDefs, resolvers, config.server);
startGQLMetricsServer(config);
return { app, server }; return { app, server };
}; };