Support GQL subscriptions, fill and custom hook for indexing on event (#255)

* Custom hook support for indexing on events.

* Add fill support.

* Process GQL subscriptions.

* Add hooks example.

* Update hooks example.
This commit is contained in:
prathamesh0 2021-09-29 10:04:09 +05:30 committed by GitHub
parent 8e3093c684
commit 40574cf3d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 287 additions and 28 deletions

View File

@ -0,0 +1,21 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import fs from 'fs';
import path from 'path';
import Handlebars from 'handlebars';
import { Writable } from 'stream';
const TEMPLATE_FILE = './templates/fill-template.handlebars';
/**
* Writes the fill file generated from a template to a stream.
* @param outStream A writable output stream to write the fill file to.
*/
export function exportFill (outStream: Writable): void {
const templateString = fs.readFileSync(path.resolve(__dirname, TEMPLATE_FILE)).toString();
const template = Handlebars.compile(templateString);
const fill = template({});
outStream.write(fill);
}

View File

@ -25,6 +25,8 @@ import { exportJobRunner } from './job-runner';
import { exportWatchContract } from './watch-contract';
import { exportLint } from './lint';
import { registerHandlebarHelpers } from './utils/handlebar-helpers';
import { exportHooks } from './hooks';
import { exportFill } from './fill';
const main = async (): Promise<void> => {
const argv = await yargs(hideBin(process.argv))
@ -206,6 +208,22 @@ function generateWatcher (data: string, visitor: Visitor, argv: any) {
: process.stdout;
exportWatchContract(outStream);
let hooksOutStream;
let exampleOutStream;
if (outputDir) {
hooksOutStream = fs.createWriteStream(path.join(outputDir, 'src/hooks.ts'));
exampleOutStream = fs.createWriteStream(path.join(outputDir, 'src/hooks.example.ts'));
} else {
hooksOutStream = process.stdout;
exampleOutStream = process.stdout;
}
exportHooks(hooksOutStream, exampleOutStream);
outStream = outputDir
? fs.createWriteStream(path.join(outputDir, 'src/fill.ts'))
: process.stdout;
exportFill(outStream);
let rcOutStream;
let ignoreOutStream;
if (outputDir) {

View File

@ -0,0 +1,30 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import fs from 'fs';
import path from 'path';
import Handlebars from 'handlebars';
import { Writable } from 'stream';
const HOOKS_TEMPLATE_FILE = './templates/hooks-template.handlebars';
const EXAMPLE_TEMPLATE_FILE = './templates/hooks-example-template.handlebars';
/**
* Writes the hooks and hooks.example files generated from templates to a stream.
* @param outStream A writable output stream to write the hooks file to.
* @param exampleOutStream A writable output stream to write the hooks.example file to.
*/
export function exportHooks (hooksOutStream: Writable, exampleOutStream: Writable): void {
const hooksTemplateString = fs.readFileSync(path.resolve(__dirname, HOOKS_TEMPLATE_FILE)).toString();
const exampleTemplateString = fs.readFileSync(path.resolve(__dirname, EXAMPLE_TEMPLATE_FILE)).toString();
const hooksTemplate = Handlebars.compile(hooksTemplateString);
const exampleTemplate = Handlebars.compile(exampleTemplateString);
const hooks = hooksTemplate({});
const example = exampleTemplate({});
hooksOutStream.write(hooks);
exampleOutStream.write(example);
}

View File

@ -95,18 +95,13 @@ export class EventWatcher {
async publishEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
const { block: { blockHash }, contract: contractAddress } = dbEvent;
const resultEvent = this._indexer.getResultEvent(dbEvent);
log(`pushing event to GQL subscribers (${timeElapsedInSeconds}s elapsed): ${resultEvent.event.__typename}`);
// Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`.
await this._pubsub.publish(EVENT, {
onEvent: {
blockHash,
contractAddress,
event: resultEvent
}
onEvent: resultEvent
});
}
}

View File

@ -0,0 +1,92 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import { PubSub } from 'apollo-server-express';
import { getDefaultProvider } from 'ethers';
import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH } from '@vulcanize/util';
import { Database } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
const log = debug('vulcanize:server');
export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv)).parserConfiguration({
'parse-numbers': false
}).options({
configFile: {
alias: 'f',
type: 'string',
demandOption: true,
describe: 'configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
},
startBlock: {
type: 'number',
demandOption: true,
describe: 'Block number to start processing at'
},
endBlock: {
type: 'number',
demandOption: true,
describe: 'Block number to stop processing at'
}
}).argv;
const config = await getConfig(argv.configFile);
assert(config.server, 'Missing server config');
const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;
assert(dbConfig, 'Missing database config');
const db = new Database(dbConfig);
await db.init();
assert(upstream, 'Missing upstream config');
const { ethServer: { gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream;
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});
const ethProvider = getDefaultProvider(rpcProviderEndpoint);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(db, ethClient, ethProvider);
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
assert(jobQueueConfig, 'Missing job queue config');
await fillBlocks(jobQueue, indexer, ethClient, eventWatcher, argv);
};
main().then(() => {
process.exit();
}).catch(err => {
log(err);
});

View File

@ -0,0 +1,51 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import { Indexer, ResultEvent } from './indexer';
/**
* Event hook function.
* @param indexer Indexer instance that contains methods to fetch and update the contract values in the database.
* @param eventData ResultEvent object containing necessary information.
*/
export async function handleEvent (indexer: Indexer, eventData: ResultEvent): Promise<void> {
assert(indexer);
assert(eventData);
// The following code is for ERC20 contract implementation.
// Perform indexing based on the type of event.
switch (eventData.event.__typename) {
// In case of ERC20 'Transfer' event.
case 'TransferEvent': {
// On a transfer, balances for both parties change.
// Therefore, trigger indexing for both sender and receiver.
// Get event fields from eventData.
// const { from, to } = eventData.event;
// Update balance entry for sender in the database.
// await indexer.balanceOf(eventData.block.hash, eventData.contract, from);
// Update balance entry for receiver in the database.
// await indexer.balanceOf(eventData.block.hash, eventData.contract, to);
break;
}
// In case of ERC20 'Approval' event.
case 'ApprovalEvent': {
// On an approval, allowance for (owner, spender) combination changes.
// Get event fields from eventData.
// const { owner, spender } = eventData.event;
// Update allowance entry for (owner, spender) combination in the database.
// await indexer.allowance(eventData.block.hash, eventData.contract, owner, spender);
break;
}
}
}

View File

@ -0,0 +1,19 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import { Indexer, ResultEvent } from './indexer';
/**
* Event hook function.
* @param indexer Indexer instance that contains methods to fetch and update the contract values in the database.
* @param eventData ResultEvent object containing necessary information.
*/
export async function handleEvent (indexer: Indexer, eventData: ResultEvent): Promise<void> {
assert(indexer);
assert(eventData);
// Perform indexing based on the type of event.
}

View File

@ -20,6 +20,7 @@ import { Event } from './entity/Event';
import { SyncStatus } from './entity/SyncStatus';
import { BlockProgress } from './entity/BlockProgress';
import artifacts from './artifacts/{{inputFileName}}.json';
import { handleEvent } from './hooks';
const log = debug('vulcanize:indexer');
@ -27,9 +28,16 @@ const log = debug('vulcanize:indexer');
const {{capitalize event.name}}_EVENT = '{{event.name}}';
{{/each}}
interface ResultEvent {
block: any;
tx: any;
export type ResultEvent = {
block: {
hash: string;
number: number;
timestamp: number;
parentHash: string;
};
tx: {
hash: string;
};
contract: string;
@ -153,8 +161,10 @@ export class Indexer {
{{/each}}
async triggerIndexingOnEvent (event: Event): Promise<void> {
// TODO: Implement custom hooks.
assert(event);
const resultEvent = this.getResultEvent(event);
// Call custom hook function for indexing on event.
await handleEvent(this, resultEvent);
}
async processEvent (event: Event): Promise<void> {

View File

@ -9,7 +9,8 @@
"build": "tsc",
"server": "DEBUG=vulcanize:* ts-node src/server.ts",
"job-runner": "DEBUG=vulcanize:* ts-node src/job-runner.ts",
"watch:contract": "DEBUG=vulcanize:* ts-node src/cli/watch-contract.ts"
"watch:contract": "DEBUG=vulcanize:* ts-node src/cli/watch-contract.ts",
"fill": "DEBUG=vulcanize:* ts-node src/fill.ts"
},
"repository": {
"type": "git",

View File

@ -35,9 +35,17 @@
{{folderName}}-job-queue=# exit
```
* Update `environments/local.toml` with database connection settings.
* Update the [config](./environments/local.toml) with database connection settings.
* Update the `upstream` config in `environments/local.toml` and provide the `ipld-eth-server` GQL API and the `indexer-db` postgraphile endpoints.
* Update the `upstream` config in the [config file](./environments/local.toml) and provide the `ipld-eth-server` GQL API and the `indexer-db` postgraphile endpoints.
## Customize
* Indexing on an event:
* Edit the custom hook function `handleEvent` (triggered on an event) in [hooks.ts](./src/hooks.ts) to perform corresponding indexing using the `Indexer` object.
* Refer to [hooks.example.ts](./src/hooks.example.ts) for an example hook function for events in an ERC20 contract.
## Run
@ -60,5 +68,11 @@ GQL console: http://localhost:3008/graphql
* To watch a contract:
```bash
yarn watch:contract --address CONTRACT_ADDRESS --kind {{contractName}} --starting-block BLOCK_NUMBER
yarn watch:contract --address <contract-address> --kind {{contractName}} --starting-block [block-number]
```
* To fill a block range:
```bash
yarn fill --startBlock <from-block> --endBlock <to-block>
```

View File

@ -9,10 +9,11 @@ import debug from 'debug';
import { ValueResult } from '@vulcanize/util';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
const log = debug('vulcanize:resolver');
export const createResolvers = async (indexer: Indexer): Promise<any> => {
export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatcher): Promise<any> => {
assert(indexer);
return {
@ -26,6 +27,12 @@ export const createResolvers = async (indexer: Indexer): Promise<any> => {
}
},
Subscription: {
onEvent: {
subscribe: () => eventWatcher.getEventIterator()
}
},
Query: {
{{#each queries}}
{{this.name}}: (_: any, { blockHash, contractAddress

View File

@ -66,23 +66,24 @@ export const main = async (): Promise<any> => {
const indexer = new Indexer(db, ethClient, ethProvider);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
if (watcherKind === KIND_ACTIVE) {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
await eventWatcher.start();
}
const resolvers = await createResolvers(indexer);
const resolvers = await createResolvers(indexer, eventWatcher);
const app: Application = express();
const typeDefs = fs.readFileSync(path.join(__dirname, 'schema.gql')).toString();