Add debug logs for job-runner

This commit is contained in:
nabarun 2022-07-14 14:11:53 +05:30
parent 129b9e71f0
commit ae76520e86
3 changed files with 36 additions and 14 deletions

View File

@ -108,9 +108,15 @@ export class GraphQLClient {
}
async query (query: DocumentNode | TypedDocumentNode, variables: { [key: string]: any }): Promise<any> {
try {
const { data: result } = await this._client.query({ query, variables });
return result;
} catch (error) {
log(`Error for query: ${query}`);
log(error);
throw error;
}
}
async mutate (mutation: DocumentNode | TypedDocumentNode, variables: { [key: string]: any }): Promise<any> {

View File

@ -799,13 +799,16 @@ export class Indexer implements IPLDIndexerInterface {
async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> {
assert(blockHash);
log(`DEBUG: Fetching for blockHash: ${blockHash}`);
const logsPromise = this._ethClient.getLogs({ blockHash });
const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash });
let [
{ block, logs },
{
log('DEBUG: Fetching logs');
let { block, logs } = await logsPromise;
log(`DEBUG: Fetched for block: ${block.number} logs count: ${logs.length}`);
log('DEBUG: Fetching txs');
const {
allEthHeaderCids: {
nodes: [
{
@ -815,8 +818,8 @@ export class Indexer implements IPLDIndexerInterface {
}
]
}
}
] = await Promise.all([logsPromise, transactionsPromise]);
} = await transactionsPromise;
log(`DEBUG: Fetched txs: ${transactions.length}`);
const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => {
acc[transaction.txHash] = transaction;

View File

@ -140,6 +140,7 @@ export class JobRunner {
// Check if chain pruning is caught up.
if ((syncStatus.latestIndexedBlockNumber - syncStatus.latestCanonicalBlockNumber) > MAX_REORG_DEPTH) {
log('DEBUG: Create pruning job');
await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority);
const message = `Chain pruning not caught up yet, latest canonical block number ${syncStatus.latestCanonicalBlockNumber} and latest indexed block number ${syncStatus.latestIndexedBlockNumber}`;
@ -147,6 +148,7 @@ export class JobRunner {
throw new Error(message);
}
log(`DEBUG: Getting blockProgress from DB for parentHash: ${parentHash} and blockHash: ${blockHash}`);
let [parentBlock, blockProgress] = await this._indexer.getBlockProgressEntities(
{
blockHash: In([parentHash, blockHash])
@ -158,15 +160,20 @@ export class JobRunner {
}
);
log(`DEBUG: Fetched parentBlock: ${parentBlock?.blockNumber} and blockProgress: ${blockProgress?.blockNumber}`);
// Check if parent block has been processed yet, if not, push a high priority job to process that first and abort.
// However, don't go beyond the `latestCanonicalBlockHash` from SyncStatus as we have to assume the reorg can't be that deep.
log(`DEBUG: Check latest canonical - blockHash: ${blockHash} and latestCanonicalBlockHash: ${syncStatus.latestCanonicalBlockHash}`);
if (blockHash !== syncStatus.latestCanonicalBlockHash) {
// Create a higher priority job to index parent block and then abort.
// We don't have to worry about aborting as this job will get retried later.
const newPriority = (priority || 0) + 1;
if (!parentBlock || parentBlock.blockHash !== parentHash) {
log(`DEBUG: Parent block not processed: ${parentBlock?.blockHash}`);
const blocks = await this._indexer.getBlocks({ blockHash: parentHash });
log(`DEBUG: Fetched parent blocks count: ${blocks.length}`);
if (!blocks.length) {
const message = `No blocks at parentHash ${parentHash}, aborting`;
@ -193,6 +200,7 @@ export class JobRunner {
return;
}
log(`DEBUG: parentBlock.isComplete: ${parentBlock.isComplete}`);
if (!parentBlock.isComplete) {
// Parent block indexing needs to finish before this block can be indexed.
const message = `Indexing incomplete for parent block number ${parentBlock.blockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash}, aborting`;
@ -212,20 +220,25 @@ export class JobRunner {
} else {
// Remove the unknown events of the parent block if it is marked complete.
await this._indexer.removeUnknownEvents(parentBlock);
log('DEBUG: removed unknown events');
}
} else {
blockProgress = parentBlock;
}
log(`DEBUG: blockProgress: ${blockProgress?.blockNumber}`);
if (!blockProgress) {
const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig;
// Delay required to process block.
await wait(jobDelayInMilliSecs);
log(`DEBUG: Fetching blockEvents for blockHash: ${blockHash}`);
blockProgress = await this._indexer.fetchBlockEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
log(`DEBUG: Fetched blockEvents and saved blockProgress: ${blockProgress.blockHash}`);
}
if (this._indexer.processBlock) {
log(`DEBUG: processing block for blockProgress: ${blockProgress.blockHash}`);
await this._indexer.processBlock(blockHash, blockNumber);
}