Add reset CLI generation

This commit is contained in:
Prathamesh Musale 2021-10-21 13:02:23 +05:30 committed by nabarun
parent 4ddb8c4af6
commit c349b23f29
14 changed files with 270 additions and 34 deletions

View File

@ -13,6 +13,9 @@ columns:
columnType: ManyToOne
lhs: ()
rhs: BlockProgress
columnOptions:
- option: onDelete
value: "'CASCADE'"
- name: txHash
pgType: varchar
tsType: string

View File

@ -17,6 +17,9 @@ columns:
columnType: ManyToOne
lhs: ()
rhs: BlockProgress
columnOptions:
- option: onDelete
value: "'CASCADE'"
- name: contractAddress
pgType: varchar
tsType: string

View File

@ -79,6 +79,12 @@ export class Entity {
}
]
});
entityObject.columns.push({
name: 'blockNumber',
pgType: 'integer',
tsType: 'number',
columnType: 'Column'
});
entityObject.columns.push({
name: 'contractAddress',
pgType: 'varchar',

View File

@ -126,8 +126,8 @@ function generateWatcher (data: string, visitor: Visitor, argv: any) {
const entitiesFolder = path.join(outputDir, 'src/entity');
if (!fs.existsSync(entitiesFolder)) fs.mkdirSync(entitiesFolder, { recursive: true });
const cliFolder = path.join(outputDir, 'src/cli');
if (!fs.existsSync(cliFolder)) fs.mkdirSync(cliFolder, { recursive: true });
const resetCmdsFolder = path.join(outputDir, 'src/cli/reset-cmds');
if (!fs.existsSync(resetCmdsFolder)) fs.mkdirSync(resetCmdsFolder, { recursive: true });
}
const inputFileName = path.basename(argv['input-file'], '.sol');
@ -214,21 +214,17 @@ function generateWatcher (data: string, visitor: Visitor, argv: any) {
: process.stdout;
exportCheckpoint(outStream);
let hooksOutStream;
if (outputDir) {
hooksOutStream = fs.createWriteStream(path.join(outputDir, 'src/hooks.ts'));
} else {
hooksOutStream = process.stdout;
}
exportHooks(hooksOutStream);
outStream = outputDir
? fs.createWriteStream(path.join(outputDir, 'src/hooks.ts'))
: process.stdout;
exportHooks(outStream);
outStream = outputDir
? fs.createWriteStream(path.join(outputDir, 'src/fill.ts'))
: process.stdout;
exportFill(outStream);
let rcOutStream;
let ignoreOutStream;
let rcOutStream, ignoreOutStream;
if (outputDir) {
rcOutStream = fs.createWriteStream(path.join(outputDir, '.eslintrc.json'));
ignoreOutStream = fs.createWriteStream(path.join(outputDir, '.eslintignore'));
@ -242,6 +238,18 @@ function generateWatcher (data: string, visitor: Visitor, argv: any) {
? fs.createWriteStream(path.join(outputDir, 'src/client.ts'))
: process.stdout;
visitor.exportClient(outStream, schemaContent, path.join(outputDir, 'src/gql'));
let resetOutStream, resetJQOutStream, resetStateOutStream;
if (outputDir) {
resetOutStream = fs.createWriteStream(path.join(outputDir, 'src/cli/reset.ts'));
resetJQOutStream = fs.createWriteStream(path.join(outputDir, 'src/cli/reset-cmds/job-queue.ts'));
resetStateOutStream = fs.createWriteStream(path.join(outputDir, 'src/cli/reset-cmds/state.ts'));
} else {
resetOutStream = process.stdout;
resetJQOutStream = process.stdout;
resetStateOutStream = process.stdout;
}
visitor.exportReset(resetOutStream, resetJQOutStream, resetStateOutStream);
}
main().catch(err => {

View File

@ -0,0 +1,80 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import fs from 'fs';
import path from 'path';
import Handlebars from 'handlebars';
import { Writable } from 'stream';
const RESET_TEMPLATE_FILE = './templates/reset-template.handlebars';
const RESET_JQ_TEMPLATE_FILE = './templates/reset-job-queue-template.handlebars';
const RESET_STATE_TEMPLATE_FILE = './templates/reset-state-template.handlebars';
export class Reset {
_queries: Array<any>;
_resetTemplateString: string;
_resetJQTemplateString: string;
_resetStateTemplateString: string;
constructor () {
this._queries = [];
this._resetTemplateString = fs.readFileSync(path.resolve(__dirname, RESET_TEMPLATE_FILE)).toString();
this._resetJQTemplateString = fs.readFileSync(path.resolve(__dirname, RESET_JQ_TEMPLATE_FILE)).toString();
this._resetStateTemplateString = fs.readFileSync(path.resolve(__dirname, RESET_STATE_TEMPLATE_FILE)).toString();
}
/**
* Stores the query to be passed to the template.
* @param name Name of the query.
* @param params Parameters to the query.
* @param returnType Return type for the query.
*/
addQuery (name: string): void {
// Check if the query is already added.
if (this._queries.some(query => query.name === name)) {
return;
}
const queryObject = {
name,
entityName: ''
};
// eth_call mode: Capitalize first letter of entity name (balanceOf -> BalanceOf).
// storage mode: Capiltalize second letter of entity name (_balances -> _Balances).
queryObject.entityName = (name.charAt(0) === '_')
? `_${name.charAt(1).toUpperCase()}${name.slice(2)}`
: `${name.charAt(0).toUpperCase()}${name.slice(1)}`;
this._queries.push(queryObject);
}
/**
* Writes the reset.ts, job-queue.ts, state.ts files generated from templates to respective streams.
* @param outStream A writable output stream to write the database file to.
*/
/**
* Writes the reset.ts, job-queue.ts, state.ts files generated from templates to respective streams.
* @param resetOutStream A writable output stream to write the reset file to.
* @param resetJQOutStream A writable output stream to write the reset job-queue file to.
* @param resetStateOutStream A writable output stream to write the reset state file to.
*/
exportReset (resetOutStream: Writable, resetJQOutStream: Writable, resetStateOutStream: Writable): void {
const resetTemplate = Handlebars.compile(this._resetTemplateString);
const resetString = resetTemplate({});
resetOutStream.write(resetString);
const resetJQTemplate = Handlebars.compile(this._resetJQTemplateString);
const resetJQString = resetJQTemplate({});
resetJQOutStream.write(resetJQString);
const resetStateTemplate = Handlebars.compile(this._resetStateTemplateString);
const obj = {
queries: this._queries
};
const resetState = resetStateTemplate(obj);
resetStateOutStream.write(resetState);
}
}

View File

@ -69,10 +69,10 @@ export class Database {
{{#if (reservedNameCheck query.entityName) }}
// eslint-disable-next-line @typescript-eslint/ban-types
{{/if}}
async {{query.saveQueryName}} ({ blockHash, contractAddress
async {{query.saveQueryName}} ({ blockHash, blockNumber, contractAddress
{{~#each query.params}}, {{this.name~}} {{/each}}, value, proof }: DeepPartial<{{query.entityName}}>): Promise<{{query.entityName}}> {
const repo = this._conn.getRepository({{query.entityName}});
const entity = repo.create({ blockHash, contractAddress
const entity = repo.create({ blockHash, blockNumber, contractAddress
{{~#each query.params}}, {{this.name~}} {{/each}}, value, proof });
return repo.save(entity);
}
@ -317,16 +317,16 @@ export class Database {
});
}
async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
const repo = queryRunner.manager.getRepository(SyncStatus);
return this._baseDatabase.updateSyncStatusIndexedBlock(repo, blockHash, blockNumber);
return this._baseDatabase.updateSyncStatusIndexedBlock(repo, blockHash, blockNumber, force);
}
async updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
async updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
const repo = queryRunner.manager.getRepository(SyncStatus);
return this._baseDatabase.updateSyncStatusCanonicalBlock(repo, blockHash, blockNumber);
return this._baseDatabase.updateSyncStatusCanonicalBlock(repo, blockHash, blockNumber, force);
}
async updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {

View File

@ -27,7 +27,7 @@ export class {{className}} {{~#if implements}} implements {{implements}} {{~/if}
@{{column.columnType}}('{{column.pgType}}'
{{~/if}}
{{~#if column.columnOptions}}, {
{{~#each column.columnOptions}} {{this.option}}: {{this.value}}
{{~#each column.columnOptions}} {{this.option}}: {{{this.value}}}
{{~#unless @last}}, {{/unless}}
{{~/each}} }
{{~/if}})

View File

@ -88,16 +88,6 @@ export class EventWatcher {
}
const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
if (dbEvents.length > 0) {
const dbEvent = dbEvents[0];
// If the block is marked as complete, push a post-block hook job.
if (dbEvent.block.isComplete) {
await this._jobQueue.pushJob(QUEUE_HOOKS, { blockHash: dbEvent.block.blockHash, blockNumber: dbEvent.block.blockNumber });
}
}
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
// Cannot publish individual event as they are processed together in a single job.

View File

@ -179,9 +179,11 @@ export class Indexer {
log('{{query.name}}: db miss, fetching from upstream server');
const { block: { number } } = await this._ethClient.getBlockByHash(blockHash);
const blockNumber = ethers.BigNumber.from(number).toNumber();
{{#if (compare query.mode @root.constants.MODE_ETH_CALL)}}
const contract = new ethers.Contract(contractAddress, this._abi, this._ethProvider);
{{#if (compare query.returnType 'bigint')}}
let value = await contract.{{query.name}}(
{{~#each query.params}}{{this.name}}, {{/each}}{ blockTag: blockHash });
@ -207,7 +209,7 @@ export class Indexer {
);
{{/if}}
await this._db.{{query.saveQueryName}}({ blockHash, contractAddress
await this._db.{{query.saveQueryName}}({ blockHash, blockNumber, contractAddress
{{~#each query.params}}, {{this.name~}} {{/each}}, value: result.value, proof: JSONbig.stringify(result.proof) });
{{#if query.stateVariableType}}
@ -563,16 +565,16 @@ export class Indexer {
return this._baseIndexer.getSyncStatus();
}
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber);
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force);
}
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber);
}
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber);
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber, force);
}
async getBlocks (blockFilter: { blockNumber?: number, blockHash?: string }): Promise<any> {

View File

@ -11,6 +11,7 @@
"job-runner": "DEBUG=vulcanize:* ts-node src/job-runner.ts",
"watch:contract": "DEBUG=vulcanize:* ts-node src/cli/watch-contract.ts",
"fill": "DEBUG=vulcanize:* ts-node src/fill.ts",
"reset": "DEBUG=vulcanize:* ts-node src/cli/reset.ts",
"checkpoint": "DEBUG=vulcanize:* ts-node src/cli/checkpoint.ts"
},
"repository": {

View File

@ -0,0 +1,22 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import debug from 'debug';
import { getConfig, resetJobs } from '@vulcanize/util';
const log = debug('vulcanize:reset-job-queue');
export const command = 'job-queue';
export const desc = 'Reset job queue';
export const builder = {};
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
await resetJobs(config);
log('Job queue reset successfully');
};

View File

@ -0,0 +1,82 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import debug from 'debug';
import { MoreThan } from 'typeorm';
import assert from 'assert';
import { getConfig, getResetConfig, resetJobs } from '@vulcanize/util';
import { Database } from '../../database';
import { Indexer } from '../../indexer';
import { BlockProgress } from '../../entity/BlockProgress';
{{#each queries as | query |}}
import { {{query.entityName}} } from '../../entity/{{query.entityName}}';
{{/each}}
const log = debug('vulcanize:reset-state');
export const command = 'state';
export const desc = 'Reset state to block number';
export const builder = {
blockNumber: {
type: 'number'
}
};
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
await resetJobs(config);
const { dbConfig, serverConfig, ethClient, postgraphileClient, ethProvider } = await getResetConfig(config);
// Initialize database.
const db = new Database(dbConfig);
await db.init();
const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider);
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');
const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false);
assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`);
assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`);
const [blockProgress] = blockProgresses;
const dbTx = await db.createTransactionRunner();
try {
const entities = [BlockProgress
{{~#each queries as | query |~}}
, {{query.entityName}}
{{~/each~}}
];
const removeEntitiesPromise = entities.map(async entityClass => {
return db.removeEntities<any>(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) });
});
await Promise.all(removeEntitiesPromise);
if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
log('Reset state successfully');
};

View File

@ -0,0 +1,24 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import 'reflect-metadata';
import debug from 'debug';
import { getResetYargs } from '@vulcanize/util';
const log = debug('vulcanize:reset');
const main = async () => {
return getResetYargs()
.commandDir('reset-cmds', { extensions: ['ts', 'js'], exclude: /([a-zA-Z0-9\s_\\.\-:])+(.d.ts)$/ })
.demandCommand(1)
.help()
.argv;
};
main().then(() => {
process.exit();
}).catch(err => {
log(err);
});

View File

@ -12,6 +12,7 @@ import { Indexer } from './indexer';
import { Resolvers } from './resolvers';
import { Schema } from './schema';
import { Client } from './client';
import { Reset } from './reset';
export class Visitor {
_schema: Schema;
@ -20,6 +21,7 @@ export class Visitor {
_entity: Entity;
_database: Database;
_client: Client;
_reset: Reset;
constructor () {
this._schema = new Schema();
@ -28,6 +30,7 @@ export class Visitor {
this._entity = new Entity();
this._database = new Database();
this._client = new Client();
this._reset = new Reset();
}
/**
@ -50,6 +53,7 @@ export class Visitor {
this._entity.addQuery(name, params, returnType);
this._database.addQuery(name, params, returnType);
this._client.addQuery(name, params, returnType);
this._reset.addQuery(name);
}
}
@ -85,6 +89,7 @@ export class Visitor {
this._entity.addQuery(name, params, returnType);
this._database.addQuery(name, params, returnType);
this._client.addQuery(name, params, returnType);
this._reset.addQuery(name);
}
/**
@ -152,4 +157,14 @@ export class Visitor {
exportClient (outStream: Writable, schemaContent: string, gqlDir: string): void {
this._client.exportClient(outStream, schemaContent, gqlDir);
}
/**
* Writes the reset.ts, job-queue.ts, state.ts files generated from templates to respective streams.
* @param resetOutStream A writable output stream to write the reset file to.
* @param resetJQOutStream A writable output stream to write the reset job-queue file to.
* @param resetStateOutStream A writable output stream to write the reset state file to.
*/
exportReset (resetOutStream: Writable, resetJQOutStream: Writable, resetStateOutStream: Writable): void {
this._reset.exportReset(resetOutStream, resetJQOutStream, resetStateOutStream);
}
}