Generate IPLD blocks table and related GQL API (#260)

* Add ipld-blocks entity generation

* Populate ipld-blocks table

* Rename ipld-block entity and update after each event

* Move ipld-hook to hooks.ts

* Change IPLD block structure

* Add cid field in blocks

* Fetch prev. IPLDBlock for a contract

* GQL API to query IPLDBlock by CID

* Save cid in blocks in existing watchers

* Update codegen docs

* GQL API for getting last derived state (#3)

* GQL API for getting last derived state

* Rename query to getState

* Change query names to getState and getStateByCid

* Save BigInt as string

* Move function to prepare IPLDBlock to indexer

* Refactor IPLDBlock hook

* Add genesis hook

* Call post-block hook after a block is marked as complete

* Add IPLDBlock checkpointing

* Use queryRunner instead of a new repo for queries

* Add a query to get block in ipld-eth-client

* Get latest checkpoints for all contracts for checkpointing.

* Call post-block hook in a queue

* Pass server config to Indexer in watch-contract cli

Co-authored-by: nikugogoi <nabarun@deepstacksoft.com>
This commit is contained in:
prathamesh0 2021-10-12 16:02:56 +05:30 committed by nabarun
parent 7b6f6e468f
commit 51b200709b
44 changed files with 884 additions and 152 deletions

View File

@ -53,13 +53,13 @@
yarn codegen --input-file ./test/examples/contracts/ERC721.sol --contract-name ERC721 --output-folder ../my-erc721-watcher --mode storage --kind lazy
```
Generate code for `ERC721` contract in both `eth_call` and `storage` mode, `active` kind:
Generate code for `ERC20` contract in both `eth_call` and `storage` mode, `active` kind:
```bash
yarn codegen --input-file ../../node_modules/@openzeppelin/contracts/token/ERC721/ERC721.sol --contract-name ERC721 --output-folder ../demo-erc721-watcher --mode all --kind active
yarn codegen --input-file ../../node_modules/@openzeppelin/contracts/token/ERC20/ERC20.sol --contract-name ERC20 --output-folder ../demo-erc20-watcher --mode all --kind active
```
This will create a folder called `demo-erc721-watcher` containing the generated code at the specified path. Follow the steps in [Run Generated Watcher](#run-generated-watcher) to setup and run the generated watcher.
This will create a folder called `demo-erc20-watcher` containing the generated code at the specified path. Follow the steps in [Run Generated Watcher](#run-generated-watcher) to setup and run the generated watcher.
## Run Generated Watcher
@ -79,7 +79,9 @@
* Edit the custom hook function `handleEvent` (triggered on an event) in `src/hooks.ts` to perform corresponding indexing using the `Indexer` object.
* Refer to `src/hooks.example.ts` for an example hook function for events in an ERC20 contract.
* Edit the custom hook function `handleBlock` (triggered on a block) in `src/hooks.ts` to save `IPLDBlock`s using the `Indexer` object.
* The existing example hooks in `src/hooks.ts` are for an `ERC20` contract.
### Run
@ -106,7 +108,7 @@
* To watch a contract:
```bash
yarn watch:contract --address <contract-address> --kind ERC721 --starting-block [block-number]
yarn watch:contract --address <contract-address> --kind <contract-kind> --starting-block [block-number]
```
* To fill a block range:

View File

@ -21,6 +21,7 @@
"dependencies": {
"@poanet/solidity-flattener": "https://github.com/vulcanize/solidity-flattener.git",
"@solidity-parser/parser": "^0.13.2",
"@vulcanize/util": "^0.1.0",
"gql-generator": "https://github.com/vulcanize/gql-generator.git",
"graphql": "^15.5.0",
"graphql-compose": "^9.0.3",

View File

@ -9,6 +9,10 @@ indexOn:
- columns:
- parentHash
columns:
- name: cid
pgType: varchar
tsType: string
columnType: Column
- name: blockHash
pgType: varchar
tsType: string

View File

@ -0,0 +1,41 @@
className: IPLDBlock
indexOn:
- columns:
- block
- contractAddress
columns:
- name: block
tsType: BlockProgress
columnType: ManyToOne
lhs: ()
rhs: BlockProgress
- name: contractAddress
pgType: varchar
tsType: string
columnType: Column
columnOptions:
- option: length
value: 42
- name: cid
pgType: varchar
tsType: string
columnType: Column
- name: kind
pgType: varchar
tsType: string
columnType: Column
- name: data
pgType: text
tsType: string
columnType: Column
imports:
- toImport:
- Entity
- PrimaryGeneratedColumn
- Column
- Index
- ManyToOne
from: typeorm
- toImport:
- BlockProgress
from: ./BlockProgress

View File

@ -37,7 +37,6 @@ export class Entity {
}
const entityObject: any = {
// Capitalize the first letter of name.
className: '',
indexOn: [],
columns: [],
@ -188,6 +187,7 @@ export class Entity {
this._addSyncStatusEntity();
this._addContractEntity();
this._addBlockProgressEntity();
this._addIPLDBlockEntity();
const template = Handlebars.compile(this._templateString);
this._entities.forEach(entityObj => {
@ -218,4 +218,9 @@ export class Entity {
const entity = yaml.load(fs.readFileSync(path.resolve(__dirname, TABLES_DIR, 'BlockProgress.yaml'), 'utf8'));
this._entities.push(entity);
}
_addIPLDBlockEntity (): void {
const entity = yaml.load(fs.readFileSync(path.resolve(__dirname, TABLES_DIR, 'IPLDBlock.yaml'), 'utf8'));
this._entities.push(entity);
}
}

View File

@ -7,8 +7,8 @@ import fetch from 'node-fetch';
import path from 'path';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import { flatten } from '@poanet/solidity-flattener';
import { flatten } from '@poanet/solidity-flattener';
import { parse, visit } from '@solidity-parser/parser';
import { KIND_ACTIVE, KIND_LAZY } from '@vulcanize/util';
@ -209,15 +209,12 @@ function generateWatcher (data: string, visitor: Visitor, argv: any) {
exportWatchContract(outStream);
let hooksOutStream;
let exampleOutStream;
if (outputDir) {
hooksOutStream = fs.createWriteStream(path.join(outputDir, 'src/hooks.ts'));
exampleOutStream = fs.createWriteStream(path.join(outputDir, 'src/hooks.example.ts'));
} else {
hooksOutStream = process.stdout;
exampleOutStream = process.stdout;
}
exportHooks(hooksOutStream, exampleOutStream);
exportHooks(hooksOutStream);
outStream = outputDir
? fs.createWriteStream(path.join(outputDir, 'src/fill.ts'))

View File

@ -8,23 +8,14 @@ import Handlebars from 'handlebars';
import { Writable } from 'stream';
const HOOKS_TEMPLATE_FILE = './templates/hooks-template.handlebars';
const EXAMPLE_TEMPLATE_FILE = './templates/hooks-example-template.handlebars';
/**
* Writes the hooks and hooks.example files generated from templates to a stream.
* Writes the hooks file generated from template to a stream.
* @param outStream A writable output stream to write the hooks file to.
* @param exampleOutStream A writable output stream to write the hooks.example file to.
*/
export function exportHooks (hooksOutStream: Writable, exampleOutStream: Writable): void {
export function exportHooks (hooksOutStream: Writable): void {
const hooksTemplateString = fs.readFileSync(path.resolve(__dirname, HOOKS_TEMPLATE_FILE)).toString();
const exampleTemplateString = fs.readFileSync(path.resolve(__dirname, EXAMPLE_TEMPLATE_FILE)).toString();
const hooksTemplate = Handlebars.compile(hooksTemplateString);
const exampleTemplate = Handlebars.compile(exampleTemplateString);
const hooks = hooksTemplate({});
const example = exampleTemplate({});
hooksOutStream.write(hooks);
exampleOutStream.write(example);
}

View File

@ -97,6 +97,9 @@ export class Schema {
// Add a mutation for watching a contract.
this._addWatchContractMutation();
this._addIPLDType();
this._addIPLDQuery();
return this._composer.buildSchema();
}
@ -173,6 +176,7 @@ export class Schema {
this._composer.createObjectTC({
name: blockName,
fields: {
cid: 'String!',
hash: 'String!',
number: 'Int!',
timestamp: 'Int!',
@ -234,6 +238,40 @@ export class Schema {
});
}
_addIPLDType (): void {
this._composer.createObjectTC({
name: 'ResultIPLDBlock',
fields: {
block: () => this._composer.getOTC('Block').NonNull,
contractAddress: 'String!',
cid: 'String!',
kind: 'String!',
data: 'String!'
}
});
}
_addIPLDQuery (): void {
this._composer.Query.addFields({
getStateByCID: {
type: this._composer.getOTC('ResultIPLDBlock'),
args: {
cid: 'String!'
}
}
});
this._composer.Query.addFields({
getState: {
type: this._composer.getOTC('ResultIPLDBlock'),
args: {
blockHash: 'String!',
contractAddress: 'String!'
}
}
});
}
/**
* Adds an event subscription to the schema.
*/
@ -254,6 +292,7 @@ export class Schema {
type: 'Boolean!',
args: {
contractAddress: 'String!',
kind: 'String!',
startingBlock: 'Int'
}
}

View File

@ -3,7 +3,6 @@
//
import { gql } from '@apollo/client/core';
import { GraphQLClient, GraphQLConfig } from '@vulcanize/ipld-eth-client';
import { queries, mutations, subscriptions } from './gql';

View File

@ -3,6 +3,12 @@
port = 3008
kind = "{{watcherKind}}"
# Checkpointing derived state.
checkpointing = true
# Checkpoint interval in number of blocks.
checkpointInterval = 2000
[database]
type = "postgres"
host = "localhost"

View File

@ -3,15 +3,16 @@
//
import assert from 'assert';
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm';
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions, In, Between } from 'typeorm';
import path from 'path';
import { Database as BaseDatabase, QueryOptions, Where } from '@vulcanize/util';
import { Database as BaseDatabase, QueryOptions, Where, MAX_REORG_DEPTH } from '@vulcanize/util';
import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
import { SyncStatus } from './entity/SyncStatus';
import { BlockProgress } from './entity/BlockProgress';
import { IPLDBlock } from './entity/IPLDBlock';
{{#each queries as | query |}}
import { {{query.entityName}} } from './entity/{{query.entityName}}';
@ -78,6 +79,159 @@ export class Database {
}
{{/each}}
async getIPLDBlocks (where: FindConditions<IPLDBlock>): Promise<IPLDBlock[]> {
const repo = this._conn.getRepository(IPLDBlock);
return repo.find({ where, relations: ['block'] });
}
async getLatestCheckpoints (queryRunner: QueryRunner): Promise<IPLDBlock[]> {
// Get the latest checkpoints for all the contracts.
const result = await queryRunner.manager.createQueryBuilder(IPLDBlock, 'ipld_block')
.distinctOn(['contract_address'])
.orderBy('contract_address')
.innerJoinAndSelect(Contract, 'contract', 'contract_address = contract.address')
.leftJoinAndSelect('ipld_block.block', 'block')
.where('block.is_pruned = false')
.andWhere('ipld_block.kind = :kind', { kind: 'checkpoint' })
.addOrderBy('ipld_block.block_id', 'DESC')
.getMany();
return result;
}
async getPrevIPLDBlock (queryRunner: QueryRunner, blockHash: string, contractAddress: string): Promise<IPLDBlock | undefined> {
const heirerchicalQuery = `
WITH RECURSIVE cte_query AS
(
SELECT
b.block_hash,
b.block_number,
b.parent_hash,
1 as depth,
i.id
FROM
block_progress b
LEFT JOIN
ipld_block i ON i.block_id = b.id
WHERE
b.block_hash = $1
UNION ALL
SELECT
b.block_hash,
b.block_number,
b.parent_hash,
c.depth + 1,
i.id
FROM
block_progress b
LEFT JOIN
ipld_block i
ON i.block_id = b.id
AND i.contract_address = $2
INNER JOIN
cte_query c ON c.parent_hash = b.block_hash
WHERE
c.id IS NULL AND c.depth < $3
)
SELECT
block_number, id
FROM
cte_query
ORDER BY block_number ASC
LIMIT 1;
`;
// Fetching block and id for previous IPLDBlock in frothy region.
const [{ block_number: blockNumber, id }] = await queryRunner.query(heirerchicalQuery, [blockHash, contractAddress, MAX_REORG_DEPTH]);
let result: IPLDBlock | undefined;
if (id) {
result = await queryRunner.manager.findOne(IPLDBlock, { id }, { relations: ['block'] });
} else {
// If IPLDBlock not found in frothy region get latest IPLDBlock in the pruned region.
// Filter out IPLDBlocks from pruned blocks.
const canonicalBlockNumber = blockNumber + 1;
result = await queryRunner.manager.createQueryBuilder(IPLDBlock, 'ipld_block')
.leftJoinAndSelect('ipld_block.block', 'block')
.where('block.is_pruned = false')
.andWhere('ipld_block.contract_address = :contractAddress', { contractAddress })
.andWhere('block.block_number <= :canonicalBlockNumber', { canonicalBlockNumber })
.orderBy('block.block_number', 'DESC')
.limit(1)
.getOne();
}
return result;
}
async getPrevIPLDBlocksAfterCheckpoint (queryRunner: QueryRunner, blockHash: string, checkpointBlockNumber: number, contractAddress: string): Promise<IPLDBlock[]> {
const heirerchicalQuery = `
WITH RECURSIVE cte_query AS
(
SELECT
b.block_hash,
b.block_number,
b.parent_hash,
1 as depth,
i.id
FROM
block_progress b
LEFT JOIN
ipld_block i ON i.block_id = b.id
WHERE
b.block_hash = $1
UNION ALL
SELECT
b.block_hash,
b.block_number,
b.parent_hash,
c.depth + 1,
i.id
FROM
block_progress b
LEFT JOIN
ipld_block i
ON i.block_id = b.id
AND i.contract_address = $2
INNER JOIN
cte_query c ON c.parent_hash = b.block_hash
WHERE
c.depth < $3
)
SELECT
block_number, id
FROM
cte_query
ORDER BY block_number ASC
`;
// Fetching ids for previous IPLDBlocks in the frothy region.
const queryResult = await queryRunner.query(heirerchicalQuery, [blockHash, contractAddress, MAX_REORG_DEPTH]);
let frothyIds = queryResult.map((obj: any) => obj.id);
frothyIds = frothyIds.filter((id: any) => id !== null);
const frothyBlockNumber = queryResult[0].block_number;
// Fetching all diff blocks after checkpoint till current blockNumber.
const ipldBlocks = await queryRunner.manager.find(IPLDBlock, {
relations: ['block'],
where: [
{ contractAddress, block: { isPruned: false, blockNumber: Between(checkpointBlockNumber + 1, frothyBlockNumber - 1) } },
{ id: In(frothyIds) }
],
order: { block: 'ASC' }
});
return ipldBlocks;
}
async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlock): Promise<IPLDBlock> {
const repo = this._conn.getRepository(IPLDBlock);
return repo.save(ipldBlock);
}
async getContract (address: string): Promise<Contract | undefined> {
const repo = this._conn.getRepository(Contract);
@ -180,6 +334,11 @@ export class Database {
return this._baseDatabase.getBlockProgressEntities(repo, where, options);
}
async getLatestBlockProgress (): Promise<BlockProgress | undefined> {
const repo = this._conn.getRepository(BlockProgress);
return repo.findOne({ order: { blockNumber: 'DESC' } });
}
async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise<void> {
const repo = queryRunner.manager.getRepository(BlockProgress);

View File

@ -12,6 +12,8 @@ import {
EventWatcher as BaseEventWatcher,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
QUEUE_BLOCK_CHECKPOINT,
QUEUE_HOOKS,
UNKNOWN_EVENT_NAME,
UpstreamConfig
} from '@vulcanize/util';
@ -85,6 +87,23 @@ export class EventWatcher {
}
const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
if (dbEvents.length > 0) {
const dbEvent = dbEvents[0];
// If the block is marked as complete:
// a. Push a post-block hook job.
// b. Push a block checkpointing job.
if (dbEvent.block.isComplete) {
await this._jobQueue.pushJob(QUEUE_HOOKS, { blockHash: dbEvent.block.blockHash });
// Push checkpointing job if checkpointing is on.
if (this._indexer._serverConfig.checkpointing) {
await this._jobQueue.pushJob(QUEUE_BLOCK_CHECKPOINT, { blockHash: dbEvent.block.blockHash, blockNumber: dbEvent.block.blockNumber });
}
}
}
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
// Cannot publish individual event as they are processed together in a single job.

View File

@ -44,11 +44,11 @@ export const main = async (): Promise<any> => {
const config = await getConfig(argv.configFile);
assert(config.server, 'Missing server config');
const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;
const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config;
assert(upstream, 'Missing upstream config');
assert(dbConfig, 'Missing database config');
assert(serverConfig, 'Missing server config');
const db = new Database(dbConfig);
await db.init();
@ -75,7 +75,7 @@ export const main = async (): Promise<any> => {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider);
const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider);
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');

View File

@ -1,51 +0,0 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import { Indexer, ResultEvent } from './indexer';
/**
* Event hook function.
* @param indexer Indexer instance that contains methods to fetch and update the contract values in the database.
* @param eventData ResultEvent object containing necessary information.
*/
export async function handleEvent (indexer: Indexer, eventData: ResultEvent): Promise<void> {
assert(indexer);
assert(eventData);
// The following code is for ERC20 contract implementation.
// Perform indexing based on the type of event.
switch (eventData.event.__typename) {
// In case of ERC20 'Transfer' event.
case 'TransferEvent': {
// On a transfer, balances for both parties change.
// Therefore, trigger indexing for both sender and receiver.
// Get event fields from eventData.
// const { from, to } = eventData.event;
// Update balance entry for sender in the database.
// await indexer.balanceOf(eventData.block.hash, eventData.contract, from);
// Update balance entry for receiver in the database.
// await indexer.balanceOf(eventData.block.hash, eventData.contract, to);
break;
}
// In case of ERC20 'Approval' event.
case 'ApprovalEvent': {
// On an approval, allowance for (owner, spender) combination changes.
// Get event fields from eventData.
// const { owner, spender } = eventData.event;
// Update allowance entry for (owner, spender) combination in the database.
// await indexer.allowance(eventData.block.hash, eventData.contract, owner, spender);
break;
}
}
}

View File

@ -3,8 +3,103 @@
//
import assert from 'assert';
import _ from 'lodash';
import { UNKNOWN_EVENT_NAME } from '@vulcanize/util';
import { Indexer, ResultEvent } from './indexer';
import { BlockProgress } from './entity/BlockProgress';
const ACCOUNTS = [
'0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc'
];
/**
* Genesis hook function.
* @param indexer Indexer instance.
* @param block Concerned block.
* @param contractAddress Address of the concerned contract.
*/
export async function genesisHook (indexer: Indexer, block: BlockProgress, contractAddress: string): Promise<void> {
// Store the genesis state values in an IPLDBlock.
const ipldBlockData: any = {};
// Setting the initial balances of accounts.
for (const account of ACCOUNTS) {
const balance = await indexer._balances(block.blockHash, contractAddress, account);
_.set(ipldBlockData, `state._balances[${account}]`, balance.value.toString());
}
const ipldBlock = await indexer.prepareIPLDBlock(block, contractAddress, ipldBlockData, 'checkpoint');
await indexer.saveOrUpdateIPLDBlock(ipldBlock);
}
/**
* Post-block hook function.
* @param indexer Indexer instance that contains methods to fetch the contract varaiable values.
* @param blockHash Block hash of the concerned block.
*/
export async function postBlockHook (indexer: Indexer, blockHash: string): Promise<void> {
// Get events for current block and make an entry of updated values in IPLDBlock.
const events = await indexer.getEventsByFilter(blockHash);
// No IPLDBlock entry if there are no events.
if (!events) {
return;
}
for (const event of events) {
if (event.eventName === UNKNOWN_EVENT_NAME) {
continue;
}
const block = event.block;
const contractAddress = event.contract;
const eventData = indexer.getResultEvent(event);
const ipldBlockData: any = {};
switch (event.eventName) {
case 'Transfer': {
const { from, to } = eventData.event;
const fromBalance = await indexer._balances(blockHash, contractAddress, from);
const toBalance = await indexer._balances(blockHash, contractAddress, to);
// {
// "_balances": {
// "0xCA6D29232D1435D8198E3E5302495417dD073d61": "100",
// "0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc": "999999999999999999900"
// }
// }
_.set(ipldBlockData, `state._balances[${from}]`, fromBalance.value.toString());
_.set(ipldBlockData, `state._balances[${to}]`, toBalance.value.toString());
break;
}
case 'Approval': {
const { owner, spender } = eventData.event;
const allowance = await indexer._allowances(blockHash, contractAddress, owner, spender);
// {
// "_allowances": {
// "0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc": {
// "0xCA6D29232D1435D8198E3E5302495417dD073d61": "10"
// }
// }
// }
_.set(ipldBlockData, `state._allowances[${owner}][${spender}]`, allowance.value.toString());
break;
}
}
const ipldBlock = await indexer.prepareIPLDBlock(block, contractAddress, ipldBlockData);
await indexer.saveOrUpdateIPLDBlock(ipldBlock);
}
}
/**
* Event hook function.
@ -15,5 +110,37 @@ export async function handleEvent (indexer: Indexer, eventData: ResultEvent): Pr
assert(indexer);
assert(eventData);
// The following code is for ERC20 contract implementation.
// Perform indexing based on the type of event.
switch (eventData.event.__typename) {
// In case of ERC20 'Transfer' event.
case 'TransferEvent': {
// On a transfer, balances for both parties change.
// Therefore, trigger indexing for both sender and receiver.
// Get event fields from eventData.
// const { from, to } = eventData.event;
// Update balance entry for sender in the database.
// await indexer.balanceOf(eventData.block.hash, eventData.contract, from);
// Update balance entry for receiver in the database.
// await indexer.balanceOf(eventData.block.hash, eventData.contract, to);
break;
}
// In case of ERC20 'Approval' event.
case 'ApprovalEvent': {
// On an approval, allowance for (owner, spender) combination changes.
// Get event fields from eventData.
// const { owner, spender } = eventData.event;
// Update allowance entry for (owner, spender) combination in the database.
// await indexer.allowance(eventData.block.hash, eventData.contract, owner, spender);
break;
}
}
}

View File

@ -4,23 +4,28 @@
import assert from 'assert';
import debug from 'debug';
import { JsonFragment } from '@ethersproject/abi';
import { DeepPartial, FindConditions } from 'typeorm';
import JSONbig from 'json-bigint';
import { ethers } from 'ethers';
import { BaseProvider } from '@ethersproject/providers';
import { sha256 } from 'multiformats/hashes/sha2';
import { CID } from 'multiformats/cid';
import _ from 'lodash';
import { JsonFragment } from '@ethersproject/abi';
import { BaseProvider } from '@ethersproject/providers';
import * as codec from '@ipld/dag-json';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { StorageLayout } from '@vulcanize/solidity-mapper';
import { EventInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, Where, QueryOptions } from '@vulcanize/util';
import { Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, ServerConfig, Where, QueryOptions } from '@vulcanize/util';
import { Database } from './database';
import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
import { SyncStatus } from './entity/SyncStatus';
import { BlockProgress } from './entity/BlockProgress';
import { IPLDBlock } from './entity/IPLDBlock';
import artifacts from './artifacts/{{inputFileName}}.json';
import { handleEvent } from './hooks';
import { genesisHook, handleEvent, postBlockHook } from './hooks';
const log = debug('vulcanize:indexer');
@ -30,6 +35,7 @@ const {{capitalize event.name}}_EVENT = '{{event.name}}';
export type ResultEvent = {
block: {
cid: string;
hash: string;
number: number;
timestamp: number;
@ -48,7 +54,21 @@ export type ResultEvent = {
event: any;
proof: string;
}
};
export type ResultIPLDBlock = {
block: {
cid: string;
hash: string;
number: number;
timestamp: number;
parentHash: string;
};
contractAddress: string;
cid: string;
kind: string;
data: string;
};
export class Indexer {
_db: Database
@ -56,12 +76,13 @@ export class Indexer {
_ethProvider: BaseProvider
_postgraphileClient: EthClient;
_baseIndexer: BaseIndexer
_serverConfig: ServerConfig
_abi: JsonFragment[]
_storageLayout: StorageLayout
_contract: ethers.utils.Interface
constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider) {
constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider) {
assert(db);
assert(ethClient);
@ -89,6 +110,7 @@ export class Indexer {
return {
block: {
cid: block.cid,
hash: block.blockHash,
number: block.blockNumber,
timestamp: block.blockTimestamp,
@ -115,6 +137,24 @@ export class Indexer {
};
}
getResultIPLDBlock (ipldBlock: IPLDBlock): ResultIPLDBlock {
const block = ipldBlock.block;
return {
block: {
cid: block.cid,
hash: block.blockHash,
number: block.blockNumber,
timestamp: block.blockTimestamp,
parentHash: block.parentHash
},
contractAddress: ipldBlock.contractAddress,
cid: ipldBlock.cid,
kind: ipldBlock.kind,
data: ipldBlock.data
};
}
{{#each queries as | query |}}
async {{query.name}} (blockHash: string, contractAddress: string
{{~#each query.params}}, {{this.name~}}: {{this.type~}} {{/each}}): Promise<ValueResult> {
@ -166,6 +206,175 @@ export class Indexer {
}
{{/each}}
async processBlock (job: any): Promise<void> {
const { data: { blockHash } } = job;
// Call custom post-block hook.
await postBlockHook(this, blockHash);
}
async processCheckpoint (job: any): Promise<void> {
// Create a checkpoint IPLDBlock for contracts that were checkpointed checkPointInterval blocks before.
// Return if checkpointInterval is <= 0.
const checkpointInterval = this._serverConfig.checkpointInterval;
if (checkpointInterval <= 0) return;
const { data: { blockNumber: currentBlockNumber, blockHash: currentBlockHash } } = job;
// Get latest checkpoints for all the contracts.
// Assuming checkPointInterval > MAX_REORG_DEPTH.
const latestCheckpointBlocks = await this.getLatestCheckpoints();
// For each contractAddress, merge the diff till now.
for (const checkpointBlock of latestCheckpointBlocks) {
// Check if it is time for a new checkpoint.
if (checkpointBlock.block.blockNumber > currentBlockNumber - checkpointInterval) {
continue;
}
const { contractAddress, block: { blockNumber: checkpointBlockNumber } } = checkpointBlock;
// Fetching all diff blocks after checkpoint.
const diffBlocks = await this.getPrevIPLDBlocksAfterCheckpoint(currentBlockHash, checkpointBlockNumber, contractAddress);
let checkPoint = codec.decode(Buffer.from(checkpointBlock.data)) as any;
for (const diffBlock of diffBlocks) {
const diff = codec.decode(Buffer.from(diffBlock.data));
checkPoint = _.merge(checkPoint, diff);
}
// Getting the current block.
const block = await this.getBlockProgress(currentBlockHash);
assert(block);
const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, checkPoint, 'checkpoint');
await this.saveOrUpdateIPLDBlock(ipldBlock);
}
}
async getLatestCheckpoints (): Promise<IPLDBlock[]> {
// Get the latest checkpoints for all the contracts.
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.getLatestCheckpoints(dbTx);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async getIPLDBlock (block: BlockProgress, contractAddress: string): Promise<IPLDBlock | undefined> {
const ipldBlocks = await this._db.getIPLDBlocks({ block, contractAddress });
// There can be only one IPLDBlock for a { block, contractAddress } combination.
assert(ipldBlocks.length <= 1);
return ipldBlocks[0];
}
async getIPLDBlockByCid (cid: string): Promise<IPLDBlock | undefined> {
const ipldBlocks = await this._db.getIPLDBlocks({ cid });
// There can be only one IPLDBlock with a particular cid.
assert(ipldBlocks.length <= 1);
return ipldBlocks[0];
}
async getPrevIPLDBlock (blockHash: string, contractAddress: string): Promise<IPLDBlock | undefined> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.getPrevIPLDBlock(dbTx, blockHash, contractAddress);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async getPrevIPLDBlocksAfterCheckpoint (blockHash: string, checkpointBlockNumber: number, contractAddress: string): Promise<IPLDBlock[]> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.getPrevIPLDBlocksAfterCheckpoint(dbTx, blockHash, checkpointBlockNumber, contractAddress);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlock): Promise<IPLDBlock> {
return this._db.saveOrUpdateIPLDBlock(ipldBlock);
}
async prepareIPLDBlock (block: BlockProgress, contractAddress: string, data: any, kind?: string):Promise<any> {
// Get an existing IPLDBlock for current block and contractAddress.
const currentIPLDBlock = await this.getIPLDBlock(block, contractAddress);
// If an IPLDBlock for { block, contractAddress } already exists, update the data field.
if (currentIPLDBlock) {
const oldData = codec.decode(Buffer.from(currentIPLDBlock.data));
data = _.merge(oldData, data);
} else {
// Fetch the parent IPLDBlock.
const parentIPLDBlock = await this.getPrevIPLDBlock(block.blockHash, contractAddress);
// Setting the meta-data for an IPLDBlock (done only once per block).
data.meta = {
id: contractAddress,
kind: kind || 'diff',
parent: {
'/': parentIPLDBlock ? parentIPLDBlock.cid : null
},
ethBlock: {
cid: {
'/': block.cid
},
num: block.blockNumber
}
};
}
// Encoding the data using dag-json codec.
const bytes = codec.encode(data);
// Calculating sha256 (multi)hash of the encoded data.
const hash = await sha256.digest(bytes);
// Calculating the CID: v1, code: dag-json, hash.
const cid = CID.create(1, codec.code, hash);
let ipldBlock = currentIPLDBlock || new IPLDBlock();
ipldBlock = Object.assign(ipldBlock, {
block,
contractAddress,
cid: cid.toString(),
kind: data.meta.kind,
data: bytes
});
return ipldBlock;
}
async triggerIndexingOnEvent (event: Event): Promise<void> {
const resultEvent = this.getResultEvent(event);
@ -209,14 +418,21 @@ export class Indexer {
return { eventName, eventInfo };
}
async watchContract (address: string, startingBlock: number): Promise<boolean> {
async watchContract (address: string, kind: string, startingBlock: number): Promise<boolean> {
// Always use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress).
await this._db.saveContract(ethers.utils.getAddress(address), '{{contractName}}', startingBlock);
await this._db.saveContract(ethers.utils.getAddress(address), kind, startingBlock);
// Getting the current block.
const currentBlock = await this._db.getLatestBlockProgress();
assert(currentBlock);
// Call custom genesis hook.
await genesisHook(this, currentBlock, address);
return true;
}
async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise<Array<Event>> {
async getEventsByFilter (blockHash: string, contract?: string, name?: string): Promise<Array<Event>> {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}
@ -292,7 +508,7 @@ export class Indexer {
return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
}
async _fetchAndSaveEvents ({ blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> {
async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> {
assert(blockHash);
let { block, logs } = await this._ethClient.getLogs({ blockHash });
@ -375,6 +591,7 @@ export class Indexer {
try {
block = {
cid: blockCid,
blockHash,
blockNumber: block.number,
blockTimestamp: block.timestamp,

View File

@ -16,6 +16,8 @@ import {
JobRunner as BaseJobRunner,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
QUEUE_BLOCK_CHECKPOINT,
QUEUE_HOOKS,
JobQueueConfig,
DEFAULT_CONFIG_PATH,
getCustomProvider
@ -42,10 +44,14 @@ export class JobRunner {
async start (): Promise<void> {
await this.subscribeBlockProcessingQueue();
await this.subscribeEventProcessingQueue();
await this.subscribeBlockCheckpointQueue();
await this.subscribeHooksQueue();
}
async subscribeBlockProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
// TODO Call pre-block hook here (Directly or indirectly (Like done through indexer.processEvent for events)).
await this._baseJobRunner.processBlock(job);
await this._jobQueue.markComplete(job);
@ -65,6 +71,23 @@ export class JobRunner {
await this._jobQueue.markComplete(job);
});
}
async subscribeBlockCheckpointQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => {
await this._indexer.processCheckpoint(job);
await this._jobQueue.markComplete(job);
});
}
// TODO: Make sure the hooks run in order.
async subscribeHooksQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
await this._indexer.processBlock(job);
await this._jobQueue.markComplete(job);
});
}
}
export const main = async (): Promise<any> => {
@ -80,16 +103,15 @@ export const main = async (): Promise<any> => {
const config = await getConfig(argv.f);
assert(config.server, 'Missing server config');
const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;
const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config;
assert(upstream, 'Missing upstream config');
assert(dbConfig, 'Missing database config');
assert(serverConfig, 'Missing server config');
const db = new Database(dbConfig);
await db.init();
assert(upstream, 'Missing upstream config');
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
@ -118,7 +140,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
const jobRunner = new JobRunner(jobQueueConfig, serverConfig, indexer, jobQueue);
await jobRunner.start();
};

View File

@ -23,7 +23,9 @@
},
"homepage": "https://github.com/vulcanize/watcher-ts#readme",
"dependencies": {
"@apollo/client": "^3.3.19",
"@ethersproject/providers": "5.3.0",
"@ipld/dag-json": "^8.0.1",
"@vulcanize/cache": "^0.1.0",
"@vulcanize/ipld-eth-client": "^0.1.0",
"@vulcanize/solidity-mapper": "^0.1.0",
@ -36,6 +38,8 @@
"graphql": "^15.5.0",
"graphql-import-node": "^0.0.4",
"json-bigint": "^1.0.0",
"lodash": "^4.17.21",
"multiformats": "^9.4.8",
"reflect-metadata": "^0.1.13",
"typeorm": "^0.2.32",
"yargs": "^17.0.1"

View File

@ -44,8 +44,10 @@
* Indexing on an event:
* Edit the custom hook function `handleEvent` (triggered on an event) in [hooks.ts](./src/hooks.ts) to perform corresponding indexing using the `Indexer` object.
* Refer to [hooks.example.ts](./src/hooks.example.ts) for an example hook function for events in an ERC20 contract.
* Edit the custom hook function `handleBlock` (triggered on a block) in [hooks.ts](./src/hooks.ts) to save `IPLDBlock`s using the `Indexer` object.
* The existing example hooks in [hooks.ts](./src/hooks.ts) are for an `ERC20` contract.
## Run

View File

@ -34,9 +34,9 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
},
Mutation: {
watchContract: (_: any, { contractAddress, startingBlock = 1 }: { contractAddress: string, startingBlock: number }): Promise<boolean> => {
log('watchContract', contractAddress, startingBlock);
return indexer.watchContract(contractAddress, startingBlock);
watchContract: (_: any, { contractAddress, kind, startingBlock = 1 }: { contractAddress: string, kind: string, startingBlock: number }): Promise<boolean> => {
log('watchContract', contractAddress, kind, startingBlock);
return indexer.watchContract(contractAddress, kind, startingBlock);
}
},
@ -52,8 +52,8 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
},
{{/each}}
events: async (_: any, { blockHash, contractAddress, name }: { blockHash: string, contractAddress: string, name: string }) => {
log('events', blockHash, contractAddress, name || '');
events: async (_: any, { blockHash, contractAddress, name }: { blockHash: string, contractAddress: string, name?: string }) => {
log('events', blockHash, contractAddress, name);
const block = await indexer.getBlockProgress(blockHash);
if (!block || !block.isComplete) {
@ -74,6 +74,22 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
const events = await indexer.getEventsInRange(fromBlockNumber, toBlockNumber);
return events.map(event => indexer.getResultEvent(event));
},
getStateByCID: async (_: any, { cid }: { cid: string }) => {
log('getStateByCID', cid);
const ipldBlock = await indexer.getIPLDBlockByCid(cid);
return ipldBlock && ipldBlock.block.isComplete ? indexer.getResultIPLDBlock(ipldBlock) : undefined;
},
getState: async (_: any, { blockHash, contractAddress }: { blockHash: string, contractAddress: string }) => {
log('getState', blockHash, contractAddress);
const ipldBlock = await indexer.getPrevIPLDBlock(blockHash, contractAddress);
return ipldBlock && ipldBlock.block.isComplete ? indexer.getResultIPLDBlock(ipldBlock) : undefined;
}
}
};

View File

@ -38,18 +38,17 @@ export const main = async (): Promise<any> => {
const config = await getConfig(argv.f);
assert(config.server, 'Missing server config');
const { host, port, kind: watcherKind } = config.server;
const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;
const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config;
assert(upstream, 'Missing upstream config');
assert(dbConfig, 'Missing database config');
assert(serverConfig, 'Missing server config');
const { host, port, kind: watcherKind } = serverConfig;
const db = new Database(dbConfig);
await db.init();
assert(upstream, 'Missing upstream config');
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
@ -69,7 +68,7 @@ export const main = async (): Promise<any> => {
const ethProvider = getCustomProvider(rpcProviderEndpoint);
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider);
const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries

View File

@ -5,10 +5,14 @@
import assert from 'assert';
import yargs from 'yargs';
import 'reflect-metadata';
import { getDefaultProvider } from 'ethers';
import { Config, DEFAULT_CONFIG_PATH, getConfig } from '@vulcanize/util';
import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { Database } from '../database';
import { Indexer } from '../indexer';
(async () => {
const argv = await yargs.parserConfiguration({
@ -42,14 +46,37 @@ import { Database } from '../database';
}).argv;
const config: Config = await getConfig(argv.configFile);
const { database: dbConfig } = config;
assert(dbConfig);
const { upstream, database: dbConfig, server: serverConfig } = config;
assert(upstream, 'Missing upstream config');
assert(dbConfig, 'Missing database config');
assert(serverConfig, 'Missing server config');
const db = new Database(dbConfig);
await db.init();
await db.saveContract(argv.address, argv.kind, argv.startingBlock);
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});
const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});
const ethProvider = getDefaultProvider(rpcProviderEndpoint);
const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider);
await indexer.watchContract(argv.address, argv.kind, argv.startingBlock);
await db.close();
})();

View File

@ -14,6 +14,9 @@ export class BlockProgress implements BlockProgressInterface {
@PrimaryGeneratedColumn()
id!: number;
@Column('varchar')
cid!: string;
@Column('varchar', { length: 66 })
blockHash!: string;

View File

@ -71,6 +71,7 @@ export const main = async (): Promise<any> => {
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,

View File

@ -12,7 +12,7 @@ import { BaseProvider } from '@ethersproject/providers';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { StorageLayout } from '@vulcanize/solidity-mapper';
import { Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, JobQueue, Where, QueryOptions } from '@vulcanize/util';
import { IndexerInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, JobQueue, Where, QueryOptions } from '@vulcanize/util';
import { Database } from './database';
import { Event } from './entity/Event';
@ -43,7 +43,7 @@ interface EventResult {
proof?: string;
}
export class Indexer {
export class Indexer implements IndexerInterface {
_db: Database
_ethClient: EthClient
_postgraphileClient: EthClient
@ -257,6 +257,11 @@ export class Indexer {
await this.triggerIndexingOnEvent(event);
}
async processBlock (blockHash: string): Promise<void> {
// Empty post-block method.
assert(blockHash);
}
parseEventNameAndArgs (kind: string, logObj: any): any {
let eventName = UNKNOWN_EVENT_NAME;
let eventInfo = {};
@ -292,7 +297,7 @@ export class Indexer {
return { eventName, eventInfo };
}
async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise<Array<Event>> {
async getEventsByFilter (blockHash: string, contract: string, name?: string): Promise<Array<Event>> {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}
@ -376,7 +381,7 @@ export class Indexer {
return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
}
async _fetchAndSaveEvents ({ blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> {
async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> {
assert(blockHash);
let { block, logs } = await this._ethClient.getLogs({ blockHash });
@ -441,6 +446,7 @@ export class Indexer {
try {
block = {
cid: blockCid,
blockHash,
blockNumber: block.number,
blockTimestamp: block.timestamp,

View File

@ -16,9 +16,11 @@ import {
JobRunner as BaseJobRunner,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
QUEUE_HOOKS,
JobQueueConfig,
DEFAULT_CONFIG_PATH,
getCustomProvider
getCustomProvider,
ServerConfig
} from '@vulcanize/util';
import { Indexer } from './indexer';
@ -31,12 +33,14 @@ export class JobRunner {
_jobQueue: JobQueue
_baseJobRunner: BaseJobRunner
_jobQueueConfig: JobQueueConfig
_serverConfig: ServerConfig
constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) {
constructor (jobQueueConfig: JobQueueConfig, serverConfig: ServerConfig, indexer: Indexer, jobQueue: JobQueue) {
this._jobQueueConfig = jobQueueConfig;
this._indexer = indexer;
this._jobQueue = jobQueue;
this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue);
this._serverConfig = serverConfig;
this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._serverConfig, this._indexer, this._jobQueue);
}
async start (): Promise<void> {
@ -55,6 +59,14 @@ export class JobRunner {
await this._baseJobRunner.processEvent(job);
});
}
async subscribeHooksQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
await this._indexer.processBlock(job);
await this._jobQueue.markComplete(job);
});
}
}
export const main = async (): Promise<any> => {
@ -70,21 +82,21 @@ export const main = async (): Promise<any> => {
const config = await getConfig(argv.f);
assert(config.server, 'Missing server config');
const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: { mode } } = config;
const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config;
assert(upstream, 'Missing upstream config');
assert(dbConfig, 'Missing database config');
assert(serverConfig, 'Missing server config');
const db = new Database(dbConfig);
await db.init();
assert(upstream, 'Missing upstream config');
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
@ -106,9 +118,9 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, mode);
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, serverConfig.mode);
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
const jobRunner = new JobRunner(jobQueueConfig, serverConfig, indexer, jobQueue);
await jobRunner.start();
};

View File

@ -56,6 +56,7 @@ export const main = async (): Promise<any> => {
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,

View File

@ -100,6 +100,7 @@ subscription {
listen(topic: "header_cids") {
relatedNode {
... on EthHeaderCid {
cid
blockHash
blockNumber
parentHash

View File

@ -14,6 +14,9 @@ export class BlockProgress implements BlockProgressInterface {
@PrimaryGeneratedColumn()
id!: number;
@Column('varchar')
cid!: string;
@Column('varchar', { length: 66 })
blockHash!: string;

View File

@ -92,6 +92,7 @@ export interface TransferEvent {
}
export interface Block {
cid: string;
number: number;
hash: string;
timestamp: number;

View File

@ -70,6 +70,7 @@ export class Indexer implements IndexerInterface {
return {
block: {
cid: block.cid,
hash: block.blockHash,
number: block.blockNumber,
timestamp: block.blockTimestamp,
@ -152,6 +153,11 @@ export class Indexer implements IndexerInterface {
console.timeEnd('time:indexer#processEvent-mapping_code');
}
async processBlock (blockHash: string): Promise<void> {
// Empty post-block method.
assert(blockHash);
}
async getBlockEntities (where: { [key: string]: any } = {}, queryOptions: QueryOptions): Promise<any> {
if (where.timestamp_gt) {
where.blockTimestamp_gt = where.timestamp_gt;

View File

@ -17,10 +17,12 @@ import {
JobQueue,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
QUEUE_HOOKS,
JobRunner as BaseJobRunner,
JobQueueConfig,
DEFAULT_CONFIG_PATH,
getCustomProvider
getCustomProvider,
ServerConfig
} from '@vulcanize/util';
import { Indexer } from './indexer';
@ -33,12 +35,14 @@ export class JobRunner {
_jobQueue: JobQueue
_baseJobRunner: BaseJobRunner
_jobQueueConfig: JobQueueConfig
_serverConfig: ServerConfig
constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) {
constructor (jobQueueConfig: JobQueueConfig, serverConfig: ServerConfig, indexer: Indexer, jobQueue: JobQueue) {
this._jobQueueConfig = jobQueueConfig;
this._indexer = indexer;
this._jobQueue = jobQueue;
this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue);
this._serverConfig = serverConfig;
this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._serverConfig, this._indexer, this._jobQueue);
}
async start (): Promise<void> {
@ -57,6 +61,14 @@ export class JobRunner {
await this._baseJobRunner.processEvent(job);
});
}
async subscribeHooksQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
await this._indexer.processBlock(job);
await this._jobQueue.markComplete(job);
});
}
}
export const main = async (): Promise<any> => {
@ -72,11 +84,11 @@ export const main = async (): Promise<any> => {
const config = await getConfig(argv.f);
assert(config.server, 'Missing server config');
const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: { mode } } = config;
const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config;
assert(upstream, 'Missing upstream config');
assert(dbConfig, 'Missing database config');
assert(serverConfig, 'Missing server config');
const db = new Database(dbConfig);
await db.init();
@ -130,9 +142,9 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, mode);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, serverConfig.mode);
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
const jobRunner = new JobRunner(jobQueueConfig, serverConfig, indexer, jobQueue);
await jobRunner.start();
};

View File

@ -68,6 +68,7 @@ export const main = async (): Promise<any> => {
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,

View File

@ -14,6 +14,9 @@ export class BlockProgress implements BlockProgressInterface {
@PrimaryGeneratedColumn()
id!: number;
@Column('varchar')
cid!: string;
@Column('varchar', { length: 66 })
blockHash!: string;

View File

@ -69,6 +69,7 @@ export class Indexer implements IndexerInterface {
return {
block: {
cid: block.cid,
hash: block.blockHash,
number: block.blockNumber,
timestamp: block.blockTimestamp,
@ -121,6 +122,11 @@ export class Indexer implements IndexerInterface {
}
}
async processBlock (blockHash: string): Promise<void> {
// Empty post-block method.
assert(blockHash);
}
parseEventNameAndArgs (kind: string, logObj: any): any {
let eventName = UNKNOWN_EVENT_NAME;
let eventInfo = {};
@ -339,7 +345,7 @@ export class Indexer implements IndexerInterface {
return contract;
}
async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise<Array<Event>> {
async getEventsByFilter (blockHash: string, contract: string, name?: string): Promise<Array<Event>> {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}
@ -428,7 +434,7 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
}
async _fetchAndSaveEvents ({ blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> {
async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> {
assert(blockHash);
const logsPromise = this._ethClient.getLogs({ blockHash });
@ -516,6 +522,7 @@ export class Indexer implements IndexerInterface {
try {
block = {
cid: blockCid,
blockHash,
blockNumber: block.number,
blockTimestamp: block.timestamp,

View File

@ -16,9 +16,11 @@ import {
JobRunner as BaseJobRunner,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
QUEUE_HOOKS,
JobQueueConfig,
DEFAULT_CONFIG_PATH,
getCustomProvider
getCustomProvider,
ServerConfig
} from '@vulcanize/util';
import { Indexer } from './indexer';
@ -31,12 +33,14 @@ export class JobRunner {
_jobQueue: JobQueue
_baseJobRunner: BaseJobRunner
_jobQueueConfig: JobQueueConfig
_serverConfig: ServerConfig
constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) {
constructor (jobQueueConfig: JobQueueConfig, serverConfig: ServerConfig, indexer: Indexer, jobQueue: JobQueue) {
this._jobQueueConfig = jobQueueConfig;
this._indexer = indexer;
this._jobQueue = jobQueue;
this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue);
this._serverConfig = serverConfig;
this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._serverConfig, this._indexer, this._jobQueue);
}
async start (): Promise<void> {
@ -55,6 +59,14 @@ export class JobRunner {
await this._baseJobRunner.processEvent(job);
});
}
async subscribeHooksQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
await this._indexer.processBlock(job);
await this._jobQueue.markComplete(job);
});
}
}
export const main = async (): Promise<any> => {
@ -70,11 +82,11 @@ export const main = async (): Promise<any> => {
const config = await getConfig(argv.f);
assert(config.server, 'Missing server config');
const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;
const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config;
assert(upstream, 'Missing upstream config');
assert(dbConfig, 'Missing database config');
assert(serverConfig, 'Missing server config');
const db = new Database(dbConfig);
await db.init();
@ -109,7 +121,7 @@ export const main = async (): Promise<any> => {
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue);
await indexer.init();
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
const jobRunner = new JobRunner(jobQueueConfig, serverConfig, indexer, jobQueue);
await jobRunner.start();
};

View File

@ -184,6 +184,7 @@ union Event = TransferEvent | PoolCreatedEvent | IncreaseLiquidityEvent | Decrea
# Ethereum types
type Block {
cid: String!
hash: String!
number: Int!
timestamp: Int!

View File

@ -24,11 +24,13 @@ export interface JobQueueConfig {
eventsInBatch: number;
}
interface ServerConfig {
export interface ServerConfig {
host: string;
port: number;
mode: string;
kind: string;
checkpointing: boolean;
checkpointInterval: number;
}
export interface UpstreamConfig {

View File

@ -7,6 +7,8 @@ export const MAX_REORG_DEPTH = 16;
export const QUEUE_BLOCK_PROCESSING = 'block-processing';
export const QUEUE_EVENT_PROCESSING = 'event-processing';
export const QUEUE_CHAIN_PRUNING = 'chain-pruning';
export const QUEUE_BLOCK_CHECKPOINT = 'block-checkpoint';
export const QUEUE_HOOKS = 'hooks';
export const JOB_KIND_INDEX = 'index';
export const JOB_KIND_PRUNE = 'prune';

View File

@ -216,6 +216,7 @@ export class Database {
async saveEvents (blockRepo: Repository<BlockProgressInterface>, eventRepo: Repository<EventInterface>, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<BlockProgressInterface> {
const {
cid,
blockHash,
blockNumber,
blockTimestamp,
@ -234,6 +235,7 @@ export class Database {
const numEvents = events.length;
const entity = blockRepo.create({
cid,
blockHash,
parentHash,
blockNumber,
@ -473,6 +475,7 @@ export class Database {
// If entity not found in frothy region get latest entity in the pruned region.
// Filter out entities from pruned blocks.
const canonicalBlockNumber = blockNumber + 1;
const entityInPrunedRegion:any = await repo.createQueryBuilder('entity')
.innerJoinAndSelect('block_progress', 'block', 'block.block_hash = entity.block_hash')
.where('block.is_pruned = false')

View File

@ -124,11 +124,12 @@ export class EventWatcher {
}
async publishBlockProgressToSubscribers (blockProgress: BlockProgressInterface): Promise<void> {
const { blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress;
const { cid, blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress;
// Publishing the event here will result in pushing the payload to GQL subscribers for `onAddressEvent(address)`.
await this._pubsub.publish(BlockProgressEvent, {
onBlockProgressEvent: {
cid,
blockHash,
blockNumber,
numEvents,

View File

@ -208,7 +208,7 @@ export class Indexer {
return this._db.getBlockEvents(blockHash, where, queryOptions);
}
async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise<Array<EventInterface>> {
async getEventsByFilter (blockHash: string, contract?: string, name?: string): Promise<Array<EventInterface>> {
if (contract) {
const watchedContract = await this.isWatchedContract(contract);
if (!watchedContract) {

View File

@ -6,8 +6,19 @@ import assert from 'assert';
import debug from 'debug';
import { In } from 'typeorm';
import { JobQueueConfig } from './config';
import { JOB_KIND_INDEX, JOB_KIND_PRUNE, JOB_KIND_EVENTS, JOB_KIND_CONTRACT, MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, UNKNOWN_EVENT_NAME } from './constants';
import { JobQueueConfig, ServerConfig } from './config';
import {
JOB_KIND_INDEX,
JOB_KIND_PRUNE,
JOB_KIND_EVENTS,
JOB_KIND_CONTRACT,
MAX_REORG_DEPTH,
UNKNOWN_EVENT_NAME,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
QUEUE_BLOCK_CHECKPOINT,
QUEUE_HOOKS
} from './constants';
import { JobQueue } from './job-queue';
import { EventInterface, IndexerInterface, SyncStatusInterface } from './types';
import { wait } from './misc';
@ -23,11 +34,13 @@ export class JobRunner {
_jobQueue: JobQueue
_jobQueueConfig: JobQueueConfig
_blockProcessStartTime?: Date
_serverConfig: ServerConfig
constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) {
this._jobQueueConfig = jobQueueConfig;
constructor (jobQueueConfig: JobQueueConfig, serverConfig: ServerConfig, indexer: IndexerInterface, jobQueue: JobQueue) {
this._indexer = indexer;
this._jobQueue = jobQueue;
this._jobQueueConfig = jobQueueConfig;
this._serverConfig = serverConfig;
}
async processBlock (job: any): Promise<void> {
@ -117,7 +130,7 @@ export class JobRunner {
}
async _indexBlock (job: any, syncStatus: SyncStatusInterface): Promise<void> {
const { data: { blockHash, blockNumber, parentHash, priority, timestamp } } = job;
const { data: { cid, blockHash, blockNumber, parentHash, priority, timestamp } } = job;
const indexBlockStartTime = new Date();
@ -168,10 +181,11 @@ export class JobRunner {
throw new Error(message);
}
const [{ blockNumber: parentBlockNumber, parentHash: grandparentHash, timestamp: parentTimestamp }] = blocks;
const [{ cid: parentCid, blockNumber: parentBlockNumber, parentHash: grandparentHash, timestamp: parentTimestamp }] = blocks;
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, {
kind: JOB_KIND_INDEX,
cid: parentCid,
blockHash: parentHash,
blockNumber: parentBlockNumber,
parentHash: grandparentHash,
@ -192,6 +206,7 @@ export class JobRunner {
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, {
kind: JOB_KIND_INDEX,
cid: parentBlock.cid,
blockHash: parentHash,
blockNumber: parentBlock.blockNumber,
parentHash: parentBlock.parentHash,
@ -213,7 +228,7 @@ export class JobRunner {
// Delay required to process block.
await wait(jobDelayInMilliSecs);
blockProgress = await this._indexer.fetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
blockProgress = await this._indexer.fetchBlockEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
}
// Check if block has unprocessed events.
@ -221,6 +236,17 @@ export class JobRunner {
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: blockProgress.blockHash, publish: true });
}
if (!blockProgress.numEvents) {
// Push post-block hook and checkpointing jobs if there are no events as the block is already marked as complete.
await this._jobQueue.pushJob(QUEUE_HOOKS, { blockHash });
// Push checkpointing job only if checkpointing is on.
if (this._serverConfig.checkpointing) {
await this._jobQueue.pushJob(QUEUE_BLOCK_CHECKPOINT, { blockHash, blockNumber });
}
}
const indexBlockDuration = new Date().getTime() - indexBlockStartTime.getTime();
log(`time:job-runner#_indexBlock: ${indexBlockDuration}ms`);
}

View File

@ -8,6 +8,7 @@ import { Where, QueryOptions } from './database';
export interface BlockProgressInterface {
id: number;
cid: string;
blockHash: string;
parentHash: string;
blockNumber: number;
@ -70,6 +71,7 @@ export interface IndexerInterface {
parseEventNameAndArgs?: (kind: string, logObj: any) => any;
isWatchedContract?: (address: string) => Promise<ContractInterface | undefined>;
cacheContract?: (contract: ContractInterface) => void;
processBlock(blockHash: string): Promise<void>;
}
export interface EventWatcherInterface {