diff --git a/packages/codegen/README.md b/packages/codegen/README.md index bfdebb6d..cf872532 100644 --- a/packages/codegen/README.md +++ b/packages/codegen/README.md @@ -72,10 +72,18 @@ yarn ``` +* Run the IPFS (go-ipfs version 0.9.0) daemon: + + ```bash + ipfs daemon + ``` + * Create the databases configured in `environments/local.toml`. * Update the state checkpoint settings in `environments/local.toml`. +* Update the IPFS API address in `environments/local.toml`. + ### Customize * Indexing on an event: @@ -88,7 +96,9 @@ * Edit the custom hook function `createInitialCheckpoint` (triggered on watch-contract, checkpoint: `true`) in `src/hooks.ts` to save an initial checkpoint `IPLDBlock` using the `Indexer` object. - * Edit the custom hook function `createStateDiff` (triggered on a block) in `src/hooks.ts` to save the state in an `IPLDBlock` using the `Indexer` object. The default state (if exists) is updated. + * Edit the custom hook function `createStateDiff` (triggered on a block) in `src/hooks.ts` to save the state in a `diff` `IPLDBlock` using the `Indexer` object. The default state (if exists) is updated. + + * Edit the custom hook function `createStateCheckpoint` (triggered just before default and CLI checkpoint) in `src/hooks.ts` to save the state in a `checkpoint` `IPLDBlock` using the `Indexer` object. * The existing example hooks in `src/hooks.ts` are for an `ERC20` contract. @@ -146,6 +156,24 @@ yarn reset job-queue --block-number ``` + * To export the watcher state: + + ```bash + yarn export-state --export-file [export-file-path] + ``` + + * To import the watcher state: + + ```bash + yarn import-state --import-file + ``` + + * To inspect a CID: + + ```bash + yarn inspect-cid --cid + ``` + ## Known Issues * Currently, `node-fetch v2.6.2` is being used to fetch from URLs as `v3.0.0` is an [ESM-only module](https://www.npmjs.com/package/node-fetch#loading-and-configuring-the-module) and `ts-node` transpiles to import it using `require`. diff --git a/packages/codegen/src/data/entities/IPLDBlock.yaml b/packages/codegen/src/data/entities/IPLDBlock.yaml index 1c902561..15bad581 100644 --- a/packages/codegen/src/data/entities/IPLDBlock.yaml +++ b/packages/codegen/src/data/entities/IPLDBlock.yaml @@ -36,8 +36,8 @@ columns: tsType: string columnType: Column - name: data - pgType: text - tsType: string + pgType: bytea + tsType: Buffer columnType: Column imports: - toImport: diff --git a/packages/codegen/src/export-state.ts b/packages/codegen/src/export-state.ts new file mode 100644 index 00000000..5d95f505 --- /dev/null +++ b/packages/codegen/src/export-state.ts @@ -0,0 +1,21 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import fs from 'fs'; +import path from 'path'; +import Handlebars from 'handlebars'; +import { Writable } from 'stream'; + +const TEMPLATE_FILE = './templates/export-state-template.handlebars'; + +/** + * Writes the export-state file generated from a template to a stream. + * @param outStream A writable output stream to write the export-state file to. + */ +export function exportState (outStream: Writable): void { + const templateString = fs.readFileSync(path.resolve(__dirname, TEMPLATE_FILE)).toString(); + const template = Handlebars.compile(templateString); + const exportState = template({}); + outStream.write(exportState); +} diff --git a/packages/codegen/src/generate-code.ts b/packages/codegen/src/generate-code.ts index 5c7e7e82..bfbf6c52 100644 --- a/packages/codegen/src/generate-code.ts +++ b/packages/codegen/src/generate-code.ts @@ -28,6 +28,10 @@ import { registerHandlebarHelpers } from './utils/handlebar-helpers'; import { exportHooks } from './hooks'; import { exportFill } from './fill'; import { exportCheckpoint } from './checkpoint'; +import { exportState } from './export-state'; +import { importState } from './import-state'; +import { exportIPFS } from './ipfs'; +import { exportInspectCID } from './inspect-cid'; const main = async (): Promise => { const argv = await yargs(hideBin(process.argv)) @@ -256,6 +260,26 @@ function generateWatcher (data: string, visitor: Visitor, argv: any) { resetStateOutStream = process.stdout; } visitor.exportReset(resetOutStream, resetJQOutStream, resetStateOutStream); + + outStream = outputDir + ? fs.createWriteStream(path.join(outputDir, 'src/cli/export-state.ts')) + : process.stdout; + exportState(outStream); + + outStream = outputDir + ? fs.createWriteStream(path.join(outputDir, 'src/cli/import-state.ts')) + : process.stdout; + importState(outStream); + + outStream = outputDir + ? fs.createWriteStream(path.join(outputDir, 'src/ipfs.ts')) + : process.stdout; + exportIPFS(outStream); + + outStream = outputDir + ? fs.createWriteStream(path.join(outputDir, 'src/cli/inspect-cid.ts')) + : process.stdout; + exportInspectCID(outStream); } main().catch(err => { diff --git a/packages/codegen/src/import-state.ts b/packages/codegen/src/import-state.ts new file mode 100644 index 00000000..00019e29 --- /dev/null +++ b/packages/codegen/src/import-state.ts @@ -0,0 +1,21 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import fs from 'fs'; +import path from 'path'; +import Handlebars from 'handlebars'; +import { Writable } from 'stream'; + +const TEMPLATE_FILE = './templates/import-state-template.handlebars'; + +/** + * Writes the import-state file generated from a template to a stream. + * @param outStream A writable output stream to write the import-state file to. + */ +export function importState (outStream: Writable): void { + const templateString = fs.readFileSync(path.resolve(__dirname, TEMPLATE_FILE)).toString(); + const template = Handlebars.compile(templateString); + const importState = template({}); + outStream.write(importState); +} diff --git a/packages/codegen/src/inspect-cid.ts b/packages/codegen/src/inspect-cid.ts new file mode 100644 index 00000000..63083cce --- /dev/null +++ b/packages/codegen/src/inspect-cid.ts @@ -0,0 +1,21 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import fs from 'fs'; +import path from 'path'; +import Handlebars from 'handlebars'; +import { Writable } from 'stream'; + +const TEMPLATE_FILE = './templates/inspect-cid-template.handlebars'; + +/** + * Writes the inspect-cid file generated from a template to a stream. + * @param outStream A writable output stream to write the inspect-cid file to. + */ +export function exportInspectCID (outStream: Writable): void { + const templateString = fs.readFileSync(path.resolve(__dirname, TEMPLATE_FILE)).toString(); + const template = Handlebars.compile(templateString); + const inspectCid = template({}); + outStream.write(inspectCid); +} diff --git a/packages/codegen/src/ipfs.ts b/packages/codegen/src/ipfs.ts new file mode 100644 index 00000000..11efedfc --- /dev/null +++ b/packages/codegen/src/ipfs.ts @@ -0,0 +1,21 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import fs from 'fs'; +import path from 'path'; +import Handlebars from 'handlebars'; +import { Writable } from 'stream'; + +const TEMPLATE_FILE = './templates/ipfs-template.handlebars'; + +/** + * Writes the ipfs.ts file generated from a template to a stream. + * @param outStream A writable output stream to write the ipfs.ts file to. + */ +export function exportIPFS (outStream: Writable): void { + const templateString = fs.readFileSync(path.resolve(__dirname, TEMPLATE_FILE)).toString(); + const template = Handlebars.compile(templateString); + const ipfsString = template({}); + outStream.write(ipfsString); +} diff --git a/packages/codegen/src/templates/checkpoint-template.handlebars b/packages/codegen/src/templates/checkpoint-template.handlebars index 17294840..3bb49d97 100644 --- a/packages/codegen/src/templates/checkpoint-template.handlebars +++ b/packages/codegen/src/templates/checkpoint-template.handlebars @@ -72,17 +72,15 @@ const main = async (): Promise => { const ethProvider = getDefaultProvider(rpcProviderEndpoint); const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider); - const blockHash = await indexer.createCheckpoint(argv.address, argv.blockHash); + const blockHash = await indexer.processCLICheckpoint(argv.address, argv.blockHash); log(`Created a checkpoint for contract ${argv.address} at block-hash ${blockHash}`); await db.close(); }; -main() - .catch(err => { - log(err); - }) - .finally(() => { - process.exit(0); - }); +main().catch(err => { + log(err); +}).finally(() => { + process.exit(0); +}); diff --git a/packages/codegen/src/templates/config-template.handlebars b/packages/codegen/src/templates/config-template.handlebars index 44340922..f54b70f2 100644 --- a/packages/codegen/src/templates/config-template.handlebars +++ b/packages/codegen/src/templates/config-template.handlebars @@ -9,6 +9,9 @@ # Checkpoint interval in number of blocks. checkpointInterval = 2000 + # IPFS API address (can be taken from the output on running the IPFS daemon). + ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001" + [database] type = "postgres" host = "localhost" diff --git a/packages/codegen/src/templates/database-template.handlebars b/packages/codegen/src/templates/database-template.handlebars index c2c1ae35..26db6bd8 100644 --- a/packages/codegen/src/templates/database-template.handlebars +++ b/packages/codegen/src/templates/database-template.handlebars @@ -6,7 +6,7 @@ import assert from 'assert'; import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions, MoreThan } from 'typeorm'; import path from 'path'; -import { Database as BaseDatabase, QueryOptions, Where, MAX_REORG_DEPTH } from '@vulcanize/util'; +import { Database as BaseDatabase, QueryOptions, Where, MAX_REORG_DEPTH, DatabaseInterface } from '@vulcanize/util'; import { Contract } from './entity/Contract'; import { Event } from './entity/Event'; @@ -19,7 +19,7 @@ import { IPLDBlock } from './entity/IPLDBlock'; import { {{query.entityName}} } from './entity/{{query.entityName}}'; {{/each}} -export class Database { +export class Database implements DatabaseInterface { _config: ConnectionOptions; _conn!: Connection; _baseDatabase: BaseDatabase; @@ -83,7 +83,7 @@ export class Database { return repo.find({ where, relations: ['block'] }); } - async getLastIPLDBlock (contractAddress: string, kind?: string): Promise { + async getLatestIPLDBlock (contractAddress: string, kind: string | null, blockNumber?: number): Promise { const repo = this._conn.getRepository(IPLDBlock); let queryBuilder = repo.createQueryBuilder('ipld_block') @@ -92,6 +92,11 @@ export class Database { .andWhere('ipld_block.contract_address = :contractAddress', { contractAddress }) .orderBy('block.block_number', 'DESC'); + // Filter out blocks after the provided block number. + if (blockNumber) { + queryBuilder.andWhere('block.block_number <= :blockNumber', { blockNumber }); + } + // Filter using kind if specified else order by id to give preference to checkpoint. queryBuilder = kind ? queryBuilder.andWhere('ipld_block.kind = :kind', { kind }) @@ -177,7 +182,8 @@ export class Database { return result; } - async getPrevIPLDBlocksAfterCheckpoint (contractAddress: string, checkpointBlockNumber: number): Promise { + // Fetch all diff IPLDBlocks after the specified checkpoint. + async getDiffIPLDBlocksByCheckpoint (contractAddress: string, checkpointBlockNumber: number): Promise { const repo = this._conn.getRepository(IPLDBlock); return repo.find({ @@ -207,7 +213,7 @@ export class Database { return repo.findOne(); } - async updateHookStatusProcessedBlock (queryRunner: QueryRunner, blockNumber: number): Promise { + async updateHookStatusProcessedBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise { const repo = queryRunner.manager.getRepository(HookStatus); let entity = await repo.findOne(); @@ -217,7 +223,7 @@ export class Database { }); } - if (blockNumber > entity.latestProcessedBlockNumber) { + if (force || blockNumber > entity.latestProcessedBlockNumber) { entity.latestProcessedBlockNumber = blockNumber; } @@ -229,11 +235,6 @@ export class Database { return repo.find({ where }); } - async getLastCompleteBlock (): Promise { - const repo = this._conn.getRepository(BlockProgress); - return repo.findOne({ where: { isComplete: true }, order: { blockNumber: 'DESC' } }); - } - async getContract (address: string): Promise { const repo = this._conn.getRepository(Contract); diff --git a/packages/codegen/src/templates/events-template.handlebars b/packages/codegen/src/templates/events-template.handlebars index a28b3ac0..28ffb99d 100644 --- a/packages/codegen/src/templates/events-template.handlebars +++ b/packages/codegen/src/templates/events-template.handlebars @@ -10,10 +10,12 @@ import { EthClient } from '@vulcanize/ipld-eth-client'; import { JobQueue, EventWatcher as BaseEventWatcher, + EventWatcherInterface, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_BLOCK_CHECKPOINT, QUEUE_HOOKS, + QUEUE_IPFS, UNKNOWN_EVENT_NAME, UpstreamConfig JOB_KIND_PRUNE @@ -26,7 +28,7 @@ const EVENT = 'event'; const log = debug('vulcanize:events'); -export class EventWatcher { +export class EventWatcher implements EventWatcherInterface { _ethClient: EthClient _indexer: Indexer _subscription: ZenObservable.Subscription | undefined @@ -59,6 +61,7 @@ export class EventWatcher { await this.initBlockProcessingOnCompleteHandler(); await this.initEventProcessingOnCompleteHandler(); await this.initHooksOnCompleteHandler(); + await this.initBlockCheckpointOnCompleteHandler(); this._baseEventWatcher.startBlockProcessing(); } @@ -68,7 +71,7 @@ export class EventWatcher { async initBlockProcessingOnCompleteHandler (): Promise { this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => { - const { id, data: { failed } } = job; + const { id, data: { failed, request: { data: { kind } } } } = job; if (failed) { log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`); @@ -77,22 +80,7 @@ export class EventWatcher { await this._baseEventWatcher.blockProcessingCompleteHandler(job); - const { data: { request: { data: { kind } } } } = job; - - // If it's a pruning job: - // Create a hook job for the latest canonical block. - if (kind === JOB_KIND_PRUNE) { - const syncStatus = await this._indexer.getSyncStatus(); - assert(syncStatus); - - this._jobQueue.pushJob( - QUEUE_HOOKS, - { - blockHash: syncStatus.latestCanonicalBlockHash, - blockNumber: syncStatus.latestCanonicalBlockNumber - } - ); - } + await this.createHooksJob(kind); }); } @@ -133,13 +121,17 @@ export class EventWatcher { await this._indexer.updateHookStatusProcessedBlock(blockNumber); // Create a checkpoint job after completion of a hook job. - this._jobQueue.pushJob( - QUEUE_BLOCK_CHECKPOINT, - { - blockHash, - blockNumber - } - ); + await this.createCheckpointJob(blockHash, blockNumber); + }); + } + + async initBlockCheckpointOnCompleteHandler (): Promise { + this._jobQueue.onComplete(QUEUE_BLOCK_CHECKPOINT, async (job) => { + const { data: { request: { data: { blockHash } } } } = job; + + if (this._indexer.isIPFSConfigured()) { + await this.createIPFSPutJob(blockHash); + } }); } @@ -155,4 +147,40 @@ export class EventWatcher { }); } } + + async createHooksJob (kind: string): Promise { + // If it's a pruning job: Create a hook job for the latest canonical block. + if (kind === JOB_KIND_PRUNE) { + const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock(); + assert(latestCanonicalBlock); + + await this._jobQueue.pushJob( + QUEUE_HOOKS, + { + blockHash: latestCanonicalBlock.blockHash, + blockNumber: latestCanonicalBlock.blockNumber + } + ); + } + } + + async createCheckpointJob (blockHash: string, blockNumber: number): Promise { + await this._jobQueue.pushJob( + QUEUE_BLOCK_CHECKPOINT, + { + blockHash, + blockNumber + } + ); + } + + async createIPFSPutJob (blockHash: string): Promise { + const ipldBlocks = await this._indexer.getIPLDBlocksByHash(blockHash); + + for (const ipldBlock of ipldBlocks) { + const data = this._indexer.getIPLDData(ipldBlock); + + await this._jobQueue.pushJob(QUEUE_IPFS, { data }); + } + } } diff --git a/packages/codegen/src/templates/export-state-template.handlebars b/packages/codegen/src/templates/export-state-template.handlebars new file mode 100644 index 00000000..070103d2 --- /dev/null +++ b/packages/codegen/src/templates/export-state-template.handlebars @@ -0,0 +1,137 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import assert from 'assert'; +import yargs from 'yargs'; +import 'reflect-metadata'; +import debug from 'debug'; +import { getDefaultProvider } from 'ethers'; +import fs from 'fs'; +import path from 'path'; + +import { Config, DEFAULT_CONFIG_PATH, getConfig } from '@vulcanize/util'; +import { getCache } from '@vulcanize/cache'; +import { EthClient } from '@vulcanize/ipld-eth-client'; +import * as codec from '@ipld/dag-cbor'; + +import { Database } from '../database'; +import { Indexer } from '../indexer'; + +const log = debug('vulcanize:export-state'); + +const main = async (): Promise => { + const argv = await yargs.parserConfiguration({ + 'parse-numbers': false + }).options({ + configFile: { + alias: 'f', + type: 'string', + require: true, + demandOption: true, + describe: 'Configuration file path (toml)', + default: DEFAULT_CONFIG_PATH + }, + exportFile: { + alias: 'o', + type: 'string', + describe: 'Export file path' + } + }).argv; + + const config: Config = await getConfig(argv.configFile); + + const { upstream, database: dbConfig, server: serverConfig } = config; + + assert(upstream, 'Missing upstream config'); + assert(dbConfig, 'Missing database config'); + assert(serverConfig, 'Missing server config'); + + const db = new Database(dbConfig); + await db.init(); + + const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; + assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); + assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); + + const cache = await getCache(cacheConfig); + + const ethClient = new EthClient({ + gqlEndpoint: gqlApiEndpoint, + gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, + cache + }); + + const postgraphileClient = new EthClient({ + gqlEndpoint: gqlPostgraphileEndpoint, + cache + }); + + const ethProvider = getDefaultProvider(rpcProviderEndpoint); + + const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider); + + const exportData: any = { + snapshotBlock: {}, + contracts: [], + ipldCheckpoints: [] + }; + + const contracts = await db.getContracts({}); + + // Get latest canonical block. + const block = await indexer.getLatestCanonicalBlock(); + assert(block); + + // Export snapshot block. + exportData.snapshotBlock = { + blockNumber: block.blockNumber, + blockHash: block.blockHash + }; + + // Export contracts and checkpoints. + for (const contract of contracts) { + exportData.contracts.push({ + address: contract.address, + kind: contract.kind, + checkpoint: contract.checkpoint, + startingBlock: block.blockNumber + }); + + // Create and export checkpoint if checkpointing is on for the contract. + if (contract.checkpoint) { + await indexer.createCheckpoint(contract.address, block.blockHash); + + const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, 'checkpoint', block.blockNumber); + assert(ipldBlock); + + const data = codec.decode(Buffer.from(ipldBlock.data)) as any; + + exportData.ipldCheckpoints.push({ + contractAddress: ipldBlock.contractAddress, + cid: ipldBlock.cid, + kind: ipldBlock.kind, + data + }); + } + } + + if (argv.exportFile) { + const encodedExportData = codec.encode(exportData); + + const filePath = path.resolve(argv.exportFile); + const fileDir = path.dirname(filePath); + + if (!fs.existsSync(fileDir)) fs.mkdirSync(fileDir, { recursive: true }); + + fs.writeFileSync(filePath, encodedExportData); + } else { + log(exportData); + } +}; + +main().catch(err => { + log(err); +}).finally(() => { + process.exit(0); +}); diff --git a/packages/codegen/src/templates/hooks-template.handlebars b/packages/codegen/src/templates/hooks-template.handlebars index 0b7d6d50..f13ffb91 100644 --- a/packages/codegen/src/templates/hooks-template.handlebars +++ b/packages/codegen/src/templates/hooks-template.handlebars @@ -9,7 +9,8 @@ import { UNKNOWN_EVENT_NAME, updateStateForMappingType, updateStateForElementary import { Indexer, ResultEvent } from './indexer'; const ACCOUNTS = [ - '0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc' + '0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc', + '0xCA6D29232D1435D8198E3E5302495417dD073d61' ]; /** @@ -36,7 +37,7 @@ export async function createInitialCheckpoint (indexer: Indexer, contractAddress } /** - * Hook function to create and store state diffs. + * Hook function to create state diffs. * @param indexer Indexer instance that contains methods to fetch the contract varaiable values. * @param blockHash Block hash of the concerned block. */ @@ -103,6 +104,31 @@ export async function createStateDiff (indexer: Indexer, blockHash: string): Pro } } +/** + * Hook function to create state checkpoint + * @param indexer Indexer instance. + * @param contractAddress Address of the concerned contract. + * @param blockHash Block hash of the concerned block. + * @returns Whether to disable default checkpoint. If false, the state from this hook is updated with that from default checkpoint. + */ +export async function createStateCheckpoint (indexer: Indexer, contractAddress: string, blockHash: string): Promise { + assert(indexer); + assert(blockHash); + assert(contractAddress); + + let ipldBlockData: any = {}; + + // Setting the balances of accounts. + for (const account of ACCOUNTS) { + const balance = await indexer._balances(blockHash, contractAddress, account); + ipldBlockData = updateStateForMappingType(ipldBlockData, '_balances', [account], balance.value.toString()); + } + + await indexer.createCheckpoint(contractAddress, blockHash, ipldBlockData); + + return false; +} + /** * Event hook function. * @param indexer Indexer instance that contains methods to fetch and update the contract values in the database. diff --git a/packages/codegen/src/templates/import-state-template.handlebars b/packages/codegen/src/templates/import-state-template.handlebars new file mode 100644 index 00000000..6127f610 --- /dev/null +++ b/packages/codegen/src/templates/import-state-template.handlebars @@ -0,0 +1,134 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import assert from 'assert'; +import 'reflect-metadata'; +import yargs from 'yargs'; +import { hideBin } from 'yargs/helpers'; +import debug from 'debug'; +import { PubSub } from 'apollo-server-express'; +import { getDefaultProvider } from 'ethers'; +import fs from 'fs'; +import path from 'path'; + +import { getCache } from '@vulcanize/cache'; +import { EthClient } from '@vulcanize/ipld-eth-client'; +import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH } from '@vulcanize/util'; +import * as codec from '@ipld/dag-cbor'; + +import { Database } from '../database'; +import { Indexer } from '../indexer'; +import { EventWatcher } from '../events'; +import { IPLDBlock } from '../entity/IPLDBlock'; + +const log = debug('vulcanize:import-state'); + +export const main = async (): Promise => { + const argv = await yargs(hideBin(process.argv)).parserConfiguration({ + 'parse-numbers': false + }).options({ + configFile: { + alias: 'f', + type: 'string', + demandOption: true, + describe: 'configuration file path (toml)', + default: DEFAULT_CONFIG_PATH + }, + importFile: { + alias: 'i', + type: 'string', + demandOption: true, + describe: 'Import file path (JSON)' + } + }).argv; + + const config = await getConfig(argv.configFile); + + const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config; + + assert(upstream, 'Missing upstream config'); + assert(dbConfig, 'Missing database config'); + assert(serverConfig, 'Missing server config'); + + const db = new Database(dbConfig); + await db.init(); + + assert(upstream, 'Missing upstream config'); + const { ethServer: { gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; + assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); + + const cache = await getCache(cacheConfig); + + const ethClient = new EthClient({ + gqlEndpoint: gqlPostgraphileEndpoint, + gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, + cache + }); + + const postgraphileClient = new EthClient({ + gqlEndpoint: gqlPostgraphileEndpoint, + cache + }); + + const ethProvider = getDefaultProvider(rpcProviderEndpoint); + + // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. + // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries + const pubsub = new PubSub(); + const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); + + assert(jobQueueConfig, 'Missing job queue config'); + + // Import data. + const importFilePath = path.resolve(argv.importFile); + const encodedImportData = fs.readFileSync(importFilePath); + const importData = codec.decode(Buffer.from(encodedImportData)) as any; + + // Fill the snapshot block. + await fillBlocks( + jobQueue, + indexer, + ethClient, + eventWatcher, + { + startBlock: importData.snapshotBlock.blockNumber, + endBlock: importData.snapshotBlock.blockNumber + } + ); + + // Fill the Contracts. + for (const contract of importData.contracts) { + await db.saveContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock); + } + + // Get the snapshot block. + const block = await indexer.getBlockProgress(importData.snapshotBlock.blockHash); + assert(block); + + // Fill the IPLDBlocks. + for (const checkpoint of importData.ipldCheckpoints) { + let ipldBlock = new IPLDBlock(); + + ipldBlock = Object.assign(ipldBlock, checkpoint); + ipldBlock.block = block; + + ipldBlock.data = Buffer.from(codec.encode(ipldBlock.data)); + + await db.saveOrUpdateIPLDBlock(ipldBlock); + } +}; + +main().catch(err => { + log(err); +}).finally(() => { + process.exit(0); +}); diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 17a28c3e..9500c84d 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -13,10 +13,10 @@ import _ from 'lodash'; import { JsonFragment } from '@ethersproject/abi'; import { BaseProvider } from '@ethersproject/providers'; -import * as codec from '@ipld/dag-json'; +import * as codec from '@ipld/dag-cbor'; import { EthClient } from '@vulcanize/ipld-eth-client'; import { StorageLayout } from '@vulcanize/solidity-mapper'; -import { Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, ServerConfig, Where, QueryOptions, updateStateForElementaryType, updateStateForMappingType } from '@vulcanize/util'; +import { Indexer as BaseIndexer, IndexerInterface, ValueResult, UNKNOWN_EVENT_NAME, ServerConfig, Where, QueryOptions, updateStateForElementaryType, updateStateForMappingType } from '@vulcanize/util'; import { Database } from './database'; import { Contract } from './entity/Contract'; @@ -26,7 +26,8 @@ import { HookStatus } from './entity/HookStatus'; import { BlockProgress } from './entity/BlockProgress'; import { IPLDBlock } from './entity/IPLDBlock'; import artifacts from './artifacts/{{inputFileName}}.json'; -import { createInitialCheckpoint, handleEvent, createStateDiff } from './hooks'; +import { createInitialCheckpoint, handleEvent, createStateDiff, createStateCheckpoint } from './hooks'; +import { IPFSClient } from './ipfs'; const log = debug('vulcanize:indexer'); @@ -71,7 +72,7 @@ export type ResultIPLDBlock = { data: string; }; -export class Indexer { +export class Indexer implements IndexerInterface { _db: Database _ethClient: EthClient _ethProvider: BaseProvider @@ -83,6 +84,8 @@ export class Indexer { _storageLayout: StorageLayout _contract: ethers.utils.Interface + _ipfsClient: IPFSClient + constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider) { assert(db); assert(ethClient); @@ -102,6 +105,8 @@ export class Indexer { this._storageLayout = storageLayout; this._contract = new ethers.utils.Interface(this._abi); + + this._ipfsClient = new IPFSClient(this._serverConfig.ipfsApiAddr); } getResultEvent (event: Event): ResultEvent { @@ -141,6 +146,8 @@ export class Indexer { getResultIPLDBlock (ipldBlock: IPLDBlock): ResultIPLDBlock { const block = ipldBlock.block; + const data = codec.decode(Buffer.from(ipldBlock.data)) as any; + return { block: { cid: block.cid, @@ -152,7 +159,7 @@ export class Indexer { contractAddress: ipldBlock.contractAddress, cid: ipldBlock.cid, kind: ipldBlock.kind, - data: ipldBlock.data + data: JSON.stringify(data) }; } @@ -277,9 +284,10 @@ export class Indexer { assert(block); // Fetch the latest checkpoint for the contract. - const checkpoint = await this.getLastIPLDBlock(contractAddress, 'checkpoint'); + const checkpoint = await this.getLatestIPLDBlock(contractAddress, 'checkpoint'); // There should be an initial checkpoint at least. + // Assumption: There should be no events for the contract at the starting block. assert(checkpoint, 'Initial checkpoint doesn\'t exist'); // Check if the latest checkpoint is in the same block. @@ -304,19 +312,38 @@ export class Indexer { // Check if contract has checkpointing on. if (contract.checkpoint) { // If a checkpoint doesn't already exist and blockNumber is equal to startingBlock, create an initial checkpoint. - const checkpointBlock = await this.getLastIPLDBlock(contract.address, 'checkpoint'); + const checkpointBlock = await this.getLatestIPLDBlock(contract.address, 'checkpoint'); if (!checkpointBlock) { if (blockNumber === contract.startingBlock) { + // Call initial checkpoint hook. await createInitialCheckpoint(this, contract.address, blockHash); } } else { - await this.createCheckpoint(contract.address, blockHash, undefined, checkpointInterval); + await this.createCheckpoint(contract.address, blockHash, null, checkpointInterval); } } } } + async processCLICheckpoint (contractAddress: string, blockHash?: string): Promise { + const checkpointBlockHash = await this.createCheckpoint(contractAddress, blockHash); + assert(checkpointBlockHash); + + const block = await this.getBlockProgress(checkpointBlockHash); + const checkpointIPLDBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind: 'checkpoint' }); + + // There can be at most one IPLDBlock for a (block, contractAddress, kind) combination. + assert(checkpointIPLDBlocks.length <= 1); + const checkpointIPLDBlock = checkpointIPLDBlocks[0]; + + const checkpointData = this.getIPLDData(checkpointIPLDBlock); + + await this.pushToIPFS(checkpointData); + + return checkpointBlockHash; + } + async createCheckpoint (contractAddress: string, blockHash?: string, data?: any, checkpointInterval?: number): Promise { const syncStatus = await this.getSyncStatus(); assert(syncStatus); @@ -333,8 +360,8 @@ export class Indexer { assert(currentBlock); - // Data is passed in case of initial checkpoint. - // Assuming there will be no events for the contract in this block. + // Data is passed in case of initial checkpoint and checkpoint hook. + // Assumption: There should be no events for the contract at the starting block. if (data) { const ipldBlock = await this.prepareIPLDBlock(currentBlock, contractAddress, data, 'checkpoint'); await this.saveOrUpdateIPLDBlock(ipldBlock); @@ -351,7 +378,7 @@ export class Indexer { assert(currentBlock.blockNumber <= syncStatus.latestCanonicalBlockNumber, 'Block for a checkpoint should be in the pruned region'); // Fetch the latest checkpoint for the contract. - const checkpointBlock = await this.getLastIPLDBlock(contractAddress, 'checkpoint'); + const checkpointBlock = await this.getLatestIPLDBlock(contractAddress, 'checkpoint', currentBlock.blockNumber); assert(checkpointBlock); // Check (only if checkpointInterval is passed) if it is time for a new checkpoint. @@ -359,16 +386,28 @@ export class Indexer { return; } + // Call state checkpoint hook and check if default checkpoint is disabled. + const disableDefaultCheckpoint = await createStateCheckpoint(this, contractAddress, currentBlock.blockHash); + + if (disableDefaultCheckpoint) { + // Return if default checkpoint is disabled. + // Return block hash for checkpoint CLI. + return currentBlock.blockHash; + } + const { block: { blockNumber: checkpointBlockNumber } } = checkpointBlock; // Fetching all diff blocks after checkpoint. - const diffBlocks = await this.getPrevIPLDBlocksAfterCheckpoint(contractAddress, checkpointBlockNumber); + const diffBlocks = await this.getDiffIPLDBlocksByCheckpoint(contractAddress, checkpointBlockNumber); - data = codec.decode(Buffer.from(checkpointBlock.data)) as any; + const checkpointBlockData = codec.decode(Buffer.from(checkpointBlock.data)) as any; + data = { + state: checkpointBlockData.state + }; for (const diffBlock of diffBlocks) { - const diff = codec.decode(Buffer.from(diffBlock.data)); - data = _.merge(data, diff); + const diff = codec.decode(Buffer.from(diffBlock.data)) as any; + data.state = _.merge(data.state, diff.state); } const ipldBlock = await this.prepareIPLDBlock(currentBlock, contractAddress, data, 'checkpoint'); @@ -377,6 +416,17 @@ export class Indexer { return currentBlock.blockHash; } + getIPLDData (ipldBlock: IPLDBlock): any { + return codec.decode(Buffer.from(ipldBlock.data)); + } + + async getIPLDBlocksByHash (blockHash: string): Promise { + const block = await this.getBlockProgress(blockHash); + assert(block); + + return this._db.getIPLDBlocks({ block }); + } + async getIPLDBlockByCid (cid: string): Promise { const ipldBlocks = await this._db.getIPLDBlocks({ cid }); @@ -386,8 +436,8 @@ export class Indexer { return ipldBlocks[0]; } - async getLastIPLDBlock (contractAddress: string, kind?: string): Promise { - return this._db.getLastIPLDBlock(contractAddress, kind); + async getLatestIPLDBlock (contractAddress: string, kind: string | null, blockNumber?: number): Promise { + return this._db.getLatestIPLDBlock(contractAddress, kind, blockNumber); } async getPrevIPLDBlock (blockHash: string, contractAddress: string, kind?: string): Promise { @@ -406,20 +456,17 @@ export class Indexer { return res; } - async getPrevIPLDBlocksAfterCheckpoint (contractAddress: string, checkpointBlockNumber: number): Promise { - return this._db.getPrevIPLDBlocksAfterCheckpoint(contractAddress, checkpointBlockNumber); + async getDiffIPLDBlocksByCheckpoint (contractAddress: string, checkpointBlockNumber: number): Promise { + return this._db.getDiffIPLDBlocksByCheckpoint(contractAddress, checkpointBlockNumber); } async prepareIPLDBlock (block: BlockProgress, contractAddress: string, data: any, kind: string):Promise { assert(_.includes(['diff', 'checkpoint', 'diff_staged'], kind)); - // Get an existing 'diff' | 'diff_staged' IPLDBlock for current block, contractAddress. - let currentIPLDBlocks: IPLDBlock[] = []; - if (kind !== 'checkpoint') { - currentIPLDBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind }); - } + // Get an existing 'diff' | 'diff_staged' | 'checkpoint' IPLDBlock for current block, contractAddress. + const currentIPLDBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind }); - // There can be only one IPLDBlock for a (block, contractAddress, kind) combination. + // There can be at most one IPLDBlock for a (block, contractAddress, kind) combination. assert(currentIPLDBlocks.length <= 1); const currentIPLDBlock = currentIPLDBlocks[0]; @@ -435,7 +482,7 @@ export class Indexer { ipldBlock = new IPLDBlock(); // Fetch the parent IPLDBlock. - const parentIPLDBlock = await this.getLastIPLDBlock(contractAddress); + const parentIPLDBlock = await this.getLatestIPLDBlock(contractAddress, null, block.blockNumber); // Setting the meta-data for an IPLDBlock (done only once per block). data.meta = { @@ -453,13 +500,13 @@ export class Indexer { }; } - // Encoding the data using dag-json codec. + // Encoding the data using dag-cbor codec. const bytes = codec.encode(data); // Calculating sha256 (multi)hash of the encoded data. const hash = await sha256.digest(bytes); - // Calculating the CID: v1, code: dag-json, hash. + // Calculating the CID: v1, code: dag-cbor, hash. const cid = CID.create(1, codec.code, hash); // Update ipldBlock with new data. @@ -468,7 +515,7 @@ export class Indexer { contractAddress, cid: cid.toString(), kind: data.meta.kind, - data: bytes + data: Buffer.from(bytes) }); return ipldBlock; @@ -492,6 +539,17 @@ export class Indexer { } } + async pushToIPFS (data: any): Promise { + await this._ipfsClient.push(data); + } + + isIPFSConfigured (): boolean { + const ipfsAddr = this._serverConfig.ipfsApiAddr; + + // Return false if ipfsAddr is undefined | null | empty string. + return (ipfsAddr !== undefined && ipfsAddr !== null && ipfsAddr !== ''); + } + async triggerIndexingOnEvent (event: Event): Promise { const resultEvent = this.getResultEvent(event); @@ -570,12 +628,12 @@ export class Indexer { return res; } - async updateHookStatusProcessedBlock (blockNumber: number): Promise { + async updateHookStatusProcessedBlock (blockNumber: number, force?: boolean): Promise { const dbTx = await this._db.createTransactionRunner(); let res; try { - res = await this._db.updateHookStatusProcessedBlock(dbTx, blockNumber); + res = await this._db.updateHookStatusProcessedBlock(dbTx, blockNumber, force); await dbTx.commitTransaction(); } catch (error) { await dbTx.rollbackTransaction(); @@ -587,6 +645,13 @@ export class Indexer { return res; } + async getLatestCanonicalBlock (): Promise { + const syncStatus = await this.getSyncStatus(); + assert(syncStatus); + + return this.getBlockProgress(syncStatus.latestCanonicalBlockHash); + } + async getEventsByFilter (blockHash: string, contract?: string, name?: string): Promise> { return this._baseIndexer.getEventsByFilter(blockHash, contract, name); } diff --git a/packages/codegen/src/templates/inspect-cid-template.handlebars b/packages/codegen/src/templates/inspect-cid-template.handlebars new file mode 100644 index 00000000..4ac727c7 --- /dev/null +++ b/packages/codegen/src/templates/inspect-cid-template.handlebars @@ -0,0 +1,85 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import assert from 'assert'; +import yargs from 'yargs'; +import 'reflect-metadata'; +import debug from 'debug'; +import { getDefaultProvider } from 'ethers'; +import util from 'util'; + +import { Config, DEFAULT_CONFIG_PATH, getConfig } from '@vulcanize/util'; +import { getCache } from '@vulcanize/cache'; +import { EthClient } from '@vulcanize/ipld-eth-client'; + +import { Database } from '../database'; +import { Indexer } from '../indexer'; + +const log = debug('vulcanize:inspect-cid'); + +const main = async (): Promise => { + const argv = await yargs.parserConfiguration({ + 'parse-numbers': false + }).options({ + configFile: { + alias: 'f', + type: 'string', + require: true, + demandOption: true, + describe: 'Configuration file path (toml)', + default: DEFAULT_CONFIG_PATH + }, + cid: { + alias: 'c', + type: 'string', + demandOption: true, + describe: 'CID to be inspected' + } + }).argv; + + const config: Config = await getConfig(argv.configFile); + + const { upstream, database: dbConfig, server: serverConfig } = config; + + assert(upstream, 'Missing upstream config'); + assert(dbConfig, 'Missing database config'); + assert(serverConfig, 'Missing server config'); + + const db = new Database(dbConfig); + await db.init(); + + const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; + assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); + assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); + + const cache = await getCache(cacheConfig); + + const ethClient = new EthClient({ + gqlEndpoint: gqlApiEndpoint, + gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, + cache + }); + + const postgraphileClient = new EthClient({ + gqlEndpoint: gqlPostgraphileEndpoint, + cache + }); + + const ethProvider = getDefaultProvider(rpcProviderEndpoint); + + const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider); + + const ipldBlock = await indexer.getIPLDBlockByCid(argv.cid); + assert(ipldBlock, 'IPLDBlock for the provided CID doesn\'t exist.'); + + const ipldData = await indexer.getIPLDData(ipldBlock); + + log(util.inspect(ipldData, false, null)); +}; + +main().catch(err => { + log(err); +}).finally(() => { + process.exit(0); +}); diff --git a/packages/codegen/src/templates/ipfs-template.handlebars b/packages/codegen/src/templates/ipfs-template.handlebars new file mode 100644 index 00000000..3c92443d --- /dev/null +++ b/packages/codegen/src/templates/ipfs-template.handlebars @@ -0,0 +1,17 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import { create, IPFSHTTPClient } from 'ipfs-http-client'; + +export class IPFSClient { + _client: IPFSHTTPClient + + constructor (url: string) { + this._client = create({ url }); + } + + async push (data: any): Promise { + await this._client.dag.put(data, { format: 'dag-cbor', hashAlg: 'sha2-256' }); + } +} diff --git a/packages/codegen/src/templates/job-runner-template.handlebars b/packages/codegen/src/templates/job-runner-template.handlebars index 8db8f5ce..ed87d6f8 100644 --- a/packages/codegen/src/templates/job-runner-template.handlebars +++ b/packages/codegen/src/templates/job-runner-template.handlebars @@ -18,6 +18,7 @@ import { QUEUE_EVENT_PROCESSING, QUEUE_BLOCK_CHECKPOINT, QUEUE_HOOKS, + QUEUE_IPFS, JobQueueConfig, DEFAULT_CONFIG_PATH, getCustomProvider @@ -46,6 +47,7 @@ export class JobRunner { await this.subscribeEventProcessingQueue(); await this.subscribeBlockCheckpointQueue(); await this.subscribeHooksQueue(); + await this.subscribeIPFSQueue(); } async subscribeBlockProcessingQueue (): Promise { @@ -98,6 +100,16 @@ export class JobRunner { await this._jobQueue.markComplete(job); }); } + + async subscribeIPFSQueue (): Promise { + await this._jobQueue.subscribe(QUEUE_IPFS, async (job) => { + const { data: { data } } = job; + + await this._indexer.pushToIPFS(data); + + await this._jobQueue.markComplete(job); + }); + } } export const main = async (): Promise => { diff --git a/packages/codegen/src/templates/package-template.handlebars b/packages/codegen/src/templates/package-template.handlebars index c003389d..799d35dc 100644 --- a/packages/codegen/src/templates/package-template.handlebars +++ b/packages/codegen/src/templates/package-template.handlebars @@ -12,7 +12,10 @@ "watch:contract": "DEBUG=vulcanize:* ts-node src/cli/watch-contract.ts", "fill": "DEBUG=vulcanize:* ts-node src/fill.ts", "reset": "DEBUG=vulcanize:* ts-node src/cli/reset.ts", - "checkpoint": "DEBUG=vulcanize:* ts-node src/cli/checkpoint.ts" + "checkpoint": "DEBUG=vulcanize:* ts-node src/cli/checkpoint.ts", + "export-state": "DEBUG=vulcanize:* ts-node src/cli/export-state.ts", + "import-state": "DEBUG=vulcanize:* ts-node src/cli/import-state.ts", + "inspect-cid": "DEBUG=vulcanize:* ts-node src/cli/inspect-cid.ts" }, "repository": { "type": "git", @@ -27,7 +30,7 @@ "dependencies": { "@apollo/client": "^3.3.19", "@ethersproject/providers": "5.3.0", - "@ipld/dag-json": "^8.0.1", + "@ipld/dag-cbor": "^6.0.12", "@vulcanize/cache": "^0.1.0", "@vulcanize/ipld-eth-client": "^0.1.0", "@vulcanize/solidity-mapper": "^0.1.0", @@ -39,6 +42,7 @@ "express": "^4.17.1", "graphql": "^15.5.0", "graphql-import-node": "^0.0.4", + "ipfs-http-client": "^53.0.1", "json-bigint": "^1.0.0", "lodash": "^4.17.21", "multiformats": "^9.4.8", diff --git a/packages/codegen/src/templates/readme-template.handlebars b/packages/codegen/src/templates/readme-template.handlebars index c1c0cbe8..847167b2 100644 --- a/packages/codegen/src/templates/readme-template.handlebars +++ b/packages/codegen/src/templates/readme-template.handlebars @@ -8,6 +8,12 @@ yarn ``` +* Run the IPFS (go-ipfs version 0.9.0) daemon: + + ```bash + ipfs daemon + ``` + * Create a postgres12 database for the watcher: ```bash @@ -39,7 +45,7 @@ * Update the `upstream` config in the [config file](./environments/local.toml) and provide the `ipld-eth-server` GQL API and the `indexer-db` postgraphile endpoints. -* Update the [config](./environments/local.toml) with state checkpoint settings. +* Update the `server` config in the [config file](./environments/local.toml) with state checkpoint settings and provide the IPFS API address. ## Customize @@ -53,7 +59,9 @@ * Edit the custom hook function `createInitialCheckpoint` (triggered on watch-contract, checkpoint: `true`) in [hooks.ts](./src/hooks.ts) to save an initial checkpoint `IPLDBlock` using the `Indexer` object. - * Edit the custom hook function `createStateDiff` (triggered on a block) in [hooks.ts](./src/hooks.ts) to save the state in an `IPLDBlock` using the `Indexer` object. The default state (if exists) is updated. + * Edit the custom hook function `createStateDiff` (triggered on a block) in [hooks.ts](./src/hooks.ts) to save the state in a `diff` `IPLDBlock` using the `Indexer` object. The default state (if exists) is updated. + + * Edit the custom hook function `createStateCheckpoint` (triggered just before default and CLI checkpoint) in [hooks.ts](./src/hooks.ts) to save the state in a `checkpoint` `IPLDBlock` using the `Indexer` object. * The existing example hooks in [hooks.ts](./src/hooks.ts) are for an `ERC20` contract. @@ -132,4 +140,50 @@ GQL console: http://localhost:3008/graphql yarn reset job-queue --block-number ``` - * `block-number`: Block number to reset the watcher to. + * `block-number`: Block number to which to reset the watcher. + + * To export and import the watcher state: + + * In source watcher, export watcher state: + + ```bash + yarn export-state --export-file [export-file-path] + ``` + + * `export-file`: Path of JSON file to which to export the watcher data. + + * In target watcher, run job-runner: + + ```bash + yarn job-runner + ``` + + * Import watcher state: + + ```bash + yarn import-state --import-file + ``` + + * `import-file`: Path of JSON file from which to import the watcher data. + + * Run fill: + + ```bash + yarn fill --start-block --end-block + ``` + + * `snapshot-block`: Block number at which the watcher state was exported. + + * Run server: + + ```bash + yarn server + ``` + + * To inspect a CID: + + ```bash + yarn inspect-cid --cid + ``` + + * `cid`: CID to be inspected. diff --git a/packages/codegen/src/templates/reset-state-template.handlebars b/packages/codegen/src/templates/reset-state-template.handlebars index 6cd93846..480ea63c 100644 --- a/packages/codegen/src/templates/reset-state-template.handlebars +++ b/packages/codegen/src/templates/reset-state-template.handlebars @@ -42,6 +42,9 @@ export const handler = async (argv: any): Promise => { const syncStatus = await indexer.getSyncStatus(); assert(syncStatus, 'Missing syncStatus'); + const hooksStatus = await indexer.getHookStatus(); + assert(hooksStatus, 'Missing hooksStatus'); + const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false); assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`); assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`); @@ -70,6 +73,10 @@ export const handler = async (argv: any): Promise => { await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true); } + if (hooksStatus.latestProcessedBlockNumber > blockProgress.blockNumber) { + await indexer.updateHookStatusProcessedBlock(blockProgress.blockNumber, true); + } + dbTx.commitTransaction(); } catch (error) { await dbTx.rollbackTransaction(); diff --git a/packages/codegen/src/templates/watch-contract-template.handlebars b/packages/codegen/src/templates/watch-contract-template.handlebars index 26866188..c57d703c 100644 --- a/packages/codegen/src/templates/watch-contract-template.handlebars +++ b/packages/codegen/src/templates/watch-contract-template.handlebars @@ -89,10 +89,8 @@ const main = async (): Promise => { await db.close(); }; -main() - .catch(err => { - log(err); - }) - .finally(() => { - process.exit(0); - }); +main().catch(err => { + log(err); +}).finally(() => { + process.exit(0); +}); diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index 8753705b..9b775d0a 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -257,11 +257,6 @@ export class Indexer implements IndexerInterface { await this.triggerIndexingOnEvent(event); } - async processBlock (job: any): Promise { - // Empty post-block method. - assert(job); - } - parseEventNameAndArgs (kind: string, logObj: any): any { let eventName = UNKNOWN_EVENT_NAME; let eventInfo = {}; diff --git a/packages/erc20-watcher/src/job-runner.ts b/packages/erc20-watcher/src/job-runner.ts index 619ea9d4..ad1ae113 100644 --- a/packages/erc20-watcher/src/job-runner.ts +++ b/packages/erc20-watcher/src/job-runner.ts @@ -16,7 +16,6 @@ import { JobRunner as BaseJobRunner, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, - QUEUE_HOOKS, JobQueueConfig, DEFAULT_CONFIG_PATH, getCustomProvider, @@ -46,7 +45,6 @@ export class JobRunner { async start (): Promise { await this.subscribeBlockProcessingQueue(); await this.subscribeEventProcessingQueue(); - await this.subscribeHooksQueue(); } async subscribeBlockProcessingQueue (): Promise { @@ -60,14 +58,6 @@ export class JobRunner { await this._baseJobRunner.processEvent(job); }); } - - async subscribeHooksQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => { - await this._indexer.processBlock(job); - - await this._jobQueue.markComplete(job); - }); - } } export const main = async (): Promise => { diff --git a/packages/uni-info-watcher/src/job-runner.ts b/packages/uni-info-watcher/src/job-runner.ts index 293c6161..fc964e17 100644 --- a/packages/uni-info-watcher/src/job-runner.ts +++ b/packages/uni-info-watcher/src/job-runner.ts @@ -17,7 +17,6 @@ import { JobQueue, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, - QUEUE_HOOKS, JobRunner as BaseJobRunner, JobQueueConfig, DEFAULT_CONFIG_PATH, @@ -48,7 +47,6 @@ export class JobRunner { async start (): Promise { await this.subscribeBlockProcessingQueue(); await this.subscribeEventProcessingQueue(); - await this.subscribeHooksQueue(); } async subscribeBlockProcessingQueue (): Promise { @@ -62,14 +60,6 @@ export class JobRunner { await this._baseJobRunner.processEvent(job); }); } - - async subscribeHooksQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => { - await this._indexer.processBlock(job); - - await this._jobQueue.markComplete(job); - }); - } } export const main = async (): Promise => { diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index 03402e4d..bdb4649a 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -122,11 +122,6 @@ export class Indexer implements IndexerInterface { } } - async processBlock (job: any): Promise { - // Empty post-block method. - assert(job); - } - parseEventNameAndArgs (kind: string, logObj: any): any { let eventName = UNKNOWN_EVENT_NAME; let eventInfo = {}; diff --git a/packages/uni-watcher/src/job-runner.ts b/packages/uni-watcher/src/job-runner.ts index 23a56215..770908ac 100644 --- a/packages/uni-watcher/src/job-runner.ts +++ b/packages/uni-watcher/src/job-runner.ts @@ -16,7 +16,6 @@ import { JobRunner as BaseJobRunner, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, - QUEUE_HOOKS, JobQueueConfig, DEFAULT_CONFIG_PATH, getCustomProvider, @@ -46,7 +45,6 @@ export class JobRunner { async start (): Promise { await this.subscribeBlockProcessingQueue(); await this.subscribeEventProcessingQueue(); - await this.subscribeHooksQueue(); } async subscribeBlockProcessingQueue (): Promise { @@ -60,14 +58,6 @@ export class JobRunner { await this._baseJobRunner.processEvent(job); }); } - - async subscribeHooksQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => { - await this._indexer.processBlock(job); - - await this._jobQueue.markComplete(job); - }); - } } export const main = async (): Promise => { diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index f2cca1e7..98521c1b 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -31,6 +31,7 @@ export interface ServerConfig { kind: string; checkpointing: boolean; checkpointInterval: number; + ipfsApiAddr: string; } export interface UpstreamConfig { diff --git a/packages/util/src/constants.ts b/packages/util/src/constants.ts index 62fbfa37..f4479d7f 100644 --- a/packages/util/src/constants.ts +++ b/packages/util/src/constants.ts @@ -9,6 +9,7 @@ export const QUEUE_EVENT_PROCESSING = 'event-processing'; export const QUEUE_CHAIN_PRUNING = 'chain-pruning'; export const QUEUE_BLOCK_CHECKPOINT = 'block-checkpoint'; export const QUEUE_HOOKS = 'hooks'; +export const QUEUE_IPFS = 'ipfs'; export const JOB_KIND_INDEX = 'index'; export const JOB_KIND_PRUNE = 'prune'; diff --git a/packages/util/src/fill.ts b/packages/util/src/fill.ts index 0d3cbfc7..38269abd 100644 --- a/packages/util/src/fill.ts +++ b/packages/util/src/fill.ts @@ -25,8 +25,7 @@ export const fillBlocks = async ( } ): Promise => { let { startBlock, endBlock, prefetch, batchBlocks } = argv; - assert(startBlock < endBlock, 'endBlock should be greater than startBlock'); - + assert(startBlock <= endBlock, 'endBlock should be greater than or equal to startBlock'); const syncStatus = await indexer.getSyncStatus(); if (syncStatus) { @@ -45,6 +44,14 @@ export const fillBlocks = async ( await eventWatcher.initBlockProcessingOnCompleteHandler(); await eventWatcher.initEventProcessingOnCompleteHandler(); + if (eventWatcher.initHooksOnCompleteHandler) { + await eventWatcher.initHooksOnCompleteHandler(); + } + + if (eventWatcher.initBlockCheckpointOnCompleteHandler) { + await eventWatcher.initBlockCheckpointOnCompleteHandler(); + } + const numberOfBlocks = endBlock - startBlock + 1; processBlockByNumber(jobQueue, indexer, blockDelayInMilliSecs, startBlock); diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 6b04f69f..6cf7a5f7 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -78,6 +78,8 @@ export interface EventWatcherInterface { getBlockProgressEventIterator (): AsyncIterator initBlockProcessingOnCompleteHandler (): Promise initEventProcessingOnCompleteHandler (): Promise + initHooksOnCompleteHandler?: () => Promise + initBlockCheckpointOnCompleteHandler?: () => Promise } export interface DatabaseInterface {