diff --git a/packages/codegen/src/data/entities/BlockProgress.yaml b/packages/codegen/src/data/entities/BlockProgress.yaml index 8990c146..8ee7ffb0 100644 --- a/packages/codegen/src/data/entities/BlockProgress.yaml +++ b/packages/codegen/src/data/entities/BlockProgress.yaml @@ -61,12 +61,16 @@ columns: columnOptions: - option: default value: false + - name: createdAt + tsType: Date + columnType: CreateDateColumn imports: - toImport: - Entity - PrimaryGeneratedColumn - Column - Index + - CreateDateColumn from: typeorm - toImport: - BlockProgressInterface diff --git a/packages/codegen/src/templates/config-template.handlebars b/packages/codegen/src/templates/config-template.handlebars index e160a141..4d5622a7 100644 --- a/packages/codegen/src/templates/config-template.handlebars +++ b/packages/codegen/src/templates/config-template.handlebars @@ -11,7 +11,7 @@ # IPFS API address (can be taken from the output on running the IPFS daemon). ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001" - + {{#if subgraphPath}} subgraphPath = "{{subgraphPath}}" {{/if}} @@ -42,3 +42,4 @@ dbConnectionString = "postgres://postgres:postgres@localhost/{{folderName}}-job-queue" maxCompletionLagInSecs = 300 jobDelayInMilliSecs = 100 + eventsInBatch = 50 diff --git a/packages/codegen/src/templates/database-template.handlebars b/packages/codegen/src/templates/database-template.handlebars index 26db6bd8..954e4af8 100644 --- a/packages/codegen/src/templates/database-template.handlebars +++ b/packages/codegen/src/templates/database-template.handlebars @@ -230,9 +230,10 @@ export class Database implements DatabaseInterface { return repo.save(entity); } - async getContracts (where: FindConditions): Promise { + async getContracts (): Promise { const repo = this._conn.getRepository(Contract); - return repo.find({ where }); + + return this._baseDatabase.getContracts(repo); } async getContract (address: string): Promise { @@ -265,7 +266,6 @@ export class Database implements DatabaseInterface { async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise { const repo = this._conn.getRepository(Event); - return this._baseDatabase.getBlockEvents(repo, blockHash, options); return this._baseDatabase.getBlockEvents(repo, blockHash, where, queryOptions); } @@ -276,12 +276,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events); } - async saveContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { - await this._conn.transaction(async (tx) => { - const repo = tx.getRepository(Contract); + async saveContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { + const repo = queryRunner.manager.getRepository(Contract); - return this._baseDatabase.saveContract(repo, address, kind, checkpoint, startingBlock); - }); + return this._baseDatabase.saveContract(repo, address, kind, checkpoint, startingBlock); } async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise { @@ -296,10 +294,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.updateSyncStatusCanonicalBlock(repo, blockHash, blockNumber, force); } - async updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise { + async updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise { const repo = queryRunner.manager.getRepository(SyncStatus); - return this._baseDatabase.updateSyncStatusChainHead(repo, blockHash, blockNumber); + return this._baseDatabase.updateSyncStatusChainHead(repo, blockHash, blockNumber, force); } async getSyncStatus (queryRunner: QueryRunner): Promise { @@ -337,10 +335,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getBlockProgressEntities(repo, where, options); } - async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise { + async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); - return this._baseDatabase.updateBlockProgress(repo, blockHash, lastProcessedEventIndex); + return this._baseDatabase.updateBlockProgress(repo, block, lastProcessedEventIndex); } async removeEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions | FindConditions): Promise { diff --git a/packages/codegen/src/templates/fill-template.handlebars b/packages/codegen/src/templates/fill-template.handlebars index 1c6b6ee6..e015ceb9 100644 --- a/packages/codegen/src/templates/fill-template.handlebars +++ b/packages/codegen/src/templates/fill-template.handlebars @@ -39,6 +39,16 @@ export const main = async (): Promise => { type: 'number', demandOption: true, describe: 'Block number to stop processing at' + }, + prefetch: { + type: 'boolean', + default: false, + describe: 'Block and events prefetch mode' + }, + batchBlocks: { + type: 'number', + default: 10, + describe: 'Number of blocks prefetched in batch' } }).argv; @@ -56,10 +66,6 @@ export const main = async (): Promise => { // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -70,9 +76,15 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); + await indexer.init(); + + graphWatcher.setIndexer(indexer); + await graphWatcher.init(); + const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); - await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv); + await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv); }; main().catch(err => { @@ -80,3 +92,8 @@ main().catch(err => { }).finally(() => { process.exit(); }); + +process.on('SIGINT', () => { + log(`Exiting process ${process.pid} with code 0`); + process.exit(0); +}); diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index b6bafee0..e9bc2d1d 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -4,7 +4,7 @@ import assert from 'assert'; import debug from 'debug'; -import { DeepPartial, FindConditions } from 'typeorm'; +import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm'; import JSONbig from 'json-bigint'; import { ethers } from 'ethers'; import { sha256 } from 'multiformats/hashes/sha2'; @@ -16,7 +16,7 @@ import { BaseProvider } from '@ethersproject/providers'; import * as codec from '@ipld/dag-cbor'; import { EthClient } from '@vulcanize/ipld-eth-client'; import { StorageLayout } from '@vulcanize/solidity-mapper'; -import { Indexer as BaseIndexer, IndexerInterface, ValueResult, UNKNOWN_EVENT_NAME, ServerConfig, Where, QueryOptions, updateStateForElementaryType, updateStateForMappingType } from '@vulcanize/util'; +import { Indexer as BaseIndexer, IndexerInterface, ValueResult, UNKNOWN_EVENT_NAME, ServerConfig, Where, QueryOptions, updateStateForElementaryType, updateStateForMappingType, JobQueue } from '@vulcanize/util'; import { GraphWatcher } from '@vulcanize/graph-node'; import { Database } from './database'; @@ -93,7 +93,7 @@ export class Indexer implements IndexerInterface { _ipfsClient: IPFSClient - constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, graphWatcher: GraphWatcher) { + constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, graphWatcher: GraphWatcher) { assert(db); assert(ethClient); assert(postgraphileClient); @@ -103,7 +103,7 @@ export class Indexer implements IndexerInterface { this._postgraphileClient = postgraphileClient; this._ethProvider = ethProvider; this._serverConfig = serverConfig; - this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider); + this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue); this._graphWatcher = graphWatcher; const { abi, storageLayout } = artifacts; @@ -119,6 +119,10 @@ export class Indexer implements IndexerInterface { this._ipfsClient = new IPFSClient(this._serverConfig.ipfsApiAddr); } + async init (): Promise { + await this._baseIndexer.fetchContracts(); + } + getResultEvent (event: Event): ResultEvent { const block = event.block; const eventFields = JSONbig.parse(event.eventInfo); @@ -615,22 +619,14 @@ export class Indexer implements IndexerInterface { }; } - async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock?: number): Promise { - // Use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress) if input to address is a contract address. - // If a contract identifier is passed as address instead, no need to convert to checksum address. - // Customize: use the kind input to filter out non-contract-address input to address. - const formattedAddress = (kind === '__protocol__') ? address : ethers.utils.getAddress(address); + async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { + this._baseIndexer.updateIPLDStatusMap(address, {}); - if (!startingBlock) { - const syncStatus = await this.getSyncStatus(); - assert(syncStatus); + return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock); + } - startingBlock = syncStatus.latestIndexedBlockNumber; - } - - await this._db.saveContract(formattedAddress, kind, checkpoint, startingBlock); - - return true; + cacheContract (contract: Contract): void { + return this._baseIndexer.cacheContract(contract); } async getHookStatus (): Promise { @@ -698,8 +694,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force); } - async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise { - return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber); + async updateSyncStatusChainHead (blockHash: string, blockNumber: number, force = false): Promise { + return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber, force); } async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force = false): Promise { @@ -742,8 +738,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.markBlocksAsPruned(blocks); } - async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { - return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex); + async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise { + return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex); } async getAncestorAtDepth (blockHash: string, depth: number): Promise { @@ -752,19 +748,24 @@ export class Indexer implements IndexerInterface { async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { assert(blockHash); - let { block, logs } = await this._ethClient.getLogs({ blockHash }); - const { - allEthHeaderCids: { - nodes: [ - { - ethTransactionCidsByHeaderId: { - nodes: transactions + const logsPromise = this._ethClient.getLogs({ blockHash }); + const transactionsPromise = this._postgraphileClient.getBlockWithTransactions({ blockHash }); + + let [ + { block, logs }, + { + allEthHeaderCids: { + nodes: [ + { + ethTransactionCidsByHeaderId: { + nodes: transactions + } } - } - ] + ] + } } - } = await this._postgraphileClient.getBlockWithTransactions({ blockHash }); + ] = await Promise.all([logsPromise, transactionsPromise]); const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { acc[transaction.txHash] = transaction; diff --git a/packages/codegen/src/templates/job-runner-template.handlebars b/packages/codegen/src/templates/job-runner-template.handlebars index a0231d5d..5ab3a2f6 100644 --- a/packages/codegen/src/templates/job-runner-template.handlebars +++ b/packages/codegen/src/templates/job-runner-template.handlebars @@ -56,22 +56,12 @@ export class JobRunner { // TODO Call pre-block hook here (Directly or indirectly (Like done through indexer.processEvent for events)). await this._baseJobRunner.processBlock(job); - - await this._jobQueue.markComplete(job); }); } async subscribeEventProcessingQueue (): Promise { await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { - const event = await this._baseJobRunner.processEvent(job); - - const watchedContract = await this._indexer.isWatchedContract(event.contract); - if (watchedContract) { - await this._indexer.processEvent(event); - } - - await this._indexer.updateBlockProgress(event.block.blockHash, event.index); - await this._jobQueue.markComplete(job); + await this._baseJobRunner.processEvent(job); }); } @@ -135,11 +125,6 @@ export const main = async (): Promise => { const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); - const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -149,6 +134,15 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); + await indexer.init(); + + graphWatcher.setIndexer(indexer); + await graphWatcher.init(); + + // Watching all the contracts in the subgraph. + await graphWatcher.addContracts(); + const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); await jobRunner.start(); }; @@ -162,3 +156,8 @@ main().then(() => { process.on('uncaughtException', err => { log('uncaughtException', err); }); + +process.on('SIGINT', () => { + log(`Exiting process ${process.pid} with code 0`); + process.exit(0); +}); diff --git a/packages/codegen/src/templates/server-template.handlebars b/packages/codegen/src/templates/server-template.handlebars index cd277f19..5638a345 100644 --- a/packages/codegen/src/templates/server-template.handlebars +++ b/packages/codegen/src/templates/server-template.handlebars @@ -51,10 +51,6 @@ export const main = async (): Promise => { // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -64,6 +60,12 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); + await indexer.init(); + + graphWatcher.setIndexer(indexer); + await graphWatcher.init(); + const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); if (watcherKind === KIND_ACTIVE) { @@ -98,3 +100,8 @@ main().then(() => { }).catch(err => { log(err); }); + +process.on('SIGINT', () => { + log(`Exiting process ${process.pid} with code 0`); + process.exit(0); +}); diff --git a/packages/codegen/src/templates/watch-contract-template.handlebars b/packages/codegen/src/templates/watch-contract-template.handlebars index 1402d587..62e0addb 100644 --- a/packages/codegen/src/templates/watch-contract-template.handlebars +++ b/packages/codegen/src/templates/watch-contract-template.handlebars @@ -82,6 +82,8 @@ const main = async (): Promise => { await indexer.watchContract(argv.address, argv.kind, argv.checkpoint, argv.startingBlock); await db.close(); + await jobQueue.stop(); + process.exit(); }; main().catch(err => {