mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-07-27 10:42:06 +00:00
Merge 0ae926224c
into 2217cd3ffb
This commit is contained in:
commit
39e3686648
@ -500,11 +500,11 @@ export class Indexer implements IndexerInterface {
|
|||||||
await this.triggerIndexingOnEvent(event, extraData);
|
await this.triggerIndexingOnEvent(event, extraData);
|
||||||
}
|
}
|
||||||
|
|
||||||
async processBlock (blockProgress: BlockProgress): Promise<void> {
|
async preEventsBlockProcessing (blockProgress: BlockProgress): Promise<void> {
|
||||||
console.time('time:indexer#processBlock-init_state');
|
console.time('time:indexer#preEventsBlockProcessing-init_state');
|
||||||
// Call a function to create initial state for contracts.
|
// Call a function to create initial state for contracts.
|
||||||
await this._baseIndexer.createInit(this, blockProgress.blockHash, blockProgress.blockNumber);
|
await this._baseIndexer.createInit(this, blockProgress.blockHash, blockProgress.blockNumber);
|
||||||
console.timeEnd('time:indexer#processBlock-init_state');
|
console.timeEnd('time:indexer#preEventsBlockProcessing-init_state');
|
||||||
{{#if (subgraphPath)}}
|
{{#if (subgraphPath)}}
|
||||||
|
|
||||||
this._graphWatcher.updateEntityCacheFrothyBlocks(blockProgress);
|
this._graphWatcher.updateEntityCacheFrothyBlocks(blockProgress);
|
||||||
@ -512,16 +512,16 @@ export class Indexer implements IndexerInterface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
{{#if (subgraphPath)}}
|
{{#if (subgraphPath)}}
|
||||||
async processBlockAfterEvents (blockHash: string, blockNumber: number, extraData: ExtraEventData): Promise<void> {
|
async postEventsBlockProcessing (blockHash: string, blockNumber: number, extraData: ExtraEventData): Promise<void> {
|
||||||
console.time('time:indexer#processBlockAfterEvents-mapping_code');
|
console.time('time:indexer#postEventsBlockProcessing-mapping_code');
|
||||||
// Call subgraph handler for block.
|
// Call subgraph handler for block.
|
||||||
await this._graphWatcher.handleBlock(blockHash, blockNumber, extraData);
|
await this._graphWatcher.handleBlock(blockHash, blockNumber, extraData);
|
||||||
console.timeEnd('time:indexer#processBlockAfterEvents-mapping_code');
|
console.timeEnd('time:indexer#postEventsBlockProcessing-mapping_code');
|
||||||
|
|
||||||
console.time('time:indexer#processBlockAfterEvents-dump_subgraph_state');
|
console.time('time:indexer#postEventsBlockProcessing-dump_subgraph_state');
|
||||||
// Persist subgraph state to the DB.
|
// Persist subgraph state to the DB.
|
||||||
await this.dumpSubgraphState(blockHash);
|
await this.dumpSubgraphState(blockHash);
|
||||||
console.timeEnd('time:indexer#processBlockAfterEvents-dump_subgraph_state');
|
console.timeEnd('time:indexer#postEventsBlockProcessing-dump_subgraph_state');
|
||||||
}
|
}
|
||||||
|
|
||||||
{{/if}}
|
{{/if}}
|
||||||
|
@ -260,7 +260,7 @@ export class Indexer implements IndexerInterface {
|
|||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
async processBlock (blockProgress: BlockProgressInterface): Promise<void> {
|
async preEventsBlockProcessing (blockProgress: BlockProgressInterface): Promise<void> {
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -176,8 +176,15 @@ export const processBatchEvents = async (
|
|||||||
subgraphEventsOrder: boolean;
|
subgraphEventsOrder: boolean;
|
||||||
}
|
}
|
||||||
): Promise<boolean> => {
|
): Promise<boolean> => {
|
||||||
let dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[];
|
let updatedDbEvents: EventInterface[];
|
||||||
let isNewContractWatched = false;
|
let isNewContractWatched = false;
|
||||||
|
let { block: dbBlock } = data;
|
||||||
|
|
||||||
|
// Perform any operations before processing events for this block
|
||||||
|
// (if this block hasn't already been processed)
|
||||||
|
if (!dbBlock.isComplete) {
|
||||||
|
await indexer.preEventsBlockProcessing(data.block);
|
||||||
|
}
|
||||||
|
|
||||||
if (subgraphEventsOrder) {
|
if (subgraphEventsOrder) {
|
||||||
({ dbBlock, updatedDbEvents, isNewContractWatched } = await _processEventsInSubgraphOrder(indexer, data, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
|
({ dbBlock, updatedDbEvents, isNewContractWatched } = await _processEventsInSubgraphOrder(indexer, data, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
|
||||||
@ -185,10 +192,9 @@ export const processBatchEvents = async (
|
|||||||
({ dbBlock, updatedDbEvents } = await _processEvents(indexer, data, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
|
({ dbBlock, updatedDbEvents } = await _processEvents(indexer, data, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (indexer.processBlockAfterEvents) {
|
// Perform any operations after processing events for this block
|
||||||
if (!dbBlock.isComplete) {
|
if (indexer.postEventsBlockProcessing && !dbBlock.isComplete) {
|
||||||
await indexer.processBlockAfterEvents(dbBlock.blockHash, dbBlock.blockNumber, data);
|
await indexer.postEventsBlockProcessing(dbBlock.blockHash, dbBlock.blockNumber, data);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dbBlock.isComplete = true;
|
dbBlock.isComplete = true;
|
||||||
@ -369,6 +375,13 @@ const _processEventsInSubgraphOrder = async (
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isNewContractWatched) {
|
||||||
|
// Create init states for newly watched contracts
|
||||||
|
// (needs to be done before we start processsing their events)
|
||||||
|
assert(indexer.createInit);
|
||||||
|
await indexer.createInit(block.blockHash, block.blockNumber);
|
||||||
|
}
|
||||||
|
|
||||||
console.time('time:common#processEventsInSubgraphOrder-processing_initially_unwatched_events');
|
console.time('time:common#processEventsInSubgraphOrder-processing_initially_unwatched_events');
|
||||||
// In the end process events of newly watched contracts
|
// In the end process events of newly watched contracts
|
||||||
for (const updatedDbEvent of updatedDbEvents) {
|
for (const updatedDbEvent of updatedDbEvents) {
|
||||||
|
@ -46,9 +46,6 @@ export const indexBlock = async (
|
|||||||
blockProgress = partialblockProgress as BlockProgressInterface;
|
blockProgress = partialblockProgress as BlockProgressInterface;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(indexer.processBlock);
|
|
||||||
await indexer.processBlock(blockProgress);
|
|
||||||
|
|
||||||
await processBatchEvents(
|
await processBatchEvents(
|
||||||
indexer,
|
indexer,
|
||||||
{
|
{
|
||||||
|
@ -394,6 +394,22 @@ export class Indexer {
|
|||||||
return blocks;
|
return blocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async getBlockWithFullBlock (blockFilter: { blockNumber?: number, blockHash?: string }): Promise<{ block: DeepPartial<BlockProgressInterface>, fullBlock: EthFullBlock }> {
|
||||||
|
const [fullBlock] = await this.getBlocks(blockFilter);
|
||||||
|
assert(fullBlock);
|
||||||
|
|
||||||
|
const block = {
|
||||||
|
...fullBlock,
|
||||||
|
blockTimestamp: Number(fullBlock.timestamp),
|
||||||
|
blockNumber: Number(fullBlock.blockNumber)
|
||||||
|
};
|
||||||
|
|
||||||
|
return {
|
||||||
|
block: block as DeepPartial<BlockProgressInterface>,
|
||||||
|
fullBlock
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
async getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined> {
|
async getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined> {
|
||||||
return this._db.getBlockProgress(blockHash);
|
return this._db.getBlockProgress(blockHash);
|
||||||
}
|
}
|
||||||
@ -458,6 +474,17 @@ export class Indexer {
|
|||||||
|
|
||||||
const { addresses, topics } = this._createLogsFilters(eventSignaturesMap);
|
const { addresses, topics } = this._createLogsFilters(eventSignaturesMap);
|
||||||
|
|
||||||
|
// Create a set of starting blocks of watched contracts in range [fromBlock, toBlock]
|
||||||
|
// TODO: Optimize (avoid template contracts)
|
||||||
|
const watchedBlockNumbers = Object.keys(this._watchedContracts).reduce((acc: Set<number>, address: string) => {
|
||||||
|
const startBlock = this._watchedContracts[address].startingBlock;
|
||||||
|
if (startBlock >= fromBlock && startBlock <= toBlock) {
|
||||||
|
acc.add(startBlock);
|
||||||
|
}
|
||||||
|
|
||||||
|
return acc;
|
||||||
|
}, new Set());
|
||||||
|
|
||||||
const { logs } = await this._ethClient.getLogsForBlockRange({
|
const { logs } = await this._ethClient.getLogsForBlockRange({
|
||||||
fromBlock,
|
fromBlock,
|
||||||
toBlock,
|
toBlock,
|
||||||
@ -474,19 +501,13 @@ export class Indexer {
|
|||||||
// Fetch blocks with transactions for the logs returned
|
// Fetch blocks with transactions for the logs returned
|
||||||
console.time(`time:indexer#fetchAndSaveFilteredEventsAndBlocks-fetch-blocks-txs-${fromBlock}-${toBlock}`);
|
console.time(`time:indexer#fetchAndSaveFilteredEventsAndBlocks-fetch-blocks-txs-${fromBlock}-${toBlock}`);
|
||||||
const blocksPromises = Array.from(blockLogsMap.keys()).map(async (blockHash) => {
|
const blocksPromises = Array.from(blockLogsMap.keys()).map(async (blockHash) => {
|
||||||
const [fullBlock] = await this._ethClient.getFullBlocks({ blockHash });
|
const { block, fullBlock } = await this.getBlockWithFullBlock({ blockHash });
|
||||||
assert(fullBlock);
|
|
||||||
|
|
||||||
const block = {
|
// Remove this block from watchedBlockNumbers set as it's already fetched
|
||||||
...fullBlock,
|
assert(block.blockNumber);
|
||||||
blockTimestamp: Number(fullBlock.timestamp),
|
watchedBlockNumbers.delete(block.blockNumber);
|
||||||
blockNumber: Number(fullBlock.blockNumber)
|
|
||||||
};
|
|
||||||
|
|
||||||
return {
|
return { block, fullBlock };
|
||||||
block: block as DeepPartial<BlockProgressInterface>,
|
|
||||||
fullBlock
|
|
||||||
};
|
|
||||||
});
|
});
|
||||||
|
|
||||||
const ethFullTxPromises = txHashes.map(async txHash => {
|
const ethFullTxPromises = txHashes.map(async txHash => {
|
||||||
@ -496,6 +517,19 @@ export class Indexer {
|
|||||||
const blocks = await Promise.all(blocksPromises);
|
const blocks = await Promise.all(blocksPromises);
|
||||||
const ethFullTxs = await Promise.all(ethFullTxPromises);
|
const ethFullTxs = await Promise.all(ethFullTxPromises);
|
||||||
|
|
||||||
|
// Fetch starting blocks for watched contracts
|
||||||
|
const watchedBlocks = await Promise.all(
|
||||||
|
Array.from(watchedBlockNumbers).map(async (blockNumber) => this.getBlockWithFullBlock({ blockNumber }))
|
||||||
|
);
|
||||||
|
|
||||||
|
// Merge and sort the two block lists
|
||||||
|
const sortedBlocks = [...blocks, ...watchedBlocks].sort((b1, b2) => {
|
||||||
|
assert(b1.block.blockNumber);
|
||||||
|
assert(b2.block.blockNumber);
|
||||||
|
|
||||||
|
return b1.block.blockNumber - b2.block.blockNumber;
|
||||||
|
});
|
||||||
|
|
||||||
const ethFullTxsMap = ethFullTxs.reduce((acc: Map<string, EthFullTransaction>, ethFullTx) => {
|
const ethFullTxsMap = ethFullTxs.reduce((acc: Map<string, EthFullTransaction>, ethFullTx) => {
|
||||||
acc.set(ethFullTx.ethTransactionCidByTxHash.txHash, ethFullTx);
|
acc.set(ethFullTx.ethTransactionCidByTxHash.txHash, ethFullTx);
|
||||||
return acc;
|
return acc;
|
||||||
@ -505,7 +539,7 @@ export class Indexer {
|
|||||||
|
|
||||||
// Map db ready events according to blockhash
|
// Map db ready events according to blockhash
|
||||||
console.time(`time:indexer#fetchAndSaveFilteredEventsAndBlocks-db-save-blocks-events-${fromBlock}-${toBlock}`);
|
console.time(`time:indexer#fetchAndSaveFilteredEventsAndBlocks-db-save-blocks-events-${fromBlock}-${toBlock}`);
|
||||||
const blockWithDbEventsPromises = blocks.map(async ({ block, fullBlock }) => {
|
const blockWithDbEventsPromises = sortedBlocks.map(async ({ block, fullBlock }) => {
|
||||||
const blockHash = block.blockHash;
|
const blockHash = block.blockHash;
|
||||||
assert(blockHash);
|
assert(blockHash);
|
||||||
const logs = blockLogsMap.get(blockHash) || [];
|
const logs = blockLogsMap.get(blockHash) || [];
|
||||||
@ -528,7 +562,6 @@ export class Indexer {
|
|||||||
blockProgress,
|
blockProgress,
|
||||||
ethFullBlock: fullBlock,
|
ethFullBlock: fullBlock,
|
||||||
ethFullTransactions: blockEthFullTxs,
|
ethFullTransactions: blockEthFullTxs,
|
||||||
block,
|
|
||||||
events: []
|
events: []
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
|
@ -681,10 +681,6 @@ export class JobRunner {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!blockProgress.isComplete) {
|
|
||||||
await this._indexer.processBlock(blockProgress);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Push job to event processing queue.
|
// Push job to event processing queue.
|
||||||
// Block with all events processed or no events will not be processed again due to check in _processEvents.
|
// Block with all events processed or no events will not be processed again due to check in _processEvents.
|
||||||
await this._pushEventProcessingJobsForBlocks([blockProgress], true);
|
await this._pushEventProcessingJobsForBlocks([blockProgress], true);
|
||||||
|
@ -213,8 +213,8 @@ export interface IndexerInterface {
|
|||||||
getRelationsMap?: () => Map<any, { [key: string]: any }>
|
getRelationsMap?: () => Map<any, { [key: string]: any }>
|
||||||
processInitialState: (contractAddress: string, blockHash: string) => Promise<any>
|
processInitialState: (contractAddress: string, blockHash: string) => Promise<any>
|
||||||
processStateCheckpoint: (contractAddress: string, blockHash: string) => Promise<boolean>
|
processStateCheckpoint: (contractAddress: string, blockHash: string) => Promise<boolean>
|
||||||
processBlock: (blockProgres: BlockProgressInterface) => Promise<void>
|
preEventsBlockProcessing: (blockProgres: BlockProgressInterface) => Promise<void>
|
||||||
processBlockAfterEvents?: (blockHash: string, blockNumber: number, data: ExtraEventData) => Promise<void>
|
postEventsBlockProcessing?: (blockHash: string, blockNumber: number, data: ExtraEventData) => Promise<void>
|
||||||
processCanonicalBlock (blockHash: string, blockNumber: number): Promise<void>
|
processCanonicalBlock (blockHash: string, blockNumber: number): Promise<void>
|
||||||
processCheckpoint (blockHash: string): Promise<void>
|
processCheckpoint (blockHash: string): Promise<void>
|
||||||
processCLICheckpoint (contractAddress: string, blockHash?: string): Promise<string | undefined>
|
processCLICheckpoint (contractAddress: string, blockHash?: string): Promise<string | undefined>
|
||||||
|
Loading…
Reference in New Issue
Block a user