Ensuring chronological execution of hooks and checkpointing (#264)

* Subscribe to hooks queue in existing watchers

* Change naming strategy for generated get and save functions

* Push checkpointing job after post-block hook job completed

* Using hooks status to ensure their chronological execution

* Add default indices to IPLDBlock table

* Add kind parameter to getState GQL API

* Add checkpoint CLI

* Add blockHash arg to checkpoint CLI and update codegen docs

* Print out block hash for checkpoint CLI

* Use log from debug for logging

* Filter using contract at start in hierarchical query

* Make kind argument to prepare IPLDBlock required
This commit is contained in:
prathamesh0 2021-10-14 16:08:45 +05:30 committed by nabarun
parent 51b200709b
commit 421e7498d3
28 changed files with 425 additions and 122 deletions

View File

@ -73,6 +73,8 @@
* Create the databases configured in `environments/local.toml`.
* Update the derived state checkpoint settings in `environments/local.toml`.
### Customize
* Indexing on an event:
@ -81,6 +83,8 @@
* Edit the custom hook function `handleBlock` (triggered on a block) in `src/hooks.ts` to save `IPLDBlock`s using the `Indexer` object.
* Edit the custom hook function `genesisHook` (triggered on watch-contract) in `src/hooks.ts` to save a genesis checkpoint `IPLDBlock` using the `Indexer` object.
* The existing example hooks in `src/hooks.ts` are for an `ERC20` contract.
### Run
@ -114,7 +118,13 @@
* To fill a block range:
```bash
yarn fill --startBlock <from-block> --endBlock <to-block>
yarn fill --start-block <from-block> --end-block <to-block>
```
* To create a checkpoint for a contract:
```bash
yarn checkpoint --address <contract-address> --block-hash [block-hash]
```
## Known Issues

View File

@ -0,0 +1,21 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import fs from 'fs';
import path from 'path';
import Handlebars from 'handlebars';
import { Writable } from 'stream';
const TEMPLATE_FILE = './templates/checkpoint-template.handlebars';
/**
* Writes the checkpoint file generated from a template to a stream.
* @param outStream A writable output stream to write the checkpoint file to.
*/
export function exportCheckpoint (outStream: Writable): void {
const templateString = fs.readFileSync(path.resolve(__dirname, TEMPLATE_FILE)).toString();
const template = Handlebars.compile(templateString);
const checkpoint = template({});
outStream.write(checkpoint);
}

View File

@ -39,10 +39,15 @@ export class Client {
const queryObject = {
name,
getQueryName: '',
params: _.cloneDeep(params),
returnType
};
queryObject.getQueryName = (name.charAt(0) === '_')
? `_get${name.charAt(1).toUpperCase()}${name.slice(2)}`
: `get${name.charAt(0).toUpperCase()}${name.slice(1)}`;
queryObject.params = queryObject.params.map((param) => {
const tsParamType = getTsForSol(param.type);
assert(tsParamType);

View File

@ -0,0 +1,13 @@
className: HooksStatus
indexOn: []
columns:
- name: latestProcessedBlockNumber
pgType: integer
tsType: number
columnType: Column
imports:
- toImport:
- Entity
- PrimaryGeneratedColumn
- Column
from: typeorm

View File

@ -1,8 +1,16 @@
className: IPLDBlock
indexOn:
- columns:
- cid
unique: true
- columns:
- block
- contractAddress
- columns:
- block
- contractAddress
- kind
unique: true
columns:
- name: block
tsType: BlockProgress

View File

@ -38,15 +38,23 @@ export class Database {
const queryObject = {
name,
entityName: '',
getQueryName: '',
saveQueryName: '',
params: _.cloneDeep(params),
returnType
};
// eth_call mode: Capitalize first letter of entity name (balanceOf -> BalanceOf).
// storage mode: Capiltalize second letter of entity name (_balances -> _Balances).
queryObject.entityName = (name.charAt(0) === '_')
? `_${name.charAt(1).toUpperCase()}${name.slice(2)}`
: `${name.charAt(0).toUpperCase()}${name.slice(1)}`;
// eth_call mode: Capitalize first letter of entity name (balanceOf -> BalanceOf, getBalanceOf, saveBalanceOf).
// storage mode: Capiltalize second letter of entity name (_balances -> _Balances, _getBalances, _saveBalances).
if (name.charAt(0) === '_') {
queryObject.entityName = `_${name.charAt(1).toUpperCase()}${name.slice(2)}`;
queryObject.getQueryName = `_get${name.charAt(1).toUpperCase()}${name.slice(2)}`;
queryObject.saveQueryName = `_save${name.charAt(1).toUpperCase()}${name.slice(2)}`;
} else {
queryObject.entityName = `${name.charAt(0).toUpperCase()}${name.slice(1)}`;
queryObject.getQueryName = `get${name.charAt(0).toUpperCase()}${name.slice(1)}`;
queryObject.saveQueryName = `save${name.charAt(0).toUpperCase()}${name.slice(1)}`;
}
queryObject.params = queryObject.params.map((param) => {
const tsParamType = getTsForSol(param.type);

View File

@ -188,6 +188,7 @@ export class Entity {
this._addContractEntity();
this._addBlockProgressEntity();
this._addIPLDBlockEntity();
this._addHooksStatusEntity();
const template = Handlebars.compile(this._templateString);
this._entities.forEach(entityObj => {
@ -223,4 +224,9 @@ export class Entity {
const entity = yaml.load(fs.readFileSync(path.resolve(__dirname, TABLES_DIR, 'IPLDBlock.yaml'), 'utf8'));
this._entities.push(entity);
}
_addHooksStatusEntity (): void {
const entity = yaml.load(fs.readFileSync(path.resolve(__dirname, TABLES_DIR, 'HooksStatus.yaml'), 'utf8'));
this._entities.push(entity);
}
}

View File

@ -27,6 +27,7 @@ import { exportLint } from './lint';
import { registerHandlebarHelpers } from './utils/handlebar-helpers';
import { exportHooks } from './hooks';
import { exportFill } from './fill';
import { exportCheckpoint } from './checkpoint';
const main = async (): Promise<void> => {
const argv = await yargs(hideBin(process.argv))
@ -208,6 +209,11 @@ function generateWatcher (data: string, visitor: Visitor, argv: any) {
: process.stdout;
exportWatchContract(outStream);
outStream = outputDir
? fs.createWriteStream(path.join(outputDir, 'src/cli/checkpoint.ts'))
: process.stdout;
exportCheckpoint(outStream);
let hooksOutStream;
if (outputDir) {
hooksOutStream = fs.createWriteStream(path.join(outputDir, 'src/hooks.ts'));

View File

@ -41,11 +41,21 @@ export class Indexer {
const queryObject = {
name,
getQueryName: '',
saveQueryName: '',
params: _.cloneDeep(params),
returnType,
mode
};
if (name.charAt(0) === '_') {
queryObject.getQueryName = `_get${name.charAt(1).toUpperCase()}${name.slice(2)}`;
queryObject.saveQueryName = `_save${name.charAt(1).toUpperCase()}${name.slice(2)}`;
} else {
queryObject.getQueryName = `get${name.charAt(0).toUpperCase()}${name.slice(1)}`;
queryObject.saveQueryName = `save${name.charAt(0).toUpperCase()}${name.slice(1)}`;
}
queryObject.params = queryObject.params.map((param) => {
const tsParamType = getTsForSol(param.type);
assert(tsParamType);

View File

@ -266,7 +266,8 @@ export class Schema {
type: this._composer.getOTC('ResultIPLDBlock'),
args: {
blockHash: 'String!',
contractAddress: 'String!'
contractAddress: 'String!',
kind: 'String'
}
}
});

View File

@ -0,0 +1,88 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import yargs from 'yargs';
import 'reflect-metadata';
import debug from 'debug';
import { getDefaultProvider } from 'ethers';
import { Config, DEFAULT_CONFIG_PATH, getConfig } from '@vulcanize/util';
import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { Database } from '../database';
import { Indexer } from '../indexer';
const log = debug('vulcanize:checkpoint');
const main = async (): Promise<void> => {
const argv = await yargs.parserConfiguration({
'parse-numbers': false
}).options({
configFile: {
alias: 'f',
type: 'string',
require: true,
demandOption: true,
describe: 'Configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
},
address: {
type: 'string',
require: true,
demandOption: true,
describe: 'Contract address to create the checkpoint for.'
},
blockHash: {
type: 'string',
describe: 'Blockhash at which to create the checkpoint.'
}
}).argv;
const config: Config = await getConfig(argv.configFile);
const { upstream, database: dbConfig, server: serverConfig } = config;
assert(upstream, 'Missing upstream config');
assert(dbConfig, 'Missing database config');
assert(serverConfig, 'Missing server config');
const db = new Database(dbConfig);
await db.init();
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});
const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});
const ethProvider = getDefaultProvider(rpcProviderEndpoint);
const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider);
const blockHash = await indexer.createCheckpoint(argv.address, argv.blockHash);
log(`Created a checkpoint for contract ${argv.address} at block-hash ${blockHash}`);
await db.close();
};
main()
.catch(err => {
log(err);
})
.finally(() => {
process.exit(0);
});

View File

@ -18,8 +18,7 @@ export class Client {
}
{{#each queries as | query |}}
// eslint-disable-next-line camelcase
async get{{capitalize query.name tillIndex=1}} (blockHash: string, contractAddress: string
async {{query.getQueryName}} (blockHash: string, contractAddress: string
{{~#each query.params}}, {{this.name}}: {{this.type~}} {{/each}}): Promise<any> {
const { {{query.name}} } = await this._client.query(
gql(queries.{{query.name}}),

View File

@ -11,6 +11,7 @@ import { Database as BaseDatabase, QueryOptions, Where, MAX_REORG_DEPTH } from '
import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
import { SyncStatus } from './entity/SyncStatus';
import { HooksStatus } from './entity/HooksStatus';
import { BlockProgress } from './entity/BlockProgress';
import { IPLDBlock } from './entity/IPLDBlock';
@ -46,11 +47,10 @@ export class Database {
}
{{#each queries as | query |}}
// eslint-disable-next-line camelcase
{{#if (reservedNameCheck (capitalize query.name tillIndex=1)) }}
{{#if (reservedNameCheck query.entityName) }}
// eslint-disable-next-line @typescript-eslint/ban-types
{{/if}}
async get{{capitalize query.name tillIndex=1}} ({ blockHash, contractAddress
async {{query.getQueryName}} ({ blockHash, contractAddress
{{~#each query.params}}, {{this.name~}} {{/each}} }: { blockHash: string, contractAddress: string
{{~#each query.params}}, {{this.name~}}: {{this.type~}} {{/each}} }): Promise<{{query.entityName}} | undefined> {
return this._conn.getRepository({{query.entityName}})
@ -66,11 +66,10 @@ export class Database {
{{/each}}
{{~#each queries as | query |}}
// eslint-disable-next-line camelcase
{{#if (reservedNameCheck (capitalize query.name tillIndex=1)) }}
{{#if (reservedNameCheck query.entityName) }}
// eslint-disable-next-line @typescript-eslint/ban-types
{{/if}}
async save{{capitalize query.name tillIndex=1}} ({ blockHash, contractAddress
async {{query.saveQueryName}} ({ blockHash, contractAddress
{{~#each query.params}}, {{this.name~}} {{/each}}, value, proof }: DeepPartial<{{query.entityName}}>): Promise<{{query.entityName}}> {
const repo = this._conn.getRepository({{query.entityName}});
const entity = repo.create({ blockHash, contractAddress
@ -84,22 +83,20 @@ export class Database {
return repo.find({ where, relations: ['block'] });
}
async getLatestCheckpoints (queryRunner: QueryRunner): Promise<IPLDBlock[]> {
// Get the latest checkpoints for all the contracts.
async getLatestCheckpoint (queryRunner: QueryRunner, contractAddress: string): Promise<IPLDBlock | undefined> {
// Get the latest checkpoints for a contract.
const result = await queryRunner.manager.createQueryBuilder(IPLDBlock, 'ipld_block')
.distinctOn(['contract_address'])
.orderBy('contract_address')
.innerJoinAndSelect(Contract, 'contract', 'contract_address = contract.address')
.leftJoinAndSelect('ipld_block.block', 'block')
.where('block.is_pruned = false')
.andWhere('ipld_block.contractAddress = :contractAddress', { contractAddress })
.andWhere('ipld_block.kind = :kind', { kind: 'checkpoint' })
.addOrderBy('ipld_block.block_id', 'DESC')
.getMany();
.orderBy('ipld_block.block_id', 'DESC')
.getOne();
return result;
}
async getPrevIPLDBlock (queryRunner: QueryRunner, blockHash: string, contractAddress: string): Promise<IPLDBlock | undefined> {
async getPrevIPLDBlock (queryRunner: QueryRunner, blockHash: string, contractAddress: string, kind?: string): Promise<IPLDBlock | undefined> {
const heirerchicalQuery = `
WITH RECURSIVE cte_query AS
(
@ -108,11 +105,13 @@ export class Database {
b.block_number,
b.parent_hash,
1 as depth,
i.id
i.id,
i.kind
FROM
block_progress b
LEFT JOIN
ipld_block i ON i.block_id = b.id
AND i.contract_address = $2
WHERE
b.block_hash = $1
UNION ALL
@ -121,7 +120,8 @@ export class Database {
b.block_number,
b.parent_hash,
c.depth + 1,
i.id
i.id,
i.kind
FROM
block_progress b
LEFT JOIN
@ -131,35 +131,42 @@ export class Database {
INNER JOIN
cte_query c ON c.parent_hash = b.block_hash
WHERE
c.id IS NULL AND c.depth < $3
c.depth < $3
)
SELECT
block_number, id
block_number, id, kind
FROM
cte_query
ORDER BY block_number ASC
LIMIT 1;
ORDER BY block_number DESC, id DESC
`;
// Fetching block and id for previous IPLDBlock in frothy region.
const [{ block_number: blockNumber, id }] = await queryRunner.query(heirerchicalQuery, [blockHash, contractAddress, MAX_REORG_DEPTH]);
const queryResult = await queryRunner.query(heirerchicalQuery, [blockHash, contractAddress, MAX_REORG_DEPTH]);
const latestRequiredResult = kind
? queryResult.find((obj: any) => obj.kind === kind)
: queryResult.find((obj: any) => obj.id);
let result: IPLDBlock | undefined;
if (id) {
result = await queryRunner.manager.findOne(IPLDBlock, { id }, { relations: ['block'] });
if (latestRequiredResult) {
result = await queryRunner.manager.findOne(IPLDBlock, { id: latestRequiredResult.id }, { relations: ['block'] });
} else {
// If IPLDBlock not found in frothy region get latest IPLDBlock in the pruned region.
// Filter out IPLDBlocks from pruned blocks.
const canonicalBlockNumber = blockNumber + 1;
const canonicalBlockNumber = queryResult.pop().block_number + 1;
result = await queryRunner.manager.createQueryBuilder(IPLDBlock, 'ipld_block')
let queryBuilder = queryRunner.manager.createQueryBuilder(IPLDBlock, 'ipld_block')
.leftJoinAndSelect('ipld_block.block', 'block')
.where('block.is_pruned = false')
.andWhere('ipld_block.contract_address = :contractAddress', { contractAddress })
.andWhere('block.block_number <= :canonicalBlockNumber', { canonicalBlockNumber })
.orderBy('block.block_number', 'DESC')
.limit(1)
.getOne();
.orderBy('block.block_number', 'DESC');
// Filter using kind if specified else order by id to give preference to checkpoint.
queryBuilder = kind
? queryBuilder.andWhere('ipld_block.kind = :kind', { kind })
: queryBuilder.addOrderBy('ipld_block.id', 'DESC');
result = await queryBuilder.getOne();
}
return result;
@ -179,6 +186,7 @@ export class Database {
block_progress b
LEFT JOIN
ipld_block i ON i.block_id = b.id
AND i.contract_address = $2
WHERE
b.block_hash = $1
UNION ALL
@ -232,6 +240,34 @@ export class Database {
return repo.save(ipldBlock);
}
async getHooksStatus (queryRunner: QueryRunner): Promise<HooksStatus | undefined> {
const repo = queryRunner.manager.getRepository(HooksStatus);
return repo.findOne();
}
async updateHooksStatusProcessedBlock (queryRunner: QueryRunner, blockNumber: number): Promise<HooksStatus> {
const repo = queryRunner.manager.getRepository(HooksStatus);
let entity = await repo.findOne();
if (!entity) {
entity = repo.create({
latestProcessedBlockNumber: blockNumber
});
}
if (blockNumber > entity.latestProcessedBlockNumber) {
entity.latestProcessedBlockNumber = blockNumber;
}
return repo.save(entity);
}
async getContracts (where: FindConditions<Contract>): Promise<Contract[]> {
const repo = this._conn.getRepository(Contract);
return repo.find({ where });
}
async getContract (address: string): Promise<Contract | undefined> {
const repo = this._conn.getRepository(Contract);

View File

@ -57,6 +57,7 @@ export class EventWatcher {
await this.initBlockProcessingOnCompleteHandler();
await this.initEventProcessingOnCompleteHandler();
await this.initHooksOnCompleteHandler();
this._baseEventWatcher.startBlockProcessing();
}
@ -91,16 +92,9 @@ export class EventWatcher {
if (dbEvents.length > 0) {
const dbEvent = dbEvents[0];
// If the block is marked as complete:
// a. Push a post-block hook job.
// b. Push a block checkpointing job.
// If the block is marked as complete, push a post-block hook job.
if (dbEvent.block.isComplete) {
await this._jobQueue.pushJob(QUEUE_HOOKS, { blockHash: dbEvent.block.blockHash });
// Push checkpointing job if checkpointing is on.
if (this._indexer._serverConfig.checkpointing) {
await this._jobQueue.pushJob(QUEUE_BLOCK_CHECKPOINT, { blockHash: dbEvent.block.blockHash, blockNumber: dbEvent.block.blockNumber });
}
await this._jobQueue.pushJob(QUEUE_HOOKS, { blockHash: dbEvent.block.blockHash, blockNumber: dbEvent.block.blockNumber });
}
}
@ -124,6 +118,19 @@ export class EventWatcher {
});
}
async initHooksOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_HOOKS, async (job) => {
const { data: { request: { data: { blockHash, blockNumber } } } } = job;
await this._indexer.updateHooksStatusProcessedBlock(blockNumber);
// Push checkpointing job only after post-block hook job is marked complete and checkpointing is on.
if (this._indexer._serverConfig.checkpointing) {
this._jobQueue.pushJob(QUEUE_BLOCK_CHECKPOINT, { blockHash });
}
});
}
async publishEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
const resultEvent = this._indexer.getResultEvent(dbEvent);

View File

@ -96,7 +96,7 @@ export async function postBlockHook (indexer: Indexer, blockHash: string): Promi
}
}
const ipldBlock = await indexer.prepareIPLDBlock(block, contractAddress, ipldBlockData);
const ipldBlock = await indexer.prepareIPLDBlock(block, contractAddress, ipldBlockData, 'diff');
await indexer.saveOrUpdateIPLDBlock(ipldBlock);
}
}

View File

@ -22,6 +22,7 @@ import { Database } from './database';
import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
import { SyncStatus } from './entity/SyncStatus';
import { HooksStatus } from './entity/HooksStatus';
import { BlockProgress } from './entity/BlockProgress';
import { IPLDBlock } from './entity/IPLDBlock';
import artifacts from './artifacts/{{inputFileName}}.json';
@ -158,7 +159,7 @@ export class Indexer {
{{#each queries as | query |}}
async {{query.name}} (blockHash: string, contractAddress: string
{{~#each query.params}}, {{this.name~}}: {{this.type~}} {{/each}}): Promise<ValueResult> {
const entity = await this._db.get{{capitalize query.name tillIndex=1}}({ blockHash, contractAddress
const entity = await this._db.{{query.getQueryName}}({ blockHash, contractAddress
{{~#each query.params}}, {{this.name~}} {{~/each}} });
if (entity) {
log('{{query.name}}: db hit.');
@ -199,7 +200,7 @@ export class Indexer {
);
{{/if}}
await this._db.save{{capitalize query.name tillIndex=1}}({ blockHash, contractAddress
await this._db.{{query.saveQueryName}}({ blockHash, contractAddress
{{~#each query.params}}, {{this.name~}} {{/each}}, value: result.value, proof: JSONbig.stringify(result.proof) });
return result;
@ -220,23 +221,42 @@ export class Indexer {
const checkpointInterval = this._serverConfig.checkpointInterval;
if (checkpointInterval <= 0) return;
const { data: { blockNumber: currentBlockNumber, blockHash: currentBlockHash } } = job;
const { data: { blockHash: currentBlockHash } } = job;
// Get latest checkpoints for all the contracts.
// Assuming checkPointInterval > MAX_REORG_DEPTH.
const latestCheckpointBlocks = await this.getLatestCheckpoints();
// Get all the contracts.
const contracts = await this._db.getContracts({});
const contractAddresses = contracts.map(contract => contract.address);
// For each contractAddress, merge the diff till now.
for (const checkpointBlock of latestCheckpointBlocks) {
// Check if it is time for a new checkpoint.
if (checkpointBlock.block.blockNumber > currentBlockNumber - checkpointInterval) {
continue;
// For each contractAddress, merge the diff till now to create a checkpoint.
for (const contractAddress of contractAddresses) {
await this.createCheckpoint(contractAddress, currentBlockHash, checkpointInterval);
}
}
const { contractAddress, block: { blockNumber: checkpointBlockNumber } } = checkpointBlock;
async createCheckpoint (contractAddress: string, currentBlockHash?: string, checkpointInterval?: number): Promise<string | undefined> {
// Getting the current block.
let currentBlock;
if (currentBlockHash) {
currentBlock = await this.getBlockProgress(currentBlockHash);
} else {
currentBlock = await this._db.getLatestBlockProgress();
}
assert(currentBlock);
// Fetching the latest checkpoint for a contract.
// Assuming checkPointInterval > MAX_REORG_DEPTH.
const checkpointBlock = await this.getLatestCheckpoint(contractAddress);
assert(checkpointBlock);
// Check if it is time for a new checkpoint.
if (checkpointInterval && checkpointBlock.block.blockNumber > (currentBlock.blockNumber - checkpointInterval)) {
return;
}
const { block: { blockNumber: checkpointBlockNumber } } = checkpointBlock;
// Fetching all diff blocks after checkpoint.
const diffBlocks = await this.getPrevIPLDBlocksAfterCheckpoint(currentBlockHash, checkpointBlockNumber, contractAddress);
const diffBlocks = await this.getPrevIPLDBlocksAfterCheckpoint(currentBlock.blockHash, checkpointBlockNumber, contractAddress);
let checkPoint = codec.decode(Buffer.from(checkpointBlock.data)) as any;
@ -245,22 +265,19 @@ export class Indexer {
checkPoint = _.merge(checkPoint, diff);
}
// Getting the current block.
const block = await this.getBlockProgress(currentBlockHash);
assert(block);
const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, checkPoint, 'checkpoint');
const ipldBlock = await this.prepareIPLDBlock(currentBlock, contractAddress, checkPoint, 'checkpoint');
await this.saveOrUpdateIPLDBlock(ipldBlock);
}
return currentBlock.blockHash;
}
async getLatestCheckpoints (): Promise<IPLDBlock[]> {
// Get the latest checkpoints for all the contracts.
async getLatestCheckpoint (contractAddress: string): Promise<IPLDBlock | undefined> {
// Get the latest checkpoints for a contract.
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.getLatestCheckpoints(dbTx);
res = await this._db.getLatestCheckpoint(dbTx, contractAddress);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
@ -271,13 +288,10 @@ export class Indexer {
return res;
}
async getIPLDBlock (block: BlockProgress, contractAddress: string): Promise<IPLDBlock | undefined> {
const ipldBlocks = await this._db.getIPLDBlocks({ block, contractAddress });
async getIPLDBlocks (block: BlockProgress, contractAddress: string, kind?: string): Promise<IPLDBlock[]> {
const ipldBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind });
// There can be only one IPLDBlock for a { block, contractAddress } combination.
assert(ipldBlocks.length <= 1);
return ipldBlocks[0];
return ipldBlocks;
}
async getIPLDBlockByCid (cid: string): Promise<IPLDBlock | undefined> {
@ -289,12 +303,12 @@ export class Indexer {
return ipldBlocks[0];
}
async getPrevIPLDBlock (blockHash: string, contractAddress: string): Promise<IPLDBlock | undefined> {
async getPrevIPLDBlock (blockHash: string, contractAddress: string, kind?: string): Promise<IPLDBlock | undefined> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.getPrevIPLDBlock(dbTx, blockHash, contractAddress);
res = await this._db.getPrevIPLDBlock(dbTx, blockHash, contractAddress, kind);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
@ -326,15 +340,24 @@ export class Indexer {
return this._db.saveOrUpdateIPLDBlock(ipldBlock);
}
async prepareIPLDBlock (block: BlockProgress, contractAddress: string, data: any, kind?: string):Promise<any> {
async prepareIPLDBlock (block: BlockProgress, contractAddress: string, data: any, kind: string):Promise<any> {
// Get an existing IPLDBlock for current block and contractAddress.
const currentIPLDBlock = await this.getIPLDBlock(block, contractAddress);
const currentIPLDBlocks = await this.getIPLDBlocks(block, contractAddress, 'diff');
// There can be only one IPLDBlock for a (block, contractAddress, 'diff') combination.
assert(currentIPLDBlocks.length <= 1);
const currentIPLDBlock = currentIPLDBlocks[0];
// If an IPLDBlock for { block, contractAddress } already exists, update the data field.
if (currentIPLDBlock) {
// Update currentIPLDBlock if it exists and is of same kind.
let ipldBlock;
if (currentIPLDBlock && currentIPLDBlock.kind === kind) {
ipldBlock = currentIPLDBlock;
// Update the data field.
const oldData = codec.decode(Buffer.from(currentIPLDBlock.data));
data = _.merge(oldData, data);
} else {
ipldBlock = new IPLDBlock();
// Fetch the parent IPLDBlock.
const parentIPLDBlock = await this.getPrevIPLDBlock(block.blockHash, contractAddress);
@ -363,7 +386,7 @@ export class Indexer {
// Calculating the CID: v1, code: dag-json, hash.
const cid = CID.create(1, codec.code, hash);
let ipldBlock = currentIPLDBlock || new IPLDBlock();
// Update ipldBlock with new data.
ipldBlock = Object.assign(ipldBlock, {
block,
contractAddress,
@ -432,6 +455,40 @@ export class Indexer {
return true;
}
async getHooksStatus (): Promise<HooksStatus | undefined> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.getHooksStatus(dbTx);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async updateHooksStatusProcessedBlock (blockNumber: number): Promise<HooksStatus> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.updateHooksStatusProcessedBlock(dbTx, blockNumber);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async getEventsByFilter (blockHash: string, contract?: string, name?: string): Promise<Array<Event>> {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}

View File

@ -72,18 +72,28 @@ export class JobRunner {
});
}
async subscribeBlockCheckpointQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => {
await this._indexer.processCheckpoint(job);
async subscribeHooksQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
const { data: { blockNumber } } = job;
const hooksStatus = await this._indexer.getHooksStatus();
if (hooksStatus && hooksStatus.latestProcessedBlockNumber !== blockNumber - 1) {
const message = `Hooks for blockNumber ${blockNumber - 1} not processed yet, aborting`;
log(message);
throw new Error(message);
}
await this._indexer.processBlock(job);
await this._jobQueue.markComplete(job);
});
}
// TODO: Make sure the hooks run in order.
async subscribeHooksQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
await this._indexer.processBlock(job);
async subscribeBlockCheckpointQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => {
await this._indexer.processCheckpoint(job);
await this._jobQueue.markComplete(job);
});

View File

@ -10,7 +10,8 @@
"server": "DEBUG=vulcanize:* ts-node src/server.ts",
"job-runner": "DEBUG=vulcanize:* ts-node src/job-runner.ts",
"watch:contract": "DEBUG=vulcanize:* ts-node src/cli/watch-contract.ts",
"fill": "DEBUG=vulcanize:* ts-node src/fill.ts"
"fill": "DEBUG=vulcanize:* ts-node src/fill.ts",
"checkpoint": "DEBUG=vulcanize:* ts-node src/cli/checkpoint.ts"
},
"repository": {
"type": "git",

View File

@ -39,6 +39,8 @@
* Update the `upstream` config in the [config file](./environments/local.toml) and provide the `ipld-eth-server` GQL API and the `indexer-db` postgraphile endpoints.
* Update the [config](./environments/local.toml) with derived state checkpoint settings.
## Customize
* Indexing on an event:
@ -47,6 +49,8 @@
* Edit the custom hook function `handleBlock` (triggered on a block) in [hooks.ts](./src/hooks.ts) to save `IPLDBlock`s using the `Indexer` object.
* Edit the custom hook function `genesisHook` (triggered on watch-contract) in [hooks.ts](./src/hooks.ts) to save a genesis checkpoint `IPLDBlock` using the `Indexer` object.
* The existing example hooks in [hooks.ts](./src/hooks.ts) are for an `ERC20` contract.
## Run
@ -76,5 +80,11 @@ GQL console: http://localhost:3008/graphql
* To fill a block range:
```bash
yarn fill --startBlock <from-block> --endBlock <to-block>
yarn fill --start-block <from-block> --end-block <to-block>
```
* To create a checkpoint for a contract:
```bash
yarn checkpoint --address <contract-address> --block-hash [block-hash]
```

View File

@ -84,10 +84,10 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
return ipldBlock && ipldBlock.block.isComplete ? indexer.getResultIPLDBlock(ipldBlock) : undefined;
},
getState: async (_: any, { blockHash, contractAddress }: { blockHash: string, contractAddress: string }) => {
log('getState', blockHash, contractAddress);
getState: async (_: any, { blockHash, contractAddress, kind = 'diff' }: { blockHash: string, contractAddress: string, kind: string }) => {
log('getState', blockHash, contractAddress, kind);
const ipldBlock = await indexer.getPrevIPLDBlock(blockHash, contractAddress);
const ipldBlock = await indexer.getPrevIPLDBlock(blockHash, contractAddress, kind);
return ipldBlock && ipldBlock.block.isComplete ? indexer.getResultIPLDBlock(ipldBlock) : undefined;
}

View File

@ -5,6 +5,7 @@
import assert from 'assert';
import yargs from 'yargs';
import 'reflect-metadata';
import debug from 'debug';
import { getDefaultProvider } from 'ethers';
import { Config, DEFAULT_CONFIG_PATH, getConfig } from '@vulcanize/util';
@ -14,7 +15,9 @@ import { EthClient } from '@vulcanize/ipld-eth-client';
import { Database } from '../database';
import { Indexer } from '../indexer';
(async () => {
const log = debug('vulcanize:watch-contract');
const main = async (): Promise<void> => {
const argv = await yargs.parserConfiguration({
'parse-numbers': false
}).options({
@ -23,7 +26,7 @@ import { Indexer } from '../indexer';
type: 'string',
require: true,
demandOption: true,
describe: 'configuration file path (toml)',
describe: 'Configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
},
address: {
@ -79,4 +82,12 @@ import { Indexer } from '../indexer';
await indexer.watchContract(argv.address, argv.kind, argv.startingBlock);
await db.close();
})();
};
main()
.catch(err => {
log(err);
})
.finally(() => {
process.exit(0);
});

View File

@ -11,7 +11,7 @@ const TEMPLATE_FILE = './templates/watch-contract-template.handlebars';
/**
* Writes the watch-contract file generated from a template to a stream.
* @param outStream A writable output stream to write the events file to.
* @param outStream A writable output stream to write the watch-contract file to.
*/
export function exportWatchContract (outStream: Writable): void {
const templateString = fs.readFileSync(path.resolve(__dirname, TEMPLATE_FILE)).toString();

View File

@ -257,9 +257,9 @@ export class Indexer implements IndexerInterface {
await this.triggerIndexingOnEvent(event);
}
async processBlock (blockHash: string): Promise<void> {
async processBlock (job: any): Promise<void> {
// Empty post-block method.
assert(blockHash);
assert(job);
}
parseEventNameAndArgs (kind: string, logObj: any): any {

View File

@ -46,6 +46,7 @@ export class JobRunner {
async start (): Promise<void> {
await this.subscribeBlockProcessingQueue();
await this.subscribeEventProcessingQueue();
await this.subscribeHooksQueue();
}
async subscribeBlockProcessingQueue (): Promise<void> {

View File

@ -48,6 +48,7 @@ export class JobRunner {
async start (): Promise<void> {
await this.subscribeBlockProcessingQueue();
await this.subscribeEventProcessingQueue();
await this.subscribeHooksQueue();
}
async subscribeBlockProcessingQueue (): Promise<void> {

View File

@ -122,9 +122,9 @@ export class Indexer implements IndexerInterface {
}
}
async processBlock (blockHash: string): Promise<void> {
async processBlock (job: any): Promise<void> {
// Empty post-block method.
assert(blockHash);
assert(job);
}
parseEventNameAndArgs (kind: string, logObj: any): any {

View File

@ -46,6 +46,7 @@ export class JobRunner {
async start (): Promise<void> {
await this.subscribeBlockProcessingQueue();
await this.subscribeEventProcessingQueue();
await this.subscribeHooksQueue();
}
async subscribeBlockProcessingQueue (): Promise<void> {

View File

@ -16,7 +16,6 @@ import {
UNKNOWN_EVENT_NAME,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
QUEUE_BLOCK_CHECKPOINT,
QUEUE_HOOKS
} from './constants';
import { JobQueue } from './job-queue';
@ -238,14 +237,8 @@ export class JobRunner {
if (!blockProgress.numEvents) {
// Push post-block hook and checkpointing jobs if there are no events as the block is already marked as complete.
await this._jobQueue.pushJob(QUEUE_HOOKS, { blockHash });
// Push checkpointing job only if checkpointing is on.
if (this._serverConfig.checkpointing) {
await this._jobQueue.pushJob(QUEUE_BLOCK_CHECKPOINT, { blockHash, blockNumber });
await this._jobQueue.pushJob(QUEUE_HOOKS, { blockHash, blockNumber });
}
}
const indexBlockDuration = new Date().getTime() - indexBlockStartTime.getTime();
log(`time:job-runner#_indexBlock: ${indexBlockDuration}ms`);