Extract block filler into common package, implement in uni-info-watcher (#220)

* Implement block filler for uni-info-watcher.

* Pull common code for Block filler to util.

* Watch for job events in block filler command.

Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
Ashwin Phatak 2021-08-18 15:50:44 +05:30 committed by GitHub
parent 23bfcc02dc
commit b8b216ea5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 489 additions and 177 deletions

View File

@ -24,7 +24,8 @@
"generate:schema": "get-graphql-schema https://api.thegraph.com/subgraphs/name/ianlapham/uniswap-v3-alt > docs/analysis/schema/full-schema.graphql", "generate:schema": "get-graphql-schema https://api.thegraph.com/subgraphs/name/ianlapham/uniswap-v3-alt > docs/analysis/schema/full-schema.graphql",
"lint:schema": "graphql-schema-linter", "lint:schema": "graphql-schema-linter",
"smoke-test": "mocha src/smoke.test.ts", "smoke-test": "mocha src/smoke.test.ts",
"test:gpev": "mocha src/get-prev-entity.test.ts" "test:gpev": "mocha src/get-prev-entity.test.ts",
"fill": "DEBUG=vulcanize:* ts-node src/fill.ts -f environments/local.toml"
}, },
"devDependencies": { "devDependencies": {
"@types/chance": "^1.1.2", "@types/chance": "^1.1.2",

View File

@ -691,11 +691,11 @@ export class Database {
.getMany(); .getMany();
} }
async saveEvents (queryRunner: QueryRunner, block: Block, events: DeepPartial<Event>[]): Promise<void> { async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<void> {
const { const {
hash: blockHash, blockHash,
number: blockNumber, blockNumber,
timestamp: blockTimestamp, blockTimestamp,
parentHash parentHash
} = block; } = block;

View File

@ -4,11 +4,13 @@
import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
import { BlockProgressInterface } from '@vulcanize/util';
@Entity() @Entity()
@Index(['blockHash'], { unique: true }) @Index(['blockHash'], { unique: true })
@Index(['blockNumber']) @Index(['blockNumber'])
@Index(['parentHash']) @Index(['parentHash'])
export class BlockProgress { export class BlockProgress implements BlockProgressInterface {
@PrimaryGeneratedColumn() @PrimaryGeneratedColumn()
id!: number; id!: number;
@ -35,4 +37,7 @@ export class BlockProgress {
@Column('boolean') @Column('boolean')
isComplete!: boolean isComplete!: boolean
@Column('boolean', { default: false })
isPruned!: boolean
} }

View File

@ -5,9 +5,11 @@
import assert from 'assert'; import assert from 'assert';
import debug from 'debug'; import debug from 'debug';
import _ from 'lodash'; import _ from 'lodash';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { PubSub } from 'apollo-server-express';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { EventWatcher as BaseEventWatcher, EventWatcherInterface, JobQueue, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from '@vulcanize/util';
import { JobQueue } from '../../util';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
const log = debug('vulcanize:events'); const log = debug('vulcanize:events');
@ -115,28 +117,33 @@ export interface ResultEvent {
} }
} }
export const QUEUE_EVENT_PROCESSING = 'event-processing'; export class EventWatcher implements EventWatcherInterface {
export const QUEUE_BLOCK_PROCESSING = 'block-processing';
export class EventWatcher {
_subscription?: ZenObservable.Subscription
_ethClient: EthClient _ethClient: EthClient
_jobQueue: JobQueue
_indexer: Indexer _indexer: Indexer
_subscription?: ZenObservable.Subscription
_pubsub: PubSub
_jobQueue: JobQueue
_eventWatcher: BaseEventWatcher
constructor (indexer: Indexer, ethClient: EthClient, jobQueue: JobQueue) { constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
this._ethClient = ethClient; this._ethClient = ethClient;
this._jobQueue = jobQueue;
this._indexer = indexer; this._indexer = indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._eventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
}
getBlockProgressEventIterator (): AsyncIterator<any> {
return this._eventWatcher.getBlockProgressEventIterator();
} }
async start (): Promise<void> { async start (): Promise<void> {
assert(!this._subscription, 'subscription already started'); assert(!this._subscription, 'subscription already started');
log('Started watching upstream events...'); log('Started watching upstream events...');
await this._initBlockProcessingOnCompleteHandler(); await this.initBlockProcessingOnCompleteHandler();
await this._initEventProcessingOnCompleteHandler(); await this.initEventProcessingOnCompleteHandler();
await this._watchBlocksAtChainHead(); await this.watchBlocksAtChainHead();
} }
async stop (): Promise<void> { async stop (): Promise<void> {
@ -146,7 +153,26 @@ export class EventWatcher {
} }
} }
async _watchBlocksAtChainHead (): Promise<void> { async initBlockProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
const { data: { request: { data: { blockHash, blockNumber } } } } = job;
log(`Job onComplete block ${blockHash} ${blockNumber}`);
// Publish block progress event.
const blockProgress = await this._indexer.getBlockProgress(blockHash);
if (blockProgress) {
await this._eventWatcher.publishBlockProgressToSubscribers(blockProgress);
}
});
}
async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
await this._eventWatcher.eventProcessingCompleteHandler(job);
});
}
async watchBlocksAtChainHead (): Promise<void> {
log('Started watching upstream blocks...'); log('Started watching upstream blocks...');
this._subscription = await this._ethClient.watchBlocks(async (value) => { this._subscription = await this._ethClient.watchBlocks(async (value) => {
const { blockHash, blockNumber, parentHash, timestamp } = _.get(value, 'data.listen.relatedNode'); const { blockHash, blockNumber, parentHash, timestamp } = _.get(value, 'data.listen.relatedNode');
@ -155,34 +181,7 @@ export class EventWatcher {
log('watchBlock', blockHash, blockNumber); log('watchBlock', blockHash, blockNumber);
const block = { await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber, parentHash, timestamp });
hash: blockHash,
number: blockNumber,
parentHash,
timestamp
};
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { block });
});
}
async _initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
const { data: { request: { data: { block } } } } = job;
log(`Job onComplete block ${block.hash} ${block.number}`);
});
}
async _initEventProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
const { data: { request } } = job;
const dbEvent = await this._indexer.getEvent(request.data.id);
assert(dbEvent);
await this._indexer.updateBlockProgress(dbEvent.block.blockHash, dbEvent.index);
log(`Job onComplete event ${request.data.id}`);
}); });
} }
} }

View File

@ -0,0 +1,95 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { getConfig, fillBlocks, JobQueue } from '@vulcanize/util';
import { Client as UniClient } from '@vulcanize/uni-watcher';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { Database } from './database';
import { PubSub } from 'apollo-server-express';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
const log = debug('vulcanize:server');
export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv)).parserConfiguration({
'parse-numbers': false
}).options({
configFile: {
alias: 'f',
type: 'string',
require: true,
demandOption: true,
describe: 'configuration file path (toml)'
},
startBlock: {
type: 'number',
require: true,
demandOption: true,
describe: 'Block number to start processing at'
},
endBlock: {
type: 'number',
require: true,
demandOption: true,
describe: 'Block number to stop processing at'
}
}).argv;
const config = await getConfig(argv.configFile);
assert(config.server, 'Missing server config');
const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;
assert(dbConfig, 'Missing database config');
const db = new Database(dbConfig);
await db.init();
assert(upstream, 'Missing upstream config');
const { ethServer: { gqlPostgraphileEndpoint }, cache: cacheConfig, uniWatcher, tokenWatcher } = upstream;
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});
const uniClient = new UniClient(uniWatcher);
const erc20Client = new ERC20Client(tokenWatcher);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(db, uniClient, erc20Client, ethClient);
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLag } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag });
await jobQueue.start();
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, ethClient, eventWatcher, argv);
};
main().then(() => {
process.exit();
}).catch(err => {
log(err);
});

View File

@ -7,9 +7,11 @@ import debug from 'debug';
import { DeepPartial, QueryRunner } from 'typeorm'; import { DeepPartial, QueryRunner } from 'typeorm';
import JSONbig from 'json-bigint'; import JSONbig from 'json-bigint';
import { utils } from 'ethers'; import { utils } from 'ethers';
import { Client as UniClient } from '@vulcanize/uni-watcher'; import { Client as UniClient } from '@vulcanize/uni-watcher';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import { IndexerInterface } from '@vulcanize/util';
import { findEthPerToken, getEthPriceInUSD, getTrackedAmountUSD, sqrtPriceX96ToTokenPrices, WHITELIST_TOKENS } from './utils/pricing'; import { findEthPerToken, getEthPriceInUSD, getTrackedAmountUSD, sqrtPriceX96ToTokenPrices, WHITELIST_TOKENS } from './utils/pricing';
import { updatePoolDayData, updatePoolHourData, updateTokenDayData, updateTokenHourData, updateUniswapDayData } from './utils/interval-updates'; import { updatePoolDayData, updatePoolHourData, updateTokenDayData, updateTokenHourData, updateUniswapDayData } from './utils/interval-updates';
@ -42,7 +44,7 @@ export interface ValueResult {
export { OrderDirection, BlockHeight }; export { OrderDirection, BlockHeight };
export class Indexer { export class Indexer implements IndexerInterface {
_db: Database _db: Database
_uniClient: UniClient _uniClient: UniClient
_erc20Client: ERC20Client _erc20Client: ERC20Client
@ -87,8 +89,9 @@ export class Indexer {
} }
// Note: Some event names might be unknown at this point, as earlier events might not yet be processed. // Note: Some event names might be unknown at this point, as earlier events might not yet be processed.
async getOrFetchBlockEvents (block: Block): Promise<Array<Event>> { async getOrFetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<Array<Event>> {
const blockProgress = await this._db.getBlockProgress(block.hash); assert(block.blockHash);
const blockProgress = await this._db.getBlockProgress(block.blockHash);
if (!blockProgress) { if (!blockProgress) {
// Fetch and save events first and make a note in the event sync progress table. // Fetch and save events first and make a note in the event sync progress table.
@ -96,7 +99,7 @@ export class Indexer {
log('getBlockEvents: db miss, fetching from upstream server'); log('getBlockEvents: db miss, fetching from upstream server');
} }
const events = await this._db.getBlockEvents(block.hash); const events = await this._db.getBlockEvents(block.blockHash);
log(`getBlockEvents: db hit, num events: ${events.length}`); log(`getBlockEvents: db hit, num events: ${events.length}`);
return events; return events;
@ -352,8 +355,9 @@ export class Indexer {
return res; return res;
} }
async _fetchAndSaveEvents (block: Block): Promise<void> { async _fetchAndSaveEvents (block: DeepPartial<BlockProgress>): Promise<void> {
const events = await this._uniClient.getEvents(block.hash); assert(block.blockHash);
const events = await this._uniClient.getEvents(block.blockHash);
const dbEvents: Array<DeepPartial<Event>> = []; const dbEvents: Array<DeepPartial<Event>> = [];
for (let i = 0; i < events.length; i++) { for (let i = 0; i < events.length; i++) {

View File

@ -10,13 +10,12 @@ import debug from 'debug';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { Client as UniClient } from '@vulcanize/uni-watcher'; import { Client as UniClient } from '@vulcanize/uni-watcher';
import { getConfig, JobQueue, wait } from '@vulcanize/util'; import { getConfig, JobQueue, wait, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from '@vulcanize/util';
import { getCache } from '@vulcanize/cache'; import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
import { Database } from './database'; import { Database } from './database';
import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './events';
import { Event } from './entity/Event'; import { Event } from './entity/Event';
const log = debug('vulcanize:job-runner'); const log = debug('vulcanize:job-runner');
@ -73,43 +72,42 @@ export const main = async (): Promise<any> => {
await jobQueue.start(); await jobQueue.start();
await jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { await jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
const { data: { block, priority } } = job; const { data: { blockHash, blockNumber, parentHash, timestamp, priority } } = job;
log(`Processing block hash ${block.hash} number ${block.number}`); log(`Processing block number ${blockNumber} hash ${blockHash} `);
// Init sync status record if none exists.
let syncStatus = await indexer.getSyncStatus();
if (!syncStatus) {
syncStatus = await indexer.updateSyncStatus(blockHash, blockNumber);
}
// Check if parent block has been processed yet, if not, push a high priority job to process that first and abort. // Check if parent block has been processed yet, if not, push a high priority job to process that first and abort.
// However, don't go beyond the `latestCanonicalBlockHash` from SyncStatus as we have to assume the reorg can't be that deep. // However, don't go beyond the `latestCanonicalBlockHash` from SyncStatus as we have to assume the reorg can't be that deep.
let syncStatus = await indexer.getSyncStatus(); if (blockHash !== syncStatus.latestCanonicalBlockHash) {
if (!syncStatus) { const parent = await indexer.getBlockProgress(parentHash);
syncStatus = await indexer.updateSyncStatus(block.hash, block.number);
}
if (block.hash !== syncStatus.latestCanonicalBlockHash) {
const parent = await indexer.getBlockProgress(block.parentHash);
if (!parent) { if (!parent) {
const { number: parentBlockNumber, parent: { hash: grandparentHash }, timestamp: parentTimestamp } = await indexer.getBlock(block.parentHash); const { number: parentBlockNumber, parent: { hash: grandparentHash }, timestamp: parentTimestamp } = await indexer.getBlock(parentHash);
// Create a higher priority job to index parent block and then abort. // Create a higher priority job to index parent block and then abort.
// We don't have to worry about aborting as this job will get retried later. // We don't have to worry about aborting as this job will get retried later.
const newPriority = (priority || 0) + 1; const newPriority = (priority || 0) + 1;
await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, {
block: { blockHash: parentHash,
hash: block.parentHash, blockNumber: parentBlockNumber,
number: parentBlockNumber, parentHash: grandparentHash,
parentHash: grandparentHash, timestamp: parentTimestamp,
timestamp: parentTimestamp
},
priority: newPriority priority: newPriority
}, { priority: newPriority }); }, { priority: newPriority });
const message = `Parent block number ${parentBlockNumber} hash ${block.parentHash} of block number ${block.number} hash ${block.hash} not fetched yet, aborting`; const message = `Parent block number ${parentBlockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash} not fetched yet, aborting`;
log(message); log(message);
throw new Error(message); throw new Error(message);
} }
if (block.parentHash !== syncStatus.latestCanonicalBlockHash && !parent.isComplete) { if (parentHash !== syncStatus.latestCanonicalBlockHash && !parent.isComplete) {
// Parent block indexing needs to finish before this block can be indexed. // Parent block indexing needs to finish before this block can be indexed.
const message = `Indexing incomplete for parent block number ${parent.blockNumber} hash ${block.parentHash} of block number ${block.number} hash ${block.hash}, aborting`; const message = `Indexing incomplete for parent block number ${parent.blockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash}, aborting`;
log(message); log(message);
throw new Error(message); throw new Error(message);
@ -117,12 +115,12 @@ export const main = async (): Promise<any> => {
} }
// Check if block is being already processed. // Check if block is being already processed.
const blockProgress = await indexer.getBlockProgress(block.hash); const blockProgress = await indexer.getBlockProgress(blockHash);
if (!blockProgress) { if (!blockProgress) {
// Delay to allow uni-watcher to process block. // Delay to allow uni-watcher to process block.
await wait(jobDelay); await wait(jobDelay);
const events = await indexer.getOrFetchBlockEvents(block); const events = await indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
for (let ei = 0; ei < events.length; ei++) { for (let ei = 0; ei < events.length; ei++) {
const { id } = events[ei]; const { id } = events[ei];

View File

@ -21,17 +21,24 @@ import { TokenHourData } from './entity/TokenHourData';
import { Transaction } from './entity/Transaction'; import { Transaction } from './entity/Transaction';
import { UniswapDayData } from './entity/UniswapDayData'; import { UniswapDayData } from './entity/UniswapDayData';
import { Position } from './entity/Position'; import { Position } from './entity/Position';
import { EventWatcher } from './events';
const log = debug('vulcanize:resolver'); const log = debug('vulcanize:resolver');
export { BlockHeight }; export { BlockHeight };
export const createResolvers = async (indexer: Indexer): Promise<any> => { export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatcher): Promise<any> => {
assert(indexer); assert(indexer);
return { return {
BigInt: new BigInt('bigInt'), BigInt: new BigInt('bigInt'),
Subscription: {
onBlockProgressEvent: {
subscribe: () => eventWatcher.getBlockProgressEventIterator()
}
},
Query: { Query: {
bundle: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { bundle: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => {
log('bundle', id, block); log('bundle', id, block);

View File

@ -166,6 +166,14 @@ type Block {
timestamp: Int! timestamp: Int!
} }
type BlockProgressEvent {
blockNumber: Int!
blockHash: String!
numEvents: Int!
numProcessedEvents: Int!
isComplete: Boolean!
}
enum OrderDirection { enum OrderDirection {
asc asc
desc desc
@ -436,4 +444,12 @@ type Query {
where: Block_filter where: Block_filter
): [Block!]! ): [Block!]!
} }
#
# Subscriptions
#
type Subscription {
# Watch for block progress events from filler process.
onBlockProgressEvent: BlockProgressEvent!
}
`; `;

View File

@ -5,7 +5,7 @@
import assert from 'assert'; import assert from 'assert';
import 'reflect-metadata'; import 'reflect-metadata';
import express, { Application } from 'express'; import express, { Application } from 'express';
import { ApolloServer } from 'apollo-server-express'; import { ApolloServer, PubSub } from 'apollo-server-express';
import yargs from 'yargs'; import yargs from 'yargs';
import { hideBin } from 'yargs/helpers'; import { hideBin } from 'yargs/helpers';
import debug from 'debug'; import debug from 'debug';
@ -84,10 +84,11 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag });
await jobQueue.start(); await jobQueue.start();
const eventWatcher = new EventWatcher(indexer, ethClient, jobQueue); const pubSub = new PubSub();
const eventWatcher = new EventWatcher(ethClient, indexer, pubSub, jobQueue);
await eventWatcher.start(); await eventWatcher.start();
const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer); const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher);
const app: Application = express(); const app: Application = express();
const server = new ApolloServer({ const server = new ApolloServer({

View File

@ -7,6 +7,7 @@ import { ethers } from 'ethers';
import { request } from 'graphql-request'; import { request } from 'graphql-request';
import Decimal from 'decimal.js'; import Decimal from 'decimal.js';
import _ from 'lodash'; import _ from 'lodash';
import { DeepPartial } from 'typeorm';
import { import {
queryFactory, queryFactory,
@ -19,9 +20,10 @@ import {
queryTokenHourData, queryTokenHourData,
queryTransactions queryTransactions
} from '../test/queries'; } from '../test/queries';
import { TestDatabase } from './test-db';
import { Block } from '../src/events'; import { Block } from '../src/events';
import { Token } from '../src/entity/Token'; import { Token } from '../src/entity/Token';
import { BlockProgress } from '../src/entity/BlockProgress';
import { TestDatabase } from './test-db';
export const checkUniswapDayData = async (endpoint: string): Promise<void> => { export const checkUniswapDayData = async (endpoint: string): Promise<void> => {
// Checked values: date, tvlUSD. // Checked values: date, tvlUSD.
@ -179,17 +181,23 @@ export const insertDummyBlock = async (db: TestDatabase, parentBlock: Block): Pr
const parentHash = parentBlock.hash; const parentHash = parentBlock.hash;
const blockNumber = parentBlock.number + 1; const blockNumber = parentBlock.number + 1;
const block: Block = { const block: DeepPartial<BlockProgress> = {
number: blockNumber, blockNumber,
hash: blockHash, blockHash,
timestamp: blockTimestamp, blockTimestamp,
parentHash parentHash
}; };
await db.updateSyncStatus(dbTx, blockHash, blockNumber); await db.updateSyncStatus(dbTx, blockHash, blockNumber);
await db.saveEvents(dbTx, block, []); await db.saveEvents(dbTx, block, []);
await dbTx.commitTransaction(); await dbTx.commitTransaction();
return block;
return {
number: blockNumber,
hash: blockHash,
timestamp: blockTimestamp,
parentHash
};
} catch (error) { } catch (error) {
await dbTx.rollbackTransaction(); await dbTx.rollbackTransaction();
throw error; throw error;

View File

@ -79,14 +79,12 @@ export class Database {
return events; return events;
} }
async saveEvents (queryRunner: QueryRunner, block: any, events: DeepPartial<Event>[]): Promise<void> { async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<void> {
const { const {
hash: blockHash, blockHash,
number: blockNumber, blockNumber,
timestamp: blockTimestamp, blockTimestamp,
parent: { parentHash
hash: parentHash
}
} = block; } = block;
assert(blockHash); assert(blockHash);

View File

@ -4,11 +4,13 @@
import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
import { BlockProgressInterface } from '@vulcanize/util';
@Entity() @Entity()
@Index(['blockHash'], { unique: true }) @Index(['blockHash'], { unique: true })
@Index(['blockNumber']) @Index(['blockNumber'])
@Index(['parentHash']) @Index(['parentHash'])
export class BlockProgress { export class BlockProgress implements BlockProgressInterface {
@PrimaryGeneratedColumn() @PrimaryGeneratedColumn()
id!: number; id!: number;

View File

@ -8,32 +8,37 @@ import _ from 'lodash';
import { PubSub } from 'apollo-server-express'; import { PubSub } from 'apollo-server-express';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import { JobQueue, MAX_REORG_DEPTH } from '@vulcanize/util'; import {
JobQueue,
EventWatcher as BaseEventWatcher,
MAX_REORG_DEPTH,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
QUEUE_CHAIN_PRUNING,
EventWatcherInterface
} from '@vulcanize/util';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
import { BlockProgress } from './entity/BlockProgress';
import { Event, UNKNOWN_EVENT_NAME } from './entity/Event'; import { Event, UNKNOWN_EVENT_NAME } from './entity/Event';
const log = debug('vulcanize:events'); const log = debug('vulcanize:events');
export const UniswapEvent = 'uniswap-event'; export const UniswapEvent = 'uniswap-event';
export const BlockProgressEvent = 'block-progress-event';
export const QUEUE_EVENT_PROCESSING = 'event-processing';
export const QUEUE_BLOCK_PROCESSING = 'block-processing';
export const QUEUE_CHAIN_PRUNING = 'chain-pruning';
export class EventWatcher { export class EventWatcher implements EventWatcherInterface {
_ethClient: EthClient _ethClient: EthClient
_indexer: Indexer _indexer: Indexer
_subscription: ZenObservable.Subscription | undefined _subscription?: ZenObservable.Subscription
_pubsub: PubSub _pubsub: PubSub
_jobQueue: JobQueue _jobQueue: JobQueue
_eventWatcher: BaseEventWatcher
constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
this._ethClient = ethClient; this._ethClient = ethClient;
this._indexer = indexer; this._indexer = indexer;
this._pubsub = pubsub; this._pubsub = pubsub;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
this._eventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
} }
getEventIterator (): AsyncIterator<any> { getEventIterator (): AsyncIterator<any> {
@ -41,7 +46,7 @@ export class EventWatcher {
} }
getBlockProgressEventIterator (): AsyncIterator<any> { getBlockProgressEventIterator (): AsyncIterator<any> {
return this._pubsub.asyncIterator([BlockProgressEvent]); return this._eventWatcher.getBlockProgressEventIterator();
} }
async start (): Promise<void> { async start (): Promise<void> {
@ -56,12 +61,13 @@ export class EventWatcher {
async watchBlocksAtChainHead (): Promise<void> { async watchBlocksAtChainHead (): Promise<void> {
log('Started watching upstream blocks...'); log('Started watching upstream blocks...');
this._subscription = await this._ethClient.watchBlocks(async (value) => { this._subscription = await this._ethClient.watchBlocks(async (value) => {
const { blockHash, blockNumber, parentHash } = _.get(value, 'data.listen.relatedNode'); const { blockHash, blockNumber, parentHash, timestamp } = _.get(value, 'data.listen.relatedNode');
await this._indexer.updateSyncStatusChainHead(blockHash, blockNumber); await this._indexer.updateSyncStatusChainHead(blockHash, blockNumber);
log('watchBlock', blockHash, blockNumber); log('watchBlock', blockHash, blockNumber);
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber, parentHash });
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber, parentHash, timestamp });
}); });
} }
@ -84,30 +90,23 @@ export class EventWatcher {
// Publish block progress event. // Publish block progress event.
const blockProgress = await this._indexer.getBlockProgress(blockHash); const blockProgress = await this._indexer.getBlockProgress(blockHash);
if (blockProgress) { if (blockProgress) {
await this.publishBlockProgressToSubscribers(blockProgress); await this._eventWatcher.publishBlockProgressToSubscribers(blockProgress);
} }
}); });
} }
async initEventProcessingOnCompleteHandler (): Promise<void> { async initEventProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => { await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
const dbEvent = await this._eventWatcher.eventProcessingCompleteHandler(job);
const { data: { request, failed, state, createdOn } } = job; const { data: { request, failed, state, createdOn } } = job;
const dbEvent = await this._indexer.getEvent(request.data.id);
assert(dbEvent);
await this._indexer.updateBlockProgress(dbEvent.block.blockHash, dbEvent.index);
const blockProgress = await this._indexer.getBlockProgress(dbEvent.block.blockHash);
if (blockProgress) {
await this.publishBlockProgressToSubscribers(blockProgress);
}
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000; const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
log(`Job onComplete event ${request.data.id} publish ${!!request.data.publish}`); log(`Job onComplete event ${request.data.id} publish ${!!request.data.publish}`);
if (!failed && state === 'completed' && request.data.publish) { if (!failed && state === 'completed' && request.data.publish) {
// Check for max acceptable lag time between request and sending results to live subscribers. // Check for max acceptable lag time between request and sending results to live subscribers.
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) { if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
return await this.publishUniswapEventToSubscribers(dbEvent, timeElapsedInSeconds); await this.publishUniswapEventToSubscribers(dbEvent, timeElapsedInSeconds);
} else { } else {
log(`event ${request.data.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`); log(`event ${request.data.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
} }
@ -142,26 +141,4 @@ export class EventWatcher {
}); });
} }
} }
async publishBlockProgressToSubscribers (blockProgress: BlockProgress): Promise<void> {
const { blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress;
// Publishing the event here will result in pushing the payload to GQL subscribers for `onAddressEvent(address)`.
await this._pubsub.publish(BlockProgressEvent, {
onBlockProgressEvent: {
blockHash,
blockNumber,
numEvents,
numProcessedEvents,
isComplete
}
});
}
async stop (): Promise<void> {
if (this._subscription) {
log('Stopped watching upstream blocks');
this._subscription.unsubscribe();
}
}
} }

View File

@ -7,13 +7,15 @@ import 'reflect-metadata';
import yargs from 'yargs'; import yargs from 'yargs';
import { hideBin } from 'yargs/helpers'; import { hideBin } from 'yargs/helpers';
import debug from 'debug'; import debug from 'debug';
import { PubSub } from 'apollo-server-express';
import { getCache } from '@vulcanize/cache'; import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import { getConfig, JobQueue } from '@vulcanize/util'; import { getConfig, fillBlocks, JobQueue } from '@vulcanize/util';
import { Database } from './database'; import { Database } from './database';
import { QUEUE_BLOCK_PROCESSING } from './events'; import { Indexer } from './indexer';
import { EventWatcher } from './events';
const log = debug('vulcanize:server'); const log = debug('vulcanize:server');
@ -64,7 +66,15 @@ export const main = async (): Promise<any> => {
cache cache
}); });
assert(jobQueueConfig, 'Missing job queue config'); const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(config, db, ethClient, postgraphileClient);
const { dbConnectionString, maxCompletionLag } = jobQueueConfig; const { dbConnectionString, maxCompletionLag } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string'); assert(dbConnectionString, 'Missing job queue db connection string');
@ -72,22 +82,11 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag });
await jobQueue.start(); await jobQueue.start();
for (let blockNumber = argv.startBlock; blockNumber <= argv.endBlock; blockNumber++) { const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
log(`Fill block ${blockNumber}`);
// TODO: Add pause between requests so as to not overwhelm the upsteam server. assert(jobQueueConfig, 'Missing job queue config');
const result = await ethClient.getBlockWithTransactions({ blockNumber });
const { allEthHeaderCids: { nodes: blockNodes } } = result; await fillBlocks(jobQueue, indexer, ethClient, eventWatcher, argv);
for (let bi = 0; bi < blockNodes.length; bi++) {
const { blockHash, blockNumber, parentHash } = blockNodes[bi];
const blockProgress = await db.getBlockProgress(blockHash);
if (blockProgress) {
log(`Block number ${blockNumber}, block hash ${blockHash} already known, skip filling`);
} else {
await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber, parentHash });
}
}
}
}; };
main().then(() => { main().then(() => {

View File

@ -16,11 +16,11 @@ import { Database } from './database';
import { Event, UNKNOWN_EVENT_NAME } from './entity/Event'; import { Event, UNKNOWN_EVENT_NAME } from './entity/Event';
import { BlockProgress } from './entity/BlockProgress'; import { BlockProgress } from './entity/BlockProgress';
import { Contract, KIND_FACTORY, KIND_POOL, KIND_NFPM } from './entity/Contract'; import { Contract, KIND_FACTORY, KIND_POOL, KIND_NFPM } from './entity/Contract';
import { SyncStatus } from './entity/SyncStatus';
import { abi as factoryABI, storageLayout as factoryStorageLayout } from './artifacts/factory.json'; import { abi as factoryABI, storageLayout as factoryStorageLayout } from './artifacts/factory.json';
import { abi as nfpmABI, storageLayout as nfpmStorageLayout } from './artifacts/NonfungiblePositionManager.json'; import { abi as nfpmABI, storageLayout as nfpmStorageLayout } from './artifacts/NonfungiblePositionManager.json';
import poolABI from './artifacts/pool.json'; import poolABI from './artifacts/pool.json';
import { SyncStatus } from './entity/SyncStatus';
// TODO: Move to config. // TODO: Move to config.
const MAX_EVENTS_BLOCK_RANGE = 1000; const MAX_EVENTS_BLOCK_RANGE = 1000;
@ -103,16 +103,17 @@ export class Indexer {
} }
// Note: Some event names might be unknown at this point, as earlier events might not yet be processed. // Note: Some event names might be unknown at this point, as earlier events might not yet be processed.
async getOrFetchBlockEvents (blockHash: string): Promise<Array<Event>> { async getOrFetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<Array<Event>> {
const blockProgress = await this._db.getBlockProgress(blockHash); assert(block.blockHash);
const blockProgress = await this._db.getBlockProgress(block.blockHash);
if (!blockProgress) { if (!blockProgress) {
// Fetch and save events first and make a note in the event sync progress table. // Fetch and save events first and make a note in the event sync progress table.
log(`getBlockEvents: db miss, fetching from upstream server ${blockHash}`); log(`getBlockEvents: db miss, fetching from upstream server ${block.blockHash}`);
await this.fetchAndSaveEvents(blockHash); await this.fetchAndSaveEvents(block);
} }
const events = await this._db.getBlockEvents(blockHash); const events = await this._db.getBlockEvents(block.blockHash);
log(`getBlockEvents: db hit, ${blockHash} num events: ${events.length}`); log(`getBlockEvents: db hit, ${block.blockHash} num events: ${events.length}`);
return events; return events;
} }
@ -314,8 +315,9 @@ export class Indexer {
return { eventName, eventInfo }; return { eventName, eventInfo };
} }
async fetchAndSaveEvents (blockHash: string): Promise<void> { async fetchAndSaveEvents ({ blockHash }: DeepPartial<BlockProgress>): Promise<void> {
const { block, logs } = await this._ethClient.getLogs({ blockHash }); assert(blockHash);
let { block, logs } = await this._ethClient.getLogs({ blockHash });
const { const {
allEthHeaderCids: { allEthHeaderCids: {
@ -388,6 +390,13 @@ export class Indexer {
const dbTx = await this._db.createTransactionRunner(); const dbTx = await this._db.createTransactionRunner();
try { try {
block = {
blockHash,
blockNumber: block.number,
blockTimestamp: block.timestamp,
parentHash: block.parent.hash
};
await this._db.saveEvents(dbTx, block, dbEvents); await this._db.saveEvents(dbTx, block, dbEvents);
await dbTx.commitTransaction(); await dbTx.commitTransaction();
} catch (error) { } catch (error) {

View File

@ -10,12 +10,11 @@ import debug from 'debug';
import { getCache } from '@vulcanize/cache'; import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import { getConfig, JobQueue, MAX_REORG_DEPTH } from '@vulcanize/util'; import { getConfig, JobQueue, MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_CHAIN_PRUNING } from '@vulcanize/util';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
import { Database } from './database'; import { Database } from './database';
import { UNKNOWN_EVENT_NAME, Event } from './entity/Event'; import { UNKNOWN_EVENT_NAME, Event } from './entity/Event';
import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_CHAIN_PRUNING } from './events';
const log = debug('vulcanize:job-runner'); const log = debug('vulcanize:job-runner');
@ -36,7 +35,7 @@ export class JobRunner {
async subscribeBlockProcessingQueue (): Promise<void> { async subscribeBlockProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
const { data: { blockHash, blockNumber, parentHash, priority } } = job; const { data: { blockHash, blockNumber, parentHash, timestamp, priority } } = job;
log(`Processing block number ${blockNumber} hash ${blockHash} `); log(`Processing block number ${blockNumber} hash ${blockHash} `);
@ -51,7 +50,7 @@ export class JobRunner {
if (blockHash !== syncStatus.latestCanonicalBlockHash) { if (blockHash !== syncStatus.latestCanonicalBlockHash) {
const parent = await this._indexer.getBlockProgress(parentHash); const parent = await this._indexer.getBlockProgress(parentHash);
if (!parent) { if (!parent) {
const { number: parentBlockNumber, parent: { hash: grandparentHash } } = await this._indexer.getBlock(parentHash); const { number: parentBlockNumber, parent: { hash: grandparentHash }, timestamp: parentTimestamp } = await this._indexer.getBlock(parentHash);
// Create a higher priority job to index parent block and then abort. // Create a higher priority job to index parent block and then abort.
// We don't have to worry about aborting as this job will get retried later. // We don't have to worry about aborting as this job will get retried later.
@ -60,6 +59,7 @@ export class JobRunner {
blockHash: parentHash, blockHash: parentHash,
blockNumber: parentBlockNumber, blockNumber: parentBlockNumber,
parentHash: grandparentHash, parentHash: grandparentHash,
timestamp: parentTimestamp,
priority: newPriority priority: newPriority
}, { priority: newPriority }); }, { priority: newPriority });
@ -78,7 +78,7 @@ export class JobRunner {
} }
} }
const events = await this._indexer.getOrFetchBlockEvents(blockHash); const events = await this._indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
for (let ei = 0; ei < events.length; ei++) { for (let ei = 0; ei < events.length; ei++) {
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true }); await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true });
} }

View File

@ -1,5 +1,12 @@
//
// Copyright 2021 Vulcanize, Inc.
//
export * from './src/config'; export * from './src/config';
export * from './src/database'; export * from './src/database';
export * from './src/job-queue'; export * from './src/job-queue';
export * from './src/constants'; export * from './src/constants';
export * from './src/index'; export * from './src/index';
export * from './src/fill';
export * from './src/events';
export * from './src/types';

View File

@ -12,6 +12,12 @@ import { Config as CacheConfig } from '@vulcanize/cache';
const log = debug('vulcanize:config'); const log = debug('vulcanize:config');
export interface JobQueueConfig {
dbConnectionString: string;
maxCompletionLag: number;
jobDelay?: number;
}
export interface Config { export interface Config {
server: { server: {
host: string; host: string;
@ -36,11 +42,7 @@ export interface Config {
gqlSubscriptionEndpoint: string; gqlSubscriptionEndpoint: string;
} }
}, },
jobQueue: { jobQueue: JobQueueConfig
dbConnectionString: string;
maxCompletionLag: number;
jobDelay?: number;
}
} }
export const getConfig = async (configFile: string): Promise<Config> => { export const getConfig = async (configFile: string): Promise<Config> => {

View File

@ -3,3 +3,7 @@
// //
export const MAX_REORG_DEPTH = 16; export const MAX_REORG_DEPTH = 16;
export const QUEUE_BLOCK_PROCESSING = 'block-processing';
export const QUEUE_EVENT_PROCESSING = 'event-processing';
export const QUEUE_CHAIN_PRUNING = 'chain-pruning';

View File

@ -0,0 +1,72 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import debug from 'debug';
import { PubSub } from 'apollo-server-express';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { JobQueue } from './job-queue';
import { BlockProgressInterface, EventInterface, IndexerInterface } from './types';
const log = debug('vulcanize:events');
export const BlockProgressEvent = 'block-progress-event';
export class EventWatcher {
_ethClient: EthClient
_indexer: IndexerInterface
_subscription?: ZenObservable.Subscription
_pubsub: PubSub
_jobQueue: JobQueue
constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
this._ethClient = ethClient;
this._indexer = indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
}
getBlockProgressEventIterator (): AsyncIterator<any> {
return this._pubsub.asyncIterator([BlockProgressEvent]);
}
async publishBlockProgressToSubscribers (blockProgress: BlockProgressInterface): Promise<void> {
const { blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress;
// Publishing the event here will result in pushing the payload to GQL subscribers for `onAddressEvent(address)`.
await this._pubsub.publish(BlockProgressEvent, {
onBlockProgressEvent: {
blockHash,
blockNumber,
numEvents,
numProcessedEvents,
isComplete
}
});
}
async stop (): Promise<void> {
if (this._subscription) {
log('Stopped watching upstream blocks');
this._subscription.unsubscribe();
}
}
async eventProcessingCompleteHandler (job: any): Promise<EventInterface> {
const { data: { request } } = job;
const dbEvent = await this._indexer.getEvent(request.data.id);
assert(dbEvent);
await this._indexer.updateBlockProgress(dbEvent.block.blockHash, dbEvent.index);
const blockProgress = await this._indexer.getBlockProgress(dbEvent.block.blockHash);
if (blockProgress) {
await this.publishBlockProgressToSubscribers(blockProgress);
}
return dbEvent;
}
}

60
packages/util/src/fill.ts Normal file
View File

@ -0,0 +1,60 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import debug from 'debug';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { JobQueue } from './job-queue';
import { QUEUE_BLOCK_PROCESSING } from './constants';
import { EventWatcherInterface, IndexerInterface } from './types';
const log = debug('vulcanize:fill');
export const fillBlocks = async (
jobQueue: JobQueue,
indexer: IndexerInterface,
ethClient: EthClient,
eventWatcher: EventWatcherInterface,
{ startBlock, endBlock }: { startBlock: number, endBlock: number}
): Promise<any> => {
await eventWatcher.initBlockProcessingOnCompleteHandler();
await eventWatcher.initEventProcessingOnCompleteHandler();
for (let blockNumber = startBlock; blockNumber <= endBlock; blockNumber++) {
log(`Fill block ${blockNumber}`);
// TODO: Add pause between requests so as to not overwhelm the upsteam server.
const result = await ethClient.getBlockWithTransactions({ blockNumber });
const { allEthHeaderCids: { nodes: blockNodes } } = result;
for (let bi = 0; bi < blockNodes.length; bi++) {
const { blockHash, blockNumber, parentHash, timestamp } = blockNodes[bi];
const blockProgress = await indexer.getBlockProgress(blockHash);
if (blockProgress) {
log(`Block number ${blockNumber}, block hash ${blockHash} already known, skip filling`);
} else {
await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber, parentHash, timestamp });
}
}
}
// Creating an AsyncIterable from AsyncIterator to iterate over the values.
// https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of
const blockProgressEventIterable = {
// getBlockProgressEventIterator returns an AsyncIterator which can be used to listen to BlockProgress events.
[Symbol.asyncIterator]: eventWatcher.getBlockProgressEventIterator.bind(eventWatcher)
};
// Iterate over async iterable.
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of
for await (const data of blockProgressEventIterable) {
const { onBlockProgressEvent: { blockNumber, isComplete } } = data;
if (blockNumber >= endBlock && isComplete) {
// Break the async loop if blockProgress event is for the endBlock and processing is complete.
break;
}
}
};

View File

@ -0,0 +1,48 @@
//
// Copyright 2021 Vulcanize, Inc.
//
export interface BlockProgressInterface {
id: number;
blockHash: string;
parentHash: string;
blockNumber: number;
blockTimestamp: number;
numEvents: number;
numProcessedEvents: number;
lastProcessedEventIndex: number;
isComplete: boolean;
isPruned: boolean;
}
export interface SyncStatusInterface {
id: number;
chainHeadBlockHash: string;
chainHeadBlockNumber: number;
latestCanonicalBlockHash: string;
latestCanonicalBlockNumber: number;
}
export interface EventInterface {
id: number;
block: BlockProgressInterface;
txHash: string;
index: number;
contract: string;
eventName: string;
eventInfo: string;
extraInfo: string;
proof: string;
}
export interface IndexerInterface {
getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined>
getEvent (id: string): Promise<EventInterface | undefined>
updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void>
}
export interface EventWatcherInterface {
getBlockProgressEventIterator (): AsyncIterator<any>
initBlockProcessingOnCompleteHandler (): Promise<void>
initEventProcessingOnCompleteHandler (): Promise<void>
}