mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-02-08 11:02:52 +00:00
Implement fetch and save of filtered event logs and required blocks (#445)
* Fix async block size caching for missing blocks in historical processing * Start historical block processing only if filter logs is set to true * Fetch filtered logs by topics and save required blocks * Fix realtime processing start block after historical processing * Avoid publishing events and blocks in historical processing * Add new method to graph-node test indexer * Get full block data for subgraph block handler only if configured * Add useBlockRanges flag for switching between historical and realtime processing
This commit is contained in:
parent
1b6ca6edeb
commit
9fb51e89f6
@ -128,12 +128,13 @@ export class BaseCmd {
|
||||
|
||||
async initEventWatcher (): Promise<void> {
|
||||
assert(this._clients?.ethClient);
|
||||
assert(this._config);
|
||||
assert(this._indexer);
|
||||
assert(this._jobQueue);
|
||||
|
||||
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
|
||||
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
|
||||
const pubsub = new PubSub();
|
||||
this._eventWatcher = new EventWatcher(this._clients.ethClient, this._indexer, pubsub, this._jobQueue);
|
||||
this._eventWatcher = new EventWatcher(this._config.server, this._clients.ethClient, this._indexer, pubsub, this._jobQueue);
|
||||
}
|
||||
}
|
||||
|
@ -29,6 +29,11 @@
|
||||
# Boolean to filter logs by topics.
|
||||
filterLogsByTopics = false
|
||||
|
||||
# Boolean to switch between modes of processing events when starting the server.
|
||||
# Setting to true will fetch filtered events and required blocks in a range of blocks and then process them
|
||||
# Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head)
|
||||
useBlockRanges = true
|
||||
|
||||
# Max block range for which to return events in eventsInRange GQL query.
|
||||
# Use -1 for skipping check on block range.
|
||||
maxEventsBlockRange = 1000
|
||||
|
@ -277,12 +277,6 @@ export class Database implements DatabaseInterface {
|
||||
return this._baseDatabase.getBlocksAtHeight(repo, height, isPruned);
|
||||
}
|
||||
|
||||
async getLatestProcessedBlockProgress (isPruned: boolean): Promise<BlockProgress | undefined> {
|
||||
const repo = this._conn.getRepository(BlockProgress);
|
||||
|
||||
return this._baseDatabase.getLatestProcessedBlockProgress(repo, isPruned);
|
||||
}
|
||||
|
||||
async markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgress[]): Promise<void> {
|
||||
const repo = queryRunner.manager.getRepository(BlockProgress);
|
||||
|
||||
|
@ -657,10 +657,6 @@ export class Indexer implements IndexerInterface {
|
||||
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
|
||||
}
|
||||
|
||||
async getLatestProcessedBlockProgress (isPruned: boolean): Promise<BlockProgress | undefined> {
|
||||
return this._db.getLatestProcessedBlockProgress(isPruned);
|
||||
}
|
||||
|
||||
async fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgress>[]): Promise<{ blockProgress: BlockProgress, events: DeepPartial<Event>[] }[]> {
|
||||
return this._baseIndexer.fetchEventsAndSaveBlocks(blocks, this._eventSignaturesMap, this.parseEventNameAndArgs.bind(this));
|
||||
}
|
||||
|
@ -215,21 +215,13 @@ export class GraphWatcher {
|
||||
}
|
||||
|
||||
async handleBlock (blockHash: string, blockNumber: number) {
|
||||
// Check if block data is already fetched in handleEvent method for the same block.
|
||||
if (!this._context.block || this._context.block.blockHash !== blockHash) {
|
||||
this._context.block = await getFullBlock(this._ethClient, this._ethProvider, blockHash, blockNumber);
|
||||
}
|
||||
|
||||
const blockData = this._context.block;
|
||||
assert(blockData);
|
||||
|
||||
// Clear transactions map on handling new block.
|
||||
this._transactionsMap.clear();
|
||||
|
||||
// Call block handler(s) for each contract.
|
||||
for (const dataSource of this._dataSources) {
|
||||
// Reinstantiate WASM after every N blocks.
|
||||
if (Number(blockData.blockNumber) % this._wasmRestartBlocksInterval === 0) {
|
||||
if (Number(blockNumber) % this._wasmRestartBlocksInterval === 0) {
|
||||
// The WASM instance allocates memory as required and the limit is 4GB.
|
||||
// https://stackoverflow.com/a/40453962
|
||||
// https://github.com/AssemblyScript/assemblyscript/pull/1268#issue-618411291
|
||||
@ -242,6 +234,14 @@ export class GraphWatcher {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if block data is already fetched in handleEvent method for the same block.
|
||||
if (!this._context.block || this._context.block.blockHash !== blockHash) {
|
||||
this._context.block = await getFullBlock(this._ethClient, this._ethProvider, blockHash, blockNumber);
|
||||
}
|
||||
|
||||
const blockData = this._context.block;
|
||||
assert(blockData);
|
||||
|
||||
const { instance } = this._dataSourceMap[dataSource.name];
|
||||
assert(instance);
|
||||
const { exports: instanceExports } = instance;
|
||||
|
@ -93,12 +93,6 @@ export class Indexer implements IndexerInterface {
|
||||
return [];
|
||||
}
|
||||
|
||||
async getLatestProcessedBlockProgress (isPruned: boolean): Promise<BlockProgressInterface | undefined> {
|
||||
assert(isPruned);
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
async getBlockEvents (blockHash: string): Promise<Array<EventInterface>> {
|
||||
assert(blockHash);
|
||||
|
||||
@ -118,6 +112,13 @@ export class Indexer implements IndexerInterface {
|
||||
return [];
|
||||
}
|
||||
|
||||
async fetchAndSaveFilteredEventsAndBlocks (startBlock: number, endBlock: number): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]> {
|
||||
assert(startBlock);
|
||||
assert(endBlock);
|
||||
|
||||
return [];
|
||||
}
|
||||
|
||||
async saveBlockAndFetchEvents (block: BlockProgressInterface): Promise<[BlockProgressInterface, DeepPartial<EventInterface>[]]> {
|
||||
return [block, []];
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ const cacheBlockSizesAsync = async (provider: providers.JsonRpcProvider, blockNu
|
||||
}
|
||||
|
||||
if (endBlockHeight > blockSizeMapLatestHeight) {
|
||||
const startBlockHeight = blockSizeMapLatestHeight + 1;
|
||||
const startBlockHeight = Math.max(blockNumber, blockSizeMapLatestHeight + 1);
|
||||
blockSizeMapLatestHeight = endBlockHeight;
|
||||
|
||||
// Start prefetching blocks after latest height in blockSizeMap.
|
||||
|
@ -138,6 +138,32 @@ export const fetchBlocksAtHeight = async (
|
||||
return blocksToBeIndexed;
|
||||
};
|
||||
|
||||
/**
|
||||
* Method to fetch and save filtered logs and blocks in a given range.
|
||||
* @param indexer
|
||||
* @param blockAndEventsMap
|
||||
* @param startBlock
|
||||
* @param endBlock
|
||||
*/
|
||||
export const fetchAndSaveFilteredLogsAndBlocks = async (
|
||||
indexer: IndexerInterface,
|
||||
blockAndEventsMap: Map<string, PrefetchedBlock>,
|
||||
startBlock: number,
|
||||
endBlock: number
|
||||
): Promise<BlockProgressInterface[]> => {
|
||||
// Fetch filtered logs and required blocks
|
||||
console.time('time:common#fetchAndSaveFilteredLogsAndBlocks-fetchAndSaveFilteredEventsAndBlocks');
|
||||
const blocksWithEvents = await indexer.fetchAndSaveFilteredEventsAndBlocks(startBlock, endBlock);
|
||||
console.timeEnd('time:common#fetchAndSaveFilteredLogsAndBlocks-fetchAndSaveFilteredEventsAndBlocks');
|
||||
|
||||
// Set blocks with events in blockAndEventsMap cache
|
||||
blocksWithEvents.forEach(({ blockProgress, events }) => {
|
||||
blockAndEventsMap.set(blockProgress.blockHash, { block: blockProgress, events });
|
||||
});
|
||||
|
||||
return blocksWithEvents.map(({ blockProgress }) => blockProgress);
|
||||
};
|
||||
|
||||
export const _prefetchBlocks = async (
|
||||
blockNumber: number,
|
||||
indexer: IndexerInterface,
|
||||
@ -182,9 +208,6 @@ export const _fetchBatchBlocks = async (
|
||||
while (true) {
|
||||
console.time('time:common#fetchBatchBlocks-getBlocks');
|
||||
|
||||
// TODO: Fetch logs by filter before fetching blocks
|
||||
// TODO: Fetch only blocks needed for returned logs
|
||||
// TODO: Save blocks and logs to DB
|
||||
const blockPromises = blockNumbers.map(async blockNumber => indexer.getBlocks({ blockNumber }));
|
||||
const settledResults = await Promise.allSettled(blockPromises);
|
||||
|
||||
|
@ -205,6 +205,11 @@ export interface ServerConfig {
|
||||
maxEventsBlockRange: number;
|
||||
clearEntitiesCacheInterval: number;
|
||||
|
||||
// Boolean to switch between modes of processing events when starting the server.
|
||||
// Setting to true will fetch filtered events and required blocks in a range of blocks and then process them
|
||||
// Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head)
|
||||
useBlockRanges: boolean;
|
||||
|
||||
// Boolean to skip updating entity fields required in state creation and not required in the frontend.
|
||||
skipStateFieldsUpdate: boolean;
|
||||
|
||||
|
@ -205,14 +205,6 @@ export class Database {
|
||||
.getMany();
|
||||
}
|
||||
|
||||
async getLatestProcessedBlockProgress (repo: Repository<BlockProgressInterface>, isPruned: boolean): Promise<BlockProgressInterface | undefined> {
|
||||
return repo.createQueryBuilder('block_progress')
|
||||
.where('is_pruned = :isPruned AND is_complete = :isComplete', { isPruned, isComplete: true })
|
||||
.orderBy('block_number', 'DESC')
|
||||
.limit(1)
|
||||
.getOne();
|
||||
}
|
||||
|
||||
async saveBlockProgress (repo: Repository<BlockProgressInterface>, block: DeepPartial<BlockProgressInterface>): Promise<BlockProgressInterface> {
|
||||
blockProgressCount.inc(1);
|
||||
|
||||
|
@ -14,6 +14,7 @@ import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME, JO
|
||||
import { createPruningJob, processBlockByNumber } from './common';
|
||||
import { OrderDirection } from './database';
|
||||
import { HISTORICAL_BLOCKS_BATCH_SIZE, HistoricalJobData } from './job-runner';
|
||||
import { ServerConfig } from './config';
|
||||
|
||||
const EVENT = 'event';
|
||||
|
||||
@ -22,6 +23,7 @@ const log = debug('vulcanize:events');
|
||||
export const BlockProgressEvent = 'block-progress-event';
|
||||
|
||||
export class EventWatcher {
|
||||
_serverConfig: ServerConfig;
|
||||
_ethClient: EthClient;
|
||||
_indexer: IndexerInterface;
|
||||
_pubsub: PubSub;
|
||||
@ -31,7 +33,8 @@ export class EventWatcher {
|
||||
_signalCount = 0;
|
||||
_historicalProcessingEndBlockNumber = 0;
|
||||
|
||||
constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
|
||||
constructor (serverConfig: ServerConfig, ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
|
||||
this._serverConfig = serverConfig;
|
||||
this._ethClient = ethClient;
|
||||
this._indexer = indexer;
|
||||
this._pubsub = pubsub;
|
||||
@ -88,8 +91,9 @@ export class EventWatcher {
|
||||
startBlockNumber = syncStatus.chainHeadBlockNumber + 1;
|
||||
}
|
||||
|
||||
// Check if filter for logs is enabled
|
||||
// Check if starting block for watcher is before latest canonical block
|
||||
if (startBlockNumber < latestCanonicalBlockNumber) {
|
||||
if (this._serverConfig.useBlockRanges && startBlockNumber < latestCanonicalBlockNumber) {
|
||||
await this.startHistoricalBlockProcessing(startBlockNumber, latestCanonicalBlockNumber);
|
||||
|
||||
return;
|
||||
@ -106,7 +110,8 @@ export class EventWatcher {
|
||||
await this._jobQueue.pushJob(
|
||||
QUEUE_HISTORICAL_PROCESSING,
|
||||
{
|
||||
blockNumber: startBlockNumber
|
||||
blockNumber: startBlockNumber,
|
||||
processingEndBlockNumber: this._historicalProcessingEndBlockNumber
|
||||
}
|
||||
);
|
||||
}
|
||||
@ -179,7 +184,7 @@ export class EventWatcher {
|
||||
|
||||
async historicalProcessingCompleteHandler (job: PgBoss.Job<any>): Promise<void> {
|
||||
const { id, data: { failed, request: { data } } } = job;
|
||||
const { blockNumber }: HistoricalJobData = data;
|
||||
const { blockNumber, processingEndBlockNumber }: HistoricalJobData = data;
|
||||
|
||||
if (failed) {
|
||||
log(`Job ${id} for queue ${QUEUE_HISTORICAL_PROCESSING} failed`);
|
||||
@ -187,41 +192,17 @@ export class EventWatcher {
|
||||
}
|
||||
|
||||
// TODO: Get batch size from config
|
||||
const nextBatchStartBlockNumber = blockNumber + HISTORICAL_BLOCKS_BATCH_SIZE + 1;
|
||||
log(`Historical block processing completed for block range: ${blockNumber} to ${nextBatchStartBlockNumber}`);
|
||||
const batchEndBlockNumber = Math.min(blockNumber + HISTORICAL_BLOCKS_BATCH_SIZE, processingEndBlockNumber);
|
||||
const nextBatchStartBlockNumber = batchEndBlockNumber + 1;
|
||||
log(`Historical block processing completed for block range: ${blockNumber} to ${batchEndBlockNumber}`);
|
||||
|
||||
// Check if historical processing endBlock / latest canonical block is reached
|
||||
if (nextBatchStartBlockNumber > this._historicalProcessingEndBlockNumber) {
|
||||
let newSyncStatusBlock: {
|
||||
blockNumber: number;
|
||||
blockHash: string;
|
||||
} | undefined;
|
||||
|
||||
// Fetch latest processed block from DB
|
||||
const latestProcessedBlock = await this._indexer.getLatestProcessedBlockProgress(false);
|
||||
|
||||
if (latestProcessedBlock) {
|
||||
if (latestProcessedBlock.blockNumber > this._historicalProcessingEndBlockNumber) {
|
||||
// Set new sync status to latest processed block
|
||||
newSyncStatusBlock = {
|
||||
blockHash: latestProcessedBlock.blockHash,
|
||||
blockNumber: latestProcessedBlock.blockNumber
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if (!newSyncStatusBlock) {
|
||||
const [block] = await this._indexer.getBlocks({ blockNumber: this._historicalProcessingEndBlockNumber });
|
||||
|
||||
newSyncStatusBlock = {
|
||||
// At latestCanonicalBlockNumber height null block might be returned in case of FEVM
|
||||
blockHash: block ? block.blockHash : constants.AddressZero,
|
||||
blockNumber: this._historicalProcessingEndBlockNumber
|
||||
};
|
||||
}
|
||||
const [block] = await this._indexer.getBlocks({ blockNumber: this._historicalProcessingEndBlockNumber });
|
||||
const historicalProcessingEndBlockHash = block ? block.blockHash : constants.AddressZero;
|
||||
|
||||
// Update sync status to max of latest processed block or latest canonical block
|
||||
const syncStatus = await this._indexer.forceUpdateSyncStatus(newSyncStatusBlock.blockHash, newSyncStatusBlock.blockNumber);
|
||||
const syncStatus = await this._indexer.forceUpdateSyncStatus(historicalProcessingEndBlockHash, this._historicalProcessingEndBlockNumber);
|
||||
log(`Sync status canonical block updated to ${syncStatus.latestCanonicalBlockNumber}`);
|
||||
// Start realtime processing
|
||||
this.startBlockProcessing();
|
||||
@ -233,7 +214,8 @@ export class EventWatcher {
|
||||
await this._jobQueue.pushJob(
|
||||
QUEUE_HISTORICAL_PROCESSING,
|
||||
{
|
||||
blockNumber: nextBatchStartBlockNumber
|
||||
blockNumber: nextBatchStartBlockNumber,
|
||||
processingEndBlockNumber
|
||||
}
|
||||
);
|
||||
}
|
||||
@ -246,47 +228,52 @@ export class EventWatcher {
|
||||
return;
|
||||
}
|
||||
|
||||
const { data: { kind, blockHash } } = request;
|
||||
const { data: { kind, blockHash, publish } } = request;
|
||||
|
||||
// Ignore jobs other than JOB_KIND_EVENTS
|
||||
if (kind !== JOB_KIND_EVENTS) {
|
||||
return;
|
||||
}
|
||||
|
||||
assert(blockHash);
|
||||
// Check if publish is set to true
|
||||
// Events and blocks are not published in historical processing
|
||||
// GQL subscription events will not be triggered if publish is set to false
|
||||
if (publish) {
|
||||
assert(blockHash);
|
||||
|
||||
const blockProgress = await this._indexer.getBlockProgress(blockHash);
|
||||
assert(blockProgress);
|
||||
const blockProgress = await this._indexer.getBlockProgress(blockHash);
|
||||
assert(blockProgress);
|
||||
|
||||
await this.publishBlockProgressToSubscribers(blockProgress);
|
||||
await this.publishBlockProgressToSubscribers(blockProgress);
|
||||
|
||||
const dbEvents = await this._indexer.getBlockEvents(
|
||||
blockProgress.blockHash,
|
||||
{
|
||||
eventName: [
|
||||
{ value: UNKNOWN_EVENT_NAME, not: true, operator: 'equals' }
|
||||
]
|
||||
},
|
||||
{
|
||||
orderBy: 'index',
|
||||
orderDirection: OrderDirection.asc
|
||||
}
|
||||
);
|
||||
const dbEvents = await this._indexer.getBlockEvents(
|
||||
blockProgress.blockHash,
|
||||
{
|
||||
eventName: [
|
||||
{ value: UNKNOWN_EVENT_NAME, not: true, operator: 'equals' }
|
||||
]
|
||||
},
|
||||
{
|
||||
orderBy: 'index',
|
||||
orderDirection: OrderDirection.asc
|
||||
}
|
||||
);
|
||||
|
||||
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
|
||||
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
|
||||
|
||||
// Cannot publish individual event as they are processed together in a single job.
|
||||
// TODO: Use a different pubsub to publish event from job-runner.
|
||||
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
|
||||
for (const dbEvent of dbEvents) {
|
||||
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);
|
||||
// Cannot publish individual event as they are processed together in a single job.
|
||||
// TODO: Use a different pubsub to publish event from job-runner.
|
||||
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
|
||||
for (const dbEvent of dbEvents) {
|
||||
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);
|
||||
|
||||
if (!failed && state === 'completed' && request.data.publish) {
|
||||
// Check for max acceptable lag time between request and sending results to live subscribers.
|
||||
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
|
||||
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
|
||||
} else {
|
||||
log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
|
||||
if (!failed && state === 'completed') {
|
||||
// Check for max acceptable lag time between request and sending results to live subscribers.
|
||||
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
|
||||
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
|
||||
} else {
|
||||
log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -334,17 +334,7 @@ export class Indexer {
|
||||
}
|
||||
|
||||
// Sort logs according to blockhash
|
||||
const logsMap: Map<string, any> = new Map();
|
||||
logs.forEach((log: any) => {
|
||||
const { blockHash: logBlockHash } = log;
|
||||
assert(typeof logBlockHash === 'string');
|
||||
|
||||
if (!logsMap.has(logBlockHash)) {
|
||||
logsMap.set(logBlockHash, []);
|
||||
}
|
||||
|
||||
logsMap.get(logBlockHash).push(log);
|
||||
});
|
||||
const blockLogsMap = this._reduceLogsToBlockLogsMap(logs);
|
||||
|
||||
// Fetch transactions for given blocks
|
||||
const transactionsMap: Map<string, any> = new Map();
|
||||
@ -352,7 +342,7 @@ export class Indexer {
|
||||
assert(block.blockHash);
|
||||
|
||||
// Skip fetching txs if no relevant logs found in this block
|
||||
if (!logsMap.has(block.blockHash)) {
|
||||
if (!blockLogsMap.has(block.blockHash)) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -380,7 +370,7 @@ export class Indexer {
|
||||
const blockHash = block.blockHash;
|
||||
assert(blockHash);
|
||||
|
||||
const logs = logsMap.get(blockHash) || [];
|
||||
const logs = blockLogsMap.get(blockHash) || [];
|
||||
const transactions = transactionsMap.get(blockHash) || [];
|
||||
|
||||
const dbEvents = this.createDbEventsFromLogsAndTxs(blockHash, logs, transactions, parseEventNameAndArgs);
|
||||
@ -390,6 +380,88 @@ export class Indexer {
|
||||
return dbEventsMap;
|
||||
}
|
||||
|
||||
async fetchAndSaveFilteredEventsAndBlocks (
|
||||
fromBlock: number,
|
||||
toBlock: number,
|
||||
eventSignaturesMap: Map<string, string[]>,
|
||||
parseEventNameAndArgs: (
|
||||
kind: string,
|
||||
logObj: { topics: string[]; data: string }
|
||||
) => { eventName: string; eventInfo: {[key: string]: any}; eventSignature: string }
|
||||
): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]> {
|
||||
assert(this._ethClient.getLogsForBlockRange, 'getLogsForBlockRange() not implemented in ethClient');
|
||||
|
||||
const { addresses, topics } = this._createLogsFilters(eventSignaturesMap);
|
||||
|
||||
const { logs } = await this._ethClient.getLogsForBlockRange({
|
||||
fromBlock,
|
||||
toBlock,
|
||||
addresses,
|
||||
topics
|
||||
});
|
||||
|
||||
const blockLogsMap = this._reduceLogsToBlockLogsMap(logs);
|
||||
|
||||
// Fetch blocks with transactions for the logs returned
|
||||
console.time(`time:indexer#fetchAndSaveFilteredEventsAndBlocks-fetch-blocks-txs-${fromBlock}-${toBlock}`);
|
||||
const blocksWithTxPromises = Array.from(blockLogsMap.keys()).map(async (blockHash) => {
|
||||
const result = await this._ethClient.getBlockWithTransactions({ blockHash });
|
||||
|
||||
const {
|
||||
allEthHeaderCids: {
|
||||
nodes: [
|
||||
{
|
||||
ethTransactionCidsByHeaderId: {
|
||||
nodes: transactions
|
||||
},
|
||||
...block
|
||||
}
|
||||
]
|
||||
}
|
||||
} = result;
|
||||
|
||||
block.blockTimestamp = Number(block.timestamp);
|
||||
block.blockNumber = Number(block.blockNumber);
|
||||
|
||||
return { block, transactions } as { block: DeepPartial<BlockProgressInterface>; transactions: any[] };
|
||||
});
|
||||
|
||||
const blockWithTxs = await Promise.all(blocksWithTxPromises);
|
||||
console.timeEnd(`time:indexer#fetchAndSaveFilteredEventsAndBlocks-fetch-blocks-txs-${fromBlock}-${toBlock}`);
|
||||
|
||||
// Map db ready events according to blockhash
|
||||
console.time(`time:indexer#fetchAndSaveFilteredEventsAndBlocks-db-save-blocks-events-${fromBlock}-${toBlock}`);
|
||||
const blockWithDbEventsPromises = blockWithTxs.map(async ({ block, transactions }) => {
|
||||
const blockHash = block.blockHash;
|
||||
assert(blockHash);
|
||||
const logs = blockLogsMap.get(blockHash) || [];
|
||||
|
||||
const events = this.createDbEventsFromLogsAndTxs(blockHash, logs, transactions, parseEventNameAndArgs);
|
||||
const [blockProgress] = await this.saveBlockWithEvents(block, events);
|
||||
|
||||
return { blockProgress, events: [] };
|
||||
});
|
||||
|
||||
const blocksWithDbEvents = await Promise.all(blockWithDbEventsPromises);
|
||||
console.timeEnd(`time:indexer#fetchAndSaveFilteredEventsAndBlocks-db-save-blocks-events-${fromBlock}-${toBlock}`);
|
||||
|
||||
return blocksWithDbEvents;
|
||||
}
|
||||
|
||||
_reduceLogsToBlockLogsMap (logs: any[]): Map<string, any> {
|
||||
return logs.reduce((acc: Map<string, any>, log: any) => {
|
||||
const { blockHash: logBlockHash } = log;
|
||||
assert(typeof logBlockHash === 'string');
|
||||
|
||||
if (!acc.has(logBlockHash)) {
|
||||
acc.set(logBlockHash, []);
|
||||
}
|
||||
|
||||
acc.get(logBlockHash).push(log);
|
||||
return acc;
|
||||
}, new Map());
|
||||
}
|
||||
|
||||
// Fetch events (to be saved to db) for a particular block
|
||||
async fetchEvents (blockHash: string, blockNumber: number, eventSignaturesMap: Map<string, string[]>, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<DeepPartial<EventInterface>[]> {
|
||||
const { addresses, topics } = this._createLogsFilters(eventSignaturesMap);
|
||||
@ -1224,8 +1296,8 @@ export class Indexer {
|
||||
return { addresses, topics: eventSignatures && [eventSignatures] };
|
||||
}
|
||||
|
||||
parseEvent (logDescription: ethers.utils.LogDescription): { eventName: string, eventInfo: any, eventSignature: string } {
|
||||
const eventInfo = logDescription.eventFragment.inputs.reduce((acc: any, input, index) => {
|
||||
parseEvent (logDescription: ethers.utils.LogDescription): { eventName: string, eventInfo: {[key: string]: any}, eventSignature: string } {
|
||||
const eventInfo = logDescription.eventFragment.inputs.reduce((acc: {[key: string]: any}, input, index) => {
|
||||
acc[input.name] = this._parseLogArg(input, logDescription.args[index]);
|
||||
|
||||
return acc;
|
||||
|
@ -30,7 +30,8 @@ import {
|
||||
createCheckpointJob,
|
||||
processBatchEvents,
|
||||
PrefetchedBlock,
|
||||
fetchBlocksAtHeight
|
||||
fetchBlocksAtHeight,
|
||||
fetchAndSaveFilteredLogsAndBlocks
|
||||
} from './common';
|
||||
import { lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber } from './metrics';
|
||||
|
||||
@ -44,6 +45,7 @@ export const HISTORICAL_BLOCKS_BATCH_SIZE = 100;
|
||||
|
||||
export interface HistoricalJobData {
|
||||
blockNumber: number;
|
||||
processingEndBlockNumber: number;
|
||||
}
|
||||
|
||||
export class JobRunner {
|
||||
@ -52,6 +54,7 @@ export class JobRunner {
|
||||
_jobQueueConfig: JobQueueConfig;
|
||||
_blockProcessStartTime?: Date;
|
||||
_endBlockProcessTimer?: () => void;
|
||||
// TODO: Check and remove events (always set to empty list as fetched from DB) from map structure
|
||||
_blockAndEventsMap: Map<string, PrefetchedBlock> = new Map();
|
||||
|
||||
_shutDown = false;
|
||||
@ -148,12 +151,16 @@ export class JobRunner {
|
||||
}
|
||||
|
||||
async processHistoricalBlocks (job: PgBoss.JobWithDoneCallback<HistoricalJobData, HistoricalJobData>): Promise<void> {
|
||||
const { data: { blockNumber: startBlock } } = job;
|
||||
const endBlock = startBlock + HISTORICAL_BLOCKS_BATCH_SIZE;
|
||||
|
||||
const { data: { blockNumber: startBlock, processingEndBlockNumber } } = job;
|
||||
const endBlock = Math.min(startBlock + HISTORICAL_BLOCKS_BATCH_SIZE, processingEndBlockNumber);
|
||||
log(`Processing historical blocks from ${startBlock} to ${endBlock}`);
|
||||
// TODO: Use method from common.ts to fetch and save filtered logs and blocks
|
||||
const blocks: BlockProgressInterface[] = [];
|
||||
|
||||
const blocks = await fetchAndSaveFilteredLogsAndBlocks(
|
||||
this._indexer,
|
||||
this._blockAndEventsMap,
|
||||
startBlock,
|
||||
endBlock
|
||||
);
|
||||
|
||||
// Push event processing job for each block
|
||||
const pushJobForBlockPromises = blocks.map(async block => this.jobQueue.pushJob(
|
||||
@ -161,7 +168,9 @@ export class JobRunner {
|
||||
{
|
||||
kind: JOB_KIND_EVENTS,
|
||||
blockHash: block.blockHash,
|
||||
publish: true
|
||||
// Avoid publishing GQL subscription event in historical processing
|
||||
// Publishing when realtime processing is listening to events will cause problems
|
||||
publish: false
|
||||
}
|
||||
));
|
||||
|
||||
@ -543,11 +552,7 @@ export class JobRunner {
|
||||
await wait(EVENTS_PROCESSING_RETRY_WAIT);
|
||||
await this.jobQueue.pushJob(
|
||||
QUEUE_EVENT_PROCESSING,
|
||||
{
|
||||
kind: JOB_KIND_EVENTS,
|
||||
blockHash: blockHash,
|
||||
publish: true
|
||||
},
|
||||
job.data,
|
||||
{
|
||||
priority: 1
|
||||
}
|
||||
|
@ -91,13 +91,13 @@ export interface IndexerInterface {
|
||||
getStateSyncStatus (): Promise<StateSyncStatusInterface | undefined>
|
||||
getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise<any>
|
||||
getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]>
|
||||
getLatestProcessedBlockProgress (isPruned: boolean): Promise<BlockProgressInterface | undefined>
|
||||
getLatestCanonicalBlock (): Promise<BlockProgressInterface | undefined>
|
||||
getLatestStateIndexedBlock (): Promise<BlockProgressInterface>
|
||||
getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<EventInterface>>
|
||||
getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
|
||||
fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgressInterface>[]): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]>
|
||||
saveBlockAndFetchEvents (block: DeepPartial<BlockProgressInterface>): Promise<[BlockProgressInterface, DeepPartial<EventInterface>[]]>
|
||||
fetchAndSaveFilteredEventsAndBlocks (startBlock: number, endBlock: number): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]>
|
||||
removeUnknownEvents (block: BlockProgressInterface): Promise<void>
|
||||
updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<BlockProgressInterface>
|
||||
updateSyncStatusChainHead (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
|
||||
|
Loading…
Reference in New Issue
Block a user