Change block processing to be pull based (#288)

* Implement pull based watcher for uni-watcher

* Fix same block processed multiple times

* Implement wait time for fetching block from config

* Use blockProgress event to fetch and process next block

* Rename utils index to misc
This commit is contained in:
nikugogoi 2021-10-26 17:36:21 +05:30 committed by GitHub
parent 137f7d1a21
commit 9d95e49ec9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 180 additions and 132 deletions

View File

@ -31,8 +31,8 @@ export class Indexer {
this._db = db; this._db = db;
this._ethClient = ethClient; this._ethClient = ethClient;
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
this._tracingClient = tracingClient; this._tracingClient = tracingClient;
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
} }
async isWatchedAddress (address : string): Promise<boolean> { async isWatchedAddress (address : string): Promise<boolean> {

View File

@ -18,6 +18,7 @@
gqlApiEndpoint = "http://127.0.0.1:8082/graphql" gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql" gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081" rpcProviderEndpoint = "http://127.0.0.1:8081"
blockDelayInMilliSecs = 2000
[upstream.cache] [upstream.cache]
name = "requests" name = "requests"

View File

@ -12,7 +12,8 @@ import {
EventWatcher as BaseEventWatcher, EventWatcher as BaseEventWatcher,
QUEUE_BLOCK_PROCESSING, QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING, QUEUE_EVENT_PROCESSING,
UNKNOWN_EVENT_NAME UNKNOWN_EVENT_NAME,
UpstreamConfig
} from '@vulcanize/util'; } from '@vulcanize/util';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
@ -30,7 +31,7 @@ export class EventWatcher {
_pubsub: PubSub _pubsub: PubSub
_jobQueue: JobQueue _jobQueue: JobQueue
constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, postgraphileClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient); assert(ethClient);
assert(indexer); assert(indexer);
@ -38,7 +39,7 @@ export class EventWatcher {
this._indexer = indexer; this._indexer = indexer;
this._pubsub = pubsub; this._pubsub = pubsub;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue); this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, postgraphileClient, this._indexer, this._pubsub, this._jobQueue);
} }
getEventIterator (): AsyncIterator<any> { getEventIterator (): AsyncIterator<any> {
@ -52,22 +53,15 @@ export class EventWatcher {
async start (): Promise<void> { async start (): Promise<void> {
assert(!this._subscription, 'subscription already started'); assert(!this._subscription, 'subscription already started');
await this.watchBlocksAtChainHead();
await this.initBlockProcessingOnCompleteHandler(); await this.initBlockProcessingOnCompleteHandler();
await this.initEventProcessingOnCompleteHandler(); await this.initEventProcessingOnCompleteHandler();
this._baseEventWatcher.startBlockProcessing();
} }
async stop (): Promise<void> { async stop (): Promise<void> {
this._baseEventWatcher.stop(); this._baseEventWatcher.stop();
} }
async watchBlocksAtChainHead (): Promise<void> {
log('Started watching upstream blocks...');
this._subscription = await this._ethClient.watchBlocks(async (value) => {
await this._baseEventWatcher.blocksHandler(value);
});
}
async initBlockProcessingOnCompleteHandler (): Promise<void> { async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => { this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
const { id, data: { failed } } = job; const { id, data: { failed } } = job;

View File

@ -54,13 +54,13 @@ export const main = async (): Promise<any> => {
await db.init(); await db.init();
assert(upstream, 'Missing upstream config'); assert(upstream, 'Missing upstream config');
const { ethServer: { gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint, blockDelayInMilliSecs }, cache: cacheConfig } = upstream;
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig); const cache = await getCache(cacheConfig);
const ethClient = new EthClient({ const ethClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint, gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache cache
}); });
@ -83,11 +83,11 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start(); await jobQueue.start();
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');
await fillBlocks(jobQueue, indexer, ethClient, eventWatcher, argv); await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, blockDelayInMilliSecs, argv);
}; };
main().catch(err => { main().catch(err => {

View File

@ -67,8 +67,8 @@ export class Indexer {
this._db = db; this._db = db;
this._ethClient = ethClient; this._ethClient = ethClient;
this._ethProvider = ethProvider;
this._postgraphileClient = postgraphileClient; this._postgraphileClient = postgraphileClient;
this._ethProvider = ethProvider;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._ethProvider); this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._ethProvider);
const { abi, storageLayout } = artifacts; const { abi, storageLayout } = artifacts;

View File

@ -33,9 +33,9 @@ export class JobRunner {
_jobQueueConfig: JobQueueConfig _jobQueueConfig: JobQueueConfig
constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) { constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) {
this._jobQueueConfig = jobQueueConfig;
this._indexer = indexer; this._indexer = indexer;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
this._jobQueueConfig = jobQueueConfig;
this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue); this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue);
} }

View File

@ -81,7 +81,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
if (watcherKind === KIND_ACTIVE) { if (watcherKind === KIND_ACTIVE) {
await jobQueue.start(); await jobQueue.start();

View File

@ -19,6 +19,7 @@
gqlApiEndpoint = "http://127.0.0.1:8082/graphql" gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql" gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081" rpcProviderEndpoint = "http://127.0.0.1:8081"
blockDelayInMilliSecs = 2000
[upstream.cache] [upstream.cache]
name = "requests" name = "requests"

View File

@ -12,7 +12,8 @@ import {
EventWatcher as BaseEventWatcher, EventWatcher as BaseEventWatcher,
QUEUE_BLOCK_PROCESSING, QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING, QUEUE_EVENT_PROCESSING,
UNKNOWN_EVENT_NAME UNKNOWN_EVENT_NAME,
UpstreamConfig
} from '@vulcanize/util'; } from '@vulcanize/util';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
@ -30,7 +31,7 @@ export class EventWatcher {
_pubsub: PubSub _pubsub: PubSub
_jobQueue: JobQueue _jobQueue: JobQueue
constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, postgraphileClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient); assert(ethClient);
assert(indexer); assert(indexer);
@ -38,7 +39,7 @@ export class EventWatcher {
this._indexer = indexer; this._indexer = indexer;
this._pubsub = pubsub; this._pubsub = pubsub;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue); this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, postgraphileClient, this._indexer, this._pubsub, this._jobQueue);
} }
getEventIterator (): AsyncIterator<any> { getEventIterator (): AsyncIterator<any> {
@ -52,22 +53,15 @@ export class EventWatcher {
async start (): Promise<void> { async start (): Promise<void> {
assert(!this._subscription, 'subscription already started'); assert(!this._subscription, 'subscription already started');
await this.watchBlocksAtChainHead();
await this.initBlockProcessingOnCompleteHandler(); await this.initBlockProcessingOnCompleteHandler();
await this.initEventProcessingOnCompleteHandler(); await this.initEventProcessingOnCompleteHandler();
this._baseEventWatcher.startBlockProcessing();
} }
async stop (): Promise<void> { async stop (): Promise<void> {
this._baseEventWatcher.stop(); this._baseEventWatcher.stop();
} }
async watchBlocksAtChainHead (): Promise<void> {
log('Started watching upstream blocks...');
this._subscription = await this._ethClient.watchBlocks(async (value) => {
await this._baseEventWatcher.blocksHandler(value);
});
}
async initBlockProcessingOnCompleteHandler (): Promise<void> { async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => { this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
const { id, data: { failed } } = job; const { id, data: { failed } } = job;

View File

@ -57,16 +57,21 @@ export const main = async (): Promise<any> => {
await db.init(); await db.init();
assert(upstream, 'Missing upstream config'); assert(upstream, 'Missing upstream config');
const { ethServer: { gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint, blockDelayInMilliSecs }, cache: cacheConfig } = upstream;
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig); const cache = await getCache(cacheConfig);
const ethClient = new EthClient({ const ethClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint, gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache cache
}); });
const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});
const ethProvider = getCustomProvider(rpcProviderEndpoint); const ethProvider = getCustomProvider(rpcProviderEndpoint);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
@ -80,11 +85,11 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start(); await jobQueue.start();
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');
await fillBlocks(jobQueue, indexer, ethClient, eventWatcher, argv); await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, blockDelayInMilliSecs, argv);
}; };
main().catch(err => { main().catch(err => {

View File

@ -33,9 +33,9 @@ export class JobRunner {
_jobQueueConfig: JobQueueConfig _jobQueueConfig: JobQueueConfig
constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) { constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) {
this._jobQueueConfig = jobQueueConfig;
this._indexer = indexer; this._indexer = indexer;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
this._jobQueueConfig = jobQueueConfig;
this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue); this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue);
} }

View File

@ -62,6 +62,11 @@ export const main = async (): Promise<any> => {
cache cache
}); });
const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});
const ethProvider = getCustomProvider(rpcProviderEndpoint); const ethProvider = getCustomProvider(rpcProviderEndpoint);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
@ -75,7 +80,7 @@ export const main = async (): Promise<any> => {
assert(dbConnectionString, 'Missing job queue db connection string'); assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
if (watcherKind === KIND_ACTIVE) { if (watcherKind === KIND_ACTIVE) {
await jobQueue.start(); await jobQueue.start();

View File

@ -76,7 +76,7 @@ export class EthClient {
return this._graphqlClient.query(ethQueries.getBlocksByNumber, { blockNumber }); return this._graphqlClient.query(ethQueries.getBlocksByNumber, { blockNumber });
} }
async getBlockByHash (blockHash: string): Promise<any> { async getBlockByHash (blockHash?: string): Promise<any> {
const { block } = await this._graphqlClient.query(ethQueries.getBlockByHash, { blockHash }); const { block } = await this._graphqlClient.query(ethQueries.getBlockByHash, { blockHash });
block.number = parseInt(block.number, 16); block.number = parseInt(block.number, 16);
block.timestamp = parseInt(block.timestamp, 16); block.timestamp = parseInt(block.timestamp, 16);

View File

@ -20,6 +20,7 @@
gqlApiEndpoint = "http://127.0.0.1:8082/graphql" gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql" gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081" rpcProviderEndpoint = "http://127.0.0.1:8081"
blockDelayInMilliSecs = 2000
[upstream.cache] [upstream.cache]
name = "requests" name = "requests"

View File

@ -20,6 +20,7 @@
gqlApiEndpoint = "http://127.0.0.1:8082/graphql" gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql" gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8545" rpcProviderEndpoint = "http://127.0.0.1:8545"
blockDelayInMilliSecs = 2000
[upstream.cache] [upstream.cache]
name = "requests" name = "requests"

View File

@ -7,7 +7,7 @@ import debug from 'debug';
import { PubSub } from 'apollo-server-express'; import { PubSub } from 'apollo-server-express';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import { EventWatcher as BaseEventWatcher, EventWatcherInterface, JobQueue, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from '@vulcanize/util'; import { EventWatcher as BaseEventWatcher, EventWatcherInterface, JobQueue, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, UpstreamConfig } from '@vulcanize/util';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
@ -124,12 +124,12 @@ export class EventWatcher implements EventWatcherInterface {
_jobQueue: JobQueue _jobQueue: JobQueue
_baseEventWatcher: BaseEventWatcher _baseEventWatcher: BaseEventWatcher
constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, postgraphileClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
this._ethClient = ethClient; this._ethClient = ethClient;
this._indexer = indexer; this._indexer = indexer;
this._pubsub = pubsub; this._pubsub = pubsub;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue); this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, postgraphileClient, this._indexer, this._pubsub, this._jobQueue);
} }
getBlockProgressEventIterator (): AsyncIterator<any> { getBlockProgressEventIterator (): AsyncIterator<any> {
@ -140,21 +140,15 @@ export class EventWatcher implements EventWatcherInterface {
assert(!this._subscription, 'subscription already started'); assert(!this._subscription, 'subscription already started');
log('Started watching upstream events...'); log('Started watching upstream events...');
await this.watchBlocksAtChainHead();
await this.initBlockProcessingOnCompleteHandler(); await this.initBlockProcessingOnCompleteHandler();
await this.initEventProcessingOnCompleteHandler(); await this.initEventProcessingOnCompleteHandler();
this._baseEventWatcher.startBlockProcessing();
} }
async stop (): Promise<void> { async stop (): Promise<void> {
this._baseEventWatcher.stop(); this._baseEventWatcher.stop();
} }
async watchBlocksAtChainHead (): Promise<void> {
this._subscription = await this._ethClient.watchBlocks(async (value) => {
await this._baseEventWatcher.blocksHandler(value);
});
}
async initBlockProcessingOnCompleteHandler (): Promise<void> { async initBlockProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => { await this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
const { id, data: { failed } } = job; const { id, data: { failed } } = job;

View File

@ -59,16 +59,21 @@ export const main = async (): Promise<any> => {
await db.init(); await db.init();
assert(upstream, 'Missing upstream config'); assert(upstream, 'Missing upstream config');
const { ethServer: { gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig, uniWatcher, tokenWatcher } = upstream; const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint, blockDelayInMilliSecs }, cache: cacheConfig, uniWatcher, tokenWatcher } = upstream;
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig); const cache = await getCache(cacheConfig);
const ethClient = new EthClient({ const ethClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint, gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache cache
}); });
const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});
const uniClient = new UniClient(uniWatcher); const uniClient = new UniClient(uniWatcher);
const erc20Client = new ERC20Client(tokenWatcher); const erc20Client = new ERC20Client(tokenWatcher);
const ethProvider = getCustomProvider(rpcProviderEndpoint); const ethProvider = getCustomProvider(rpcProviderEndpoint);
@ -85,9 +90,9 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start(); await jobQueue.start();
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, ethClient, eventWatcher, argv); await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, blockDelayInMilliSecs, argv);
}; };
main().catch(err => { main().catch(err => {

View File

@ -35,9 +35,9 @@ export class JobRunner {
_jobQueueConfig: JobQueueConfig _jobQueueConfig: JobQueueConfig
constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) { constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) {
this._jobQueueConfig = jobQueueConfig;
this._indexer = indexer; this._indexer = indexer;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
this._jobQueueConfig = jobQueueConfig;
this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue); this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue);
} }

View File

@ -74,6 +74,11 @@ export const main = async (): Promise<any> => {
cache cache
}); });
const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});
const uniClient = new UniClient(uniWatcher); const uniClient = new UniClient(uniWatcher);
const erc20Client = new ERC20Client(tokenWatcher); const erc20Client = new ERC20Client(tokenWatcher);
const ethProvider = getCustomProvider(rpcProviderEndpoint); const ethProvider = getCustomProvider(rpcProviderEndpoint);
@ -88,7 +93,7 @@ export const main = async (): Promise<any> => {
await jobQueue.start(); await jobQueue.start();
const pubSub = new PubSub(); const pubSub = new PubSub();
const eventWatcher = new EventWatcher(ethClient, indexer, pubSub, jobQueue); const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubSub, jobQueue);
await eventWatcher.start(); await eventWatcher.start();
const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher); const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher);

View File

@ -17,6 +17,7 @@
gqlApiEndpoint = "http://127.0.0.1:8082/graphql" gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql" gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081" rpcProviderEndpoint = "http://127.0.0.1:8081"
blockDelayInMilliSecs = 2000
[upstream.cache] [upstream.cache]
name = "requests" name = "requests"

View File

@ -17,6 +17,7 @@
gqlApiEndpoint = "http://127.0.0.1:8082/graphql" gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql" gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8545" rpcProviderEndpoint = "http://127.0.0.1:8545"
blockDelayInMilliSecs = 2000
[upstream.cache] [upstream.cache]
name = "requests" name = "requests"

View File

@ -12,7 +12,8 @@ import {
EventWatcher as BaseEventWatcher, EventWatcher as BaseEventWatcher,
QUEUE_BLOCK_PROCESSING, QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING, QUEUE_EVENT_PROCESSING,
EventWatcherInterface EventWatcherInterface,
UpstreamConfig
} from '@vulcanize/util'; } from '@vulcanize/util';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
@ -30,12 +31,12 @@ export class EventWatcher implements EventWatcherInterface {
_jobQueue: JobQueue _jobQueue: JobQueue
_baseEventWatcher: BaseEventWatcher _baseEventWatcher: BaseEventWatcher
constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, postgraphileClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
this._ethClient = ethClient; this._ethClient = ethClient;
this._indexer = indexer; this._indexer = indexer;
this._pubsub = pubsub; this._pubsub = pubsub;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue); this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, postgraphileClient, this._indexer, this._pubsub, this._jobQueue);
} }
getEventIterator (): AsyncIterator<any> { getEventIterator (): AsyncIterator<any> {
@ -49,22 +50,15 @@ export class EventWatcher implements EventWatcherInterface {
async start (): Promise<void> { async start (): Promise<void> {
assert(!this._subscription, 'subscription already started'); assert(!this._subscription, 'subscription already started');
await this.watchBlocksAtChainHead();
await this.initBlockProcessingOnCompleteHandler(); await this.initBlockProcessingOnCompleteHandler();
await this.initEventProcessingOnCompleteHandler(); await this.initEventProcessingOnCompleteHandler();
this._baseEventWatcher.startBlockProcessing();
} }
async stop (): Promise<void> { async stop (): Promise<void> {
this._baseEventWatcher.stop(); this._baseEventWatcher.stop();
} }
async watchBlocksAtChainHead (): Promise<void> {
log('Started watching upstream blocks...');
this._subscription = await this._ethClient.watchBlocks(async (value) => {
await this._baseEventWatcher.blocksHandler(value);
});
}
async initBlockProcessingOnCompleteHandler (): Promise<void> { async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => { this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
const { id, data: { failed } } = job; const { id, data: { failed } } = job;

View File

@ -57,12 +57,12 @@ export const main = async (): Promise<any> => {
await db.init(); await db.init();
assert(upstream, 'Missing upstream config'); assert(upstream, 'Missing upstream config');
const { ethServer: { gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint, blockDelayInMilliSecs }, cache: cacheConfig } = upstream;
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig); const cache = await getCache(cacheConfig);
const ethClient = new EthClient({ const ethClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint, gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache cache
}); });
@ -85,11 +85,11 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start(); await jobQueue.start();
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');
await fillBlocks(jobQueue, indexer, ethClient, eventWatcher, argv); await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, blockDelayInMilliSecs, argv);
}; };
main().catch(err => { main().catch(err => {

View File

@ -34,9 +34,9 @@ export class JobRunner {
_jobQueueConfig: JobQueueConfig _jobQueueConfig: JobQueueConfig
constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) { constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) {
this._jobQueueConfig = jobQueueConfig;
this._indexer = indexer; this._indexer = indexer;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
this._jobQueueConfig = jobQueueConfig;
this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue); this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue);
} }

View File

@ -82,7 +82,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start(); await jobQueue.start();
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
await eventWatcher.start(); await eventWatcher.start();
const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher); const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher);

View File

@ -6,7 +6,7 @@ export * from './src/config';
export * from './src/database'; export * from './src/database';
export * from './src/job-queue'; export * from './src/job-queue';
export * from './src/constants'; export * from './src/constants';
export * from './src/index'; export * from './src/misc';
export * from './src/fill'; export * from './src/fill';
export * from './src/events'; export * from './src/events';
export * from './src/types'; export * from './src/types';

View File

@ -1,5 +1,13 @@
import { JOB_KIND_PRUNE, QUEUE_BLOCK_PROCESSING } from './constants'; import debug from 'debug';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { JOB_KIND_PRUNE, QUEUE_BLOCK_PROCESSING, JOB_KIND_INDEX } from './constants';
import { JobQueue } from './job-queue'; import { JobQueue } from './job-queue';
import { IndexerInterface } from './types';
import { wait } from './misc';
const log = debug('vulcanize:common');
/** /**
* Create pruning job in QUEUE_BLOCK_PROCESSING. * Create pruning job in QUEUE_BLOCK_PROCESSING.
@ -24,3 +32,46 @@ export const createPruningJob = async (jobQueue: JobQueue, latestCanonicalBlockN
} }
); );
}; };
/**
* Method to fetch block by number and push to job queue.
* @param jobQueue
* @param indexer
* @param ethClient
* @param blockNumber
*/
export const processBlockByNumber = async (
jobQueue: JobQueue,
indexer: IndexerInterface,
ethClient: EthClient,
blockDelayInMilliSecs: number,
blockNumber: number
): Promise<void> => {
log(`Process block ${blockNumber}`);
while (true) {
const result = await ethClient.getBlocksByNumber(blockNumber);
const { allEthHeaderCids: { nodes: blockNodes } } = result;
if (blockNodes.length) {
for (let bi = 0; bi < blockNodes.length; bi++) {
const { blockHash, blockNumber, parentHash, timestamp } = blockNodes[bi];
const blockProgress = await indexer.getBlockProgress(blockHash);
if (blockProgress) {
log(`Block number ${blockNumber}, block hash ${blockHash} already processed`);
} else {
await indexer.updateSyncStatusChainHead(blockHash, blockNumber);
await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { kind: JOB_KIND_INDEX, blockHash, blockNumber, parentHash, timestamp });
}
}
return;
}
log(`No blocks fetched for block number ${blockNumber}, retrying after ${blockDelayInMilliSecs} ms delay.`);
await wait(blockDelayInMilliSecs);
}
};

View File

@ -13,7 +13,7 @@ import { BaseProvider } from '@ethersproject/providers';
import { Config as CacheConfig, getCache } from '@vulcanize/cache'; import { Config as CacheConfig, getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import { getCustomProvider } from './index'; import { getCustomProvider } from './misc';
const log = debug('vulcanize:config'); const log = debug('vulcanize:config');
@ -30,12 +30,13 @@ interface ServerConfig {
kind: string; kind: string;
} }
interface UpstreamConfig { export interface UpstreamConfig {
cache: CacheConfig, cache: CacheConfig,
ethServer: { ethServer: {
gqlApiEndpoint: string; gqlApiEndpoint: string;
gqlPostgraphileEndpoint: string; gqlPostgraphileEndpoint: string;
rpcProviderEndpoint: string rpcProviderEndpoint: string;
blockDelayInMilliSecs: number;
} }
traceProviderEndpoint: string; traceProviderEndpoint: string;
uniWatcher: { uniWatcher: {

View File

@ -5,14 +5,14 @@
import assert from 'assert'; import assert from 'assert';
import debug from 'debug'; import debug from 'debug';
import { PubSub } from 'apollo-server-express'; import { PubSub } from 'apollo-server-express';
import _ from 'lodash';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import { JobQueue } from './job-queue'; import { JobQueue } from './job-queue';
import { BlockProgressInterface, EventInterface, IndexerInterface } from './types'; import { BlockProgressInterface, EventInterface, IndexerInterface } from './types';
import { QUEUE_BLOCK_PROCESSING, MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX } from './constants'; import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX } from './constants';
import { createPruningJob } from './common'; import { createPruningJob, processBlockByNumber } from './common';
import { UpstreamConfig } from './config';
const log = debug('vulcanize:events'); const log = debug('vulcanize:events');
@ -20,13 +20,17 @@ export const BlockProgressEvent = 'block-progress-event';
export class EventWatcher { export class EventWatcher {
_ethClient: EthClient _ethClient: EthClient
_postgraphileClient: EthClient
_indexer: IndexerInterface _indexer: IndexerInterface
_subscription?: ZenObservable.Subscription _subscription?: ZenObservable.Subscription
_pubsub: PubSub _pubsub: PubSub
_jobQueue: JobQueue _jobQueue: JobQueue
_upstreamConfig: UpstreamConfig
constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, postgraphileClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
this._upstreamConfig = upstreamConfig;
this._ethClient = ethClient; this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._indexer = indexer; this._indexer = indexer;
this._pubsub = pubsub; this._pubsub = pubsub;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
@ -36,14 +40,45 @@ export class EventWatcher {
return this._pubsub.asyncIterator([BlockProgressEvent]); return this._pubsub.asyncIterator([BlockProgressEvent]);
} }
async blocksHandler (value: any): Promise<void> { async stop (): Promise<void> {
const { blockHash, blockNumber, parentHash, timestamp } = _.get(value, 'data.listen.relatedNode'); if (this._subscription) {
log('Stopped watching upstream blocks');
this._subscription.unsubscribe();
}
}
await this._indexer.updateSyncStatusChainHead(blockHash, blockNumber); async startBlockProcessing (): Promise<void> {
const syncStatus = await this._indexer.getSyncStatus();
let blockNumber;
log('watchBlock', blockHash, blockNumber); if (!syncStatus) {
// Get latest block in chain.
const { block: currentBlock } = await this._ethClient.getBlockByHash();
blockNumber = currentBlock.number + 1;
} else {
blockNumber = syncStatus.latestIndexedBlockNumber + 1;
}
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { kind: JOB_KIND_INDEX, blockHash, blockNumber, parentHash, timestamp }); const { ethServer: { blockDelayInMilliSecs } } = this._upstreamConfig;
processBlockByNumber(this._jobQueue, this._indexer, this._postgraphileClient, blockDelayInMilliSecs, blockNumber + 1);
// Creating an AsyncIterable from AsyncIterator to iterate over the values.
// https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of
const blockProgressEventIterable = {
// getBlockProgressEventIterator returns an AsyncIterator which can be used to listen to BlockProgress events.
[Symbol.asyncIterator]: this.getBlockProgressEventIterator.bind(this)
};
// Iterate over async iterable.
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of
for await (const data of blockProgressEventIterable) {
const { onBlockProgressEvent: { blockNumber, isComplete } } = data;
if (isComplete) {
processBlockByNumber(this._jobQueue, this._indexer, this._postgraphileClient, blockDelayInMilliSecs, blockNumber + 1);
}
}
} }
async blockProcessingCompleteHandler (job: any): Promise<void> { async blockProcessingCompleteHandler (job: any): Promise<void> {
@ -130,11 +165,4 @@ export class EventWatcher {
await this._indexer.updateSyncStatusCanonicalBlock(block.blockHash, block.blockNumber); await this._indexer.updateSyncStatusCanonicalBlock(block.blockHash, block.blockNumber);
} }
async stop (): Promise<void> {
if (this._subscription) {
log('Stopped watching upstream blocks');
this._subscription.unsubscribe();
}
}
} }

View File

@ -2,22 +2,20 @@
// Copyright 2021 Vulcanize, Inc. // Copyright 2021 Vulcanize, Inc.
// //
import debug from 'debug';
import assert from 'assert'; import assert from 'assert';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import { JobQueue } from './job-queue'; import { JobQueue } from './job-queue';
import { JOB_KIND_INDEX, QUEUE_BLOCK_PROCESSING } from './constants';
import { EventWatcherInterface, IndexerInterface } from './types'; import { EventWatcherInterface, IndexerInterface } from './types';
import { processBlockByNumber } from './common';
const log = debug('vulcanize:fill');
export const fillBlocks = async ( export const fillBlocks = async (
jobQueue: JobQueue, jobQueue: JobQueue,
indexer: IndexerInterface, indexer: IndexerInterface,
ethClient: EthClient, ethClient: EthClient,
eventWatcher: EventWatcherInterface, eventWatcher: EventWatcherInterface,
blockDelayInMilliSecs: number,
{ startBlock, endBlock }: { startBlock: number, endBlock: number} { startBlock, endBlock }: { startBlock: number, endBlock: number}
): Promise<any> => { ): Promise<any> => {
assert(startBlock < endBlock, 'endBlock should be greater than startBlock'); assert(startBlock < endBlock, 'endBlock should be greater than startBlock');
@ -36,7 +34,7 @@ export const fillBlocks = async (
currentBlockNumber = syncStatus.latestIndexedBlockNumber + 1; currentBlockNumber = syncStatus.latestIndexedBlockNumber + 1;
} }
processBlockByNumber(jobQueue, indexer, ethClient, currentBlockNumber); processBlockByNumber(jobQueue, indexer, ethClient, blockDelayInMilliSecs, currentBlockNumber);
// Creating an AsyncIterable from AsyncIterator to iterate over the values. // Creating an AsyncIterable from AsyncIterator to iterate over the values.
// https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of // https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of
@ -57,39 +55,7 @@ export const fillBlocks = async (
} }
currentBlockNumber++; currentBlockNumber++;
processBlockByNumber(jobQueue, indexer, ethClient, currentBlockNumber); processBlockByNumber(jobQueue, indexer, ethClient, blockDelayInMilliSecs, currentBlockNumber);
}
}
};
/**
* Method to fetch block by number and push to job queue.
* @param jobQueue
* @param indexer
* @param ethClient
* @param blockNumber
*/
const processBlockByNumber = async (
jobQueue: JobQueue,
indexer: IndexerInterface,
ethClient: EthClient,
blockNumber: number
) => {
log(`Fill block ${blockNumber}`);
const result = await ethClient.getBlocksByNumber(blockNumber);
const { allEthHeaderCids: { nodes: blockNodes } } = result;
for (let bi = 0; bi < blockNodes.length; bi++) {
const { blockHash, blockNumber, parentHash, timestamp } = blockNodes[bi];
const blockProgress = await indexer.getBlockProgress(blockHash);
if (blockProgress) {
log(`Block number ${blockNumber}, block hash ${blockHash} already known, skip filling`);
} else {
await indexer.updateSyncStatusChainHead(blockHash, blockNumber);
await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { kind: JOB_KIND_INDEX, blockHash, blockNumber, parentHash, timestamp });
} }
} }
}; };

View File

@ -34,8 +34,8 @@ export class Indexer {
constructor (db: DatabaseInterface, ethClient: EthClient, ethProvider: ethers.providers.BaseProvider) { constructor (db: DatabaseInterface, ethClient: EthClient, ethProvider: ethers.providers.BaseProvider) {
this._db = db; this._db = db;
this._ethClient = ethClient; this._ethClient = ethClient;
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
this._ethProvider = ethProvider; this._ethProvider = ethProvider;
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
} }
async getSyncStatus (): Promise<SyncStatusInterface | undefined> { async getSyncStatus (): Promise<SyncStatusInterface | undefined> {

View File

@ -4,7 +4,7 @@
import assert from 'assert'; import assert from 'assert';
import debug from 'debug'; import debug from 'debug';
import { wait } from '.'; import { wait } from './misc';
import { createPruningJob } from './common'; import { createPruningJob } from './common';
import { JobQueueConfig } from './config'; import { JobQueueConfig } from './config';
@ -20,9 +20,9 @@ export class JobRunner {
_jobQueueConfig: JobQueueConfig _jobQueueConfig: JobQueueConfig
constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) { constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) {
this._jobQueueConfig = jobQueueConfig;
this._indexer = indexer; this._indexer = indexer;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
this._jobQueueConfig = jobQueueConfig;
} }
async processBlock (job: any): Promise<void> { async processBlock (job: any): Promise<void> {