Implement changes in codegen for performance improvements (#84)

This commit is contained in:
nikugogoi 2021-12-22 11:55:39 +05:30 committed by nabarun
parent 97b529f3f2
commit 1e7d84879b
8 changed files with 99 additions and 70 deletions

View File

@ -61,12 +61,16 @@ columns:
columnOptions: columnOptions:
- option: default - option: default
value: false value: false
- name: createdAt
tsType: Date
columnType: CreateDateColumn
imports: imports:
- toImport: - toImport:
- Entity - Entity
- PrimaryGeneratedColumn - PrimaryGeneratedColumn
- Column - Column
- Index - Index
- CreateDateColumn
from: typeorm from: typeorm
- toImport: - toImport:
- BlockProgressInterface - BlockProgressInterface

View File

@ -11,7 +11,7 @@
# IPFS API address (can be taken from the output on running the IPFS daemon). # IPFS API address (can be taken from the output on running the IPFS daemon).
ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001" ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001"
{{#if subgraphPath}} {{#if subgraphPath}}
subgraphPath = "{{subgraphPath}}" subgraphPath = "{{subgraphPath}}"
{{/if}} {{/if}}
@ -42,3 +42,4 @@
dbConnectionString = "postgres://postgres:postgres@localhost/{{folderName}}-job-queue" dbConnectionString = "postgres://postgres:postgres@localhost/{{folderName}}-job-queue"
maxCompletionLagInSecs = 300 maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 100 jobDelayInMilliSecs = 100
eventsInBatch = 50

View File

@ -230,9 +230,10 @@ export class Database implements DatabaseInterface {
return repo.save(entity); return repo.save(entity);
} }
async getContracts (where: FindConditions<Contract>): Promise<Contract[]> { async getContracts (): Promise<Contract[]> {
const repo = this._conn.getRepository(Contract); const repo = this._conn.getRepository(Contract);
return repo.find({ where });
return this._baseDatabase.getContracts(repo);
} }
async getContract (address: string): Promise<Contract | undefined> { async getContract (address: string): Promise<Contract | undefined> {
@ -265,7 +266,6 @@ export class Database implements DatabaseInterface {
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Event[]> { async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Event[]> {
const repo = this._conn.getRepository(Event); const repo = this._conn.getRepository(Event);
return this._baseDatabase.getBlockEvents(repo, blockHash, options);
return this._baseDatabase.getBlockEvents(repo, blockHash, where, queryOptions); return this._baseDatabase.getBlockEvents(repo, blockHash, where, queryOptions);
} }
@ -276,12 +276,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events); return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events);
} }
async saveContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> { async saveContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<Contract> {
await this._conn.transaction(async (tx) => { const repo = queryRunner.manager.getRepository(Contract);
const repo = tx.getRepository(Contract);
return this._baseDatabase.saveContract(repo, address, kind, checkpoint, startingBlock); return this._baseDatabase.saveContract(repo, address, kind, checkpoint, startingBlock);
});
} }
async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> { async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
@ -296,10 +294,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.updateSyncStatusCanonicalBlock(repo, blockHash, blockNumber, force); return this._baseDatabase.updateSyncStatusCanonicalBlock(repo, blockHash, blockNumber, force);
} }
async updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> { async updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
const repo = queryRunner.manager.getRepository(SyncStatus); const repo = queryRunner.manager.getRepository(SyncStatus);
return this._baseDatabase.updateSyncStatusChainHead(repo, blockHash, blockNumber); return this._baseDatabase.updateSyncStatusChainHead(repo, blockHash, blockNumber, force);
} }
async getSyncStatus (queryRunner: QueryRunner): Promise<SyncStatus | undefined> { async getSyncStatus (queryRunner: QueryRunner): Promise<SyncStatus | undefined> {
@ -337,10 +335,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getBlockProgressEntities(repo, where, options); return this._baseDatabase.getBlockProgressEntities(repo, where, options);
} }
async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise<void> { async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress); const repo = queryRunner.manager.getRepository(BlockProgress);
return this._baseDatabase.updateBlockProgress(repo, blockHash, lastProcessedEventIndex); return this._baseDatabase.updateBlockProgress(repo, block, lastProcessedEventIndex);
} }
async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions<Entity> | FindConditions<Entity>): Promise<void> { async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions<Entity> | FindConditions<Entity>): Promise<void> {

View File

@ -39,6 +39,16 @@ export const main = async (): Promise<any> => {
type: 'number', type: 'number',
demandOption: true, demandOption: true,
describe: 'Block number to stop processing at' describe: 'Block number to stop processing at'
},
prefetch: {
type: 'boolean',
default: false,
describe: 'Block and events prefetch mode'
},
batchBlocks: {
type: 'number',
default: 10,
describe: 'Number of blocks prefetched in batch'
} }
}).argv; }).argv;
@ -56,10 +66,6 @@ export const main = async (): Promise<any> => {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub(); const pubsub = new PubSub();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const jobQueueConfig = config.jobQueue; const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');
@ -70,9 +76,15 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start(); await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv); await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv);
}; };
main().catch(err => { main().catch(err => {
@ -80,3 +92,8 @@ main().catch(err => {
}).finally(() => { }).finally(() => {
process.exit(); process.exit();
}); });
process.on('SIGINT', () => {
log(`Exiting process ${process.pid} with code 0`);
process.exit(0);
});

View File

@ -4,7 +4,7 @@
import assert from 'assert'; import assert from 'assert';
import debug from 'debug'; import debug from 'debug';
import { DeepPartial, FindConditions } from 'typeorm'; import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm';
import JSONbig from 'json-bigint'; import JSONbig from 'json-bigint';
import { ethers } from 'ethers'; import { ethers } from 'ethers';
import { sha256 } from 'multiformats/hashes/sha2'; import { sha256 } from 'multiformats/hashes/sha2';
@ -16,7 +16,7 @@ import { BaseProvider } from '@ethersproject/providers';
import * as codec from '@ipld/dag-cbor'; import * as codec from '@ipld/dag-cbor';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import { StorageLayout } from '@vulcanize/solidity-mapper'; import { StorageLayout } from '@vulcanize/solidity-mapper';
import { Indexer as BaseIndexer, IndexerInterface, ValueResult, UNKNOWN_EVENT_NAME, ServerConfig, Where, QueryOptions, updateStateForElementaryType, updateStateForMappingType } from '@vulcanize/util'; import { Indexer as BaseIndexer, IndexerInterface, ValueResult, UNKNOWN_EVENT_NAME, ServerConfig, Where, QueryOptions, updateStateForElementaryType, updateStateForMappingType, JobQueue } from '@vulcanize/util';
import { GraphWatcher } from '@vulcanize/graph-node'; import { GraphWatcher } from '@vulcanize/graph-node';
import { Database } from './database'; import { Database } from './database';
@ -93,7 +93,7 @@ export class Indexer implements IndexerInterface {
_ipfsClient: IPFSClient _ipfsClient: IPFSClient
constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, graphWatcher: GraphWatcher) { constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, graphWatcher: GraphWatcher) {
assert(db); assert(db);
assert(ethClient); assert(ethClient);
assert(postgraphileClient); assert(postgraphileClient);
@ -103,7 +103,7 @@ export class Indexer implements IndexerInterface {
this._postgraphileClient = postgraphileClient; this._postgraphileClient = postgraphileClient;
this._ethProvider = ethProvider; this._ethProvider = ethProvider;
this._serverConfig = serverConfig; this._serverConfig = serverConfig;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider); this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue);
this._graphWatcher = graphWatcher; this._graphWatcher = graphWatcher;
const { abi, storageLayout } = artifacts; const { abi, storageLayout } = artifacts;
@ -119,6 +119,10 @@ export class Indexer implements IndexerInterface {
this._ipfsClient = new IPFSClient(this._serverConfig.ipfsApiAddr); this._ipfsClient = new IPFSClient(this._serverConfig.ipfsApiAddr);
} }
async init (): Promise<void> {
await this._baseIndexer.fetchContracts();
}
getResultEvent (event: Event): ResultEvent { getResultEvent (event: Event): ResultEvent {
const block = event.block; const block = event.block;
const eventFields = JSONbig.parse(event.eventInfo); const eventFields = JSONbig.parse(event.eventInfo);
@ -615,22 +619,14 @@ export class Indexer implements IndexerInterface {
}; };
} }
async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock?: number): Promise<boolean> { async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> {
// Use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress) if input to address is a contract address. this._baseIndexer.updateIPLDStatusMap(address, {});
// If a contract identifier is passed as address instead, no need to convert to checksum address.
// Customize: use the kind input to filter out non-contract-address input to address.
const formattedAddress = (kind === '__protocol__') ? address : ethers.utils.getAddress(address);
if (!startingBlock) { return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock);
const syncStatus = await this.getSyncStatus(); }
assert(syncStatus);
startingBlock = syncStatus.latestIndexedBlockNumber; cacheContract (contract: Contract): void {
} return this._baseIndexer.cacheContract(contract);
await this._db.saveContract(formattedAddress, kind, checkpoint, startingBlock);
return true;
} }
async getHookStatus (): Promise<HookStatus | undefined> { async getHookStatus (): Promise<HookStatus | undefined> {
@ -698,8 +694,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force); return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force);
} }
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatus> { async updateSyncStatusChainHead (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber); return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber, force);
} }
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> { async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
@ -742,8 +738,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.markBlocksAsPruned(blocks); return this._baseIndexer.markBlocksAsPruned(blocks);
} }
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> { async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex); return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex);
} }
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> { async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
@ -752,19 +748,24 @@ export class Indexer implements IndexerInterface {
async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> { async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> {
assert(blockHash); assert(blockHash);
let { block, logs } = await this._ethClient.getLogs({ blockHash });
const { const logsPromise = this._ethClient.getLogs({ blockHash });
allEthHeaderCids: { const transactionsPromise = this._postgraphileClient.getBlockWithTransactions({ blockHash });
nodes: [
{ let [
ethTransactionCidsByHeaderId: { { block, logs },
nodes: transactions {
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
}
} }
} ]
] }
} }
} = await this._postgraphileClient.getBlockWithTransactions({ blockHash }); ] = await Promise.all([logsPromise, transactionsPromise]);
const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => {
acc[transaction.txHash] = transaction; acc[transaction.txHash] = transaction;

View File

@ -56,22 +56,12 @@ export class JobRunner {
// TODO Call pre-block hook here (Directly or indirectly (Like done through indexer.processEvent for events)). // TODO Call pre-block hook here (Directly or indirectly (Like done through indexer.processEvent for events)).
await this._baseJobRunner.processBlock(job); await this._baseJobRunner.processBlock(job);
await this._jobQueue.markComplete(job);
}); });
} }
async subscribeEventProcessingQueue (): Promise<void> { async subscribeEventProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
const event = await this._baseJobRunner.processEvent(job); await this._baseJobRunner.processEvent(job);
const watchedContract = await this._indexer.isWatchedContract(event.contract);
if (watchedContract) {
await this._indexer.processEvent(event);
}
await this._indexer.updateBlockProgress(event.block.blockHash, event.index);
await this._jobQueue.markComplete(job);
}); });
} }
@ -135,11 +125,6 @@ export const main = async (): Promise<any> => {
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath);
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const jobQueueConfig = config.jobQueue; const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');
@ -149,6 +134,15 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start(); await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
// Watching all the contracts in the subgraph.
await graphWatcher.addContracts();
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
await jobRunner.start(); await jobRunner.start();
}; };
@ -162,3 +156,8 @@ main().then(() => {
process.on('uncaughtException', err => { process.on('uncaughtException', err => {
log('uncaughtException', err); log('uncaughtException', err);
}); });
process.on('SIGINT', () => {
log(`Exiting process ${process.pid} with code 0`);
process.exit(0);
});

View File

@ -51,10 +51,6 @@ export const main = async (): Promise<any> => {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub(); const pubsub = new PubSub();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const jobQueueConfig = config.jobQueue; const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');
@ -64,6 +60,12 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
if (watcherKind === KIND_ACTIVE) { if (watcherKind === KIND_ACTIVE) {
@ -98,3 +100,8 @@ main().then(() => {
}).catch(err => { }).catch(err => {
log(err); log(err);
}); });
process.on('SIGINT', () => {
log(`Exiting process ${process.pid} with code 0`);
process.exit(0);
});

View File

@ -82,6 +82,8 @@ const main = async (): Promise<void> => {
await indexer.watchContract(argv.address, argv.kind, argv.checkpoint, argv.startingBlock); await indexer.watchContract(argv.address, argv.kind, argv.checkpoint, argv.startingBlock);
await db.close(); await db.close();
await jobQueue.stop();
process.exit();
}; };
main().catch(err => { main().catch(err => {