mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-01-08 12:28:05 +00:00
Avoid updating StateSyncStatus table when enableState
flag is set to false (#440)
* Handle zero hash canonical block incase of FEVM null block * Fix json-bigint parse in processBatchEvents * Avoid updating SyncStatus table when enableState is false
This commit is contained in:
parent
445d5a9293
commit
43463af1f2
@ -8,7 +8,7 @@ import debug from 'debug';
|
||||
{{#if queries}}
|
||||
import JSONbig from 'json-bigint';
|
||||
{{/if}}
|
||||
import { ethers } from 'ethers';
|
||||
import { ethers, constants } from 'ethers';
|
||||
{{#if (subgraphPath)}}
|
||||
import { SelectionNode } from 'graphql';
|
||||
{{/if}}
|
||||
@ -493,7 +493,11 @@ export class Indexer implements IndexerInterface {
|
||||
return this._db.getStateSyncStatus();
|
||||
}
|
||||
|
||||
async updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatus> {
|
||||
async updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatus | undefined> {
|
||||
if (!this._serverConfig.enableState) {
|
||||
return;
|
||||
}
|
||||
|
||||
const dbTx = await this._db.createTransactionRunner();
|
||||
let res;
|
||||
|
||||
@ -527,10 +531,14 @@ export class Indexer implements IndexerInterface {
|
||||
return res;
|
||||
}
|
||||
|
||||
async getLatestCanonicalBlock (): Promise<BlockProgress> {
|
||||
async getLatestCanonicalBlock (): Promise<BlockProgress | undefined> {
|
||||
const syncStatus = await this.getSyncStatus();
|
||||
assert(syncStatus);
|
||||
|
||||
if (syncStatus.latestCanonicalBlockHash === constants.HashZero) {
|
||||
return;
|
||||
}
|
||||
|
||||
const latestCanonicalBlock = await this.getBlockProgress(syncStatus.latestCanonicalBlockHash);
|
||||
assert(latestCanonicalBlock);
|
||||
|
||||
|
@ -189,16 +189,16 @@ export class Indexer implements IndexerInterface {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
async updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface> {
|
||||
return {} as StateSyncStatusInterface;
|
||||
async updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface | undefined> {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
async updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface> {
|
||||
return {} as StateSyncStatusInterface;
|
||||
}
|
||||
|
||||
async getLatestCanonicalBlock (): Promise<BlockProgressInterface> {
|
||||
return {} as BlockProgressInterface;
|
||||
async getLatestCanonicalBlock (): Promise<BlockProgressInterface | undefined> {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
isWatchedContract (address : string): ContractInterface | undefined {
|
||||
|
@ -321,7 +321,7 @@ export const processEvents = async (indexer: IndexerInterface, block: BlockProgr
|
||||
// We might not have parsed this event yet. This can happen if the contract was added
|
||||
// as a result of a previous event in the same block.
|
||||
if (event.eventName === UNKNOWN_EVENT_NAME) {
|
||||
const logObj = JSON.parse(event.extraInfo);
|
||||
const logObj = JSONbigNative.parse(event.extraInfo);
|
||||
|
||||
assert(indexer.parseEventNameAndArgs);
|
||||
assert(typeof watchedContract !== 'boolean');
|
||||
@ -422,7 +422,7 @@ export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, bl
|
||||
// We might not have parsed this event yet. This can happen if the contract was added
|
||||
// as a result of a previous event in the same block.
|
||||
if (event.eventName === UNKNOWN_EVENT_NAME) {
|
||||
const logObj = JSON.parse(event.extraInfo);
|
||||
const logObj = JSONbigNative.parse(event.extraInfo);
|
||||
|
||||
assert(indexer.parseEventNameAndArgs);
|
||||
assert(typeof watchedContract !== 'boolean');
|
||||
@ -481,12 +481,11 @@ export const createPruningJob = async (jobQueue: JobQueue, latestCanonicalBlockN
|
||||
* @param blockHash
|
||||
* @param blockNumber
|
||||
*/
|
||||
export const createHooksJob = async (jobQueue: JobQueue, blockHash: string, blockNumber: number): Promise<void> => {
|
||||
export const createHooksJob = async (jobQueue: JobQueue, blockHash: string): Promise<void> => {
|
||||
await jobQueue.pushJob(
|
||||
QUEUE_HOOKS,
|
||||
{
|
||||
blockHash,
|
||||
blockNumber
|
||||
blockHash
|
||||
}
|
||||
);
|
||||
};
|
||||
|
@ -112,7 +112,12 @@ export class JobRunner {
|
||||
|
||||
// Create a hooks job for parent block of latestCanonicalBlock pruning for first block is skipped as it is assumed to be a canonical block.
|
||||
const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock();
|
||||
await createHooksJob(this.jobQueue, latestCanonicalBlock.parentHash, latestCanonicalBlock.blockNumber - 1);
|
||||
|
||||
// Check if latestCanonicalBlock is undefined incase of null block in FEVM
|
||||
if (latestCanonicalBlock) {
|
||||
await createHooksJob(this.jobQueue, latestCanonicalBlock.parentHash);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
@ -145,16 +150,19 @@ export class JobRunner {
|
||||
}
|
||||
|
||||
async processHooks (job: any): Promise<void> {
|
||||
const { data: { blockHash, blockNumber } } = job;
|
||||
// Get the block and current stateSyncStatus.
|
||||
const [blockProgress, stateSyncStatus] = await Promise.all([
|
||||
this._indexer.getBlockProgress(job.data.blockHash),
|
||||
this._indexer.getStateSyncStatus()
|
||||
]);
|
||||
|
||||
// Get the current stateSyncStatus.
|
||||
const stateSyncStatus = await this._indexer.getStateSyncStatus();
|
||||
assert(blockProgress);
|
||||
const { blockHash, blockNumber, parentHash } = blockProgress;
|
||||
|
||||
if (stateSyncStatus) {
|
||||
if (stateSyncStatus.latestIndexedBlockNumber < (blockNumber - 1)) {
|
||||
// Create hooks job for parent block.
|
||||
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
|
||||
await createHooksJob(this.jobQueue, parentBlock.blockHash, parentBlock.blockNumber);
|
||||
await createHooksJob(this.jobQueue, parentHash);
|
||||
|
||||
const message = `State for blockNumber ${blockNumber - 1} not indexed yet, aborting`;
|
||||
log(message);
|
||||
@ -186,33 +194,34 @@ export class JobRunner {
|
||||
|
||||
// Get the current stateSyncStatus.
|
||||
const stateSyncStatus = await this._indexer.getStateSyncStatus();
|
||||
assert(stateSyncStatus);
|
||||
|
||||
if (stateSyncStatus.latestCheckpointBlockNumber >= 0) {
|
||||
if (stateSyncStatus.latestCheckpointBlockNumber < (blockNumber - 1)) {
|
||||
// Create a checkpoint job for parent block.
|
||||
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
|
||||
await createCheckpointJob(this.jobQueue, parentBlock.blockHash, parentBlock.blockNumber);
|
||||
if (stateSyncStatus) {
|
||||
if (stateSyncStatus.latestCheckpointBlockNumber >= 0) {
|
||||
if (stateSyncStatus.latestCheckpointBlockNumber < (blockNumber - 1)) {
|
||||
// Create a checkpoint job for parent block.
|
||||
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
|
||||
await createCheckpointJob(this.jobQueue, parentBlock.blockHash, parentBlock.blockNumber);
|
||||
|
||||
const message = `Checkpoints for blockNumber ${blockNumber - 1} not processed yet, aborting`;
|
||||
log(message);
|
||||
const message = `Checkpoints for blockNumber ${blockNumber - 1} not processed yet, aborting`;
|
||||
log(message);
|
||||
|
||||
throw new Error(message);
|
||||
throw new Error(message);
|
||||
}
|
||||
|
||||
if (stateSyncStatus.latestCheckpointBlockNumber > (blockNumber - 1)) {
|
||||
log(`Checkpoints for blockNumber ${blockNumber} already processed`);
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (stateSyncStatus.latestCheckpointBlockNumber > (blockNumber - 1)) {
|
||||
log(`Checkpoints for blockNumber ${blockNumber} already processed`);
|
||||
// Process checkpoints for the given block.
|
||||
await this._indexer.processCheckpoint(blockHash);
|
||||
|
||||
return;
|
||||
}
|
||||
// Update the stateSyncStatus.
|
||||
await this._indexer.updateStateSyncStatusCheckpointBlock(blockNumber);
|
||||
}
|
||||
|
||||
// Process checkpoints for the given block.
|
||||
await this._indexer.processCheckpoint(blockHash);
|
||||
|
||||
// Update the stateSyncStatus.
|
||||
await this._indexer.updateStateSyncStatusCheckpointBlock(blockNumber);
|
||||
|
||||
await this.jobQueue.markComplete(job);
|
||||
}
|
||||
|
||||
|
@ -91,7 +91,7 @@ export interface IndexerInterface {
|
||||
getStateSyncStatus (): Promise<StateSyncStatusInterface | undefined>
|
||||
getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise<any>
|
||||
getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]>
|
||||
getLatestCanonicalBlock (): Promise<BlockProgressInterface>
|
||||
getLatestCanonicalBlock (): Promise<BlockProgressInterface | undefined>
|
||||
getLatestStateIndexedBlock (): Promise<BlockProgressInterface>
|
||||
getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<EventInterface>>
|
||||
getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
|
||||
@ -102,7 +102,7 @@ export interface IndexerInterface {
|
||||
updateSyncStatusChainHead (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
|
||||
updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
|
||||
updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
|
||||
updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface>
|
||||
updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface | undefined>
|
||||
updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface>
|
||||
markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise<void>
|
||||
saveEventEntity (dbEvent: EventInterface): Promise<EventInterface>
|
||||
|
Loading…
Reference in New Issue
Block a user