Implement active watcher. (#252)

Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
Ashwin Phatak 2021-09-27 15:24:17 +05:30 committed by GitHub
parent 482be8cfaa
commit 11d16b9870
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 313 additions and 41 deletions

View File

@ -20,6 +20,8 @@ import { exportPackage } from './package';
import { exportTSConfig } from './tsconfig';
import { exportReadme } from './readme';
import { exportEvents } from './events';
import { exportJobRunner } from './job-runner';
import { exportWatchContract } from './watch-contract';
import { registerHandlebarHelpers } from './utils/handlebar-helpers';
const main = async (): Promise<void> => {
@ -111,6 +113,9 @@ function generateWatcher (data: string, visitor: Visitor, argv: any) {
const entitiesFolder = path.join(outputDir, 'src/entity');
if (!fs.existsSync(entitiesFolder)) fs.mkdirSync(entitiesFolder, { recursive: true });
const cliFolder = path.join(outputDir, 'src/cli');
if (!fs.existsSync(cliFolder)) fs.mkdirSync(cliFolder, { recursive: true });
}
const inputFileName = path.basename(argv['input-file'], '.sol');
@ -181,6 +186,16 @@ function generateWatcher (data: string, visitor: Visitor, argv: any) {
? fs.createWriteStream(path.join(outputDir, 'src/events.ts'))
: process.stdout;
exportEvents(outStream);
outStream = outputDir
? fs.createWriteStream(path.join(outputDir, 'src/job-runner.ts'))
: process.stdout;
exportJobRunner(outStream);
outStream = outputDir
? fs.createWriteStream(path.join(outputDir, 'src/cli/watch-contract.ts'))
: process.stdout;
exportWatchContract(outStream);
}
main().catch(err => {

View File

@ -66,10 +66,19 @@ export class Indexer {
return;
}
this._events.push({
const eventObject = {
name,
params
params: _.cloneDeep(params)
};
eventObject.params = eventObject.params.map((param) => {
const tsParamType = getTsForSol(param.type);
assert(tsParamType);
param.type = tsParamType;
return param;
});
this._events.push(eventObject);
}
/**

View File

@ -0,0 +1,21 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import fs from 'fs';
import path from 'path';
import Handlebars from 'handlebars';
import { Writable } from 'stream';
const TEMPLATE_FILE = './templates/job-runner-template.handlebars';
/**
* Writes the job-runner file generated from a template to a stream.
* @param outStream A writable output stream to write the events file to.
*/
export function exportJobRunner (outStream: Writable): void {
const templateString = fs.readFileSync(path.resolve(__dirname, TEMPLATE_FILE)).toString();
const template = Handlebars.compile(templateString);
const events = template({});
outStream.write(events);
}

View File

@ -22,3 +22,8 @@
name = "requests"
enabled = false
deleteOnStart = false
[jobQueue]
dbConnectionString = "postgres://postgres:postgres@localhost/{{folderName}}-job-queue"
maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 100

View File

@ -42,7 +42,6 @@ interface ResultEvent {
export class Indexer {
_db: Database
_ethClient: EthClient
_getStorageAt: GetStorageAt
_ethProvider: BaseProvider
_baseIndexer: BaseIndexer
@ -57,7 +56,6 @@ export class Indexer {
this._db = db;
this._ethClient = ethClient;
this._ethProvider = ethProvider;
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
this._baseIndexer = new BaseIndexer(this._db, this._ethClient);
const { abi, storageLayout } = artifacts;
@ -73,8 +71,7 @@ export class Indexer {
getResultEvent (event: Event): ResultEvent {
const block = event.block;
const eventFields = JSON.parse(event.eventInfo);
const { tx } = JSON.parse(event.extraInfo);
const eventFields = JSONbig.parse(event.eventInfo);
return {
block: {
@ -85,10 +82,7 @@ export class Indexer {
},
tx: {
hash: event.txHash,
from: tx.src,
to: tx.dst,
index: tx.index
hash: event.txHash
},
contract: event.contract,
@ -140,7 +134,7 @@ export class Indexer {
{{~#if (compare query.mode @root.constants.MODE_STORAGE)}}
const result = await this._getStorageValue(
const result = await this._baseIndexer.getStorageValue(
this._storageLayout,
blockHash,
contractAddress,
@ -158,15 +152,14 @@ export class Indexer {
}
{{/each}}
async _getStorageValue (storageLayout: StorageLayout, blockHash: string, contractAddress: string, variable: string, ...mappingKeys: any[]): Promise<ValueResult> {
return getStorageValue(
storageLayout,
this._getStorageAt,
blockHash,
contractAddress,
variable,
...mappingKeys
);
async triggerIndexingOnEvent (event: Event): Promise<void> {
// TODO: Implement custom hooks.
}
async processEvent (event: Event): Promise<void> {
// Trigger indexing of data based on the event.
await this.triggerIndexingOnEvent(event);
}
parseEventNameAndArgs (kind: string, logObj: any): any {
@ -183,7 +176,12 @@ export class Indexer {
const { {{#each event.params~}} {{this.name}} {{~#unless @last}}, {{/unless}} {{~/each}} } = logDescription.args;
eventInfo = {
{{#each event.params}}
{{this.name}} {{~#unless @last}},{{/unless}}
{{#if (compare this.type 'bigint')}}
{{this.name}}: BigInt(ethers.BigNumber.from({{this.name}}).toString())
{{else}}
{{this.name}}
{{~/if}}
{{~#unless @last}},{{/unless}}
{{/each}}
};

View File

@ -0,0 +1,126 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import { getDefaultProvider } from 'ethers';
import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client';
import {
getConfig,
JobQueue,
JobRunner as BaseJobRunner,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
JobQueueConfig,
DEFAULT_CONFIG_PATH
} from '@vulcanize/util';
import { Indexer } from './indexer';
import { Database } from './database';
const log = debug('vulcanize:job-runner');
export class JobRunner {
_indexer: Indexer
_jobQueue: JobQueue
_baseJobRunner: BaseJobRunner
_jobQueueConfig: JobQueueConfig
constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) {
this._indexer = indexer;
this._jobQueue = jobQueue;
this._jobQueueConfig = jobQueueConfig;
this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue);
}
async start (): Promise<void> {
await this.subscribeBlockProcessingQueue();
await this.subscribeEventProcessingQueue();
}
async subscribeBlockProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
await this._baseJobRunner.processBlock(job);
await this._jobQueue.markComplete(job);
});
}
async subscribeEventProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
const event = await this._baseJobRunner.processEvent(job);
const watchedContract = await this._indexer.isWatchedContract(event.contract);
if (watchedContract) {
await this._indexer.processEvent(event);
}
await this._jobQueue.markComplete(job);
});
}
}
export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv))
.option('f', {
alias: 'config-file',
demandOption: true,
describe: 'configuration file path (toml)',
type: 'string',
default: DEFAULT_CONFIG_PATH
})
.argv;
const config = await getConfig(argv.f);
assert(config.server, 'Missing server config');
const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;
assert(dbConfig, 'Missing database config');
const db = new Database(dbConfig);
await db.init();
assert(upstream, 'Missing upstream config');
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});
const ethProvider = getDefaultProvider(rpcProviderEndpoint);
const indexer = new Indexer(db, ethClient, ethProvider);
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
await jobRunner.start();
};
main().then(() => {
log('Starting job runner...');
}).catch(err => {
log(err);
});
process.on('uncaughtException', err => {
log('uncaughtException', err);
});

View File

@ -6,7 +6,9 @@
"main": "dist/index.js",
"scripts": {
"build": "tsc",
"server": "DEBUG=vulcanize:* ts-node src/server.ts"
"server": "DEBUG=vulcanize:* ts-node src/server.ts",
"job-runner": "DEBUG=vulcanize:* ts-node src/job-runner.ts",
"watch:contract": "DEBUG=vulcanize:* ts-node src/cli/watch-contract.ts"
},
"repository": {
"type": "git",

View File

@ -15,6 +15,24 @@
createdb {{folderName}}
```
* Create database for the job queue and enable the `pgcrypto` extension on them (https://github.com/timgit/pg-boss/blob/master/docs/usage.md#intro):
```
createdb {{folderName}}-job-queue
```
```
postgres@tesla:~$ psql -U postgres -h localhost {{folderName}}-job-queue
Password for user postgres:
psql (12.7 (Ubuntu 12.7-1.pgdg18.04+1))
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off)
Type "help" for help.
{{folderName}}-job-queue=# CREATE EXTENSION pgcrypto;
CREATE EXTENSION
{{folderName}}-job-queue=# exit
```
* Update `environments/local.toml` with database connection settings.
* Update the `upstream` config in `environments/local.toml` and provide the `ipld-eth-server` GQL API and the `indexer-db` postgraphile endpoints.
@ -27,25 +45,16 @@
yarn server
```
* Run the job-runner:
```bash
yarn job-runner
```
GQL console: http://localhost:3008/graphql
## Demo
* Install required packages:
* To watch a contract:
```bash
yarn
```
* Create the database:
```bash
sudo su - postgres
createdb {{folderName}}
```
* Run the watcher:
```bash
yarn server
yarn watch:contract --address CONTRACT_ADDRESS --kind {{contractName}} --starting-block BLOCK_NUMBER
```

View File

@ -17,11 +17,12 @@ import { getDefaultProvider } from 'ethers';
import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { DEFAULT_CONFIG_PATH, getConfig } from '@vulcanize/util';
import { DEFAULT_CONFIG_PATH, getConfig, JobQueue } from '@vulcanize/util';
import { createResolvers } from './resolvers';
import { Indexer } from './indexer';
import { Database } from './database';
import { EventWatcher } from './events';
const log = debug('vulcanize:server');
@ -42,7 +43,7 @@ export const main = async (): Promise<any> => {
const { host, port } = config.server;
const { upstream, database: dbConfig } = config;
const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;
assert(dbConfig, 'Missing database config');
@ -68,6 +69,16 @@ export const main = async (): Promise<any> => {
const pubsub = new PubSub();
const indexer = new Indexer(db, ethClient, ethProvider);
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
await eventWatcher.start();
const resolvers = await createResolvers(indexer);
const app: Application = express();

View File

@ -0,0 +1,55 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import yargs from 'yargs';
import 'reflect-metadata';
import { Config, DEFAULT_CONFIG_PATH, getConfig } from '@vulcanize/util';
import { Database } from '../database';
(async () => {
const argv = await yargs.parserConfiguration({
'parse-numbers': false
}).options({
configFile: {
alias: 'f',
type: 'string',
require: true,
demandOption: true,
describe: 'configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
},
address: {
type: 'string',
require: true,
demandOption: true,
describe: 'Address of the deployed contract'
},
kind: {
type: 'string',
require: true,
demandOption: true,
describe: 'Kind of contract'
},
startingBlock: {
type: 'number',
default: 1,
describe: 'Starting block'
}
}).argv;
const config: Config = await getConfig(argv.configFile);
const { database: dbConfig } = config;
assert(dbConfig);
const db = new Database(dbConfig);
await db.init();
await db.saveContract(argv.address, argv.kind, argv.startingBlock);
await db.close();
})();

View File

@ -0,0 +1,21 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import fs from 'fs';
import path from 'path';
import Handlebars from 'handlebars';
import { Writable } from 'stream';
const TEMPLATE_FILE = './templates/watch-contract-template.handlebars';
/**
* Writes the watch-contract file generated from a template to a stream.
* @param outStream A writable output stream to write the events file to.
*/
export function exportWatchContract (outStream: Writable): void {
const templateString = fs.readFileSync(path.resolve(__dirname, TEMPLATE_FILE)).toString();
const template = Handlebars.compile(templateString);
const events = template({});
outStream.write(events);
}