From ae76520e86f536f05cb280616d886ea6522c2a07 Mon Sep 17 00:00:00 2001 From: nabarun Date: Thu, 14 Jul 2022 14:11:53 +0530 Subject: [PATCH] Add debug logs for job-runner --- .../ipld-eth-client/src/graphql-client.ts | 10 +++++-- packages/mobymask-watcher/src/indexer.ts | 27 ++++++++++--------- packages/util/src/job-runner.ts | 13 +++++++++ 3 files changed, 36 insertions(+), 14 deletions(-) diff --git a/packages/ipld-eth-client/src/graphql-client.ts b/packages/ipld-eth-client/src/graphql-client.ts index 0fa807e8..877546c4 100644 --- a/packages/ipld-eth-client/src/graphql-client.ts +++ b/packages/ipld-eth-client/src/graphql-client.ts @@ -108,9 +108,15 @@ export class GraphQLClient { } async query (query: DocumentNode | TypedDocumentNode, variables: { [key: string]: any }): Promise { - const { data: result } = await this._client.query({ query, variables }); + try { + const { data: result } = await this._client.query({ query, variables }); - return result; + return result; + } catch (error) { + log(`Error for query: ${query}`); + log(error); + throw error; + } } async mutate (mutation: DocumentNode | TypedDocumentNode, variables: { [key: string]: any }): Promise { diff --git a/packages/mobymask-watcher/src/indexer.ts b/packages/mobymask-watcher/src/indexer.ts index f3f5e03d..236d574b 100644 --- a/packages/mobymask-watcher/src/indexer.ts +++ b/packages/mobymask-watcher/src/indexer.ts @@ -799,24 +799,27 @@ export class Indexer implements IPLDIndexerInterface { async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { assert(blockHash); + log(`DEBUG: Fetching for blockHash: ${blockHash}`); const logsPromise = this._ethClient.getLogs({ blockHash }); const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash }); - let [ - { block, logs }, - { - allEthHeaderCids: { - nodes: [ - { - ethTransactionCidsByHeaderId: { - nodes: transactions - } + log('DEBUG: Fetching logs'); + let { block, logs } = await logsPromise; + log(`DEBUG: Fetched for block: ${block.number} logs count: ${logs.length}`); + log('DEBUG: Fetching txs'); + const { + allEthHeaderCids: { + nodes: [ + { + ethTransactionCidsByHeaderId: { + nodes: transactions } - ] - } + } + ] } - ] = await Promise.all([logsPromise, transactionsPromise]); + } = await transactionsPromise; + log(`DEBUG: Fetched txs: ${transactions.length}`); const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { acc[transaction.txHash] = transaction; diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 75276795..3ed89d70 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -140,6 +140,7 @@ export class JobRunner { // Check if chain pruning is caught up. if ((syncStatus.latestIndexedBlockNumber - syncStatus.latestCanonicalBlockNumber) > MAX_REORG_DEPTH) { + log('DEBUG: Create pruning job'); await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority); const message = `Chain pruning not caught up yet, latest canonical block number ${syncStatus.latestCanonicalBlockNumber} and latest indexed block number ${syncStatus.latestIndexedBlockNumber}`; @@ -147,6 +148,7 @@ export class JobRunner { throw new Error(message); } + log(`DEBUG: Getting blockProgress from DB for parentHash: ${parentHash} and blockHash: ${blockHash}`); let [parentBlock, blockProgress] = await this._indexer.getBlockProgressEntities( { blockHash: In([parentHash, blockHash]) @@ -158,15 +160,20 @@ export class JobRunner { } ); + log(`DEBUG: Fetched parentBlock: ${parentBlock?.blockNumber} and blockProgress: ${blockProgress?.blockNumber}`); + // Check if parent block has been processed yet, if not, push a high priority job to process that first and abort. // However, don't go beyond the `latestCanonicalBlockHash` from SyncStatus as we have to assume the reorg can't be that deep. + log(`DEBUG: Check latest canonical - blockHash: ${blockHash} and latestCanonicalBlockHash: ${syncStatus.latestCanonicalBlockHash}`); if (blockHash !== syncStatus.latestCanonicalBlockHash) { // Create a higher priority job to index parent block and then abort. // We don't have to worry about aborting as this job will get retried later. const newPriority = (priority || 0) + 1; if (!parentBlock || parentBlock.blockHash !== parentHash) { + log(`DEBUG: Parent block not processed: ${parentBlock?.blockHash}`); const blocks = await this._indexer.getBlocks({ blockHash: parentHash }); + log(`DEBUG: Fetched parent blocks count: ${blocks.length}`); if (!blocks.length) { const message = `No blocks at parentHash ${parentHash}, aborting`; @@ -193,6 +200,7 @@ export class JobRunner { return; } + log(`DEBUG: parentBlock.isComplete: ${parentBlock.isComplete}`); if (!parentBlock.isComplete) { // Parent block indexing needs to finish before this block can be indexed. const message = `Indexing incomplete for parent block number ${parentBlock.blockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash}, aborting`; @@ -212,20 +220,25 @@ export class JobRunner { } else { // Remove the unknown events of the parent block if it is marked complete. await this._indexer.removeUnknownEvents(parentBlock); + log('DEBUG: removed unknown events'); } } else { blockProgress = parentBlock; } + log(`DEBUG: blockProgress: ${blockProgress?.blockNumber}`); if (!blockProgress) { const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig; // Delay required to process block. await wait(jobDelayInMilliSecs); + log(`DEBUG: Fetching blockEvents for blockHash: ${blockHash}`); blockProgress = await this._indexer.fetchBlockEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); + log(`DEBUG: Fetched blockEvents and saved blockProgress: ${blockProgress.blockHash}`); } if (this._indexer.processBlock) { + log(`DEBUG: processing block for blockProgress: ${blockProgress.blockHash}`); await this._indexer.processBlock(blockHash, blockNumber); }