diff --git a/packages/codegen/src/generate-code.ts b/packages/codegen/src/generate-code.ts index d2766a36..f7489763 100644 --- a/packages/codegen/src/generate-code.ts +++ b/packages/codegen/src/generate-code.ts @@ -36,6 +36,7 @@ import { exportState } from './export-state'; import { importState } from './import-state'; import { exportInspectCID } from './inspect-cid'; import { getSubgraphConfig } from './utils/subgraph'; +import { exportIndexBlock } from './index-block'; const main = async (): Promise => { const argv = await yargs(hideBin(process.argv)) @@ -166,7 +167,7 @@ function generateWatcher (visitor: Visitor, contracts: any[], config: any) { }); // Register the handlebar helpers to be used in the templates. - registerHandlebarHelpers(); + registerHandlebarHelpers(config); visitor.visitSubgraph(config.subgraphPath); @@ -300,6 +301,11 @@ function generateWatcher (visitor: Visitor, contracts: any[], config: any) { ? fs.createWriteStream(path.join(outputDir, 'src/cli/inspect-cid.ts')) : process.stdout; exportInspectCID(outStream, config.subgraphPath); + + outStream = outputDir + ? fs.createWriteStream(path.join(outputDir, 'src/cli/index-block.ts')) + : process.stdout; + exportIndexBlock(outStream); } function getConfig (configFile: string): any { diff --git a/packages/codegen/src/index-block.ts b/packages/codegen/src/index-block.ts new file mode 100644 index 00000000..f30c72fa --- /dev/null +++ b/packages/codegen/src/index-block.ts @@ -0,0 +1,21 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import fs from 'fs'; +import path from 'path'; +import Handlebars from 'handlebars'; +import { Writable } from 'stream'; + +const TEMPLATE_FILE = './templates/index-block-template.handlebars'; + +/** + * Writes the index-block file generated from a template to a stream. + * @param outStream A writable output stream to write the index-block file to. + */ +export function exportIndexBlock (outStream: Writable): void { + const templateString = fs.readFileSync(path.resolve(__dirname, TEMPLATE_FILE)).toString(); + const template = Handlebars.compile(templateString); + const indexBlock = template({}); + outStream.write(indexBlock); +} diff --git a/packages/codegen/src/templates/checkpoint-template.handlebars b/packages/codegen/src/templates/checkpoint-template.handlebars index 84563a9d..e82beffa 100644 --- a/packages/codegen/src/templates/checkpoint-template.handlebars +++ b/packages/codegen/src/templates/checkpoint-template.handlebars @@ -2,14 +2,18 @@ // Copyright 2021 Vulcanize, Inc. // +{{#if (subgraphPath)}} import path from 'path'; +{{/if}} import yargs from 'yargs'; import 'reflect-metadata'; import debug from 'debug'; import assert from 'assert'; import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util'; +{{#if (subgraphPath)}} import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; +{{/if}} import { Database } from '../database'; import { Indexer } from '../indexer'; @@ -45,11 +49,13 @@ const main = async (): Promise => { const db = new Database(config.database); await db.init(); - + {{#if (subgraphPath)}} + const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - + const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); + {{/if}} const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -60,11 +66,11 @@ const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue{{#if (subgraphPath)}}, graphWatcher{{/if}}); await indexer.init(); + {{#if (subgraphPath)}} graphWatcher.setIndexer(indexer); - {{#if subgraphPath}} await graphWatcher.init(); {{/if}} diff --git a/packages/codegen/src/templates/config-template.handlebars b/packages/codegen/src/templates/config-template.handlebars index 525428ce..94fe582c 100644 --- a/packages/codegen/src/templates/config-template.handlebars +++ b/packages/codegen/src/templates/config-template.handlebars @@ -12,10 +12,17 @@ # IPFS API address (can be taken from the output on running the IPFS daemon). ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001" - {{#if subgraphPath}} + {{#if (subgraphPath)}} subgraphPath = "{{subgraphPath}}" wasmRestartBlocksInterval = 20 + {{/if}} + # Boolean to filter logs by contract. + filterLogs = false + + # Max block range for which to return events in eventsInRange GQL query. + # Use -1 for skipping check on block range. + maxEventsBlockRange = 1000 [database] type = "postgres" diff --git a/packages/codegen/src/templates/export-state-template.handlebars b/packages/codegen/src/templates/export-state-template.handlebars index cc62d56c..35d75a60 100644 --- a/packages/codegen/src/templates/export-state-template.handlebars +++ b/packages/codegen/src/templates/export-state-template.handlebars @@ -10,7 +10,9 @@ import fs from 'fs'; import path from 'path'; import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, StateKind } from '@vulcanize/util'; +{{#if (subgraphPath)}} import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; +{{/if}} import * as codec from '@ipld/dag-cbor'; import { Database } from '../database'; @@ -42,11 +44,13 @@ const main = async (): Promise => { const db = new Database(config.database); await db.init(); + {{#if (subgraphPath)}} const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); + {{/if}} const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -57,11 +61,11 @@ const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue{{#if (subgraphPath)}}, graphWatcher{{/if}}); await indexer.init(); + {{#if (subgraphPath)}} graphWatcher.setIndexer(indexer); - {{#if subgraphPath}} await graphWatcher.init(); {{/if}} diff --git a/packages/codegen/src/templates/fill-template.handlebars b/packages/codegen/src/templates/fill-template.handlebars index 71f0b920..1760aa69 100644 --- a/packages/codegen/src/templates/fill-template.handlebars +++ b/packages/codegen/src/templates/fill-template.handlebars @@ -2,7 +2,9 @@ // Copyright 2021 Vulcanize, Inc. // +{{#if (subgraphPath)}} import path from 'path'; +{{/if}} import assert from 'assert'; import 'reflect-metadata'; import yargs from 'yargs'; @@ -11,7 +13,9 @@ import debug from 'debug'; import { PubSub } from 'apollo-server-express'; import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@vulcanize/util'; +{{#if (subgraphPath)}} import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; +{{/if}} import { Database } from './database'; import { Indexer } from './indexer'; @@ -57,11 +61,13 @@ export const main = async (): Promise => { const db = new Database(config.database); await db.init(); - + {{#if (subgraphPath)}} + const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - + const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); + {{/if}} const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -72,11 +78,11 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue{{#if (subgraphPath)}}, graphWatcher{{/if}}); await indexer.init(); + {{#if (subgraphPath)}} graphWatcher.setIndexer(indexer); - {{#if subgraphPath}} await graphWatcher.init(); {{/if}} diff --git a/packages/codegen/src/templates/import-state-template.handlebars b/packages/codegen/src/templates/import-state-template.handlebars index 8d077011..670f3cee 100644 --- a/packages/codegen/src/templates/import-state-template.handlebars +++ b/packages/codegen/src/templates/import-state-template.handlebars @@ -12,7 +12,9 @@ import fs from 'fs'; import path from 'path'; import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients, StateKind } from '@vulcanize/util'; +{{#if (subgraphPath)}} import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; +{{/if}} import * as codec from '@ipld/dag-cbor'; import { Database } from '../database'; @@ -46,11 +48,13 @@ export const main = async (): Promise => { const db = new Database(config.database); await db.init(); + {{#if (subgraphPath)}} const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); + {{/if}} // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries @@ -65,11 +69,11 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue{{#if (subgraphPath)}}, graphWatcher{{/if}}); await indexer.init(); + {{#if (subgraphPath)}} graphWatcher.setIndexer(indexer); - {{#if subgraphPath}} await graphWatcher.init(); {{/if}} diff --git a/packages/codegen/src/templates/index-block-template.handlebars b/packages/codegen/src/templates/index-block-template.handlebars new file mode 100644 index 00000000..56e12c2c --- /dev/null +++ b/packages/codegen/src/templates/index-block-template.handlebars @@ -0,0 +1,81 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +{{#if (subgraphPath)}} +import path from 'path'; +{{/if}} +import yargs from 'yargs'; +import 'reflect-metadata'; +import debug from 'debug'; +import assert from 'assert'; + +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, indexBlock } from '@vulcanize/util'; +{{#if (subgraphPath)}} +import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; +{{/if}} + +import { Database } from '../database'; +import { Indexer } from '../indexer'; + +const log = debug('vulcanize:index-block'); + +const main = async (): Promise => { + const argv = await yargs.parserConfiguration({ + 'parse-numbers': false + }).options({ + configFile: { + alias: 'f', + type: 'string', + require: true, + demandOption: true, + describe: 'Configuration file path (toml)', + default: DEFAULT_CONFIG_PATH + }, + block: { + type: 'number', + require: true, + demandOption: true, + describe: 'Block number to index' + } + }).argv; + + const config: Config = await getConfig(argv.configFile); + const { ethClient, ethProvider } = await initClients(config); + + const db = new Database(config.database); + await db.init(); + {{#if (subgraphPath)}} + + const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); + await graphDb.init(); + + const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); + {{/if}} + + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue{{#if (subgraphPath)}}, graphWatcher{{/if}}); + await indexer.init(); + {{#if (subgraphPath)}} + + graphWatcher.setIndexer(indexer); + await graphWatcher.init(); + {{/if}} + + await indexBlock(indexer, jobQueueConfig.eventsInBatch, argv); + + await db.close(); +}; + +main().catch(err => { + log(err); +}).finally(() => { + process.exit(0); +}); diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index f3602097..7887bf4f 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -24,12 +24,16 @@ import { QueryOptions, updateStateForElementaryType, updateStateForMappingType, + {{#if (subgraphPath)}} BlockHeight, + {{/if}} IPFSClient, StateKind, IpldStatus as IpldStatusInterface } from '@vulcanize/util'; +{{#if (subgraphPath)}} import { GraphWatcher } from '@vulcanize/graph-node'; +{{/if}} {{#each contracts as | contract |}} import {{contract.contractName}}Artifacts from './artifacts/{{contract.contractName}}.json'; @@ -101,7 +105,9 @@ export class Indexer implements IPLDIndexerInterface { _ethProvider: BaseProvider _baseIndexer: BaseIndexer _serverConfig: ServerConfig + {{#if (subgraphPath)}} _graphWatcher: GraphWatcher; + {{/if}} _abiMap: Map _storageLayoutMap: Map @@ -109,10 +115,12 @@ export class Indexer implements IPLDIndexerInterface { _ipfsClient: IPFSClient + {{#if (subgraphPath)}} _entityTypesMap: Map _relationsMap: Map - - constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, graphWatcher: GraphWatcher) { + + {{/if}} + constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue{{#if (subgraphPath)}}, graphWatcher: GraphWatcher{{/if}}) { assert(db); assert(ethClient); @@ -122,7 +130,9 @@ export class Indexer implements IPLDIndexerInterface { this._serverConfig = serverConfig; this._ipfsClient = new IPFSClient(this._serverConfig.ipfsApiAddr); this._baseIndexer = new BaseIndexer(this._serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue, this._ipfsClient); + {{#if (subgraphPath)}} this._graphWatcher = graphWatcher; + {{/if}} this._abiMap = new Map(); this._storageLayoutMap = new Map(); @@ -147,14 +157,16 @@ export class Indexer implements IPLDIndexerInterface { this._contractMap.set(KIND_{{capitalize contract.contractName}}, new ethers.utils.Interface({{contract.contractName}}ABI)); {{/each}} + {{#if (subgraphPath)}} this._entityTypesMap = new Map(); this._populateEntityTypesMap(); this._relationsMap = new Map(); this._populateRelationsMap(); + {{/if}} } - get serverConfig () { + get serverConfig (): ServerConfig { return this._serverConfig; } @@ -399,6 +411,7 @@ export class Indexer implements IPLDIndexerInterface { await this._baseIndexer.removeIPLDBlocks(blockNumber, kind); } + {{#if (subgraphPath)}} async getSubgraphEntity (entity: new () => Entity, id: string, block?: BlockHeight): Promise { const relations = this._relationsMap.get(entity) || {}; @@ -407,12 +420,16 @@ export class Indexer implements IPLDIndexerInterface { return data; } + {{/if}} + async triggerIndexingOnEvent (event: Event): Promise { const resultEvent = this.getResultEvent(event); + {{#if (subgraphPath)}} // Call subgraph handler for event. await this._graphWatcher.handleEvent(resultEvent); + {{/if}} // Call custom hook function for indexing on event. await handleEvent(this, resultEvent); } @@ -425,9 +442,11 @@ export class Indexer implements IPLDIndexerInterface { async processBlock (blockHash: string, blockNumber: number): Promise { // Call a function to create initial state for contracts. await this._baseIndexer.createInit(this, blockHash, blockNumber); + {{#if (subgraphPath)}} // Call subgraph handler for block. await this._graphWatcher.handleBlock(blockHash); + {{/if}} } parseEventNameAndArgs (kind: string, logObj: any): any { @@ -601,7 +620,7 @@ export class Indexer implements IPLDIndexerInterface { } async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise> { - return this._baseIndexer.getEventsInRange(fromBlockNumber, toBlockNumber); + return this._baseIndexer.getEventsInRange(fromBlockNumber, toBlockNumber, this._serverConfig.maxEventsBlockRange); } async getSyncStatus (): Promise { @@ -664,10 +683,14 @@ export class Indexer implements IPLDIndexerInterface { return this._baseIndexer.getAncestorAtDepth(blockHash, depth); } + {{#if (subgraphPath)}} getEntityTypesMap (): Map { return this._entityTypesMap; } + {{/if}} + + {{#if (subgraphPath)}} _populateEntityTypesMap (): void { {{#each subgraphEntities as | subgraphEntity |}} this._entityTypesMap.set('{{subgraphEntity.className}}', { @@ -682,7 +705,9 @@ export class Indexer implements IPLDIndexerInterface { }); {{/each}} } + {{/if}} + {{#if (subgraphPath)}} _populateRelationsMap (): void { {{#each subgraphEntities as | subgraphEntity |}} {{#if subgraphEntity.relations}} @@ -705,16 +730,39 @@ export class Indexer implements IPLDIndexerInterface { {{/if}} {{/each}} } + {{/if}} async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { assert(blockHash); - - const logsPromise = this._ethClient.getLogs({ blockHash }); const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash }); + const blockPromise = this._ethClient.getBlockByHash(blockHash); + let logs: any[]; + + if (this._serverConfig.filterLogs) { + const watchedContracts = this._baseIndexer.getWatchedContracts(); + + // TODO: Query logs by multiple contracts. + const contractlogsPromises = watchedContracts.map((watchedContract): Promise => this._ethClient.getLogs({ + blockHash, + contract: watchedContract.address + })); + + const contractlogs = await Promise.all(contractlogsPromises); + + // Flatten logs by contract and sort by index. + logs = contractlogs.map(data => { + return data.logs; + }).flat() + .sort((a, b) => { + return a.index - b.index; + }); + } else { + ({ logs } = await this._ethClient.getLogs({ blockHash })); + } let [ - { block, logs }, - { + { block }, + { allEthHeaderCids: { nodes: [ { @@ -725,7 +773,7 @@ export class Indexer implements IPLDIndexerInterface { ] } } - ] = await Promise.all([logsPromise, transactionsPromise]); + ] = await Promise.all([blockPromise, transactionsPromise]); const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { acc[transaction.txHash] = transaction; diff --git a/packages/codegen/src/templates/inspect-cid-template.handlebars b/packages/codegen/src/templates/inspect-cid-template.handlebars index 299383f7..8bcbdf31 100644 --- a/packages/codegen/src/templates/inspect-cid-template.handlebars +++ b/packages/codegen/src/templates/inspect-cid-template.handlebars @@ -2,7 +2,9 @@ // Copyright 2021 Vulcanize, Inc. // +{{#if (subgraphPath)}} import path from 'path'; +{{/if}} import assert from 'assert'; import yargs from 'yargs'; import 'reflect-metadata'; @@ -10,7 +12,9 @@ import debug from 'debug'; import util from 'util'; import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util'; +{{#if (subgraphPath)}} import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; +{{/if}} import { Database } from '../database'; import { Indexer } from '../indexer'; @@ -42,11 +46,13 @@ const main = async (): Promise => { const db = new Database(config.database); await db.init(); - + {{#if (subgraphPath)}} + const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - + const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); + {{/if}} const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -57,11 +63,11 @@ const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue{{#if (subgraphPath)}}, graphWatcher{{/if}}); await indexer.init(); + {{#if (subgraphPath)}} graphWatcher.setIndexer(indexer); - {{#if subgraphPath}} await graphWatcher.init(); {{/if}} diff --git a/packages/codegen/src/templates/job-runner-template.handlebars b/packages/codegen/src/templates/job-runner-template.handlebars index a422f540..ebbaa2b0 100644 --- a/packages/codegen/src/templates/job-runner-template.handlebars +++ b/packages/codegen/src/templates/job-runner-template.handlebars @@ -2,7 +2,9 @@ // Copyright 2021 Vulcanize, Inc. // +{{#if (subgraphPath)}} import path from 'path'; +{{/if}} import assert from 'assert'; import 'reflect-metadata'; import yargs from 'yargs'; @@ -24,7 +26,9 @@ import { DEFAULT_CONFIG_PATH, initClients } from '@vulcanize/util'; +{{#if (subgraphPath)}} import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; +{{/if}} import { Indexer } from './indexer'; import { Database } from './database'; @@ -252,11 +256,13 @@ export const main = async (): Promise => { const db = new Database(config.database); await db.init(); - + {{#if (subgraphPath)}} + const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - + const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); + {{/if}} const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -267,11 +273,11 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue{{#if (subgraphPath)}}, graphWatcher{{/if}}); await indexer.init(); + {{#if (subgraphPath)}} graphWatcher.setIndexer(indexer); - {{#if subgraphPath}} await graphWatcher.init(); // Watching all the contracts in the subgraph. diff --git a/packages/codegen/src/templates/package-template.handlebars b/packages/codegen/src/templates/package-template.handlebars index df800681..213d038e 100644 --- a/packages/codegen/src/templates/package-template.handlebars +++ b/packages/codegen/src/templates/package-template.handlebars @@ -15,7 +15,8 @@ "checkpoint": "DEBUG=vulcanize:* ts-node src/cli/checkpoint.ts", "export-state": "DEBUG=vulcanize:* ts-node src/cli/export-state.ts", "import-state": "DEBUG=vulcanize:* ts-node src/cli/import-state.ts", - "inspect-cid": "DEBUG=vulcanize:* ts-node src/cli/inspect-cid.ts" + "inspect-cid": "DEBUG=vulcanize:* ts-node src/cli/inspect-cid.ts", + "index-block": "DEBUG=vulcanize:* ts-node src/cli/index-block.ts" }, "repository": { "type": "git", @@ -34,7 +35,9 @@ "@vulcanize/ipld-eth-client": "^0.1.0", "@vulcanize/solidity-mapper": "^0.1.0", "@vulcanize/util": "^0.1.0", + {{#if (subgraphPath)}} "@vulcanize/graph-node": "^0.1.0", + {{/if}} "apollo-server-express": "^2.25.0", "apollo-type-bigint": "^0.1.3", "debug": "^4.3.1", diff --git a/packages/codegen/src/templates/reset-state-template.handlebars b/packages/codegen/src/templates/reset-state-template.handlebars index 009d1768..38eb9ac7 100644 --- a/packages/codegen/src/templates/reset-state-template.handlebars +++ b/packages/codegen/src/templates/reset-state-template.handlebars @@ -2,13 +2,17 @@ // Copyright 2021 Vulcanize, Inc. // +{{#if (subgraphPath)}} import path from 'path'; +{{/if}} import debug from 'debug'; import { MoreThan } from 'typeorm'; import assert from 'assert'; import { getConfig, initClients, resetJobs, JobQueue } from '@vulcanize/util'; +{{#if (subgraphPath)}} import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; +{{/if}} import { Database } from '../../database'; import { Indexer } from '../../indexer'; @@ -38,11 +42,13 @@ export const handler = async (argv: any): Promise => { // Initialize database. const db = new Database(config.database); await db.init(); - + {{#if (subgraphPath)}} + const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - + const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); + {{/if}} const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -53,11 +59,11 @@ export const handler = async (argv: any): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue{{#if (subgraphPath)}}, graphWatcher{{/if}}); await indexer.init(); + {{#if (subgraphPath)}} graphWatcher.setIndexer(indexer); - {{#if subgraphPath}} await graphWatcher.init(); {{/if}} diff --git a/packages/codegen/src/templates/server-template.handlebars b/packages/codegen/src/templates/server-template.handlebars index c28aac29..8f496d8b 100644 --- a/packages/codegen/src/templates/server-template.handlebars +++ b/packages/codegen/src/templates/server-template.handlebars @@ -15,7 +15,9 @@ import 'graphql-import-node'; import { createServer } from 'http'; import { DEFAULT_CONFIG_PATH, getConfig, Config, JobQueue, KIND_ACTIVE, initClients } from '@vulcanize/util'; +{{#if (subgraphPath)}} import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; +{{/if}} import { createResolvers } from './resolvers'; import { Indexer } from './indexer'; @@ -42,11 +44,13 @@ export const main = async (): Promise => { const db = new Database(config.database); await db.init(); - + {{#if (subgraphPath)}} + const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - + const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); + {{/if}} // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries @@ -60,11 +64,11 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue{{#if (subgraphPath)}}, graphWatcher{{/if}}); await indexer.init(); + {{#if (subgraphPath)}} graphWatcher.setIndexer(indexer); - {{#if subgraphPath}} await graphWatcher.init(); {{/if}} diff --git a/packages/codegen/src/templates/tsconfig-template.handlebars b/packages/codegen/src/templates/tsconfig-template.handlebars index 99712bdf..b30e1629 100644 --- a/packages/codegen/src/templates/tsconfig-template.handlebars +++ b/packages/codegen/src/templates/tsconfig-template.handlebars @@ -6,7 +6,7 @@ // "incremental": true, /* Enable incremental compilation */ "target": "es5", /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019', 'ES2020', 'ES2021', or 'ESNEXT'. */ "module": "commonjs", /* Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', 'es2020', or 'ESNext'. */ - // "lib": [], /* Specify library files to be included in the compilation. */ + "lib": ["es2019"], /* Specify library files to be included in the compilation. */ // "allowJs": true, /* Allow javascript files to be compiled. */ // "checkJs": true, /* Report errors in .js files. */ // "jsx": "preserve", /* Specify JSX code generation: 'preserve', 'react-native', 'react', 'react-jsx' or 'react-jsxdev'. */ diff --git a/packages/codegen/src/templates/watch-contract-template.handlebars b/packages/codegen/src/templates/watch-contract-template.handlebars index c19093cd..f10815a2 100644 --- a/packages/codegen/src/templates/watch-contract-template.handlebars +++ b/packages/codegen/src/templates/watch-contract-template.handlebars @@ -2,14 +2,18 @@ // Copyright 2021 Vulcanize, Inc. // +{{#if (subgraphPath)}} import path from 'path'; +{{/if}} import yargs from 'yargs'; import 'reflect-metadata'; import debug from 'debug'; import assert from 'assert'; import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util'; +{{#if (subgraphPath)}} import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; +{{/if}} import { Database } from '../database'; import { Indexer } from '../indexer'; @@ -58,11 +62,13 @@ const main = async (): Promise => { const db = new Database(config.database); await db.init(); - + {{#if (subgraphPath)}} + const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - + const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); + {{/if}} const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -73,11 +79,11 @@ const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue{{#if (subgraphPath)}}, graphWatcher{{/if}}); await indexer.init(); + {{#if (subgraphPath)}} graphWatcher.setIndexer(indexer); - {{#if subgraphPath}} await graphWatcher.init(); {{/if}} diff --git a/packages/codegen/src/utils/handlebar-helpers.ts b/packages/codegen/src/utils/handlebar-helpers.ts index 7059476b..0ea75951 100644 --- a/packages/codegen/src/utils/handlebar-helpers.ts +++ b/packages/codegen/src/utils/handlebar-helpers.ts @@ -7,10 +7,11 @@ import Handlebars from 'handlebars'; import { reservedNames } from './types'; -export function registerHandlebarHelpers (): void { +export function registerHandlebarHelpers (config: any): void { Handlebars.registerHelper('compare', compareHelper); Handlebars.registerHelper('capitalize', capitalizeHelper); Handlebars.registerHelper('reservedNameCheck', reservedNameCheckHelper); + Handlebars.registerHelper('subgraphPath', () => config.subgraphPath); } /** diff --git a/packages/codegen/src/visitor.ts b/packages/codegen/src/visitor.ts index 6d5dc8cb..ad92c0d6 100644 --- a/packages/codegen/src/visitor.ts +++ b/packages/codegen/src/visitor.ts @@ -93,9 +93,13 @@ export class Visitor { const variable = node.variables[0]; const name: string = variable.name; const stateVariableType: string = variable.typeName.type; - const params: Param[] = []; + if (variable.isImmutable) { + // Skip in case variable is immutable. + return; + } + let typeName = variable.typeName; let numParams = 0; diff --git a/packages/eden-watcher/environments/local.toml b/packages/eden-watcher/environments/local.toml index a3f7000a..2e4cd841 100644 --- a/packages/eden-watcher/environments/local.toml +++ b/packages/eden-watcher/environments/local.toml @@ -15,6 +15,13 @@ subgraphPath = "../graph-node/test/subgraph/eden" wasmRestartBlocksInterval = 20 + # Boolean to filter logs by contract. + filterLogs = false + + # Max block range for which to return events in eventsInRange GQL query. + # Use -1 for skipping check on block range. + maxEventsBlockRange = 1000 + [database] type = "postgres" host = "localhost" diff --git a/packages/eden-watcher/package.json b/packages/eden-watcher/package.json index 8e729545..1c94060a 100644 --- a/packages/eden-watcher/package.json +++ b/packages/eden-watcher/package.json @@ -15,7 +15,8 @@ "checkpoint": "DEBUG=vulcanize:* ts-node src/cli/checkpoint.ts", "export-state": "DEBUG=vulcanize:* ts-node src/cli/export-state.ts", "import-state": "DEBUG=vulcanize:* ts-node src/cli/import-state.ts", - "inspect-cid": "DEBUG=vulcanize:* ts-node src/cli/inspect-cid.ts" + "inspect-cid": "DEBUG=vulcanize:* ts-node src/cli/inspect-cid.ts", + "index-block": "DEBUG=vulcanize:* ts-node src/cli/index-block.ts" }, "repository": { "type": "git", diff --git a/packages/eden-watcher/src/cli/index-block.ts b/packages/eden-watcher/src/cli/index-block.ts new file mode 100644 index 00000000..02cd13b3 --- /dev/null +++ b/packages/eden-watcher/src/cli/index-block.ts @@ -0,0 +1,73 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import path from 'path'; +import yargs from 'yargs'; +import 'reflect-metadata'; +import debug from 'debug'; +import assert from 'assert'; + +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, indexBlock } from '@vulcanize/util'; +import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; + +import { Database } from '../database'; +import { Indexer } from '../indexer'; + +const log = debug('vulcanize:index-block'); + +const main = async (): Promise => { + const argv = await yargs.parserConfiguration({ + 'parse-numbers': false + }).options({ + configFile: { + alias: 'f', + type: 'string', + require: true, + demandOption: true, + describe: 'Configuration file path (toml)', + default: DEFAULT_CONFIG_PATH + }, + block: { + type: 'number', + require: true, + demandOption: true, + describe: 'Block number to index' + } + }).argv; + + const config: Config = await getConfig(argv.configFile); + const { ethClient, ethProvider } = await initClients(config); + + const db = new Database(config.database); + await db.init(); + + const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); + await graphDb.init(); + + const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); + + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + await indexer.init(); + + graphWatcher.setIndexer(indexer); + await graphWatcher.init(); + + await indexBlock(indexer, jobQueueConfig.eventsInBatch, argv); + + await db.close(); +}; + +main().catch(err => { + log(err); +}).finally(() => { + process.exit(0); +}); diff --git a/packages/eden-watcher/src/indexer.ts b/packages/eden-watcher/src/indexer.ts index 5091feb6..a75daa6e 100644 --- a/packages/eden-watcher/src/indexer.ts +++ b/packages/eden-watcher/src/indexer.ts @@ -191,7 +191,7 @@ export class Indexer implements IPLDIndexerInterface { this._populateRelationsMap(); } - get serverConfig () { + get serverConfig (): ServerConfig { return this._serverConfig; } @@ -1361,12 +1361,34 @@ export class Indexer implements IPLDIndexerInterface { async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { assert(blockHash); - - const logsPromise = this._ethClient.getLogs({ blockHash }); const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash }); + const blockPromise = this._ethClient.getBlockByHash(blockHash); + let logs: any[]; + + if (this._serverConfig.filterLogs) { + const watchedContracts = this._baseIndexer.getWatchedContracts(); + + // TODO: Query logs by multiple contracts. + const contractlogsPromises = watchedContracts.map((watchedContract): Promise => this._ethClient.getLogs({ + blockHash, + contract: watchedContract.address + })); + + const contractlogs = await Promise.all(contractlogsPromises); + + // Flatten logs by contract and sort by index. + logs = contractlogs.map(data => { + return data.logs; + }).flat() + .sort((a, b) => { + return a.index - b.index; + }); + } else { + ({ logs } = await this._ethClient.getLogs({ blockHash })); + } let [ - { block, logs }, + { block }, { allEthHeaderCids: { nodes: [ @@ -1378,7 +1400,7 @@ export class Indexer implements IPLDIndexerInterface { ] } } - ] = await Promise.all([logsPromise, transactionsPromise]); + ] = await Promise.all([blockPromise, transactionsPromise]); const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { acc[transaction.txHash] = transaction; diff --git a/packages/eden-watcher/tsconfig.json b/packages/eden-watcher/tsconfig.json index 99712bdf..b30e1629 100644 --- a/packages/eden-watcher/tsconfig.json +++ b/packages/eden-watcher/tsconfig.json @@ -6,7 +6,7 @@ // "incremental": true, /* Enable incremental compilation */ "target": "es5", /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019', 'ES2020', 'ES2021', or 'ESNEXT'. */ "module": "commonjs", /* Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', 'es2020', or 'ESNext'. */ - // "lib": [], /* Specify library files to be included in the compilation. */ + "lib": ["es2019"], /* Specify library files to be included in the compilation. */ // "allowJs": true, /* Allow javascript files to be compiled. */ // "checkJs": true, /* Report errors in .js files. */ // "jsx": "preserve", /* Specify JSX code generation: 'preserve', 'react-native', 'react', 'react-jsx' or 'react-jsxdev'. */ diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index cf878e69..dfd3adbf 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -75,7 +75,7 @@ export class Indexer implements IndexerInterface { this._contract = new ethers.utils.Interface(this._abi); } - get serverConfig () { + get serverConfig (): ServerConfig { return this._serverConfig; } @@ -388,7 +388,10 @@ export class Indexer implements IndexerInterface { async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { assert(blockHash); - let { block, logs } = await this._ethClient.getLogs({ blockHash }); + let [{ block }, { logs }] = await Promise.all([ + this._ethClient.getBlockByHash(blockHash), + this._ethClient.getLogs({ blockHash }) + ]); const dbEvents: Array> = []; diff --git a/packages/erc721-watcher/environments/local.toml b/packages/erc721-watcher/environments/local.toml index 1e7fe5dd..711e735c 100644 --- a/packages/erc721-watcher/environments/local.toml +++ b/packages/erc721-watcher/environments/local.toml @@ -12,6 +12,12 @@ # IPFS API address (can be taken from the output on running the IPFS daemon). ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001" + # Boolean to filter logs by contract. + filterLogs = false + + # Max block range for which to return events in eventsInRange GQL query. + # Use -1 for skipping check on block range. + maxEventsBlockRange = 1000 [database] type = "postgres" diff --git a/packages/erc721-watcher/package.json b/packages/erc721-watcher/package.json index 2f846f91..528ccda4 100644 --- a/packages/erc721-watcher/package.json +++ b/packages/erc721-watcher/package.json @@ -16,6 +16,7 @@ "export-state": "DEBUG=vulcanize:* ts-node src/cli/export-state.ts", "import-state": "DEBUG=vulcanize:* ts-node src/cli/import-state.ts", "inspect-cid": "DEBUG=vulcanize:* ts-node src/cli/inspect-cid.ts", + "index-block": "DEBUG=vulcanize:* ts-node src/cli/index-block.ts", "nft:deploy": "hardhat --network localhost nft-deploy", "nft:mint": "hardhat --network localhost nft-mint", "nft:transfer": "hardhat --network localhost nft-transfer", @@ -39,7 +40,6 @@ "@vulcanize/ipld-eth-client": "^0.1.0", "@vulcanize/solidity-mapper": "^0.1.0", "@vulcanize/util": "^0.1.0", - "@vulcanize/graph-node": "^0.1.0", "apollo-server-express": "^2.25.0", "apollo-type-bigint": "^0.1.3", "debug": "^4.3.1", diff --git a/packages/erc721-watcher/src/cli/checkpoint.ts b/packages/erc721-watcher/src/cli/checkpoint.ts index 84230167..990d64f0 100644 --- a/packages/erc721-watcher/src/cli/checkpoint.ts +++ b/packages/erc721-watcher/src/cli/checkpoint.ts @@ -2,14 +2,12 @@ // Copyright 2021 Vulcanize, Inc. // -import path from 'path'; import yargs from 'yargs'; import 'reflect-metadata'; import debug from 'debug'; import assert from 'assert'; import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util'; -import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Database } from '../database'; import { Indexer } from '../indexer'; @@ -46,11 +44,6 @@ const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); - await graphDb.init(); - - const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); - const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -60,11 +53,9 @@ const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); await indexer.init(); - graphWatcher.setIndexer(indexer); - const blockHash = await indexer.processCLICheckpoint(argv.address, argv.blockHash); log(`Created a checkpoint for contract ${argv.address} at block-hash ${blockHash}`); diff --git a/packages/erc721-watcher/src/cli/export-state.ts b/packages/erc721-watcher/src/cli/export-state.ts index a42c6aa3..31ac9028 100644 --- a/packages/erc721-watcher/src/cli/export-state.ts +++ b/packages/erc721-watcher/src/cli/export-state.ts @@ -10,7 +10,6 @@ import fs from 'fs'; import path from 'path'; import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, StateKind } from '@vulcanize/util'; -import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import * as codec from '@ipld/dag-cbor'; import { Database } from '../database'; @@ -43,11 +42,6 @@ const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); - await graphDb.init(); - - const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); - const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -57,11 +51,9 @@ const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); await indexer.init(); - graphWatcher.setIndexer(indexer); - const exportData: any = { snapshotBlock: {}, contracts: [], diff --git a/packages/erc721-watcher/src/cli/import-state.ts b/packages/erc721-watcher/src/cli/import-state.ts index f3fa932f..4f3aa5d8 100644 --- a/packages/erc721-watcher/src/cli/import-state.ts +++ b/packages/erc721-watcher/src/cli/import-state.ts @@ -12,7 +12,6 @@ import fs from 'fs'; import path from 'path'; import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients, StateKind } from '@vulcanize/util'; -import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import * as codec from '@ipld/dag-cbor'; import { Database } from '../database'; @@ -47,11 +46,6 @@ export const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); - await graphDb.init(); - - const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); @@ -65,11 +59,9 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); await indexer.init(); - graphWatcher.setIndexer(indexer); - const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); // Import data. diff --git a/packages/erc721-watcher/src/cli/index-block.ts b/packages/erc721-watcher/src/cli/index-block.ts new file mode 100644 index 00000000..7b81a9e4 --- /dev/null +++ b/packages/erc721-watcher/src/cli/index-block.ts @@ -0,0 +1,63 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import yargs from 'yargs'; +import 'reflect-metadata'; +import debug from 'debug'; +import assert from 'assert'; + +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, indexBlock } from '@vulcanize/util'; + +import { Database } from '../database'; +import { Indexer } from '../indexer'; + +const log = debug('vulcanize:index-block'); + +const main = async (): Promise => { + const argv = await yargs.parserConfiguration({ + 'parse-numbers': false + }).options({ + configFile: { + alias: 'f', + type: 'string', + require: true, + demandOption: true, + describe: 'Configuration file path (toml)', + default: DEFAULT_CONFIG_PATH + }, + block: { + type: 'number', + require: true, + demandOption: true, + describe: 'Block number to index' + } + }).argv; + + const config: Config = await getConfig(argv.configFile); + const { ethClient, ethProvider } = await initClients(config); + + const db = new Database(config.database); + await db.init(); + + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); + await indexer.init(); + + await indexBlock(indexer, jobQueueConfig.eventsInBatch, argv); + + await db.close(); +}; + +main().catch(err => { + log(err); +}).finally(() => { + process.exit(0); +}); diff --git a/packages/erc721-watcher/src/cli/inspect-cid.ts b/packages/erc721-watcher/src/cli/inspect-cid.ts index 4abc919b..d6c1acee 100644 --- a/packages/erc721-watcher/src/cli/inspect-cid.ts +++ b/packages/erc721-watcher/src/cli/inspect-cid.ts @@ -2,7 +2,6 @@ // Copyright 2021 Vulcanize, Inc. // -import path from 'path'; import assert from 'assert'; import yargs from 'yargs'; import 'reflect-metadata'; @@ -10,7 +9,6 @@ import debug from 'debug'; import util from 'util'; import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util'; -import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Database } from '../database'; import { Indexer } from '../indexer'; @@ -43,11 +41,6 @@ const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); - await graphDb.init(); - - const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); - const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -57,11 +50,9 @@ const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); await indexer.init(); - graphWatcher.setIndexer(indexer); - const ipldBlock = await indexer.getIPLDBlockByCid(argv.cid); assert(ipldBlock, 'IPLDBlock for the provided CID doesn\'t exist.'); diff --git a/packages/erc721-watcher/src/cli/reset-cmds/state.ts b/packages/erc721-watcher/src/cli/reset-cmds/state.ts index f711b770..8ca609f0 100644 --- a/packages/erc721-watcher/src/cli/reset-cmds/state.ts +++ b/packages/erc721-watcher/src/cli/reset-cmds/state.ts @@ -2,13 +2,11 @@ // Copyright 2021 Vulcanize, Inc. // -import path from 'path'; import debug from 'debug'; import { MoreThan } from 'typeorm'; import assert from 'assert'; import { getConfig, initClients, resetJobs, JobQueue } from '@vulcanize/util'; -import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Database } from '../../database'; import { Indexer } from '../../indexer'; @@ -50,11 +48,6 @@ export const handler = async (argv: any): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); - await graphDb.init(); - - const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); - const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -64,11 +57,9 @@ export const handler = async (argv: any): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); await indexer.init(); - graphWatcher.setIndexer(indexer); - const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false); assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`); assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`); diff --git a/packages/erc721-watcher/src/cli/watch-contract.ts b/packages/erc721-watcher/src/cli/watch-contract.ts index 63bf09d3..b22c49c7 100644 --- a/packages/erc721-watcher/src/cli/watch-contract.ts +++ b/packages/erc721-watcher/src/cli/watch-contract.ts @@ -2,14 +2,12 @@ // Copyright 2021 Vulcanize, Inc. // -import path from 'path'; import yargs from 'yargs'; import 'reflect-metadata'; import debug from 'debug'; import assert from 'assert'; import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util'; -import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Database } from '../database'; import { Indexer } from '../indexer'; @@ -59,11 +57,6 @@ const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); - await graphDb.init(); - - const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); - const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -73,11 +66,9 @@ const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); await indexer.init(); - graphWatcher.setIndexer(indexer); - await indexer.watchContract(argv.address, argv.kind, argv.checkpoint, argv.startingBlock); await db.close(); diff --git a/packages/erc721-watcher/src/entity/TransferCount.ts b/packages/erc721-watcher/src/entity/TransferCount.ts index 681ee91a..01d07ba2 100644 --- a/packages/erc721-watcher/src/entity/TransferCount.ts +++ b/packages/erc721-watcher/src/entity/TransferCount.ts @@ -2,7 +2,7 @@ // Copyright 2022 Vulcanize, Inc. // -import { Entity, PrimaryColumn, Column, Index } from 'typeorm'; +import { Entity, PrimaryColumn, Column } from 'typeorm'; @Entity() export class TransferCount { diff --git a/packages/erc721-watcher/src/fill.ts b/packages/erc721-watcher/src/fill.ts index 4a95f247..8db23cb7 100644 --- a/packages/erc721-watcher/src/fill.ts +++ b/packages/erc721-watcher/src/fill.ts @@ -2,7 +2,6 @@ // Copyright 2021 Vulcanize, Inc. // -import path from 'path'; import assert from 'assert'; import 'reflect-metadata'; import yargs from 'yargs'; @@ -11,7 +10,6 @@ import debug from 'debug'; import { PubSub } from 'apollo-server-express'; import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@vulcanize/util'; -import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Database } from './database'; import { Indexer } from './indexer'; @@ -58,11 +56,6 @@ export const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); - await graphDb.init(); - - const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); - const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -72,11 +65,9 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); await indexer.init(); - graphWatcher.setIndexer(indexer); - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); diff --git a/packages/erc721-watcher/src/hooks.ts b/packages/erc721-watcher/src/hooks.ts index 6567696b..296b9f9f 100644 --- a/packages/erc721-watcher/src/hooks.ts +++ b/packages/erc721-watcher/src/hooks.ts @@ -4,7 +4,7 @@ import assert from 'assert'; -import { updateStateForMappingType, updateStateForElementaryType } from '@vulcanize/util'; +import { updateStateForElementaryType } from '@vulcanize/util'; import { Indexer, ResultEvent } from './indexer'; import { TransferCount } from './entity/TransferCount'; diff --git a/packages/erc721-watcher/src/indexer.ts b/packages/erc721-watcher/src/indexer.ts index 883b0071..0021144e 100644 --- a/packages/erc721-watcher/src/indexer.ts +++ b/packages/erc721-watcher/src/indexer.ts @@ -29,7 +29,6 @@ import { StateKind, IpldStatus as IpldStatusInterface } from '@vulcanize/util'; -import { GraphWatcher } from '@vulcanize/graph-node'; import ERC721Artifacts from './artifacts/ERC721.json'; import { Database } from './database'; @@ -94,7 +93,6 @@ export class Indexer implements IPLDIndexerInterface { _ethProvider: BaseProvider _baseIndexer: BaseIndexer _serverConfig: ServerConfig - _graphWatcher: GraphWatcher; _abiMap: Map _storageLayoutMap: Map @@ -102,10 +100,7 @@ export class Indexer implements IPLDIndexerInterface { _ipfsClient: IPFSClient - _entityTypesMap: Map - _relationsMap: Map - - constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, graphWatcher: GraphWatcher) { + constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue) { assert(db); assert(ethClient); @@ -115,7 +110,6 @@ export class Indexer implements IPLDIndexerInterface { this._serverConfig = serverConfig; this._ipfsClient = new IPFSClient(this._serverConfig.ipfsApiAddr); this._baseIndexer = new BaseIndexer(this._serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue, this._ipfsClient); - this._graphWatcher = graphWatcher; this._abiMap = new Map(); this._storageLayoutMap = new Map(); @@ -131,13 +125,9 @@ export class Indexer implements IPLDIndexerInterface { assert(ERC721StorageLayout); this._storageLayoutMap.set(KIND_ERC721, ERC721StorageLayout); this._contractMap.set(KIND_ERC721, new ethers.utils.Interface(ERC721ABI)); - - this._entityTypesMap = new Map(); - - this._relationsMap = new Map(); } - get serverConfig () { + get serverConfig (): ServerConfig { return this._serverConfig; } @@ -452,9 +442,8 @@ export class Indexer implements IPLDIndexerInterface { return res; } - async saveOrUpdateTransferCount (transferCount: TransferCount) { + async saveOrUpdateTransferCount (transferCount: TransferCount): Promise { const dbTx = await this._db.createTransactionRunner(); - let res; try { await this._db.saveTransferCount(dbTx, transferCount); @@ -464,8 +453,6 @@ export class Indexer implements IPLDIndexerInterface { } finally { await dbTx.release(); } - - return res; } async _name (blockHash: string, contractAddress: string, diff = false): Promise { @@ -784,20 +771,9 @@ export class Indexer implements IPLDIndexerInterface { await this._baseIndexer.removeIPLDBlocks(blockNumber, kind); } - async getSubgraphEntity (entity: new () => Entity, id: string, block?: BlockHeight): Promise { - const relations = this._relationsMap.get(entity) || {}; - - const data = await this._graphWatcher.getEntity(entity, id, relations, block); - - return data; - } - async triggerIndexingOnEvent (event: Event): Promise { const resultEvent = this.getResultEvent(event); - // Call subgraph handler for event. - await this._graphWatcher.handleEvent(resultEvent); - // Call custom hook function for indexing on event. await handleEvent(this, resultEvent); } @@ -810,9 +786,6 @@ export class Indexer implements IPLDIndexerInterface { async processBlock (blockHash: string, blockNumber: number): Promise { // Call a function to create initial state for contracts. await this._baseIndexer.createInit(this, blockHash, blockNumber); - - // Call subgraph handler for block. - await this._graphWatcher.handleBlock(blockHash); } parseEventNameAndArgs (kind: string, logObj: any): any { @@ -848,7 +821,7 @@ export class Indexer implements IPLDIndexerInterface { switch (logDescription.name) { case APPROVAL_EVENT: { eventName = logDescription.name; - const { owner, approved, tokenId } = logDescription.args; + const [owner, approved, tokenId] = logDescription.args; eventInfo = { owner, approved, @@ -859,7 +832,7 @@ export class Indexer implements IPLDIndexerInterface { } case APPROVALFORALL_EVENT: { eventName = logDescription.name; - const { owner, operator, approved } = logDescription.args; + const [owner, operator, approved] = logDescription.args; eventInfo = { owner, operator, @@ -870,7 +843,7 @@ export class Indexer implements IPLDIndexerInterface { } case TRANSFER_EVENT: { eventName = logDescription.name; - const { from, to, tokenId } = logDescription.args; + const [from, to, tokenId] = logDescription.args; eventInfo = { from, to, @@ -1054,18 +1027,36 @@ export class Indexer implements IPLDIndexerInterface { return this._baseIndexer.getAncestorAtDepth(blockHash, depth); } - getEntityTypesMap (): Map { - return this._entityTypesMap; - } - async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { assert(blockHash); - - const logsPromise = this._ethClient.getLogs({ blockHash }); const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash }); + const blockPromise = this._ethClient.getBlockByHash(blockHash); + let logs: any[]; + + if (this._serverConfig.filterLogs) { + const watchedContracts = this._baseIndexer.getWatchedContracts(); + + // TODO: Query logs by multiple contracts. + const contractlogsPromises = watchedContracts.map((watchedContract): Promise => this._ethClient.getLogs({ + blockHash, + contract: watchedContract.address + })); + + const contractlogs = await Promise.all(contractlogsPromises); + + // Flatten logs by contract and sort by index. + logs = contractlogs.map(data => { + return data.logs; + }).flat() + .sort((a, b) => { + return a.index - b.index; + }); + } else { + ({ logs } = await this._ethClient.getLogs({ blockHash })); + } let [ - { block, logs }, + { block }, { allEthHeaderCids: { nodes: [ @@ -1077,7 +1068,7 @@ export class Indexer implements IPLDIndexerInterface { ] } } - ] = await Promise.all([logsPromise, transactionsPromise]); + ] = await Promise.all([blockPromise, transactionsPromise]); const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { acc[transaction.txHash] = transaction; diff --git a/packages/erc721-watcher/src/job-runner.ts b/packages/erc721-watcher/src/job-runner.ts index 6e934621..16c8926b 100644 --- a/packages/erc721-watcher/src/job-runner.ts +++ b/packages/erc721-watcher/src/job-runner.ts @@ -2,7 +2,6 @@ // Copyright 2021 Vulcanize, Inc. // -import path from 'path'; import assert from 'assert'; import 'reflect-metadata'; import yargs from 'yargs'; @@ -24,7 +23,6 @@ import { DEFAULT_CONFIG_PATH, initClients } from '@vulcanize/util'; -import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Indexer } from './indexer'; import { Database } from './database'; @@ -253,11 +251,6 @@ export const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); - await graphDb.init(); - - const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); - const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -267,11 +260,9 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); await indexer.init(); - graphWatcher.setIndexer(indexer); - const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); await jobRunner.start(); }; diff --git a/packages/erc721-watcher/src/resolvers.ts b/packages/erc721-watcher/src/resolvers.ts index 8f72a2ad..f912e3e7 100644 --- a/packages/erc721-watcher/src/resolvers.ts +++ b/packages/erc721-watcher/src/resolvers.ts @@ -12,7 +12,6 @@ import { ValueResult, BlockHeight, StateKind } from '@vulcanize/util'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; -import { TransferCount } from './entity/TransferCount'; const log = debug('vulcanize:resolver'); diff --git a/packages/erc721-watcher/src/server.ts b/packages/erc721-watcher/src/server.ts index 1dce3dbe..6d60c429 100644 --- a/packages/erc721-watcher/src/server.ts +++ b/packages/erc721-watcher/src/server.ts @@ -15,7 +15,6 @@ import 'graphql-import-node'; import { createServer } from 'http'; import { DEFAULT_CONFIG_PATH, getConfig, Config, JobQueue, KIND_ACTIVE, initClients } from '@vulcanize/util'; -import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { createResolvers } from './resolvers'; import { Indexer } from './indexer'; @@ -43,11 +42,6 @@ export const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); - await graphDb.init(); - - const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); @@ -60,11 +54,9 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); await indexer.init(); - graphWatcher.setIndexer(indexer); - const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); if (watcherKind === KIND_ACTIVE) { diff --git a/packages/erc721-watcher/tsconfig.json b/packages/erc721-watcher/tsconfig.json index 99712bdf..b30e1629 100644 --- a/packages/erc721-watcher/tsconfig.json +++ b/packages/erc721-watcher/tsconfig.json @@ -6,7 +6,7 @@ // "incremental": true, /* Enable incremental compilation */ "target": "es5", /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019', 'ES2020', 'ES2021', or 'ESNEXT'. */ "module": "commonjs", /* Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', 'es2020', or 'ESNext'. */ - // "lib": [], /* Specify library files to be included in the compilation. */ + "lib": ["es2019"], /* Specify library files to be included in the compilation. */ // "allowJs": true, /* Allow javascript files to be compiled. */ // "checkJs": true, /* Report errors in .js files. */ // "jsx": "preserve", /* Specify JSX code generation: 'preserve', 'react-native', 'react', 'react-jsx' or 'react-jsxdev'. */ diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index 402b84e6..02927dd9 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -161,6 +161,7 @@ class ServerConfig implements ServerConfigInterface { subgraphPath: string; wasmRestartBlocksInterval: number; filterLogs: boolean; + maxEventsBlockRange: number; constructor () { this.host = ''; @@ -173,5 +174,6 @@ class ServerConfig implements ServerConfigInterface { this.subgraphPath = ''; this.wasmRestartBlocksInterval = 0; this.filterLogs = false; + this.maxEventsBlockRange = 0; } } diff --git a/packages/graph-test-watcher/environments/local.toml b/packages/graph-test-watcher/environments/local.toml index e0138233..4bc0d780 100644 --- a/packages/graph-test-watcher/environments/local.toml +++ b/packages/graph-test-watcher/environments/local.toml @@ -15,6 +15,13 @@ subgraphPath = "../graph-node/test/subgraph/example1/build" wasmRestartBlocksInterval = 20 + # Boolean to filter logs by contract. + filterLogs = false + + # Max block range for which to return events in eventsInRange GQL query. + # Use -1 for skipping check on block range. + maxEventsBlockRange = 1000 + [database] type = "postgres" host = "localhost" diff --git a/packages/graph-test-watcher/package.json b/packages/graph-test-watcher/package.json index 3a9e21c9..b27d31dd 100644 --- a/packages/graph-test-watcher/package.json +++ b/packages/graph-test-watcher/package.json @@ -15,7 +15,8 @@ "checkpoint": "DEBUG=vulcanize:* ts-node src/cli/checkpoint.ts", "export-state": "DEBUG=vulcanize:* ts-node src/cli/export-state.ts", "import-state": "DEBUG=vulcanize:* ts-node src/cli/import-state.ts", - "inspect-cid": "DEBUG=vulcanize:* ts-node src/cli/inspect-cid.ts" + "inspect-cid": "DEBUG=vulcanize:* ts-node src/cli/inspect-cid.ts", + "index-block": "DEBUG=vulcanize:* ts-node src/cli/index-block.ts" }, "repository": { "type": "git", diff --git a/packages/graph-test-watcher/src/cli/index-block.ts b/packages/graph-test-watcher/src/cli/index-block.ts new file mode 100644 index 00000000..02cd13b3 --- /dev/null +++ b/packages/graph-test-watcher/src/cli/index-block.ts @@ -0,0 +1,73 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import path from 'path'; +import yargs from 'yargs'; +import 'reflect-metadata'; +import debug from 'debug'; +import assert from 'assert'; + +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, indexBlock } from '@vulcanize/util'; +import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; + +import { Database } from '../database'; +import { Indexer } from '../indexer'; + +const log = debug('vulcanize:index-block'); + +const main = async (): Promise => { + const argv = await yargs.parserConfiguration({ + 'parse-numbers': false + }).options({ + configFile: { + alias: 'f', + type: 'string', + require: true, + demandOption: true, + describe: 'Configuration file path (toml)', + default: DEFAULT_CONFIG_PATH + }, + block: { + type: 'number', + require: true, + demandOption: true, + describe: 'Block number to index' + } + }).argv; + + const config: Config = await getConfig(argv.configFile); + const { ethClient, ethProvider } = await initClients(config); + + const db = new Database(config.database); + await db.init(); + + const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); + await graphDb.init(); + + const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); + + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher); + await indexer.init(); + + graphWatcher.setIndexer(indexer); + await graphWatcher.init(); + + await indexBlock(indexer, jobQueueConfig.eventsInBatch, argv); + + await db.close(); +}; + +main().catch(err => { + log(err); +}).finally(() => { + process.exit(0); +}); diff --git a/packages/graph-test-watcher/src/indexer.ts b/packages/graph-test-watcher/src/indexer.ts index 9c8c38e7..d89a955f 100644 --- a/packages/graph-test-watcher/src/indexer.ts +++ b/packages/graph-test-watcher/src/indexer.ts @@ -160,7 +160,7 @@ export class Indexer implements IPLDIndexerInterface { this._populateRelationsMap(); } - get serverConfig () { + get serverConfig (): ServerConfig { return this._serverConfig; } @@ -758,12 +758,34 @@ export class Indexer implements IPLDIndexerInterface { async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { assert(blockHash); - - const logsPromise = this._ethClient.getLogs({ blockHash }); const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash }); + const blockPromise = this._ethClient.getBlockByHash(blockHash); + let logs: any[]; + + if (this._serverConfig.filterLogs) { + const watchedContracts = this._baseIndexer.getWatchedContracts(); + + // TODO: Query logs by multiple contracts. + const contractlogsPromises = watchedContracts.map((watchedContract): Promise => this._ethClient.getLogs({ + blockHash, + contract: watchedContract.address + })); + + const contractlogs = await Promise.all(contractlogsPromises); + + // Flatten logs by contract and sort by index. + logs = contractlogs.map(data => { + return data.logs; + }).flat() + .sort((a, b) => { + return a.index - b.index; + }); + } else { + ({ logs } = await this._ethClient.getLogs({ blockHash })); + } let [ - { block, logs }, + { block }, { allEthHeaderCids: { nodes: [ @@ -775,7 +797,7 @@ export class Indexer implements IPLDIndexerInterface { ] } } - ] = await Promise.all([logsPromise, transactionsPromise]); + ] = await Promise.all([blockPromise, transactionsPromise]); const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { acc[transaction.txHash] = transaction; diff --git a/packages/graph-test-watcher/tsconfig.json b/packages/graph-test-watcher/tsconfig.json index 99712bdf..b30e1629 100644 --- a/packages/graph-test-watcher/tsconfig.json +++ b/packages/graph-test-watcher/tsconfig.json @@ -6,7 +6,7 @@ // "incremental": true, /* Enable incremental compilation */ "target": "es5", /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019', 'ES2020', 'ES2021', or 'ESNEXT'. */ "module": "commonjs", /* Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', 'es2020', or 'ESNext'. */ - // "lib": [], /* Specify library files to be included in the compilation. */ + "lib": ["es2019"], /* Specify library files to be included in the compilation. */ // "allowJs": true, /* Allow javascript files to be compiled. */ // "checkJs": true, /* Report errors in .js files. */ // "jsx": "preserve", /* Specify JSX code generation: 'preserve', 'react-native', 'react', 'react-jsx' or 'react-jsxdev'. */ diff --git a/packages/ipld-eth-client/src/eth-client.ts b/packages/ipld-eth-client/src/eth-client.ts index de5e99e8..0f644a47 100644 --- a/packages/ipld-eth-client/src/eth-client.ts +++ b/packages/ipld-eth-client/src/eth-client.ts @@ -3,7 +3,6 @@ // import assert from 'assert'; -import _ from 'lodash'; import { Cache } from '@vulcanize/cache'; @@ -116,24 +115,10 @@ export class EthClient { async getLogs (vars: Vars): Promise { const result = await this._getCachedOrFetch('getLogs', vars); const { - getLogs: resultLogs, - block: { - number: blockNumHex, - timestamp: timestampHex, - parent - } + getLogs } = result; - const block = { - hash: vars.blockHash, - number: parseInt(blockNumHex, 16), - timestamp: parseInt(timestampHex, 16), - parent - }; - - const logs = resultLogs.map((logEntry: any) => _.merge({}, logEntry, { transaction: { block } })); - - return { logs, block }; + return { logs: getLogs }; } async _getCachedOrFetch (queryName: keyof typeof ethQueries, vars: Vars): Promise { diff --git a/packages/ipld-eth-client/src/eth-queries.ts b/packages/ipld-eth-client/src/eth-queries.ts index 7e0091c4..257fcb76 100644 --- a/packages/ipld-eth-client/src/eth-queries.ts +++ b/packages/ipld-eth-client/src/eth-queries.ts @@ -31,13 +31,6 @@ query getLogs($blockHash: Bytes32!, $contract: Address) { receiptCID status } - block(hash: $blockHash) { - number - timestamp - parent { - hash - } - } } `; diff --git a/packages/lighthouse-watcher/src/indexer.ts b/packages/lighthouse-watcher/src/indexer.ts index 4875a295..9d48ffce 100644 --- a/packages/lighthouse-watcher/src/indexer.ts +++ b/packages/lighthouse-watcher/src/indexer.ts @@ -82,7 +82,11 @@ export class Indexer { async fetchEvents (blockHash: string): Promise> { assert(this._config.watch); const contract = this._config.watch.lighthouse; - const { logs, block } = await this._ethClient.getLogs({ blockHash, contract }); + + const [{ logs }, { block }] = await Promise.all([ + this._ethClient.getLogs({ blockHash, contract }), + this._ethClient.getBlockByHash(blockHash) + ]); const { allEthHeaderCids: { diff --git a/packages/mobymask-watcher/environments/local.toml b/packages/mobymask-watcher/environments/local.toml index ec60fe7b..b3a165e7 100644 --- a/packages/mobymask-watcher/environments/local.toml +++ b/packages/mobymask-watcher/environments/local.toml @@ -15,6 +15,10 @@ # Boolean to filter logs by contract. filterLogs = true + # Max block range for which to return events in eventsInRange GQL query. + # Use -1 for skipping check on block range. + maxEventsBlockRange = -1 + [database] type = "postgres" host = "localhost" diff --git a/packages/mobymask-watcher/src/cli/index-block.ts b/packages/mobymask-watcher/src/cli/index-block.ts index 3c7dc4bd..7b81a9e4 100644 --- a/packages/mobymask-watcher/src/cli/index-block.ts +++ b/packages/mobymask-watcher/src/cli/index-block.ts @@ -7,16 +7,12 @@ import 'reflect-metadata'; import debug from 'debug'; import assert from 'assert'; -import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, OrderDirection, UNKNOWN_EVENT_NAME } from '@vulcanize/util'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, indexBlock } from '@vulcanize/util'; import { Database } from '../database'; import { Indexer } from '../indexer'; -import { BlockProgress } from '../entity/BlockProgress'; -import { Event } from '../entity/Event'; -const DEFAULT_EVENTS_IN_BATCH = 50; - -const log = debug('vulcanize:watch-contract'); +const log = debug('vulcanize:index-block'); const main = async (): Promise => { const argv = await yargs.parserConfiguration({ @@ -55,70 +51,7 @@ const main = async (): Promise => { const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); await indexer.init(); - let blockProgressEntities: Partial[] = await indexer.getBlocksAtHeight(argv.block, false); - - if (!blockProgressEntities.length) { - console.time('time:index-block#getBlocks-ipld-eth-server'); - const blocks = await indexer.getBlocks({ blockNumber: argv.block }); - - blockProgressEntities = blocks.map((block: any): Partial => { - block.blockTimestamp = block.timestamp; - - return block; - }); - - console.timeEnd('time:index-block#getBlocks-ipld-eth-server'); - } - - assert(blockProgressEntities.length, `No blocks fetched for block number ${argv.block}.`); - - for (let blockProgress of blockProgressEntities) { - // Check if blockProgress fetched from database. - if (!blockProgress.id) { - blockProgress = await indexer.fetchBlockEvents(blockProgress); - } - - assert(blockProgress instanceof BlockProgress); - assert(indexer.processBlock); - await indexer.processBlock(blockProgress.blockHash, blockProgress.blockNumber); - - // Check if block has unprocessed events. - if (blockProgress.numProcessedEvents < blockProgress.numEvents) { - while (!blockProgress.isComplete) { - console.time('time:index-block#fetching_events_batch'); - - // Fetch events in batches - const events = await indexer.getBlockEvents( - blockProgress.blockHash, - { - index: [ - { value: blockProgress.lastProcessedEventIndex + 1, operator: 'gte', not: false } - ] - }, - { - limit: jobQueueConfig.eventsInBatch || DEFAULT_EVENTS_IN_BATCH, - orderBy: 'index', - orderDirection: OrderDirection.asc - } - ); - - console.timeEnd('time:index-block#fetching_events_batch'); - - if (events.length) { - log(`Processing events batch from index ${events[0].index} to ${events[0].index + events.length - 1}`); - } - - console.time('time:index-block#processEvents-processing_events_batch'); - - for (const event of events) { - // Process events in loop - await processEvent(indexer, blockProgress, event); - } - - console.timeEnd('time:index-block#processEvents-processing_events_batch'); - } - } - } + await indexBlock(indexer, jobQueueConfig.eventsInBatch, argv); await db.close(); }; @@ -128,57 +61,3 @@ main().catch(err => { }).finally(() => { process.exit(0); }); - -/** - * Process individual event from database. - * @param indexer - * @param block - * @param event - */ -const processEvent = async (indexer: Indexer, block: BlockProgress, event: Event) => { - const eventIndex = event.index; - - // Check that events are processed in order. - if (eventIndex <= block.lastProcessedEventIndex) { - throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash}, got event index ${eventIndex} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`); - } - - // Check if previous event in block has been processed exactly before this and abort if not. - // Skip check if logs fetched are filtered by contract address. - if (!indexer.serverConfig.filterLogs) { - const prevIndex = eventIndex - 1; - - if (prevIndex !== block.lastProcessedEventIndex) { - throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash},` + - ` prev event index ${prevIndex}, got event index ${event.index} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`); - } - } - - let watchedContract; - - if (!indexer.isWatchedContract) { - watchedContract = true; - } else { - watchedContract = await indexer.isWatchedContract(event.contract); - } - - if (watchedContract) { - // We might not have parsed this event yet. This can happen if the contract was added - // as a result of a previous event in the same block. - if (event.eventName === UNKNOWN_EVENT_NAME) { - const logObj = JSON.parse(event.extraInfo); - - assert(indexer.parseEventNameAndArgs); - assert(typeof watchedContract !== 'boolean'); - const { eventName, eventInfo } = indexer.parseEventNameAndArgs(watchedContract.kind, logObj); - - event.eventName = eventName; - event.eventInfo = JSON.stringify(eventInfo); - event = await indexer.saveEventEntity(event); - } - - await indexer.processEvent(event); - } - - block = await indexer.updateBlockProgress(block, event.index); -}; diff --git a/packages/mobymask-watcher/src/cli/reset-cmds/state.ts b/packages/mobymask-watcher/src/cli/reset-cmds/state.ts index bf695b67..c0c29be4 100644 --- a/packages/mobymask-watcher/src/cli/reset-cmds/state.ts +++ b/packages/mobymask-watcher/src/cli/reset-cmds/state.ts @@ -12,7 +12,6 @@ import { Database } from '../../database'; import { Indexer } from '../../indexer'; import { BlockProgress } from '../../entity/BlockProgress'; -import { DomainHash } from '../../entity/DomainHash'; import { MultiNonce } from '../../entity/MultiNonce'; import { _Owner } from '../../entity/_Owner'; import { IsRevoked } from '../../entity/IsRevoked'; @@ -60,7 +59,7 @@ export const handler = async (argv: any): Promise => { const dbTx = await db.createTransactionRunner(); try { - const entities = [BlockProgress, DomainHash, MultiNonce, _Owner, IsRevoked, IsPhisher, IsMember]; + const entities = [BlockProgress, MultiNonce, _Owner, IsRevoked, IsPhisher, IsMember]; const removeEntitiesPromise = entities.map(async entityClass => { return db.removeEntities(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) }); diff --git a/packages/mobymask-watcher/src/client.ts b/packages/mobymask-watcher/src/client.ts index 1d0ecdeb..c747c9bb 100644 --- a/packages/mobymask-watcher/src/client.ts +++ b/packages/mobymask-watcher/src/client.ts @@ -17,15 +17,6 @@ export class Client { this._client = new GraphQLClient(config); } - async getDomainHash (blockHash: string, contractAddress: string): Promise { - const { domainHash } = await this._client.query( - gql(queries.domainHash), - { blockHash, contractAddress } - ); - - return domainHash; - } - async getMultiNonce (blockHash: string, contractAddress: string, key0: string, key1: bigint): Promise { const { multiNonce } = await this._client.query( gql(queries.multiNonce), diff --git a/packages/mobymask-watcher/src/database.ts b/packages/mobymask-watcher/src/database.ts index 5144a4a2..2309093b 100644 --- a/packages/mobymask-watcher/src/database.ts +++ b/packages/mobymask-watcher/src/database.ts @@ -14,7 +14,6 @@ import { SyncStatus } from './entity/SyncStatus'; import { IpldStatus } from './entity/IpldStatus'; import { BlockProgress } from './entity/BlockProgress'; import { IPLDBlock } from './entity/IPLDBlock'; -import { DomainHash } from './entity/DomainHash'; import { MultiNonce } from './entity/MultiNonce'; import { _Owner } from './entity/_Owner'; import { IsRevoked } from './entity/IsRevoked'; @@ -48,14 +47,6 @@ export class Database implements IPLDDatabaseInterface { return this._baseDatabase.close(); } - async getDomainHash ({ blockHash, contractAddress }: { blockHash: string, contractAddress: string }): Promise { - return this._conn.getRepository(DomainHash) - .findOne({ - blockHash, - contractAddress - }); - } - async getMultiNonce ({ blockHash, contractAddress, key0, key1 }: { blockHash: string, contractAddress: string, key0: string, key1: bigint }): Promise { return this._conn.getRepository(MultiNonce) .findOne({ @@ -111,12 +102,6 @@ export class Database implements IPLDDatabaseInterface { }); } - async saveDomainHash ({ blockHash, blockNumber, contractAddress, value, proof }: DeepPartial): Promise { - const repo = this._conn.getRepository(DomainHash); - const entity = repo.create({ blockHash, blockNumber, contractAddress, value, proof }); - return repo.save(entity); - } - async saveMultiNonce ({ blockHash, blockNumber, contractAddress, key0, key1, value, proof }: DeepPartial): Promise { const repo = this._conn.getRepository(MultiNonce); const entity = repo.create({ blockHash, blockNumber, contractAddress, key0, key1, value, proof }); @@ -332,7 +317,6 @@ export class Database implements IPLDDatabaseInterface { } _setPropColMaps (): void { - this._propColMaps.DomainHash = this._getPropertyColumnMapForEntity('DomainHash'); this._propColMaps.MultiNonce = this._getPropertyColumnMapForEntity('MultiNonce'); this._propColMaps._Owner = this._getPropertyColumnMapForEntity('_Owner'); this._propColMaps.IsRevoked = this._getPropertyColumnMapForEntity('IsRevoked'); diff --git a/packages/mobymask-watcher/src/entity/DomainHash.ts b/packages/mobymask-watcher/src/entity/DomainHash.ts deleted file mode 100644 index c04fdb45..00000000 --- a/packages/mobymask-watcher/src/entity/DomainHash.ts +++ /dev/null @@ -1,27 +0,0 @@ -// -// Copyright 2021 Vulcanize, Inc. -// - -import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; - -@Entity() -@Index(['blockHash', 'contractAddress'], { unique: true }) -export class DomainHash { - @PrimaryGeneratedColumn() - id!: number; - - @Column('varchar', { length: 66 }) - blockHash!: string; - - @Column('integer') - blockNumber!: number; - - @Column('varchar', { length: 42 }) - contractAddress!: string; - - @Column('varchar') - value!: string; - - @Column('text', { nullable: true }) - proof!: string; -} diff --git a/packages/mobymask-watcher/src/gql/queries/domainHash.gql b/packages/mobymask-watcher/src/gql/queries/domainHash.gql deleted file mode 100644 index 99826420..00000000 --- a/packages/mobymask-watcher/src/gql/queries/domainHash.gql +++ /dev/null @@ -1,8 +0,0 @@ -query domainHash($blockHash: String!, $contractAddress: String!){ - domainHash(blockHash: $blockHash, contractAddress: $contractAddress){ - value - proof{ - data - } - } -} \ No newline at end of file diff --git a/packages/mobymask-watcher/src/gql/queries/index.ts b/packages/mobymask-watcher/src/gql/queries/index.ts index 389f74c5..bdcdde02 100644 --- a/packages/mobymask-watcher/src/gql/queries/index.ts +++ b/packages/mobymask-watcher/src/gql/queries/index.ts @@ -3,7 +3,6 @@ import path from 'path'; export const events = fs.readFileSync(path.join(__dirname, 'events.gql'), 'utf8'); export const eventsInRange = fs.readFileSync(path.join(__dirname, 'eventsInRange.gql'), 'utf8'); -export const domainHash = fs.readFileSync(path.join(__dirname, 'domainHash.gql'), 'utf8'); export const multiNonce = fs.readFileSync(path.join(__dirname, 'multiNonce.gql'), 'utf8'); export const _owner = fs.readFileSync(path.join(__dirname, '_owner.gql'), 'utf8'); export const isRevoked = fs.readFileSync(path.join(__dirname, 'isRevoked.gql'), 'utf8'); diff --git a/packages/mobymask-watcher/src/indexer.ts b/packages/mobymask-watcher/src/indexer.ts index 608f8031..0f42c295 100644 --- a/packages/mobymask-watcher/src/indexer.ts +++ b/packages/mobymask-watcher/src/indexer.ts @@ -45,7 +45,6 @@ import { IsPhisher } from './entity/IsPhisher'; import { IsRevoked } from './entity/IsRevoked'; import { _Owner } from './entity/_Owner'; import { MultiNonce } from './entity/MultiNonce'; -import { DomainHash } from './entity/DomainHash'; const log = debug('vulcanize:indexer'); @@ -56,8 +55,6 @@ const MEMBERSTATUSUPDATED_EVENT = 'MemberStatusUpdated'; const OWNERSHIPTRANSFERRED_EVENT = 'OwnershipTransferred'; const PHISHERSTATUSUPDATED_EVENT = 'PhisherStatusUpdated'; -const MAX_EVENTS_BLOCK_RANGE = -1; - export type ResultEvent = { block: { cid: string; @@ -109,9 +106,6 @@ export class Indexer implements IPLDIndexerInterface { _ipfsClient: IPFSClient - _entityTypesMap: Map - _relationsMap: Map - constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, ethProvider: JsonRpcProvider, jobQueue: JobQueue) { assert(db); assert(ethClient); @@ -137,13 +131,9 @@ export class Indexer implements IPLDIndexerInterface { assert(PhisherRegistryStorageLayout); this._storageLayoutMap.set(KIND_PHISHERREGISTRY, PhisherRegistryStorageLayout); this._contractMap.set(KIND_PHISHERREGISTRY, new ethers.utils.Interface(PhisherRegistryABI)); - - this._entityTypesMap = new Map(); - - this._relationsMap = new Map(); } - get serverConfig () { + get serverConfig (): ServerConfig { return this._serverConfig; } @@ -207,37 +197,6 @@ export class Indexer implements IPLDIndexerInterface { }; } - async domainHash (blockHash: string, contractAddress: string, diff = false): Promise { - let entity = await this._db.getDomainHash({ blockHash, contractAddress }); - - if (entity) { - log('domainHash: db hit.'); - } else { - log('domainHash: db miss, fetching from upstream server'); - - entity = await this._getStorageEntity( - blockHash, - contractAddress, - DomainHash, - 'domainHash', - {}, - '' - ); - - await this._db.saveDomainHash(entity); - - if (diff) { - const stateUpdate = updateStateForElementaryType({}, 'domainHash', entity.value.toString()); - await this.createDiffStaged(contractAddress, blockHash, stateUpdate); - } - } - - return { - value: entity.value, - proof: JSON.parse(entity.proof) - }; - } - async multiNonce (blockHash: string, contractAddress: string, key0: string, key1: bigint, diff = false): Promise { let entity = await this._db.getMultiNonce({ blockHash, contractAddress, key0, key1 }); @@ -740,7 +699,7 @@ export class Indexer implements IPLDIndexerInterface { } async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise> { - return this._baseIndexer.getEventsInRange(fromBlockNumber, toBlockNumber, MAX_EVENTS_BLOCK_RANGE); + return this._baseIndexer.getEventsInRange(fromBlockNumber, toBlockNumber, this._serverConfig.maxEventsBlockRange); } async getSyncStatus (): Promise { @@ -820,49 +779,48 @@ export class Indexer implements IPLDIndexerInterface { return this._contractMap.get(kind); } - getEntityTypesMap (): Map { - return this._entityTypesMap; - } - async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { assert(blockHash); - let block: any, logs: any[]; + const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash }); + const blockPromise = this._ethClient.getBlockByHash(blockHash); + let logs: any[]; if (this._serverConfig.filterLogs) { const watchedContracts = this._baseIndexer.getWatchedContracts(); // TODO: Query logs by multiple contracts. - const contractlogsWithBlockPromises = watchedContracts.map((watchedContract): Promise => this._ethClient.getLogs({ + const contractlogsPromises = watchedContracts.map((watchedContract): Promise => this._ethClient.getLogs({ blockHash, contract: watchedContract.address })); - const contractlogsWithBlock = await Promise.all(contractlogsWithBlockPromises); + const contractlogs = await Promise.all(contractlogsPromises); // Flatten logs by contract and sort by index. - logs = contractlogsWithBlock.map(data => { + logs = contractlogs.map(data => { return data.logs; }).flat() .sort((a, b) => { return a.index - b.index; }); - - ({ block } = await this._ethClient.getBlockByHash(blockHash)); } else { - ({ block, logs } = await this._ethClient.getLogs({ blockHash })); + ({ logs } = await this._ethClient.getLogs({ blockHash })); } - const { - allEthHeaderCids: { - nodes: [ - { - ethTransactionCidsByHeaderId: { - nodes: transactions + let [ + { block }, + { + allEthHeaderCids: { + nodes: [ + { + ethTransactionCidsByHeaderId: { + nodes: transactions + } } - } - ] + ] + } } - } = await this._ethClient.getBlockWithTransactions({ blockHash }); + ] = await Promise.all([blockPromise, transactionsPromise]); const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { acc[transaction.txHash] = transaction; diff --git a/packages/mobymask-watcher/src/resolvers.ts b/packages/mobymask-watcher/src/resolvers.ts index 2b0a354b..102f6775 100644 --- a/packages/mobymask-watcher/src/resolvers.ts +++ b/packages/mobymask-watcher/src/resolvers.ts @@ -8,7 +8,7 @@ import debug from 'debug'; import Decimal from 'decimal.js'; import { GraphQLScalarType } from 'graphql'; -import { ValueResult, BlockHeight, StateKind } from '@vulcanize/util'; +import { ValueResult, StateKind } from '@vulcanize/util'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; @@ -58,11 +58,6 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch }, Query: { - domainHash: (_: any, { blockHash, contractAddress }: { blockHash: string, contractAddress: string }): Promise => { - log('domainHash', blockHash, contractAddress); - return indexer.domainHash(blockHash, contractAddress); - }, - multiNonce: (_: any, { blockHash, contractAddress, key0, key1 }: { blockHash: string, contractAddress: string, key0: string, key1: bigint }): Promise => { log('multiNonce', blockHash, contractAddress, key0, key1); return indexer.multiNonce(blockHash, contractAddress, key0, key1); diff --git a/packages/mobymask-watcher/src/schema.gql b/packages/mobymask-watcher/src/schema.gql index 4906a73f..3afa168a 100644 --- a/packages/mobymask-watcher/src/schema.gql +++ b/packages/mobymask-watcher/src/schema.gql @@ -90,7 +90,6 @@ type ResultIPLDBlock { type Query { events(blockHash: String!, contractAddress: String!, name: String): [ResultEvent!] eventsInRange(fromBlockNumber: Int!, toBlockNumber: Int!): [ResultEvent!] - domainHash(blockHash: String!, contractAddress: String!): ResultString! multiNonce(blockHash: String!, contractAddress: String!, key0: String!, key1: BigInt!): ResultBigInt! _owner(blockHash: String!, contractAddress: String!): ResultString! isRevoked(blockHash: String!, contractAddress: String!, key0: String!): ResultBoolean! diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index 75e6c05c..23e90bd4 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -63,7 +63,7 @@ export class Indexer implements IndexerInterface { this._isDemo = serverConfig.mode === 'demo'; } - get serverConfig () { + get serverConfig (): ServerConfig { return this._serverConfig; } diff --git a/packages/uni-info-watcher/src/server.ts b/packages/uni-info-watcher/src/server.ts index 81676a80..dcdcc267 100644 --- a/packages/uni-info-watcher/src/server.ts +++ b/packages/uni-info-watcher/src/server.ts @@ -40,7 +40,7 @@ export const main = async (): Promise => { const config: Config = await getConfig(argv.f); const { ethClient } = await initClients(config); - const { host, port, mode } = config.server; + const { host, port } = config.server; const db = new Database(config.database); await db.init(); diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index 3fb57b86..cc1222ed 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -3,7 +3,7 @@ // import debug from 'debug'; -import { DeepPartial, FindConditions, FindManyOptions, QueryRunner, Server } from 'typeorm'; +import { DeepPartial, FindConditions, FindManyOptions, QueryRunner } from 'typeorm'; import JSONbig from 'json-bigint'; import { ethers } from 'ethers'; import assert from 'assert'; @@ -58,7 +58,7 @@ export class Indexer implements IndexerInterface { this._nfpmContract = new ethers.utils.Interface(nfpmABI); } - get serverConfig () { + get serverConfig (): ServerConfig { return this._serverConfig; } @@ -435,12 +435,34 @@ export class Indexer implements IndexerInterface { async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { assert(blockHash); - - const logsPromise = this._ethClient.getLogs({ blockHash }); const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash }); + const blockPromise = this._ethClient.getBlockByHash(blockHash); + let logs: any[]; + + if (this._serverConfig.filterLogs) { + const watchedContracts = this._baseIndexer.getWatchedContracts(); + + // TODO: Query logs by multiple contracts. + const contractlogsPromises = watchedContracts.map((watchedContract): Promise => this._ethClient.getLogs({ + blockHash, + contract: watchedContract.address + })); + + const contractlogs = await Promise.all(contractlogsPromises); + + // Flatten logs by contract and sort by index. + logs = contractlogs.map(data => { + return data.logs; + }).flat() + .sort((a, b) => { + return a.index - b.index; + }); + } else { + ({ logs } = await this._ethClient.getLogs({ blockHash })); + } let [ - { block, logs }, + { block }, { allEthHeaderCids: { nodes: [ @@ -452,7 +474,7 @@ export class Indexer implements IndexerInterface { ] } } - ] = await Promise.all([logsPromise, transactionsPromise]); + ] = await Promise.all([blockPromise, transactionsPromise]); const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { acc[transaction.txHash] = transaction; diff --git a/packages/util/index.ts b/packages/util/index.ts index 4d567d40..64bf288a 100644 --- a/packages/util/index.ts +++ b/packages/util/index.ts @@ -17,3 +17,4 @@ export * from './src/graph-decimal'; export * from './src/ipld-indexer'; export * from './src/ipld-database'; export * from './src/ipfs'; +export * from './src/index-block'; diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index e126a769..2b601595 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -1,9 +1,13 @@ import debug from 'debug'; +import assert from 'assert'; -import { JOB_KIND_PRUNE, QUEUE_BLOCK_PROCESSING, JOB_KIND_INDEX } from './constants'; +import { JOB_KIND_PRUNE, QUEUE_BLOCK_PROCESSING, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME } from './constants'; import { JobQueue } from './job-queue'; -import { IndexerInterface } from './types'; +import { BlockProgressInterface, IndexerInterface } from './types'; import { wait } from './misc'; +import { OrderDirection } from './database'; + +const DEFAULT_EVENTS_IN_BATCH = 50; const log = debug('vulcanize:common'); @@ -98,3 +102,93 @@ export const processBlockByNumber = async ( await wait(blockDelayInMilliSecs); } }; + +/** + * Process events in batches for a block. + * @param indexer + * @param block + * @param eventsInBatch + */ +export const processBatchEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise => { + // Check if block processing is complete. + while (!block.isComplete) { + console.time('time:common#processBacthEvents-fetching_events_batch'); + + // Fetch events in batches + const events = await indexer.getBlockEvents( + block.blockHash, + { + index: [ + { value: block.lastProcessedEventIndex + 1, operator: 'gte', not: false } + ] + }, + { + limit: eventsInBatch || DEFAULT_EVENTS_IN_BATCH, + orderBy: 'index', + orderDirection: OrderDirection.asc + } + ); + + console.timeEnd('time:common#processBacthEvents-fetching_events_batch'); + + if (events.length) { + log(`Processing events batch from index ${events[0].index} to ${events[0].index + events.length - 1}`); + } + + console.time('time:common#processBacthEvents-processing_events_batch'); + + for (let event of events) { + // Process events in loop + + const eventIndex = event.index; + // log(`Processing event ${event.id} index ${eventIndex}`); + + // Check that events are processed in order. + if (eventIndex <= block.lastProcessedEventIndex) { + throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash}, got event index ${eventIndex} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`); + } + + // Check if previous event in block has been processed exactly before this and abort if not. + // Skip check if logs fetched are filtered by contract address. + if (!indexer.serverConfig.filterLogs) { + const prevIndex = eventIndex - 1; + + if (prevIndex !== block.lastProcessedEventIndex) { + throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash},` + + ` prev event index ${prevIndex}, got event index ${event.index} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`); + } + } + + let watchedContract; + + if (!indexer.isWatchedContract) { + // uni-info-watcher indexer doesn't have watched contracts implementation. + watchedContract = true; + } else { + watchedContract = await indexer.isWatchedContract(event.contract); + } + + if (watchedContract) { + // We might not have parsed this event yet. This can happen if the contract was added + // as a result of a previous event in the same block. + if (event.eventName === UNKNOWN_EVENT_NAME) { + const logObj = JSON.parse(event.extraInfo); + + assert(indexer.parseEventNameAndArgs); + assert(typeof watchedContract !== 'boolean'); + const { eventName, eventInfo } = indexer.parseEventNameAndArgs(watchedContract.kind, logObj); + + event.eventName = eventName; + event.eventInfo = JSON.stringify(eventInfo); + event = await indexer.saveEventEntity(event); + } + + await indexer.processEvent(event); + } + + block = await indexer.updateBlockProgress(block, event.index); + } + + console.timeEnd('time:common#processBacthEvents-processing_events_batch'); + } +}; diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index 552503ec..e3ee9b1f 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -11,7 +11,7 @@ import { ConnectionOptions } from 'typeorm'; import { Config as CacheConfig, getCache } from '@vulcanize/cache'; import { EthClient } from '@vulcanize/ipld-eth-client'; -import { BaseProvider, JsonRpcProvider } from '@ethersproject/providers'; +import { JsonRpcProvider } from '@ethersproject/providers'; import { getCustomProvider } from './misc'; @@ -35,6 +35,7 @@ export interface ServerConfig { subgraphPath: string; wasmRestartBlocksInterval: number; filterLogs: boolean; + maxEventsBlockRange: number; } export interface UpstreamConfig { diff --git a/packages/util/src/index-block.ts b/packages/util/src/index-block.ts new file mode 100644 index 00000000..de6941cc --- /dev/null +++ b/packages/util/src/index-block.ts @@ -0,0 +1,49 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import assert from 'assert'; + +import { BlockProgressInterface, IndexerInterface } from './types'; +import { processBatchEvents } from './common'; + +export const indexBlock = async ( + indexer: IndexerInterface, + eventsInBatch: number, + argv: { + block: number, + } +): Promise => { + let blockProgressEntities: Partial[] = await indexer.getBlocksAtHeight(argv.block, false); + + if (!blockProgressEntities.length) { + console.time('time:index-block#getBlocks-ipld-eth-server'); + const blocks = await indexer.getBlocks({ blockNumber: argv.block }); + + blockProgressEntities = blocks.map((block: any): Partial => { + block.blockTimestamp = block.timestamp; + + return block; + }); + + console.timeEnd('time:index-block#getBlocks-ipld-eth-server'); + } + + assert(blockProgressEntities.length, `No blocks fetched for block number ${argv.block}.`); + + for (const partialblockProgress of blockProgressEntities) { + let blockProgress: BlockProgressInterface; + + // Check if blockProgress fetched from database. + if (!partialblockProgress.id) { + blockProgress = await indexer.fetchBlockEvents(partialblockProgress); + } else { + blockProgress = partialblockProgress as BlockProgressInterface; + } + + assert(indexer.processBlock); + await indexer.processBlock(blockProgress.blockHash, blockProgress.blockNumber); + + await processBatchEvents(indexer, blockProgress, eventsInBatch); + } +}; diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 3c5356fc..8403610a 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -13,17 +13,13 @@ import { JOB_KIND_EVENTS, JOB_KIND_CONTRACT, MAX_REORG_DEPTH, - UNKNOWN_EVENT_NAME, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './constants'; import { JobQueue } from './job-queue'; import { EventInterface, IndexerInterface, IPLDIndexerInterface, SyncStatusInterface } from './types'; import { wait } from './misc'; -import { createPruningJob } from './common'; -import { OrderDirection } from './database'; - -const DEFAULT_EVENTS_IN_BATCH = 50; +import { createPruningJob, processBatchEvents } from './common'; const log = debug('vulcanize:job-runner'); @@ -241,92 +237,13 @@ export class JobRunner { const { blockHash } = job.data; console.time('time:job-runner#_processEvents-get-block-progress'); - let block = await this._indexer.getBlockProgress(blockHash); + const block = await this._indexer.getBlockProgress(blockHash); console.timeEnd('time:job-runner#_processEvents-get-block-progress'); assert(block); console.time('time:job-runner#_processEvents-events'); - while (!block.isComplete) { - console.time('time:job-runner#_processEvents-fetching_events_batch'); - - // Fetch events in batches - const events: EventInterface[] = await this._indexer.getBlockEvents( - blockHash, - { - index: [ - { value: block.lastProcessedEventIndex + 1, operator: 'gte', not: false } - ] - }, - { - limit: this._jobQueueConfig.eventsInBatch || DEFAULT_EVENTS_IN_BATCH, - orderBy: 'index', - orderDirection: OrderDirection.asc - } - ); - - console.timeEnd('time:job-runner#_processEvents-fetching_events_batch'); - - if (events.length) { - log(`Processing events batch from index ${events[0].index} to ${events[0].index + events.length - 1}`); - } - - console.time('time:job-runner#_processEvents-processing_events_batch'); - - for (let event of events) { - // Process events in loop - - const eventIndex = event.index; - // log(`Processing event ${event.id} index ${eventIndex}`); - - // Check that events are processed in order. - if (eventIndex <= block.lastProcessedEventIndex) { - throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash}, got event index ${eventIndex} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`); - } - - // Check if previous event in block has been processed exactly before this and abort if not. - // Skip check if logs fetched are filtered by contract address. - if (!this._indexer.serverConfig.filterLogs) { - const prevIndex = eventIndex - 1; - - if (prevIndex !== block.lastProcessedEventIndex) { - throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash},` + - ` prev event index ${prevIndex}, got event index ${event.index} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`); - } - } - - let watchedContract; - - if (!this._indexer.isWatchedContract) { - // uni-info-watcher indexer doesn't have watched contracts implementation. - watchedContract = true; - } else { - watchedContract = await this._indexer.isWatchedContract(event.contract); - } - - if (watchedContract) { - // We might not have parsed this event yet. This can happen if the contract was added - // as a result of a previous event in the same block. - if (event.eventName === UNKNOWN_EVENT_NAME) { - const logObj = JSON.parse(event.extraInfo); - - assert(this._indexer.parseEventNameAndArgs); - assert(typeof watchedContract !== 'boolean'); - const { eventName, eventInfo } = this._indexer.parseEventNameAndArgs(watchedContract.kind, logObj); - - event.eventName = eventName; - event.eventInfo = JSON.stringify(eventInfo); - event = await this._indexer.saveEventEntity(event); - } - - await this._indexer.processEvent(event); - } - - block = await this._indexer.updateBlockProgress(block, event.index); - } - - console.timeEnd('time:job-runner#_processEvents-processing_events_batch'); - } + await processBatchEvents(this._indexer, block, this._jobQueueConfig.eventsInBatch); console.timeEnd('time:job-runner#_processEvents-events'); }