Wait for events processing to complete before continuing historical blocks processing (#450)

* Wait for events queue to be empty before continuing historical processing

* Make historicalLogsBlockRange and historicalMaxFetchAhead configurable

* Perform single RPC request for multiple addresses
This commit is contained in:
Nabarun Gogoi 2023-11-06 11:34:48 +05:30 committed by GitHub
parent 546af92638
commit 8a720ef175
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 115 additions and 61 deletions

View File

@ -135,6 +135,12 @@ export class BaseCmd {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
this._eventWatcher = new EventWatcher(this._config.server, this._clients.ethClient, this._indexer, pubsub, this._jobQueue);
const config = {
server: this._config.server,
jobQueue: this._config.jobQueue
};
this._eventWatcher = new EventWatcher(config, this._clients.ethClient, this._indexer, pubsub, this._jobQueue);
}
}

View File

@ -108,7 +108,7 @@ export class JobRunnerCmd {
const jobRunner = new JobRunner(config.jobQueue, indexer, jobQueue);
await jobRunner.jobQueue.deleteAllJobs();
await jobRunner.jobQueue.deleteAllJobs('completed');
await jobRunner.resetToPrevIndexedBlock();
await startJobRunner(jobRunner);

View File

@ -281,8 +281,8 @@ export class ServerCmd {
assert(eventWatcher);
if (config.server.kind === KIND_ACTIVE) {
// Delete jobs to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs();
// Delete jobs before completed state to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs('completed');
await eventWatcher.start();
}

View File

@ -92,3 +92,10 @@
blockDelayInMilliSecs = 2000
prefetchBlocksInMem = true
prefetchBlockCount = 10
# Block range in which logs are fetched during historical blocks processing
historicalLogsBlockRange = 2000
# Max block range of historical processing after which it waits for completion of events processing
# If set to -1 historical processing does not wait for events processing and completes till latest canonical block
historicalMaxFetchAhead = 10000

View File

@ -4,7 +4,6 @@
import assert from 'assert';
import { errors, providers, utils } from 'ethers';
import { TransactionReceipt } from '@ethersproject/abstract-provider';
import { Cache } from '@cerc-io/cache';
import { encodeHeader, escapeHexString, getRawTransaction, EthClient as EthClientInterface } from '@cerc-io/util';
@ -80,7 +79,7 @@ export class EthClient implements EthClientInterface {
nodes: result.transactions.map((transaction) => ({
txHash: transaction.hash,
// Transactions with block should be of type TransactionReceipt
index: (transaction as unknown as TransactionReceipt).transactionIndex,
index: (transaction as unknown as providers.TransactionReceipt).transactionIndex,
src: transaction.from,
dst: transaction.to
}))
@ -239,12 +238,9 @@ export class EthClient implements EthClientInterface {
addresses?: string[],
topics?: string[][]
}): Promise<any> {
const blockNumber = Number(vars.blockNumber);
console.time(`time:eth-client#getLogs-${JSON.stringify(vars)}`);
const result = await this._getLogs({
fromBlock: blockNumber,
toBlock: blockNumber,
blockHash: vars.blockHash,
addresses: vars.addresses,
topics: vars.topics
});
@ -273,36 +269,35 @@ export class EthClient implements EthClientInterface {
// TODO: Implement return type
async _getLogs (vars: {
blockHash?: string,
fromBlock?: number,
toBlock?: number,
addresses?: string[],
topics?: string[][]
}): Promise<any> {
const { fromBlock, toBlock, addresses = [], topics } = vars;
const { blockHash, fromBlock, toBlock, addresses = [], topics } = vars;
const result = await this._getCachedOrFetch(
'getLogs',
vars,
async () => {
const logsByAddressPromises = addresses?.map(address => this._provider.getLogs({
fromBlock,
toBlock,
address,
topics
}));
const logsByAddress = await Promise.all(logsByAddressPromises);
let logs = logsByAddress.flat();
// If no addresses provided to filter
if (!addresses.length) {
logs = await this._provider.getLogs({
fromBlock,
toBlock,
const ethLogs = await this._provider.send(
'eth_getLogs',
[{
address: addresses.map(address => address.toLowerCase()),
fromBlock: fromBlock && utils.hexlify(fromBlock),
toBlock: toBlock && utils.hexlify(toBlock),
blockHash,
topics
});
}
}]
);
return logs.map(log => {
// Format raw eth_getLogs response
const logs: providers.Log[] = providers.Formatter.arrayOf(
this._provider.formatter.filterLog.bind(this._provider.formatter)
)(ethLogs);
return logs.map((log) => {
log.address = log.address.toLowerCase();
return log;
});

View File

@ -24,6 +24,11 @@ export interface JobQueueConfig {
blockDelayInMilliSecs: number;
prefetchBlocksInMem: boolean;
prefetchBlockCount: number;
// Block range in which logs are fetched during historical blocks processing
historicalLogsBlockRange?: number;
// Max block range of historical processing after which it waits for completion of events processing
// If set to -1 historical processing does not wait for events processing and completes till latest canonical block
historicalMaxFetchAhead?: number;
}
export interface GQLCacheConfig {

View File

@ -13,20 +13,27 @@ import { BlockProgressInterface, EventInterface, IndexerInterface, EthClient } f
import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME, JOB_KIND_EVENTS, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_HISTORICAL_PROCESSING } from './constants';
import { createPruningJob, processBlockByNumber } from './common';
import { OrderDirection } from './database';
import { HISTORICAL_BLOCKS_BATCH_SIZE, HistoricalJobData } from './job-runner';
import { ServerConfig } from './config';
import { HistoricalJobData, HistoricalJobResponseData } from './job-runner';
import { JobQueueConfig, ServerConfig } from './config';
import { wait } from './misc';
const EVENT = 'event';
// TODO: Make configurable
const HISTORICAL_MAX_FETCH_AHEAD = 20_000;
// Time to wait for events queue to be empty
const EMPTY_EVENTS_QUEUE_WAIT_TIME = 5000;
const DEFAULT_HISTORICAL_MAX_FETCH_AHEAD = 20_000;
const log = debug('vulcanize:events');
export const BlockProgressEvent = 'block-progress-event';
interface Config {
server: ServerConfig;
jobQueue: JobQueueConfig;
}
export class EventWatcher {
_serverConfig: ServerConfig;
_config: Config;
_ethClient: EthClient;
_indexer: IndexerInterface;
_pubsub: PubSub;
@ -36,8 +43,8 @@ export class EventWatcher {
_signalCount = 0;
_historicalProcessingEndBlockNumber = 0;
constructor (serverConfig: ServerConfig, ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
this._serverConfig = serverConfig;
constructor (config: Config, ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
this._config = config;
this._ethClient = ethClient;
this._indexer = indexer;
this._pubsub = pubsub;
@ -96,11 +103,12 @@ export class EventWatcher {
// Check if filter for logs is enabled
// Check if starting block for watcher is before latest canonical block
if (this._serverConfig.useBlockRanges && startBlockNumber < latestCanonicalBlockNumber) {
if (this._config.server.useBlockRanges && startBlockNumber < latestCanonicalBlockNumber) {
let endBlockNumber = latestCanonicalBlockNumber;
const historicalMaxFetchAhead = this._config.jobQueue.historicalMaxFetchAhead ?? DEFAULT_HISTORICAL_MAX_FETCH_AHEAD;
if (HISTORICAL_MAX_FETCH_AHEAD > 0) {
endBlockNumber = Math.min(startBlockNumber + HISTORICAL_MAX_FETCH_AHEAD, endBlockNumber);
if (historicalMaxFetchAhead > 0) {
endBlockNumber = Math.min(startBlockNumber + historicalMaxFetchAhead, endBlockNumber);
}
await this.startHistoricalBlockProcessing(startBlockNumber, endBlockNumber);
@ -112,10 +120,11 @@ export class EventWatcher {
}
async startHistoricalBlockProcessing (startBlockNumber: number, endBlockNumber: number): Promise<void> {
// TODO: Wait for events job queue to be empty so that historical processing does not move far ahead
// Wait for events job queue to be empty so that historical processing does not move far ahead
await this._waitForEmptyEventsQueue();
this._historicalProcessingEndBlockNumber = endBlockNumber;
log(`Starting historical block processing up to block ${this._historicalProcessingEndBlockNumber}`);
log(`Starting historical block processing in batches from ${startBlockNumber} up to block ${this._historicalProcessingEndBlockNumber}`);
// Push job for historical block processing
await this._jobQueue.pushJob(
@ -127,6 +136,19 @@ export class EventWatcher {
);
}
async _waitForEmptyEventsQueue (): Promise<void> {
while (true) {
// Get queue size for active and pending jobs
const queueSize = await this._jobQueue.getQueueSize(QUEUE_EVENT_PROCESSING, 'completed');
if (queueSize === 0) {
break;
}
await wait(EMPTY_EVENTS_QUEUE_WAIT_TIME);
}
}
async startRealtimeBlockProcessing (startBlockNumber: number): Promise<void> {
log(`Starting realtime block processing from block ${startBlockNumber}`);
await processBlockByNumber(this._jobQueue, startBlockNumber);
@ -194,20 +216,22 @@ export class EventWatcher {
}
async historicalProcessingCompleteHandler (job: PgBoss.Job<any>): Promise<void> {
const { id, data: { failed, request: { data } } } = job;
const { blockNumber, isComplete }: HistoricalJobData = data;
const { id, data: { failed, request: { data }, response } } = job;
const { blockNumber }: HistoricalJobData = data;
const { isComplete, endBlock: batchEndBlockNumber }: HistoricalJobResponseData = response;
if (failed || isComplete) {
if (failed || !isComplete) {
log(`Job ${id} for queue ${QUEUE_HISTORICAL_PROCESSING} failed`);
return;
}
// TODO: Get batch size from config
const batchEndBlockNumber = Math.min(blockNumber + HISTORICAL_BLOCKS_BATCH_SIZE, this._historicalProcessingEndBlockNumber);
// endBlock exists if isComplete is true
assert(batchEndBlockNumber);
const nextBatchStartBlockNumber = batchEndBlockNumber + 1;
log(`Historical block processing completed for block range: ${blockNumber} to ${batchEndBlockNumber}`);
// Check if historical processing endBlock / latest canonical block is reached
// Check if historical processing end block is reached
if (nextBatchStartBlockNumber > this._historicalProcessingEndBlockNumber) {
const [block] = await this._indexer.getBlocks({ blockNumber: this._historicalProcessingEndBlockNumber });
const historicalProcessingEndBlockHash = block ? block.blockHash : constants.AddressZero;

View File

@ -129,7 +129,7 @@ export class JobQueue {
}
async markComplete (job: PgBoss.Job, data: object = {}): Promise<void> {
this._boss.complete(job.id, { ...job.data, ...data });
await this._boss.complete(job.id, data);
}
async pushJob (queue: string, job: any, options: PgBoss.PublishOptions = {}): Promise<void> {
@ -139,7 +139,19 @@ export class JobQueue {
log(`Created job in queue ${queue}: ${jobId}`);
}
async deleteAllJobs (): Promise<void> {
await this._boss.deleteAllQueues();
async deleteAllJobs (before: PgBoss.Subscription['state'] = 'active'): Promise<void> {
// Workaround for incorrect type of pg-boss deleteAllQueues method
const deleteAllQueues = this._boss.deleteAllQueues.bind(this._boss) as (options: { before: PgBoss.Subscription['state'] }) => Promise<void>;
await deleteAllQueues({ before });
}
async deleteJobs (name: string, before: PgBoss.Subscription['state'] = 'active'): Promise<void> {
// Workaround for incorrect type of pg-boss deleteAllQueues method
const deleteQueue = this._boss.deleteQueue.bind(this._boss) as (name: string, options: { before: PgBoss.Subscription['state'] }) => Promise<void>;
await deleteQueue(name, { before });
}
async getQueueSize (name: string, before: PgBoss.Subscription['state'] = 'active'): Promise<number> {
return this._boss.getQueueSize(name, { before });
}
}

View File

@ -40,13 +40,16 @@ const log = debug('vulcanize:job-runner');
// Wait time for retrying events processing on error (in ms)
const EVENTS_PROCESSING_RETRY_WAIT = 2000;
// TODO: Get batch size from config
export const HISTORICAL_BLOCKS_BATCH_SIZE = 2000;
const DEFAULT_HISTORICAL_LOGS_BLOCK_RANGE = 2000;
export interface HistoricalJobData {
blockNumber: number;
processingEndBlockNumber: number;
isComplete?: boolean;
}
export interface HistoricalJobResponseData {
isComplete: boolean;
endBlock?: number;
}
export class JobRunner {
@ -154,12 +157,12 @@ export class JobRunner {
await this.jobQueue.markComplete(job);
}
async processHistoricalBlocks (job: PgBoss.JobWithDoneCallback<HistoricalJobData, HistoricalJobData>): Promise<void> {
async processHistoricalBlocks (job: PgBoss.JobWithDoneCallback<HistoricalJobData, HistoricalJobResponseData>): Promise<void> {
const { data: { blockNumber: startBlock, processingEndBlockNumber } } = job;
if (this._historicalProcessingCompletedUpto) {
if (startBlock < this._historicalProcessingCompletedUpto) {
await this.jobQueue.deleteAllJobs();
await this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING);
// Remove all watcher blocks and events data if startBlock is less than this._historicalProcessingCompletedUpto
// This occurs when new contract is added (with filterLogsByAddresses set to true) and historical processing is restarted from a previous block
@ -168,10 +171,10 @@ export class JobRunner {
} else {
// Check that startBlock is one greater than previous batch end block
if (startBlock - 1 !== this._historicalProcessingCompletedUpto) {
// TODO: Debug jobQueue deleteAllJobs not working
// TODO: Debug jobQueue deleteJobs for historical processing not working
await this.jobQueue.markComplete(
job,
{ isComplete: true }
{ isComplete: false }
);
return;
@ -180,7 +183,8 @@ export class JobRunner {
}
this._lastHistoricalProcessingEndBlockNumber = processingEndBlockNumber;
const endBlock = Math.min(startBlock + HISTORICAL_BLOCKS_BATCH_SIZE, processingEndBlockNumber);
const logsBlockRange = this._jobQueueConfig.historicalLogsBlockRange ?? DEFAULT_HISTORICAL_LOGS_BLOCK_RANGE;
const endBlock = Math.min(startBlock + logsBlockRange, processingEndBlockNumber);
log(`Processing historical blocks from ${startBlock} to ${endBlock}`);
const blocks = await fetchAndSaveFilteredLogsAndBlocks(
@ -207,7 +211,7 @@ export class JobRunner {
await this.jobQueue.markComplete(
job,
{ isComplete: true }
{ isComplete: true, endBlock }
);
}
@ -570,8 +574,8 @@ export class JobRunner {
// Check if new contract was added and filterLogsByAddresses is set to true
if (isNewContractWatched && this._indexer.serverConfig.filterLogsByAddresses) {
// Delete jobs for any pending events and blocks processing
await this.jobQueue.deleteAllJobs();
// Delete jobs for any pending events processing
await this.jobQueue.deleteJobs(QUEUE_EVENT_PROCESSING);
// Check if historical processing is running and that current block is being processed was trigerred by historical processing
if (this._historicalProcessingCompletedUpto && this._historicalProcessingCompletedUpto > block.blockNumber) {
@ -610,6 +614,7 @@ export class JobRunner {
// Catch event processing error and push to job queue after some time with higher priority
log(`Retrying event processing after ${EVENTS_PROCESSING_RETRY_WAIT} ms`);
await wait(EVENTS_PROCESSING_RETRY_WAIT);
// TODO: Stop next job in queue from processing next
await this.jobQueue.pushJob(
QUEUE_EVENT_PROCESSING,
job.data,

View File

@ -132,7 +132,7 @@ export const resetJobs = async (config: Config): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
await jobQueue.deleteAllJobs();
await jobQueue.deleteAllJobs('completed');
};
export const getResetYargs = (): yargs.Argv => {