Use postgraphile client to fetch blocks

This commit is contained in:
Prathamesh Musale 2021-11-01 15:35:02 +05:30 committed by nabarun
parent bb1345c696
commit 62521d7ccc
9 changed files with 13 additions and 7 deletions

View File

@ -17,7 +17,7 @@ import {
QUEUE_HOOKS, QUEUE_HOOKS,
QUEUE_IPFS, QUEUE_IPFS,
UNKNOWN_EVENT_NAME, UNKNOWN_EVENT_NAME,
UpstreamConfig UpstreamConfig,
JOB_KIND_PRUNE JOB_KIND_PRUNE
} from '@vulcanize/util'; } from '@vulcanize/util';

View File

@ -62,7 +62,7 @@ export const main = async (): Promise<any> => {
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, argv); await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv);
}; };
main().catch(err => { main().catch(err => {

View File

@ -60,7 +60,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start(); await jobQueue.start();
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
// Import data. // Import data.
const importFilePath = path.resolve(argv.importFile); const importFilePath = path.resolve(argv.importFile);
@ -71,8 +71,9 @@ export const main = async (): Promise<any> => {
await fillBlocks( await fillBlocks(
jobQueue, jobQueue,
indexer, indexer,
ethClient, postgraphileClient,
eventWatcher, eventWatcher,
config.upstream.ethServer.blockDelayInMilliSecs,
{ {
startBlock: importData.snapshotBlock.blockNumber, startBlock: importData.snapshotBlock.blockNumber,
endBlock: importData.snapshotBlock.blockNumber endBlock: importData.snapshotBlock.blockNumber

View File

@ -89,11 +89,13 @@ export class Indexer implements IndexerInterface {
constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider) { constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider) {
assert(db); assert(db);
assert(ethClient); assert(ethClient);
assert(postgraphileClient);
this._db = db; this._db = db;
this._ethClient = ethClient; this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient; this._postgraphileClient = postgraphileClient;
this._ethProvider = ethProvider; this._ethProvider = ethProvider;
this._serverConfig = serverConfig;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider); this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider);
const { abi, storageLayout } = artifacts; const { abi, storageLayout } = artifacts;

View File

@ -138,7 +138,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start(); await jobQueue.start();
const jobRunner = new JobRunner(jobQueueConfig, config.server, indexer, jobQueue); const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
await jobRunner.start(); await jobRunner.start();
}; };

View File

@ -45,7 +45,7 @@ export const main = async (): Promise<any> => {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub(); const pubsub = new PubSub();
const indexer = new Indexer(config.server, db, ethClient, ethProvider); const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider);
const jobQueueConfig = config.jobQueue; const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');

View File

@ -58,6 +58,7 @@ export class Indexer implements IndexerInterface {
constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, serverMode: string) { constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, serverMode: string) {
assert(db); assert(db);
assert(ethClient); assert(ethClient);
assert(postgraphileClient);
this._db = db; this._db = db;
this._ethClient = ethClient; this._ethClient = ethClient;

View File

@ -52,6 +52,7 @@ export class Indexer implements IndexerInterface {
assert(db); assert(db);
assert(uniClient); assert(uniClient);
assert(erc20Client); assert(erc20Client);
assert(ethClient);
assert(postgraphileClient); assert(postgraphileClient);
this._db = db; this._db = db;

View File

@ -70,7 +70,7 @@ export const processBlockByNumber = async (
if (blocks.length) { if (blocks.length) {
for (let bi = 0; bi < blocks.length; bi++) { for (let bi = 0; bi < blocks.length; bi++) {
const { blockHash, blockNumber, parentHash, timestamp } = blocks[bi]; const { cid, blockHash, blockNumber, parentHash, timestamp } = blocks[bi];
// Stop blocks already pushed to job queue. They are already retried after fail. // Stop blocks already pushed to job queue. They are already retried after fail.
if (!syncStatus || syncStatus.chainHeadBlockNumber < blockNumber) { if (!syncStatus || syncStatus.chainHeadBlockNumber < blockNumber) {
@ -79,6 +79,7 @@ export const processBlockByNumber = async (
{ {
kind: JOB_KIND_INDEX, kind: JOB_KIND_INDEX,
blockNumber: Number(blockNumber), blockNumber: Number(blockNumber),
cid,
blockHash, blockHash,
parentHash, parentHash,
timestamp timestamp