Refactor fill and fill-state CLIs to cli package (#257)

* Refactor fill CLI to cli package

* Refactor method to fill-state to graph-node

* Refactor fill-state CLI to cli package

* Move subgraph state utils to a separate file

* Refactor subgraph state helper methods to graph-node

* Update mock indexer

* Move watcher job-runner to util

* Remove mock server and data from erc20-watcher

* Import watcher job-runner from util
This commit is contained in:
prathamesh0 2022-11-24 03:58:38 -06:00 committed by GitHub
parent 7717601408
commit aba0c665f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 597 additions and 1100 deletions

159
packages/cli/src/fill.ts Normal file
View File

@ -0,0 +1,159 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import assert from 'assert';
import 'reflect-metadata';
import debug from 'debug';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import { ConnectionOptions } from 'typeorm';
import { PubSub } from 'graphql-subscriptions';
import { JsonRpcProvider } from '@ethersproject/providers';
import { GraphWatcher, fillState } from '@cerc-io/graph-node';
import { EthClient } from '@cerc-io/ipld-eth-client';
import {
DEFAULT_CONFIG_PATH,
JobQueue,
DatabaseInterface,
IndexerInterface,
ServerConfig,
Clients,
EventWatcherInterface,
fillBlocks
} from '@cerc-io/util';
import { BaseCmd } from './base';
const log = debug('vulcanize:fill');
interface Arguments {
configFile: string;
startBlock: number;
endBlock: number;
prefetch: boolean;
batchBlocks: number;
state: boolean;
}
export class FillCmd {
_argv?: Arguments
_baseCmd: BaseCmd;
constructor () {
this._baseCmd = new BaseCmd();
}
get indexer (): IndexerInterface | undefined {
return this._baseCmd.indexer;
}
async initConfig<ConfigType> (): Promise<ConfigType> {
this._argv = this._getArgv();
assert(this._argv);
return this._baseCmd.initConfig(this._argv.configFile);
}
async init (
Database: new (config: ConnectionOptions,
serverConfig?: ServerConfig
) => DatabaseInterface,
Indexer: new (
serverConfig: ServerConfig,
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,
jobQueue: JobQueue,
graphWatcher?: GraphWatcher
) => IndexerInterface,
EventWatcher: new(
ethClient: EthClient,
indexer: IndexerInterface,
pubsub: PubSub,
jobQueue: JobQueue
) => EventWatcherInterface,
clients: { [key: string]: any } = {},
entityQueryTypeMap?: Map<any, any>,
entityToLatestEntityMap?: Map<any, any>
): Promise<void> {
await this.initConfig();
await this._baseCmd.init(Database, Indexer, clients, entityQueryTypeMap, entityToLatestEntityMap);
await this._baseCmd.initEventWatcher(EventWatcher);
}
async exec (contractEntitiesMap: Map<string, string[]> = new Map()): Promise<void> {
assert(this._argv);
const config = this._baseCmd.config;
const jobQueue = this._baseCmd.jobQueue;
const database = this._baseCmd.database;
const indexer = this._baseCmd.indexer;
const eventWatcher = this._baseCmd.eventWatcher;
assert(config);
assert(jobQueue);
assert(database);
assert(indexer);
assert(eventWatcher);
if (this._argv.state) {
assert(config.server.enableState, 'State creation disabled');
const { startBlock, endBlock } = this._argv;
// NOTE: Assuming all blocks in the given range are in the pruned region
log(`Filling state for subgraph entities in range: [${startBlock}, ${endBlock}]`);
await fillState(indexer, contractEntitiesMap, this._argv);
log(`Filled state for subgraph entities in range: [${startBlock}, ${endBlock}]`);
} else {
await fillBlocks(jobQueue, indexer, eventWatcher, config.jobQueue.blockDelayInMilliSecs, this._argv);
}
await database.close();
}
_getArgv (): any {
return yargs(hideBin(process.argv)).parserConfiguration({
'parse-numbers': false
}).env(
'FILL'
).options({
configFile: {
alias: 'f',
type: 'string',
require: true,
demandOption: true,
describe: 'Configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
},
startBlock: {
type: 'number',
demandOption: true,
describe: 'Block number to start processing at'
},
endBlock: {
type: 'number',
demandOption: true,
describe: 'Block number to stop processing at'
},
prefetch: {
type: 'boolean',
default: false,
describe: 'Block and events prefetch mode'
},
batchBlocks: {
type: 'number',
default: 10,
describe: 'Number of blocks prefetched in batch'
},
state: {
type: 'boolean',
default: false,
describe: 'Fill state for subgraph entities'
}
}).argv;
}
}

View File

@ -13,3 +13,4 @@ export * from './export-state';
export * from './server';
export * from './job-runner';
export * from './index-block';
export * from './fill';

View File

@ -17,12 +17,7 @@ import {
IndexerInterface,
ServerConfig,
Clients,
JobRunner as BaseJobRunner,
JobQueueConfig,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
QUEUE_BLOCK_CHECKPOINT,
QUEUE_HOOKS,
WatcherJobRunner as JobRunner,
startMetricsServer
} from '@cerc-io/util';
@ -112,41 +107,3 @@ export class JobRunnerCmd {
.argv;
}
}
export class JobRunner {
jobQueue: JobQueue
baseJobRunner: BaseJobRunner
_indexer: IndexerInterface
_jobQueueConfig: JobQueueConfig
constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) {
this._jobQueueConfig = jobQueueConfig;
this._indexer = indexer;
this.jobQueue = jobQueue;
this.baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this.jobQueue);
}
async subscribeBlockProcessingQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
await this.baseJobRunner.processBlock(job);
});
}
async subscribeEventProcessingQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
await this.baseJobRunner.processEvent(job);
});
}
async subscribeHooksQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
await this.baseJobRunner.processHooks(job);
});
}
async subscribeBlockCheckpointQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => {
await this.baseJobRunner.processCheckpoint(job);
});
}
}

View File

@ -286,6 +286,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getBlockProgressEntities(repo, where, options);
}
async getEntitiesForBlock (blockHash: string, tableName: string): Promise<any[]> {
return this._baseDatabase.getEntitiesForBlock(blockHash, tableName);
}
async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);

View File

@ -7,7 +7,6 @@ import debug from 'debug';
import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm';
import JSONbig from 'json-bigint';
import { ethers } from 'ethers';
import _ from 'lodash';
{{#if (subgraphPath)}}
import { SelectionNode } from 'graphql';
{{/if}}
@ -35,7 +34,7 @@ import {
getResultEvent
} from '@cerc-io/util';
{{#if (subgraphPath)}}
import { GraphWatcher } from '@cerc-io/graph-node';
import { GraphWatcher, updateSubgraphState, dumpSubgraphState } from '@cerc-io/graph-node';
{{/if}}
{{#each contracts as | contract |}}
@ -143,6 +142,12 @@ export class Indexer implements IndexerInterface {
return this._storageLayoutMap;
}
{{#if (subgraphPath)}}
get graphWatcher (): GraphWatcher {
return this._graphWatcher;
}
{{/if}}
async init (): Promise<void> {
await this._baseIndexer.fetchContracts();
await this._baseIndexer.fetchStateStatus();
@ -248,6 +253,10 @@ export class Indexer implements IndexerInterface {
);
}
async getEntitiesForBlock (blockHash: string, tableName: string): Promise<any[]> {
return this._db.getEntitiesForBlock(blockHash, tableName);
}
async processInitialState (contractAddress: string, blockHash: string): Promise<any> {
// Call initial state hook.
return createInitialState(this, contractAddress, blockHash);
@ -341,12 +350,14 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.createCheckpoint(this, contractAddress, block);
}
{{#if (subgraphPath)}}
// Method to be used by fill-state CLI.
async createInit (blockHash: string, blockNumber: number): Promise<void> {
// Create initial state for contracts.
await this._baseIndexer.createInit(this, blockHash, blockNumber);
}
{{/if}}
async saveOrUpdateState (state: State): Promise<State> {
return this._baseIndexer.saveOrUpdateState(state);
}
@ -636,27 +647,11 @@ export class Indexer implements IndexerInterface {
}
updateSubgraphState (contractAddress: string, data: any): void {
// Update the subgraph state for a given contract.
const oldData = this._subgraphStateMap.get(contractAddress);
const updatedData = _.merge(oldData, data);
this._subgraphStateMap.set(contractAddress, updatedData);
return updateSubgraphState(this._subgraphStateMap, contractAddress, data);
}
async dumpSubgraphState (blockHash: string, isStateFinalized = false): Promise<void> {
// Create a diff for each contract in the subgraph state map.
const createDiffPromises = Array.from(this._subgraphStateMap.entries())
.map(([contractAddress, data]): Promise<void> => {
if (isStateFinalized) {
return this.createDiff(contractAddress, blockHash, data);
}
return this.createDiffStaged(contractAddress, blockHash, data);
});
await Promise.all(createDiffPromises);
// Reset the subgraph state map.
this._subgraphStateMap.clear();
return dumpSubgraphState(this, this._subgraphStateMap, blockHash, isStateFinalized);
}
_populateEntityTypesMap (): void {

View File

@ -242,6 +242,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getBlockProgressEntities(repo, where, options);
}
async getEntitiesForBlock (blockHash: string, tableName: string): Promise<any[]> {
return this._baseDatabase.getEntitiesForBlock(blockHash, tableName);
}
async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);

View File

@ -1,112 +0,0 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import 'reflect-metadata';
import debug from 'debug';
import { Between } from 'typeorm';
import { Database as GraphDatabase, prepareEntityState } from '@cerc-io/graph-node';
import { Indexer } from './indexer';
const log = debug('vulcanize:fill-state');
export const fillState = async (
indexer: Indexer,
graphDb: GraphDatabase,
dataSources: any[],
argv: {
startBlock: number,
endBlock: number
}
): Promise<void> => {
const { startBlock, endBlock } = argv;
if (startBlock > endBlock) {
log('endBlock should be greater than or equal to startBlock');
process.exit(1);
}
// NOTE: Assuming all blocks in the given range are in the pruned region
log(`Filling state for subgraph entities in range: [${startBlock}, ${endBlock}]`);
// Check that there are no existing diffs in this range
const existingStates = await indexer.getStates({ block: { blockNumber: Between(startBlock, endBlock) } });
if (existingStates.length > 0) {
log('found existing state(s) in the given range');
process.exit(1);
}
// Map: contractAddress -> entities updated
const contractEntitiesMap: Map<string, string[]> = new Map();
// Populate contractEntitiesMap using data sources from subgraph
// NOTE: Assuming each entity type is only mapped to a single contract
// This is true for eden subgraph; may not be the case for other subgraphs
dataSources.forEach((dataSource: any) => {
const { source: { address: contractAddress }, mapping: { entities } } = dataSource;
contractEntitiesMap.set(contractAddress, entities as string[]);
});
console.time('time:fill-state');
// Fill state for blocks in the given range
for (let blockNumber = startBlock; blockNumber <= endBlock; blockNumber++) {
console.time(`time:fill-state-${blockNumber}`);
// Get the canonical block hash at current height
const blocks = await indexer.getBlocksAtHeight(blockNumber, false);
if (blocks.length === 0) {
log(`block not found at height ${blockNumber}`);
process.exit(1);
} else if (blocks.length > 1) {
log(`found more than one non-pruned block at height ${blockNumber}`);
process.exit(1);
}
const blockHash = blocks[0].blockHash;
// Create initial state for contracts
await indexer.createInit(blockHash, blockNumber);
// Fill state for each contract in contractEntitiesMap
const contractStatePromises = Array.from(contractEntitiesMap.entries())
.map(async ([contractAddress, entities]): Promise<void> => {
// Get all the updated entities at this block
const updatedEntitiesListPromises = entities.map(async (entity): Promise<any[]> => {
return graphDb.getEntitiesForBlock(blockHash, entity);
});
const updatedEntitiesList = await Promise.all(updatedEntitiesListPromises);
// Populate state with all the updated entities of each entity type
updatedEntitiesList.forEach((updatedEntities, index) => {
const entityName = entities[index];
updatedEntities.forEach((updatedEntity) => {
// Prepare diff data for the entity update
const diffData = prepareEntityState(updatedEntity, entityName, indexer.getRelationsMap());
// Update the in-memory subgraph state
indexer.updateSubgraphState(contractAddress, diffData);
});
});
});
await Promise.all(contractStatePromises);
// Persist subgraph state to the DB
await indexer.dumpSubgraphState(blockHash, true);
await indexer.updateStateSyncStatusIndexedBlock(blockNumber);
// Create checkpoints
await indexer.processCheckpoint(blockHash);
await indexer.updateStateSyncStatusCheckpointBlock(blockNumber);
console.timeEnd(`time:fill-state-${blockNumber}`);
}
console.timeEnd('time:fill-state');
log(`Filled state for subgraph entities in range: [${startBlock}, ${endBlock}]`);
};

View File

@ -4,101 +4,30 @@
import assert from 'assert';
import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';
import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { FillCmd } from '@cerc-io/cli';
import { getContractEntitiesMap } from '@cerc-io/graph-node';
import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database';
import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
import { fillState } from './fill-state';
const log = debug('vulcanize:server');
const log = debug('vulcanize:fill');
export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv)).parserConfiguration({
'parse-numbers': false
}).env(
'FILL'
).options({
configFile: {
alias: 'f',
type: 'string',
demandOption: true,
describe: 'configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
},
startBlock: {
type: 'number',
demandOption: true,
describe: 'Block number to start processing at'
},
endBlock: {
type: 'number',
demandOption: true,
describe: 'Block number to stop processing at'
},
prefetch: {
type: 'boolean',
default: false,
describe: 'Block and events prefetch mode'
},
batchBlocks: {
type: 'number',
default: 10,
describe: 'Number of blocks prefetched in batch'
},
state: {
type: 'boolean',
default: false,
describe: 'Fill state for subgraph entities'
}
}).argv;
const fillCmd = new FillCmd();
await fillCmd.init(Database, Indexer, EventWatcher, {}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP);
const config: Config = await getConfig(argv.configFile);
const { ethClient, ethProvider } = await initClients(config);
const indexer = fillCmd.indexer as Indexer;
assert(indexer);
const db = new Database(config.database);
await db.init();
// Get contractEntitiesMap required for fill-state
// NOTE: Assuming each entity type is only mapped to a single contract
// This is true for eden subgraph; may not be the case for other subgraphs
const contractEntitiesMap = getContractEntitiesMap(indexer.graphWatcher.dataSources);
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
if (argv.state) {
assert(config.server.enableState, 'State creation disabled');
await fillState(indexer, graphDb, graphWatcher.dataSources, argv);
return;
}
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv);
await fillCmd.exec(contractEntitiesMap);
};
main().catch(err => {

View File

@ -5,7 +5,6 @@
import assert from 'assert';
import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm';
import { ethers } from 'ethers';
import _ from 'lodash';
import { SelectionNode } from 'graphql';
import { JsonFragment } from '@ethersproject/abi';
@ -28,7 +27,7 @@ import {
DatabaseInterface,
Clients
} from '@cerc-io/util';
import { GraphWatcher } from '@cerc-io/graph-node';
import { GraphWatcher, updateSubgraphState, dumpSubgraphState } from '@cerc-io/graph-node';
import { Database, ENTITIES, SUBGRAPH_ENTITIES } from './database';
import { Contract } from './entity/Contract';
@ -139,6 +138,10 @@ export class Indexer implements IndexerInterface {
return this._storageLayoutMap;
}
get graphWatcher (): GraphWatcher {
return this._graphWatcher;
}
async init (): Promise<void> {
await this._baseIndexer.fetchContracts();
await this._baseIndexer.fetchStateStatus();
@ -158,6 +161,10 @@ export class Indexer implements IndexerInterface {
);
}
async getEntitiesForBlock (blockHash: string, tableName: string): Promise<any[]> {
return this._db.getEntitiesForBlock(blockHash, tableName);
}
async processInitialState (contractAddress: string, blockHash: string): Promise<any> {
// Call initial state hook.
return createInitialState(this, contractAddress, blockHash);
@ -521,27 +528,11 @@ export class Indexer implements IndexerInterface {
}
updateSubgraphState (contractAddress: string, data: any): void {
// Update the subgraph state for a given contract.
const oldData = this._subgraphStateMap.get(contractAddress);
const updatedData = _.merge(oldData, data);
this._subgraphStateMap.set(contractAddress, updatedData);
return updateSubgraphState(this._subgraphStateMap, contractAddress, data);
}
async dumpSubgraphState (blockHash: string, isStateFinalized = false): Promise<void> {
// Create a diff for each contract in the subgraph state map.
const createDiffPromises = Array.from(this._subgraphStateMap.entries())
.map(([contractAddress, data]): Promise<void> => {
if (isStateFinalized) {
return this.createDiff(contractAddress, blockHash, data);
}
return this.createDiffStaged(contractAddress, blockHash, data);
});
await Promise.all(createDiffPromises);
// Reset the subgraph state map.
this._subgraphStateMap.clear();
return dumpSubgraphState(this, this._subgraphStateMap, blockHash, isStateFinalized);
}
async resetWatcherToBlock (blockNumber: number): Promise<void> {

View File

@ -4,7 +4,8 @@
import debug from 'debug';
import { JobRunner, JobRunnerCmd } from '@cerc-io/cli';
import { JobRunnerCmd } from '@cerc-io/cli';
import { WatcherJobRunner as JobRunner } from '@cerc-io/util';
import { Indexer } from './indexer';
import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from './database';

View File

@ -189,15 +189,3 @@ $ yarn fill --startBlock 1000 --endBlock 2000
}
```
## Test
To run tests (GQL queries) against the mock server:
```
yarn run server:mock
```
```bash
yarn test
```

View File

@ -6,11 +6,9 @@
"main": "dist/index.js",
"scripts": {
"lint": "eslint .",
"test": "mocha -r ts-node/register src/**/*.test.ts",
"build": "tsc",
"server": "DEBUG=vulcanize:* node --enable-source-maps dist/server.js",
"server:dev": "DEBUG=vulcanize:* nodemon --watch src src/server.ts",
"server:mock": "MOCK=1 nodemon src/server.ts",
"job-runner": "DEBUG=vulcanize:* YARN_CHILD_PROCESS=true node --enable-source-maps dist/job-runner.js",
"job-runner:dev": "DEBUG=vulcanize:* YARN_CHILD_PROCESS=true ts-node src/job-runner.ts",
"watch:contract": "node --enable-source-maps dist/cli/watch-contract.js",

View File

@ -252,6 +252,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getBlockProgressEntities(repo, where, options);
}
async getEntitiesForBlock (blockHash: string, tableName: string): Promise<any[]> {
return this._baseDatabase.getEntitiesForBlock(blockHash, tableName);
}
async saveBlockProgress (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);

View File

@ -2,83 +2,22 @@
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';
import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@cerc-io/util';
import { FillCmd } from '@cerc-io/cli';
import { Database } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
const log = debug('vulcanize:server');
const log = debug('vulcanize:fill');
export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv)).parserConfiguration({
'parse-numbers': false
}).env(
'FILL'
).options({
configFile: {
alias: 'f',
type: 'string',
require: true,
demandOption: true,
describe: 'configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
},
startBlock: {
type: 'number',
require: true,
demandOption: true,
describe: 'Block number to start processing at'
},
endBlock: {
type: 'number',
require: true,
demandOption: true,
describe: 'Block number to stop processing at'
},
prefetch: {
type: 'boolean',
default: false,
describe: 'Block and events prefetch mode'
},
batchBlocks: {
type: 'number',
default: 10,
describe: 'Number of blocks prefetched in batch'
}
}).argv;
const fillCmd = new FillCmd();
await fillCmd.init(Database, Indexer, EventWatcher);
const config: Config = await getConfig(argv.configFile);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue);
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv);
await fillCmd.exec();
};
main().catch(err => {

View File

@ -262,6 +262,10 @@ export class Indexer implements IndexerInterface {
);
}
async getEntitiesForBlock (blockHash: string, tableName: string): Promise<any[]> {
return this._db.getEntitiesForBlock(blockHash, tableName);
}
async processCanonicalBlock (blockHash: string, blockNumber: number): Promise<void> {
// TODO Implement
}
@ -295,7 +299,19 @@ export class Indexer implements IndexerInterface {
return undefined;
}
// Method to be used by export-state CLI.
async getStates (where: FindConditions<State>): Promise<State[]> {
// TODO Implement
return [];
}
async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise<void> {
// TODO Implement
}
async createDiff (contractAddress: string, blockHash: string, data: any): Promise<void> {
// TODO Implement
}
async createCheckpoint (contractAddress: string, blockHash: string): Promise<string | undefined> {
// TODO Implement
return undefined;

View File

@ -4,7 +4,8 @@
import debug from 'debug';
import { JobRunner, JobRunnerCmd } from '@cerc-io/cli';
import { JobRunnerCmd } from '@cerc-io/cli';
import { WatcherJobRunner as JobRunner } from '@cerc-io/util';
import { Indexer } from './indexer';
import { Database } from './database';

View File

@ -1,50 +0,0 @@
//
// Copyright 2021 Vulcanize, Inc.
//
// TODO: Pull mock data for 5 tokens from rinkeby.
export const tokens: {[address: string]: {[variable: string]: string}} = {
'0xd87fea54f506972e3267239ec8e159548892074a': {
name: 'ChainLink Token',
symbol: 'LINK',
decimals: '18',
totalSupply: '1000000'
}
};
export const blocks: {[blockHash: string]: {[address: string]: any}} = {
// Block hash.
'0x77b5479a5856dd8ec63df6aabf9ce0913071a6dda3a3d54f3c9c940574bcb8ab': {
// ERC20 token address.
'0xd87fea54f506972e3267239ec8e159548892074a': {
...tokens['0xd87fea54f506972e3267239ec8e159548892074a'],
balanceOf: {
'0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc': '10000',
'0xCA6D29232D1435D8198E3E5302495417dD073d61': '500'
},
allowance: {
'0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc': {
'0xCA6D29232D1435D8198E3E5302495417dD073d61': '100',
'0x9273D9437B0bf2F1b7999d8dB72960d6379564d1': '200'
}
},
events: [
{
name: 'Transfer',
from: '0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc',
to: '0xCA6D29232D1435D8198E3E5302495417dD073d61',
value: '500'
},
{
name: 'Approval',
owner: '0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc',
spender: '0xCA6D29232D1435D8198E3E5302495417dD073d61',
value: '100'
}
]
}
}
};

View File

@ -1,90 +0,0 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import debug from 'debug';
import BigInt from 'apollo-type-bigint';
import { blocks } from './data';
const log = debug('test');
export const createResolvers = async (): Promise<any> => {
return {
BigInt: new BigInt('bigInt'),
TokenEvent: {
__resolveType: (obj: any) => {
if (obj.owner) {
return 'ApprovalEvent';
}
return 'TransferEvent';
}
},
Query: {
totalSupply: (_: any, { blockHash, token }: { blockHash: string, token: string }) => {
log('totalSupply', blockHash, token);
return {
value: blocks[blockHash][token].totalSupply,
proof: { data: '' }
};
},
balanceOf: (_: any, { blockHash, token, owner }: { blockHash: string, token: string, owner: string }) => {
log('balanceOf', blockHash, token, owner);
return {
value: blocks[blockHash][token].balanceOf[owner],
proof: { data: '' }
};
},
allowance: (_: any, { blockHash, token, owner, spender }: { blockHash: string, token: string, owner: string, spender: string }) => {
log('allowance', blockHash, token, owner, spender);
return {
value: blocks[blockHash][token].allowance[owner][spender],
proof: { data: '' }
};
},
name: (_: any, { blockHash, token }: { blockHash: string, token: string }) => {
log('name', blockHash, token);
return {
value: blocks[blockHash][token].name,
proof: { data: '' }
};
},
symbol: (_: any, { blockHash, token }: { blockHash: string, token: string }) => {
log('symbol', blockHash, token);
return {
value: blocks[blockHash][token].symbol,
proof: { data: '' }
};
},
decimals: (_: any, { blockHash, token }: { blockHash: string, token: string }) => {
log('decimals', blockHash, token);
return {
value: blocks[blockHash][token].decimals,
proof: { data: '' }
};
},
events: (_: any, { blockHash, token, name }: { blockHash: string, token: string, name: string }) => {
log('events', blockHash, token, name);
return blocks[blockHash][token].events
.filter((e: any) => !name || name === e.name)
.map((e: any) => ({ event: e }));
}
}
};
};

View File

@ -1,173 +0,0 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import 'mocha';
import { expect } from 'chai';
import _ from 'lodash';
import { GraphQLClient } from 'graphql-request';
import {
queryName,
querySymbol,
queryDecimals,
queryTotalSupply,
queryBalanceOf,
queryAllowance,
queryEvents
} from '../queries';
import { blocks, tokens as tokenInfo } from './data';
const testCases: {
balanceOf: any[],
allowance: any[],
events: any[],
tokens: any[]
} = {
balanceOf: [],
allowance: [],
events: [],
tokens: []
};
const blockHashes = _.keys(blocks);
blockHashes.forEach(blockHash => {
const block = blocks[blockHash];
const tokens = _.keys(block);
tokens.forEach(token => {
const tokenObj = block[token];
// Token info test cases.
testCases.tokens.push({
blockHash,
token,
info: tokenInfo[token]
});
// Event test cases.
testCases.events.push({
blockHash,
token,
events: tokenObj.events
});
// Balance test cases.
const balanceOfOwners = _.keys(tokenObj.balanceOf);
balanceOfOwners.forEach(owner => {
testCases.balanceOf.push({
blockHash,
token,
owner,
balance: tokenObj.balanceOf[owner]
});
});
// Allowance test cases.
const allowanceOwners = _.keys(tokenObj.allowance);
allowanceOwners.forEach(owner => {
const allowanceObj = tokenObj.allowance[owner];
const spenders = _.keys(allowanceObj);
spenders.forEach(spender => {
testCases.allowance.push({
blockHash,
token,
owner,
spender,
allowance: allowanceObj[spender]
});
});
});
});
});
describe('server', () => {
const client = new GraphQLClient('http://localhost:3001/graphql');
it('query token info', async () => {
const tests = testCases.tokens;
expect(tests.length).to.be.greaterThan(0);
for (let i = 0; i < tests.length; i++) {
const testCase = tests[i];
// Token totalSupply.
let result = await client.request(queryTotalSupply, testCase);
expect(result.totalSupply.value).to.equal(testCase.info.totalSupply);
expect(result.totalSupply.proof.data).to.equal('');
// Token name.
result = await client.request(queryName, testCase);
expect(result.name.value).to.equal(testCase.info.name);
expect(result.name.proof.data).to.equal('');
// Token symbol.
result = await client.request(querySymbol, testCase);
expect(result.symbol.value).to.equal(testCase.info.symbol);
expect(result.symbol.proof.data).to.equal('');
// Token decimals.
result = await client.request(queryDecimals, testCase);
expect(result.decimals.value).to.equal(testCase.info.decimals);
expect(result.decimals.proof.data).to.equal('');
}
});
it('query balanceOf', async () => {
const tests = testCases.balanceOf;
expect(tests.length).to.be.greaterThan(0);
for (let i = 0; i < tests.length; i++) {
const testCase = tests[i];
const result = await client.request(queryBalanceOf, testCase);
expect(result.balanceOf.value).to.equal(testCase.balance);
// TODO: Check proof.
expect(result.balanceOf.proof.data).to.equal('');
}
});
it('query allowance', async () => {
const tests = testCases.allowance;
expect(tests.length).to.be.greaterThan(0);
for (let i = 0; i < tests.length; i++) {
const testCase = tests[i];
const result = await client.request(queryAllowance, testCase);
expect(result.allowance.value).to.equal(testCase.allowance);
// TODO: Check proof.
expect(result.allowance.proof.data).to.equal('');
}
});
it('query events', async () => {
const tests = testCases.events;
expect(tests.length).to.be.greaterThan(0);
for (let i = 0; i < tests.length; i++) {
const testCase = tests[i];
const result = await client.request(queryEvents, testCase);
const resultEvents = result.events.map((record: any) => record.event);
expect(resultEvents.length).to.equal(testCase.events.length);
resultEvents.forEach((resultEvent: any, index: number) => {
const { name, ...testCaseEvent } = testCase.events[index];
if (name === 'Transfer') {
expect(resultEvent.__typename).to.equal('TransferEvent');
} else if (name === 'Approval') {
expect(resultEvent.__typename).to.equal('ApprovalEvent');
}
expect(resultEvent).to.include(testCaseEvent);
});
// TODO: Check proof.
}
});
});

View File

@ -8,7 +8,6 @@ import 'graphql-import-node';
import { ServerCmd } from '@cerc-io/cli';
import typeDefs from './schema';
import { createResolvers as createMockResolvers } from './mock/resolvers';
import { createResolvers } from './resolvers';
import { Indexer } from './indexer';
import { Database } from './database';
@ -20,7 +19,7 @@ export const main = async (): Promise<any> => {
const serverCmd = new ServerCmd();
await serverCmd.init(Database, Indexer, EventWatcher);
return process.env.MOCK ? serverCmd.exec(createMockResolvers, typeDefs) : serverCmd.exec(createResolvers, typeDefs);
return serverCmd.exec(createResolvers, typeDefs);
};
main().then(() => {

View File

@ -482,6 +482,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getBlockProgressEntities(repo, where, options);
}
async getEntitiesForBlock (blockHash: string, tableName: string): Promise<any[]> {
return this._baseDatabase.getEntitiesForBlock(blockHash, tableName);
}
async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);

View File

@ -2,81 +2,22 @@
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';
import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@cerc-io/util';
import { FillCmd } from '@cerc-io/cli';
import { Database } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
const log = debug('vulcanize:server');
const log = debug('vulcanize:fill');
export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv)).parserConfiguration({
'parse-numbers': false
}).env(
'FILL'
).options({
configFile: {
alias: 'f',
type: 'string',
demandOption: true,
describe: 'configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
},
startBlock: {
type: 'number',
demandOption: true,
describe: 'Block number to start processing at'
},
endBlock: {
type: 'number',
demandOption: true,
describe: 'Block number to stop processing at'
},
prefetch: {
type: 'boolean',
default: false,
describe: 'Block and events prefetch mode'
},
batchBlocks: {
type: 'number',
default: 10,
describe: 'Number of blocks prefetched in batch'
}
}).argv;
const fillCmd = new FillCmd();
await fillCmd.init(Database, Indexer, EventWatcher);
const config: Config = await getConfig(argv.configFile);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue);
await indexer.init();
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv);
await fillCmd.exec();
};
main().catch(err => {

View File

@ -596,6 +596,10 @@ export class Indexer implements IndexerInterface {
);
}
async getEntitiesForBlock (blockHash: string, tableName: string): Promise<any[]> {
return this._db.getEntitiesForBlock(blockHash, tableName);
}
async processInitialState (contractAddress: string, blockHash: string): Promise<any> {
// Call initial state hook.
return createInitialState(this, contractAddress, blockHash);
@ -646,6 +650,10 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getStateByCID(cid);
}
async getStates (where: FindConditions<State>): Promise<State[]> {
return this._db.getStates(where);
}
getStateData (state: State): any {
return this._baseIndexer.getStateData(state);
}

View File

@ -4,7 +4,8 @@
import debug from 'debug';
import { JobRunner, JobRunnerCmd } from '@cerc-io/cli';
import { JobRunnerCmd } from '@cerc-io/cli';
import { WatcherJobRunner as JobRunner } from '@cerc-io/util';
import { Indexer } from './indexer';
import { Database } from './database';

View File

@ -1,8 +1,7 @@
export * from './watcher';
export * from './database';
export {
prepareEntityState,
updateEntitiesFromState,
resolveEntityFieldConflicts,
afterEntityInsertOrUpdate
} from './utils';
export * from './state-utils';

View File

@ -24,9 +24,9 @@ import {
toEthereumValue,
getEthereumTypes,
jsonFromBytes,
getStorageValueType,
prepareEntityState
getStorageValueType
} from './utils';
import { prepareEntityState } from './state-utils';
import { Database } from './database';
// Endianness of BN used in bigInt store host API.

View File

@ -0,0 +1,208 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import assert from 'assert';
import debug from 'debug';
import _ from 'lodash';
import { Between } from 'typeorm';
import { IndexerInterface, jsonBigIntStringReplacer, StateInterface } from '@cerc-io/util';
import { Database } from './database';
import { resolveEntityFieldConflicts } from './utils';
const log = debug('vulcanize:state-utils');
export const prepareEntityState = (updatedEntity: any, entityName: string, relationsMap: Map<any, { [key: string]: any }>): any => {
// Resolve any field name conflicts in the dbData for auto-diff.
updatedEntity = resolveEntityFieldConflicts(updatedEntity);
// Prepare the diff data.
const diffData: any = { state: {} };
const result = Array.from(relationsMap.entries())
.find(([key]) => key.name === entityName);
if (result) {
// Update entity data if relations exist.
const [_, relations] = result;
// Update relation fields for diff data to be similar to GQL query entities.
Object.entries(relations).forEach(([relation, { isArray, isDerived }]) => {
if (isDerived || !updatedEntity[relation]) {
// Field is not present in dbData for derived relations
return;
}
if (isArray) {
updatedEntity[relation] = updatedEntity[relation].map((id: string) => ({ id }));
} else {
updatedEntity[relation] = { id: updatedEntity[relation] };
}
});
}
// JSON stringify and parse data for handling unknown types when encoding.
// For example, decimal.js values are converted to string in the diff data.
diffData.state[entityName] = {
// Using custom replacer to store bigints as string values to be encoded by IPLD dag-cbor.
// TODO: Parse and store as native bigint by using Type encoders in IPLD dag-cbor encode.
// https://github.com/rvagg/cborg#type-encoders
[updatedEntity.id]: JSON.parse(JSON.stringify(updatedEntity, jsonBigIntStringReplacer))
};
return diffData;
};
export const updateEntitiesFromState = async (database: Database, indexer: IndexerInterface, state: StateInterface) => {
const data = indexer.getStateData(state);
// Get relations for subgraph entity
assert(indexer.getRelationsMap);
const relationsMap = indexer.getRelationsMap();
for (const [entityName, entities] of Object.entries(data.state)) {
const result = Array.from(relationsMap.entries())
.find(([key]) => key.name === entityName);
const relations = result ? result[1] : {};
log(`Updating entities from State for entity ${entityName}`);
console.time(`time:watcher#GraphWatcher-updateEntitiesFromState-update-entity-${entityName}`);
for (const [id, entityData] of Object.entries(entities as any)) {
const dbData = database.fromState(state.block, entityName, entityData, relations);
await database.saveEntity(entityName, dbData);
}
console.timeEnd(`time:watcher#GraphWatcher-updateEntitiesFromState-update-entity-${entityName}`);
}
};
export const updateSubgraphState = (subgraphStateMap: Map<string, any>, contractAddress: string, data: any): void => {
// Update the subgraph state for a given contract.
const oldData = subgraphStateMap.get(contractAddress);
const updatedData = _.merge(oldData, data);
subgraphStateMap.set(contractAddress, updatedData);
};
export const dumpSubgraphState = async (
indexer: IndexerInterface,
subgraphStateMap: Map<string, any>,
blockHash: string,
isStateFinalized = false
): Promise<void> => {
// Create a diff for each contract in the subgraph state map.
const createDiffPromises = Array.from(subgraphStateMap.entries())
.map(([contractAddress, data]): Promise<void> => {
if (isStateFinalized) {
return indexer.createDiff(contractAddress, blockHash, data);
}
return indexer.createDiffStaged(contractAddress, blockHash, data);
});
await Promise.all(createDiffPromises);
// Reset the subgraph state map.
subgraphStateMap.clear();
};
export const getContractEntitiesMap = (dataSources: any[]): Map<string, string[]> => {
// Map: contractAddress -> entities updated
const contractEntitiesMap: Map<string, string[]> = new Map();
// Populate contractEntitiesMap using data sources from subgraph
dataSources.forEach((dataSource: any) => {
const { source: { address: contractAddress }, mapping: { entities } } = dataSource;
contractEntitiesMap.set(contractAddress, entities as string[]);
});
return contractEntitiesMap;
};
export const fillState = async (
indexer: IndexerInterface,
contractEntitiesMap: Map<string, string[]>,
argv: {
startBlock: number,
endBlock: number
}
): Promise<void> => {
const { startBlock, endBlock } = argv;
if (startBlock > endBlock) {
log('endBlock should be greater than or equal to startBlock');
process.exit(1);
}
// Check that there are no existing diffs in this range
const existingStates = await indexer.getStates({ block: { blockNumber: Between(startBlock, endBlock) } });
if (existingStates.length > 0) {
log('found existing state(s) in the given range');
process.exit(1);
}
console.time('time:fill-state');
// Fill state for blocks in the given range
for (let blockNumber = startBlock; blockNumber <= endBlock; blockNumber++) {
console.time(`time:fill-state-${blockNumber}`);
// Get the canonical block hash at current height
const blocks = await indexer.getBlocksAtHeight(blockNumber, false);
if (blocks.length === 0) {
log(`block not found at height ${blockNumber}`);
process.exit(1);
} else if (blocks.length > 1) {
log(`found more than one non-pruned block at height ${blockNumber}`);
process.exit(1);
}
const blockHash = blocks[0].blockHash;
// Create initial state for contracts
assert(indexer.createInit);
await indexer.createInit(blockHash, blockNumber);
// Fill state for each contract in contractEntitiesMap
const contractStatePromises = Array.from(contractEntitiesMap.entries())
.map(async ([contractAddress, entities]): Promise<void> => {
// Get all the updated entities at this block
const updatedEntitiesListPromises = entities.map(async (entity): Promise<any[]> => {
return indexer.getEntitiesForBlock(blockHash, entity);
});
const updatedEntitiesList = await Promise.all(updatedEntitiesListPromises);
// Populate state with all the updated entities of each entity type
updatedEntitiesList.forEach((updatedEntities, index) => {
const entityName = entities[index];
updatedEntities.forEach((updatedEntity) => {
assert(indexer.getRelationsMap);
assert(indexer.updateSubgraphState);
// Prepare diff data for the entity update
const diffData = prepareEntityState(updatedEntity, entityName, indexer.getRelationsMap());
// Update the in-memory subgraph state
indexer.updateSubgraphState(contractAddress, diffData);
});
});
});
await Promise.all(contractStatePromises);
// Persist subgraph state to the DB
assert(indexer.dumpSubgraphState);
await indexer.dumpSubgraphState(blockHash, true);
await indexer.updateStateSyncStatusIndexedBlock(blockNumber);
// Create checkpoints
await indexer.processCheckpoint(blockHash);
await indexer.updateStateSyncStatusCheckpointBlock(blockNumber);
console.timeEnd(`time:fill-state-${blockNumber}`);
}
console.timeEnd('time:fill-state');
};

View File

@ -1,3 +1,7 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import { BigNumber, utils } from 'ethers';
import path from 'path';
import fs from 'fs-extra';
@ -8,11 +12,10 @@ import { ColumnMetadata } from 'typeorm/metadata/ColumnMetadata';
import assert from 'assert';
import _ from 'lodash';
import { GraphDecimal, IndexerInterface, jsonBigIntStringReplacer, StateInterface } from '@cerc-io/util';
import { GraphDecimal } from '@cerc-io/util';
import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
import { TypeId, EthereumValueKind, ValueKind } from './types';
import { Database } from './database';
const log = debug('vulcanize:utils');
@ -802,47 +805,6 @@ const getEthereumType = (storageTypes: StorageLayout['types'], type: string, map
return utils.ParamType.from(label);
};
export const prepareEntityState = (updatedEntity: any, entityName: string, relationsMap: Map<any, { [key: string]: any }>): any => {
// Resolve any field name conflicts in the dbData for auto-diff.
updatedEntity = resolveEntityFieldConflicts(updatedEntity);
// Prepare the diff data.
const diffData: any = { state: {} };
const result = Array.from(relationsMap.entries())
.find(([key]) => key.name === entityName);
if (result) {
// Update entity data if relations exist.
const [_, relations] = result;
// Update relation fields for diff data to be similar to GQL query entities.
Object.entries(relations).forEach(([relation, { isArray, isDerived }]) => {
if (isDerived || !updatedEntity[relation]) {
// Field is not present in dbData for derived relations
return;
}
if (isArray) {
updatedEntity[relation] = updatedEntity[relation].map((id: string) => ({ id }));
} else {
updatedEntity[relation] = { id: updatedEntity[relation] };
}
});
}
// JSON stringify and parse data for handling unknown types when encoding.
// For example, decimal.js values are converted to string in the diff data.
diffData.state[entityName] = {
// Using custom replacer to store bigints as string values to be encoded by IPLD dag-cbor.
// TODO: Parse and store as native bigint by using Type encoders in IPLD dag-cbor encode.
// https://github.com/rvagg/cborg#type-encoders
[updatedEntity.id]: JSON.parse(JSON.stringify(updatedEntity, jsonBigIntStringReplacer))
};
return diffData;
};
export const fromStateEntityValues = (
stateEntity: any,
propertyName: string,
@ -876,29 +838,6 @@ export const fromStateEntityValues = (
return stateEntity[propertyName];
};
export const updateEntitiesFromState = async (database: Database, indexer: IndexerInterface, state: StateInterface) => {
const data = indexer.getStateData(state);
// Get relations for subgraph entity
assert(indexer.getRelationsMap);
const relationsMap = indexer.getRelationsMap();
for (const [entityName, entities] of Object.entries(data.state)) {
const result = Array.from(relationsMap.entries())
.find(([key]) => key.name === entityName);
const relations = result ? result[1] : {};
log(`Updating entities from State for entity ${entityName}`);
console.time(`time:watcher#GraphWatcher-updateEntitiesFromState-update-entity-${entityName}`);
for (const [id, entityData] of Object.entries(entities as any)) {
const dbData = database.fromState(state.block, entityName, entityData, relations);
await database.saveEntity(entityName, dbData);
}
console.timeEnd(`time:watcher#GraphWatcher-updateEntitiesFromState-update-entity-${entityName}`);
}
};
export const afterEntityInsertOrUpdate = async<Entity> (
frothyEntityType: EntityTarget<Entity>,
entities: Set<any>,
@ -953,7 +892,7 @@ export const afterEntityInsertOrUpdate = async<Entity> (
.execute();
};
export function getLatestEntityFromEntity<Entity> (latestEntityRepo: Repository<Entity>, entity: any): Entity {
export const getLatestEntityFromEntity = <Entity> (latestEntityRepo: Repository<Entity>, entity: any): Entity => {
const latestEntityFields = latestEntityRepo.metadata.columns.map(column => column.propertyName);
return latestEntityRepo.create(_.pick(entity, latestEntityFields) as DeepPartial<Entity>);
}
};

View File

@ -14,7 +14,8 @@ import { ResultObject } from '@vulcanize/assemblyscript/lib/loader';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { getFullBlock, BlockHeight, ServerConfig, getFullTransaction, QueryOptions, StateInterface, IndexerInterface, BlockProgressInterface } from '@cerc-io/util';
import { createBlock, createEvent, getSubgraphConfig, resolveEntityFieldConflicts, Transaction, updateEntitiesFromState } from './utils';
import { createBlock, createEvent, getSubgraphConfig, resolveEntityFieldConflicts, Transaction } from './utils';
import { updateEntitiesFromState } from './state-utils';
import { Context, GraphData, instantiate } from './loader';
import { Database, DEFAULT_LIMIT } from './database';

View File

@ -58,6 +58,10 @@ export class Indexer implements IndexerInterface {
);
}
async getEntitiesForBlock (blockHash: string, tableName: string): Promise<any[]> {
return [];
}
async getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined> {
assert(blockHash);
@ -217,6 +221,14 @@ export class Indexer implements IndexerInterface {
return undefined;
}
async getStates (where: FindConditions<StateInterface>): Promise<StateInterface[]> {
return [];
}
async createDiff (contractAddress: string, blockHash: string, data: any): Promise<void> {
return undefined;
}
async createCheckpoint (contractAddress: string, blockHash: string): Promise<string | undefined> {
return undefined;
}
@ -230,7 +242,7 @@ export class Indexer implements IndexerInterface {
}
async removeStates (blockNumber: number, kind: StateKind): Promise<void> {
// TODO Implement
return undefined;
}
getStateData (state: StateInterface): any {

View File

@ -262,6 +262,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getBlockProgressEntities(repo, where, options);
}
async getEntitiesForBlock (blockHash: string, tableName: string): Promise<any[]> {
return this._baseDatabase.getEntitiesForBlock(blockHash, tableName);
}
async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);

View File

@ -1,112 +0,0 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import 'reflect-metadata';
import debug from 'debug';
import { Between } from 'typeorm';
import { Database as GraphDatabase, prepareEntityState } from '@cerc-io/graph-node';
import { Indexer } from './indexer';
const log = debug('vulcanize:fill-state');
export const fillState = async (
indexer: Indexer,
graphDb: GraphDatabase,
dataSources: any[],
argv: {
startBlock: number,
endBlock: number
}
): Promise<void> => {
const { startBlock, endBlock } = argv;
if (startBlock > endBlock) {
log('endBlock should be greater than or equal to startBlock');
process.exit(1);
}
// NOTE: Assuming all blocks in the given range are in the pruned region
log(`Filling state for subgraph entities in range: [${startBlock}, ${endBlock}]`);
// Check that there are no existing diffs in this range
const existingStates = await indexer.getStates({ block: { blockNumber: Between(startBlock, endBlock) } });
if (existingStates.length > 0) {
log('found existing state(s) in the given range');
process.exit(1);
}
// Map: contractAddress -> entities updated
const contractEntitiesMap: Map<string, string[]> = new Map();
// Populate contractEntitiesMap using data sources from subgraph
// NOTE: Assuming each entity type is only mapped to a single contract
// This is true for eden subgraph; may not be the case for other subgraphs
dataSources.forEach((dataSource: any) => {
const { source: { address: contractAddress }, mapping: { entities } } = dataSource;
contractEntitiesMap.set(contractAddress, entities as string[]);
});
console.time('time:fill-state');
// Fill state for blocks in the given range
for (let blockNumber = startBlock; blockNumber <= endBlock; blockNumber++) {
console.time(`time:fill-state-${blockNumber}`);
// Get the canonical block hash at current height
const blocks = await indexer.getBlocksAtHeight(blockNumber, false);
if (blocks.length === 0) {
log(`block not found at height ${blockNumber}`);
process.exit(1);
} else if (blocks.length > 1) {
log(`found more than one non-pruned block at height ${blockNumber}`);
process.exit(1);
}
const blockHash = blocks[0].blockHash;
// Create initial state for contracts
await indexer.createInit(blockHash, blockNumber);
// Fill state for each contract in contractEntitiesMap
const contractStatePromises = Array.from(contractEntitiesMap.entries())
.map(async ([contractAddress, entities]): Promise<void> => {
// Get all the updated entities at this block
const updatedEntitiesListPromises = entities.map(async (entity): Promise<any[]> => {
return graphDb.getEntitiesForBlock(blockHash, entity);
});
const updatedEntitiesList = await Promise.all(updatedEntitiesListPromises);
// Populate state with all the updated entities of each entity type
updatedEntitiesList.forEach((updatedEntities, index) => {
const entityName = entities[index];
updatedEntities.forEach((updatedEntity) => {
// Prepare diff data for the entity update
const diffData = prepareEntityState(updatedEntity, entityName, indexer.getRelationsMap());
// Update the in-memory subgraph state
indexer.updateSubgraphState(contractAddress, diffData);
});
});
});
await Promise.all(contractStatePromises);
// Persist subgraph state to the DB
await indexer.dumpSubgraphState(blockHash, true);
await indexer.updateStateSyncStatusIndexedBlock(blockNumber);
// Create checkpoints
await indexer.processCheckpoint(blockHash);
await indexer.updateStateSyncStatusCheckpointBlock(blockNumber);
console.timeEnd(`time:fill-state-${blockNumber}`);
}
console.timeEnd('time:fill-state');
log(`Filled state for subgraph entities in range: [${startBlock}, ${endBlock}]`);
};

View File

@ -4,101 +4,28 @@
import assert from 'assert';
import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';
import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { FillCmd } from '@cerc-io/cli';
import { getContractEntitiesMap } from '@cerc-io/graph-node';
import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database';
import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
import { fillState } from './fill-state';
const log = debug('vulcanize:server');
const log = debug('vulcanize:fill');
export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv)).parserConfiguration({
'parse-numbers': false
}).env(
'FILL'
).options({
configFile: {
alias: 'f',
type: 'string',
demandOption: true,
describe: 'configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
},
startBlock: {
type: 'number',
demandOption: true,
describe: 'Block number to start processing at'
},
endBlock: {
type: 'number',
demandOption: true,
describe: 'Block number to stop processing at'
},
prefetch: {
type: 'boolean',
default: false,
describe: 'Block and events prefetch mode'
},
batchBlocks: {
type: 'number',
default: 10,
describe: 'Number of blocks prefetched in batch'
},
state: {
type: 'boolean',
default: false,
describe: 'Fill state for subgraph entities'
}
}).argv;
const fillCmd = new FillCmd();
await fillCmd.init(Database, Indexer, EventWatcher, {}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP);
const config: Config = await getConfig(argv.configFile);
const { ethClient, ethProvider } = await initClients(config);
const indexer = fillCmd.indexer as Indexer;
assert(indexer);
const db = new Database(config.database);
await db.init();
// Get contractEntitiesMap required for fill-state
const contractEntitiesMap = getContractEntitiesMap(indexer.graphWatcher.dataSources);
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
if (argv.state) {
assert(config.server.enableState, 'State creation disabled');
await fillState(indexer, graphDb, graphWatcher.dataSources, argv);
return;
}
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv);
await fillCmd.exec(contractEntitiesMap);
};
main().catch(err => {

View File

@ -7,7 +7,6 @@ import debug from 'debug';
import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm';
import JSONbig from 'json-bigint';
import { ethers } from 'ethers';
import _ from 'lodash';
import { SelectionNode } from 'graphql';
import { JsonFragment } from '@ethersproject/abi';
@ -31,7 +30,7 @@ import {
DatabaseInterface,
Clients
} from '@cerc-io/util';
import { GraphWatcher } from '@cerc-io/graph-node';
import { GraphWatcher, updateSubgraphState, dumpSubgraphState } from '@cerc-io/graph-node';
import { Database, ENTITIES, SUBGRAPH_ENTITIES } from './database';
import { Contract } from './entity/Contract';
@ -114,6 +113,10 @@ export class Indexer implements IndexerInterface {
return this._storageLayoutMap;
}
get graphWatcher (): GraphWatcher {
return this._graphWatcher;
}
async init (): Promise<void> {
await this._baseIndexer.fetchContracts();
await this._baseIndexer.fetchStateStatus();
@ -198,6 +201,10 @@ export class Indexer implements IndexerInterface {
);
}
async getEntitiesForBlock (blockHash: string, tableName: string): Promise<any[]> {
return this._db.getEntitiesForBlock(blockHash, tableName);
}
async processInitialState (contractAddress: string, blockHash: string): Promise<any> {
// Call initial state hook.
return createInitialState(this, contractAddress, blockHash);
@ -554,27 +561,11 @@ export class Indexer implements IndexerInterface {
}
updateSubgraphState (contractAddress: string, data: any): void {
// Update the subgraph state for a given contract.
const oldData = this._subgraphStateMap.get(contractAddress);
const updatedData = _.merge(oldData, data);
this._subgraphStateMap.set(contractAddress, updatedData);
return updateSubgraphState(this._subgraphStateMap, contractAddress, data);
}
async dumpSubgraphState (blockHash: string, isStateFinalized = false): Promise<void> {
// Create a diff for each contract in the subgraph state map.
const createDiffPromises = Array.from(this._subgraphStateMap.entries())
.map(([contractAddress, data]): Promise<void> => {
if (isStateFinalized) {
return this.createDiff(contractAddress, blockHash, data);
}
return this.createDiffStaged(contractAddress, blockHash, data);
});
await Promise.all(createDiffPromises);
// Reset the subgraph state map.
this._subgraphStateMap.clear();
return dumpSubgraphState(this, this._subgraphStateMap, blockHash, isStateFinalized);
}
async resetWatcherToBlock (blockNumber: number): Promise<void> {

View File

@ -4,7 +4,8 @@
import debug from 'debug';
import { JobRunner, JobRunnerCmd } from '@cerc-io/cli';
import { JobRunnerCmd } from '@cerc-io/cli';
import { WatcherJobRunner as JobRunner } from '@cerc-io/util';
import { Indexer } from './indexer';
import { Database } from './database';

View File

@ -314,6 +314,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getBlockProgressEntities(repo, where, options);
}
async getEntitiesForBlock (blockHash: string, tableName: string): Promise<any[]> {
return this._baseDatabase.getEntitiesForBlock(blockHash, tableName);
}
async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);

View File

@ -2,81 +2,22 @@
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';
import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@cerc-io/util';
import { FillCmd } from '@cerc-io/cli';
import { Database } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
const log = debug('vulcanize:server');
const log = debug('vulcanize:fill');
export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv)).parserConfiguration({
'parse-numbers': false
}).env(
'FILL'
).options({
configFile: {
alias: 'f',
type: 'string',
demandOption: true,
describe: 'configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
},
startBlock: {
type: 'number',
demandOption: true,
describe: 'Block number to start processing at'
},
endBlock: {
type: 'number',
demandOption: true,
describe: 'Block number to stop processing at'
},
prefetch: {
type: 'boolean',
default: false,
describe: 'Block and events prefetch mode'
},
batchBlocks: {
type: 'number',
default: 10,
describe: 'Number of blocks prefetched in batch'
}
}).argv;
const fillCmd = new FillCmd();
await fillCmd.init(Database, Indexer, EventWatcher);
const config: Config = await getConfig(argv.configFile);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue);
await indexer.init();
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv);
await fillCmd.exec();
};
main().catch(err => {

View File

@ -323,6 +323,10 @@ export class Indexer implements IndexerInterface {
);
}
async getEntitiesForBlock (blockHash: string, tableName: string): Promise<any[]> {
return this._db.getEntitiesForBlock(blockHash, tableName);
}
async processInitialState (contractAddress: string, blockHash: string): Promise<any> {
// Call initial state hook.
return createInitialState(this, contractAddress, blockHash);
@ -373,6 +377,10 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getStateByCID(cid);
}
async getStates (where: FindConditions<State>): Promise<State[]> {
return this._db.getStates(where);
}
getStateData (state: State): any {
return this._baseIndexer.getStateData(state);
}

View File

@ -4,7 +4,8 @@
import debug from 'debug';
import { JobRunner, JobRunnerCmd } from '@cerc-io/cli';
import { JobRunnerCmd } from '@cerc-io/cli';
import { WatcherJobRunner as JobRunner } from '@cerc-io/util';
import { Indexer } from './indexer';
import { Database } from './database';

View File

@ -305,6 +305,18 @@ export class Database {
eventCount.inc(knownEvents);
}
async getEntitiesForBlock (blockHash: string, tableName: string): Promise<any[]> {
const repo = this._conn.getRepository(tableName);
const entities = await repo.find({
where: {
blockHash
}
});
return entities;
}
async getEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions<Entity>): Promise<Entity[]> {
const repo = queryRunner.manager.getRepository(entity);

View File

@ -14,7 +14,9 @@ import {
JOB_KIND_CONTRACT,
MAX_REORG_DEPTH,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING
QUEUE_EVENT_PROCESSING,
QUEUE_BLOCK_CHECKPOINT,
QUEUE_HOOKS
} from './constants';
import { JobQueue } from './job-queue';
import { EventInterface, IndexerInterface } from './types';
@ -427,3 +429,41 @@ export class JobRunner {
this._indexer.updateStateStatusMap(contract.address, {});
}
}
export class WatcherJobRunner {
jobQueue: JobQueue
baseJobRunner: JobRunner
_indexer: IndexerInterface
_jobQueueConfig: JobQueueConfig
constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) {
this._jobQueueConfig = jobQueueConfig;
this._indexer = indexer;
this.jobQueue = jobQueue;
this.baseJobRunner = new JobRunner(this._jobQueueConfig, this._indexer, this.jobQueue);
}
async subscribeBlockProcessingQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
await this.baseJobRunner.processBlock(job);
});
}
async subscribeEventProcessingQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
await this.baseJobRunner.processEvent(job);
});
}
async subscribeHooksQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
await this.baseJobRunner.processHooks(job);
});
}
async subscribeBlockCheckpointQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => {
await this.baseJobRunner.processCheckpoint(job);
});
}
}

View File

@ -86,6 +86,7 @@ export interface IndexerInterface {
init (): Promise<void>
getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined>
getBlockProgressEntities (where: FindConditions<BlockProgressInterface>, options: FindManyOptions<BlockProgressInterface>): Promise<BlockProgressInterface[]>
getEntitiesForBlock (blockHash: string, tableName: string): Promise<any[]>
getEvent (id: string): Promise<EventInterface | undefined>
getSyncStatus (): Promise<SyncStatusInterface | undefined>
getStateSyncStatus (): Promise<StateSyncStatusInterface | undefined>
@ -121,12 +122,17 @@ export interface IndexerInterface {
processCanonicalBlock (blockHash: string, blockNumber: number): Promise<void>
processCheckpoint (blockHash: string): Promise<void>
processCLICheckpoint (contractAddress: string, blockHash?: string): Promise<string | undefined>
createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise<void>
createDiff (contractAddress: string, blockHash: string, data: any): Promise<void>
createCheckpoint (contractAddress: string, blockHash: string): Promise<string | undefined>
createInit? (blockHash: string, blockNumber: number): Promise<void>
getStorageValue (storageLayout: StorageLayout, blockHash: string, contractAddress: string, variable: string, ...mappingKeys: MappingKey[]): Promise<ValueResult>
updateSubgraphState?: (contractAddress: string, data: any) => void
dumpSubgraphState?: (blockHash: string, isStateFinalized?: boolean) => Promise<void>
updateStateStatusMap (address: string, stateStatus: StateStatus): void
getStateData (state: StateInterface): any
getStateByCID (cid: string): Promise<StateInterface | undefined>
getStates (where: FindConditions<StateInterface>): Promise<StateInterface[]>
getLatestState (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise<StateInterface | undefined>
saveOrUpdateState (state: StateInterface): Promise<StateInterface>
removeStates (blockNumber: number, kind: StateKind): Promise<void>