Use Promise all while fetching events and watch contracts only once

This commit is contained in:
Prathamesh Musale 2021-12-13 16:26:01 +05:30 committed by nabarun
parent ba890e7d9a
commit 768a4d0818
23 changed files with 68 additions and 23 deletions

View File

@ -40,3 +40,4 @@
dbConnectionString = "postgres://postgres:postgres@localhost/eden-watcher-job-queue"
maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 100
eventsInBatch = 50

View File

@ -61,6 +61,7 @@ const main = async (): Promise<void> => {
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -58,6 +58,7 @@ const main = async (): Promise<void> => {
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -66,6 +66,7 @@ export const main = async (): Promise<any> => {
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -58,6 +58,7 @@ const main = async (): Promise<void> => {
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -68,6 +68,7 @@ export const handler = async (argv: any): Promise<void> => {
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -74,6 +74,7 @@ const main = async (): Promise<void> => {
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -63,6 +63,7 @@ export const main = async (): Promise<any> => {
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -191,6 +191,10 @@ export class Indexer implements IndexerInterface {
this._populateRelationsMap();
}
async init (): Promise<void> {
await this._baseIndexer.fetchContracts();
}
getResultEvent (event: Event): ResultEvent {
const block = event.block;
const eventFields = JSONbig.parse(event.eventInfo);
@ -1246,19 +1250,24 @@ export class Indexer implements IndexerInterface {
async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<void> {
assert(blockHash);
let { block, logs } = await this._ethClient.getLogs({ blockHash });
const {
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
const logsPromise = this._ethClient.getLogs({ blockHash });
const transactionsPromise = this._postgraphileClient.getBlockWithTransactions({ blockHash });
let [
{ block, logs },
{
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
}
}
}
]
]
}
}
} = await this._postgraphileClient.getBlockWithTransactions({ blockHash });
] = await Promise.all([logsPromise, transactionsPromise]);
const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => {
acc[transaction.txHash] = transaction;

View File

@ -142,6 +142,7 @@ export const main = async (): Promise<any> => {
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -61,6 +61,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -98,12 +98,19 @@ export class GraphWatcher {
}
async addContracts () {
assert(this._indexer?.watchContract);
assert(this._indexer);
assert(this._indexer.watchContract);
assert(this._indexer.isWatchedContract);
// Watching the contract(s).
// Watching the contract(s) if not watched already.
for (const dataSource of this._dataSources) {
const { source: { address, startBlock }, name } = dataSource;
await this._indexer.watchContract(address, name, true, startBlock);
const watchedContract = await this._indexer.isWatchedContract(address);
if (!watchedContract) {
await this._indexer.watchContract(address, name, true, startBlock);
}
}
}

View File

@ -40,3 +40,4 @@
dbConnectionString = "postgres://postgres:postgres@localhost/graph-test-watcher-job-queue"
maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 100
eventsInBatch = 50

View File

@ -61,6 +61,7 @@ const main = async (): Promise<void> => {
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -58,6 +58,7 @@ const main = async (): Promise<void> => {
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -66,6 +66,7 @@ export const main = async (): Promise<any> => {
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -58,6 +58,7 @@ const main = async (): Promise<void> => {
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -56,6 +56,7 @@ export const handler = async (argv: any): Promise<void> => {
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -74,6 +74,7 @@ const main = async (): Promise<void> => {
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -63,6 +63,7 @@ export const main = async (): Promise<any> => {
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -131,6 +131,10 @@ export class Indexer implements IndexerInterface {
this._populateRelationsMap();
}
async init (): Promise<void> {
await this._baseIndexer.fetchContracts();
}
getResultEvent (event: Event): ResultEvent {
const block = event.block;
const eventFields = JSONbig.parse(event.eventInfo);
@ -572,19 +576,24 @@ export class Indexer implements IndexerInterface {
async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<void> {
assert(blockHash);
let { block, logs } = await this._ethClient.getLogs({ blockHash });
const {
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
const logsPromise = this._ethClient.getLogs({ blockHash });
const transactionsPromise = this._postgraphileClient.getBlockWithTransactions({ blockHash });
let [
{ block, logs },
{
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
}
}
}
]
]
}
}
} = await this._postgraphileClient.getBlockWithTransactions({ blockHash });
] = await Promise.all([logsPromise, transactionsPromise]);
const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => {
acc[transaction.txHash] = transaction;

View File

@ -142,6 +142,7 @@ export const main = async (): Promise<any> => {
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -61,6 +61,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();