diff --git a/packages/codegen/src/generate-code.ts b/packages/codegen/src/generate-code.ts index 6649d24e..babd361d 100644 --- a/packages/codegen/src/generate-code.ts +++ b/packages/codegen/src/generate-code.ts @@ -20,6 +20,8 @@ import { exportPackage } from './package'; import { exportTSConfig } from './tsconfig'; import { exportReadme } from './readme'; import { exportEvents } from './events'; +import { exportJobRunner } from './job-runner'; +import { exportWatchContract } from './watch-contract'; import { registerHandlebarHelpers } from './utils/handlebar-helpers'; const main = async (): Promise => { @@ -111,6 +113,9 @@ function generateWatcher (data: string, visitor: Visitor, argv: any) { const entitiesFolder = path.join(outputDir, 'src/entity'); if (!fs.existsSync(entitiesFolder)) fs.mkdirSync(entitiesFolder, { recursive: true }); + + const cliFolder = path.join(outputDir, 'src/cli'); + if (!fs.existsSync(cliFolder)) fs.mkdirSync(cliFolder, { recursive: true }); } const inputFileName = path.basename(argv['input-file'], '.sol'); @@ -181,6 +186,16 @@ function generateWatcher (data: string, visitor: Visitor, argv: any) { ? fs.createWriteStream(path.join(outputDir, 'src/events.ts')) : process.stdout; exportEvents(outStream); + + outStream = outputDir + ? fs.createWriteStream(path.join(outputDir, 'src/job-runner.ts')) + : process.stdout; + exportJobRunner(outStream); + + outStream = outputDir + ? fs.createWriteStream(path.join(outputDir, 'src/cli/watch-contract.ts')) + : process.stdout; + exportWatchContract(outStream); } main().catch(err => { diff --git a/packages/codegen/src/indexer.ts b/packages/codegen/src/indexer.ts index b67a33c3..7f309c7e 100644 --- a/packages/codegen/src/indexer.ts +++ b/packages/codegen/src/indexer.ts @@ -66,10 +66,19 @@ export class Indexer { return; } - this._events.push({ + const eventObject = { name, - params + params: _.cloneDeep(params) + }; + + eventObject.params = eventObject.params.map((param) => { + const tsParamType = getTsForSol(param.type); + assert(tsParamType); + param.type = tsParamType; + return param; }); + + this._events.push(eventObject); } /** diff --git a/packages/codegen/src/job-runner.ts b/packages/codegen/src/job-runner.ts new file mode 100644 index 00000000..fc4f0a30 --- /dev/null +++ b/packages/codegen/src/job-runner.ts @@ -0,0 +1,21 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import fs from 'fs'; +import path from 'path'; +import Handlebars from 'handlebars'; +import { Writable } from 'stream'; + +const TEMPLATE_FILE = './templates/job-runner-template.handlebars'; + +/** + * Writes the job-runner file generated from a template to a stream. + * @param outStream A writable output stream to write the events file to. + */ +export function exportJobRunner (outStream: Writable): void { + const templateString = fs.readFileSync(path.resolve(__dirname, TEMPLATE_FILE)).toString(); + const template = Handlebars.compile(templateString); + const events = template({}); + outStream.write(events); +} diff --git a/packages/codegen/src/templates/config-template.handlebars b/packages/codegen/src/templates/config-template.handlebars index b556d1f9..2bd8211e 100644 --- a/packages/codegen/src/templates/config-template.handlebars +++ b/packages/codegen/src/templates/config-template.handlebars @@ -22,3 +22,8 @@ name = "requests" enabled = false deleteOnStart = false + +[jobQueue] + dbConnectionString = "postgres://postgres:postgres@localhost/{{folderName}}-job-queue" + maxCompletionLagInSecs = 300 + jobDelayInMilliSecs = 100 diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 8bbd7da4..820c0bf2 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -42,7 +42,6 @@ interface ResultEvent { export class Indexer { _db: Database _ethClient: EthClient - _getStorageAt: GetStorageAt _ethProvider: BaseProvider _baseIndexer: BaseIndexer @@ -57,7 +56,6 @@ export class Indexer { this._db = db; this._ethClient = ethClient; this._ethProvider = ethProvider; - this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient); this._baseIndexer = new BaseIndexer(this._db, this._ethClient); const { abi, storageLayout } = artifacts; @@ -73,8 +71,7 @@ export class Indexer { getResultEvent (event: Event): ResultEvent { const block = event.block; - const eventFields = JSON.parse(event.eventInfo); - const { tx } = JSON.parse(event.extraInfo); + const eventFields = JSONbig.parse(event.eventInfo); return { block: { @@ -85,10 +82,7 @@ export class Indexer { }, tx: { - hash: event.txHash, - from: tx.src, - to: tx.dst, - index: tx.index + hash: event.txHash }, contract: event.contract, @@ -140,7 +134,7 @@ export class Indexer { {{~#if (compare query.mode @root.constants.MODE_STORAGE)}} - const result = await this._getStorageValue( + const result = await this._baseIndexer.getStorageValue( this._storageLayout, blockHash, contractAddress, @@ -158,15 +152,14 @@ export class Indexer { } {{/each}} - async _getStorageValue (storageLayout: StorageLayout, blockHash: string, contractAddress: string, variable: string, ...mappingKeys: any[]): Promise { - return getStorageValue( - storageLayout, - this._getStorageAt, - blockHash, - contractAddress, - variable, - ...mappingKeys - ); + + async triggerIndexingOnEvent (event: Event): Promise { + // TODO: Implement custom hooks. + } + + async processEvent (event: Event): Promise { + // Trigger indexing of data based on the event. + await this.triggerIndexingOnEvent(event); } parseEventNameAndArgs (kind: string, logObj: any): any { @@ -183,7 +176,12 @@ export class Indexer { const { {{#each event.params~}} {{this.name}} {{~#unless @last}}, {{/unless}} {{~/each}} } = logDescription.args; eventInfo = { {{#each event.params}} - {{this.name}} {{~#unless @last}},{{/unless}} + {{#if (compare this.type 'bigint')}} + {{this.name}}: BigInt(ethers.BigNumber.from({{this.name}}).toString()) + {{else}} + {{this.name}} + {{~/if}} + {{~#unless @last}},{{/unless}} {{/each}} }; diff --git a/packages/codegen/src/templates/job-runner-template.handlebars b/packages/codegen/src/templates/job-runner-template.handlebars new file mode 100644 index 00000000..5f101336 --- /dev/null +++ b/packages/codegen/src/templates/job-runner-template.handlebars @@ -0,0 +1,126 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import assert from 'assert'; +import 'reflect-metadata'; +import yargs from 'yargs'; +import { hideBin } from 'yargs/helpers'; +import debug from 'debug'; +import { getDefaultProvider } from 'ethers'; + +import { getCache } from '@vulcanize/cache'; +import { EthClient } from '@vulcanize/ipld-eth-client'; +import { + getConfig, + JobQueue, + JobRunner as BaseJobRunner, + QUEUE_BLOCK_PROCESSING, + QUEUE_EVENT_PROCESSING, + JobQueueConfig, + DEFAULT_CONFIG_PATH +} from '@vulcanize/util'; + +import { Indexer } from './indexer'; +import { Database } from './database'; + +const log = debug('vulcanize:job-runner'); + +export class JobRunner { + _indexer: Indexer + _jobQueue: JobQueue + _baseJobRunner: BaseJobRunner + _jobQueueConfig: JobQueueConfig + + constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) { + this._indexer = indexer; + this._jobQueue = jobQueue; + this._jobQueueConfig = jobQueueConfig; + this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue); + } + + async start (): Promise { + await this.subscribeBlockProcessingQueue(); + await this.subscribeEventProcessingQueue(); + } + + async subscribeBlockProcessingQueue (): Promise { + await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { + await this._baseJobRunner.processBlock(job); + + await this._jobQueue.markComplete(job); + }); + } + + async subscribeEventProcessingQueue (): Promise { + await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { + const event = await this._baseJobRunner.processEvent(job); + + const watchedContract = await this._indexer.isWatchedContract(event.contract); + if (watchedContract) { + await this._indexer.processEvent(event); + } + + await this._jobQueue.markComplete(job); + }); + } +} + +export const main = async (): Promise => { + const argv = await yargs(hideBin(process.argv)) + .option('f', { + alias: 'config-file', + demandOption: true, + describe: 'configuration file path (toml)', + type: 'string', + default: DEFAULT_CONFIG_PATH + }) + .argv; + + const config = await getConfig(argv.f); + + assert(config.server, 'Missing server config'); + + const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config; + + assert(dbConfig, 'Missing database config'); + + const db = new Database(dbConfig); + await db.init(); + + assert(upstream, 'Missing upstream config'); + const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; + assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); + assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); + + const cache = await getCache(cacheConfig); + const ethClient = new EthClient({ + gqlEndpoint: gqlApiEndpoint, + gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, + cache + }); + + const ethProvider = getDefaultProvider(rpcProviderEndpoint); + const indexer = new Indexer(db, ethClient, ethProvider); + + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); + await jobRunner.start(); +}; + +main().then(() => { + log('Starting job runner...'); +}).catch(err => { + log(err); +}); + +process.on('uncaughtException', err => { + log('uncaughtException', err); +}); diff --git a/packages/codegen/src/templates/package-template.handlebars b/packages/codegen/src/templates/package-template.handlebars index 42cb817c..fbc7247a 100644 --- a/packages/codegen/src/templates/package-template.handlebars +++ b/packages/codegen/src/templates/package-template.handlebars @@ -6,7 +6,9 @@ "main": "dist/index.js", "scripts": { "build": "tsc", - "server": "DEBUG=vulcanize:* ts-node src/server.ts" + "server": "DEBUG=vulcanize:* ts-node src/server.ts", + "job-runner": "DEBUG=vulcanize:* ts-node src/job-runner.ts", + "watch:contract": "DEBUG=vulcanize:* ts-node src/cli/watch-contract.ts" }, "repository": { "type": "git", diff --git a/packages/codegen/src/templates/readme-template.handlebars b/packages/codegen/src/templates/readme-template.handlebars index d8ba4722..b4edb4bd 100644 --- a/packages/codegen/src/templates/readme-template.handlebars +++ b/packages/codegen/src/templates/readme-template.handlebars @@ -15,6 +15,24 @@ createdb {{folderName}} ``` +* Create database for the job queue and enable the `pgcrypto` extension on them (https://github.com/timgit/pg-boss/blob/master/docs/usage.md#intro): + + ``` + createdb {{folderName}}-job-queue + ``` + + ``` + postgres@tesla:~$ psql -U postgres -h localhost {{folderName}}-job-queue + Password for user postgres: + psql (12.7 (Ubuntu 12.7-1.pgdg18.04+1)) + SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off) + Type "help" for help. + + {{folderName}}-job-queue=# CREATE EXTENSION pgcrypto; + CREATE EXTENSION + {{folderName}}-job-queue=# exit + ``` + * Update `environments/local.toml` with database connection settings. * Update the `upstream` config in `environments/local.toml` and provide the `ipld-eth-server` GQL API and the `indexer-db` postgraphile endpoints. @@ -27,25 +45,16 @@ yarn server ``` +* Run the job-runner: + + ```bash + yarn job-runner + ``` + GQL console: http://localhost:3008/graphql -## Demo - -* Install required packages: +* To watch a contract: ```bash - yarn - ``` - -* Create the database: - - ```bash - sudo su - postgres - createdb {{folderName}} - ``` - -* Run the watcher: - - ```bash - yarn server + yarn watch:contract --address CONTRACT_ADDRESS --kind {{contractName}} --starting-block BLOCK_NUMBER ``` diff --git a/packages/codegen/src/templates/server-template.handlebars b/packages/codegen/src/templates/server-template.handlebars index a51af18a..9c135acf 100644 --- a/packages/codegen/src/templates/server-template.handlebars +++ b/packages/codegen/src/templates/server-template.handlebars @@ -17,11 +17,12 @@ import { getDefaultProvider } from 'ethers'; import { getCache } from '@vulcanize/cache'; import { EthClient } from '@vulcanize/ipld-eth-client'; -import { DEFAULT_CONFIG_PATH, getConfig } from '@vulcanize/util'; +import { DEFAULT_CONFIG_PATH, getConfig, JobQueue } from '@vulcanize/util'; import { createResolvers } from './resolvers'; import { Indexer } from './indexer'; import { Database } from './database'; +import { EventWatcher } from './events'; const log = debug('vulcanize:server'); @@ -42,7 +43,7 @@ export const main = async (): Promise => { const { host, port } = config.server; - const { upstream, database: dbConfig } = config; + const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config; assert(dbConfig, 'Missing database config'); @@ -68,6 +69,16 @@ export const main = async (): Promise => { const pubsub = new PubSub(); const indexer = new Indexer(db, ethClient, ethProvider); + assert(jobQueueConfig, 'Missing job queue config'); + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); + await eventWatcher.start(); + const resolvers = await createResolvers(indexer); const app: Application = express(); diff --git a/packages/codegen/src/templates/watch-contract-template.handlebars b/packages/codegen/src/templates/watch-contract-template.handlebars new file mode 100644 index 00000000..a9b9640e --- /dev/null +++ b/packages/codegen/src/templates/watch-contract-template.handlebars @@ -0,0 +1,55 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import assert from 'assert'; +import yargs from 'yargs'; +import 'reflect-metadata'; + +import { Config, DEFAULT_CONFIG_PATH, getConfig } from '@vulcanize/util'; + +import { Database } from '../database'; + +(async () => { + const argv = await yargs.parserConfiguration({ + 'parse-numbers': false + }).options({ + configFile: { + alias: 'f', + type: 'string', + require: true, + demandOption: true, + describe: 'configuration file path (toml)', + default: DEFAULT_CONFIG_PATH + }, + address: { + type: 'string', + require: true, + demandOption: true, + describe: 'Address of the deployed contract' + }, + kind: { + type: 'string', + require: true, + demandOption: true, + describe: 'Kind of contract' + }, + startingBlock: { + type: 'number', + default: 1, + describe: 'Starting block' + } + }).argv; + + const config: Config = await getConfig(argv.configFile); + const { database: dbConfig } = config; + + assert(dbConfig); + + const db = new Database(dbConfig); + await db.init(); + + await db.saveContract(argv.address, argv.kind, argv.startingBlock); + + await db.close(); +})(); diff --git a/packages/codegen/src/watch-contract.ts b/packages/codegen/src/watch-contract.ts new file mode 100644 index 00000000..3b056ed6 --- /dev/null +++ b/packages/codegen/src/watch-contract.ts @@ -0,0 +1,21 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import fs from 'fs'; +import path from 'path'; +import Handlebars from 'handlebars'; +import { Writable } from 'stream'; + +const TEMPLATE_FILE = './templates/watch-contract-template.handlebars'; + +/** + * Writes the watch-contract file generated from a template to a stream. + * @param outStream A writable output stream to write the events file to. + */ +export function exportWatchContract (outStream: Writable): void { + const templateString = fs.readFileSync(path.resolve(__dirname, TEMPLATE_FILE)).toString(); + const template = Handlebars.compile(templateString); + const events = template({}); + outStream.write(events); +}