diff --git a/packages/codegen/src/templates/events-template.handlebars b/packages/codegen/src/templates/events-template.handlebars index 28ffb99d..0bae9fe6 100644 --- a/packages/codegen/src/templates/events-template.handlebars +++ b/packages/codegen/src/templates/events-template.handlebars @@ -17,7 +17,7 @@ import { QUEUE_HOOKS, QUEUE_IPFS, UNKNOWN_EVENT_NAME, - UpstreamConfig + UpstreamConfig, JOB_KIND_PRUNE } from '@vulcanize/util'; diff --git a/packages/codegen/src/templates/fill-template.handlebars b/packages/codegen/src/templates/fill-template.handlebars index 60a5c481..d21cd915 100644 --- a/packages/codegen/src/templates/fill-template.handlebars +++ b/packages/codegen/src/templates/fill-template.handlebars @@ -62,7 +62,7 @@ export const main = async (): Promise => { const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); - await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, argv); + await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv); }; main().catch(err => { diff --git a/packages/codegen/src/templates/import-state-template.handlebars b/packages/codegen/src/templates/import-state-template.handlebars index 26eace38..7c985f24 100644 --- a/packages/codegen/src/templates/import-state-template.handlebars +++ b/packages/codegen/src/templates/import-state-template.handlebars @@ -60,7 +60,7 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); // Import data. const importFilePath = path.resolve(argv.importFile); @@ -71,8 +71,9 @@ export const main = async (): Promise => { await fillBlocks( jobQueue, indexer, - ethClient, + postgraphileClient, eventWatcher, + config.upstream.ethServer.blockDelayInMilliSecs, { startBlock: importData.snapshotBlock.blockNumber, endBlock: importData.snapshotBlock.blockNumber diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 9500c84d..261ce85f 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -89,11 +89,13 @@ export class Indexer implements IndexerInterface { constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider) { assert(db); assert(ethClient); + assert(postgraphileClient); this._db = db; this._ethClient = ethClient; this._postgraphileClient = postgraphileClient; this._ethProvider = ethProvider; + this._serverConfig = serverConfig; this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider); const { abi, storageLayout } = artifacts; diff --git a/packages/codegen/src/templates/job-runner-template.handlebars b/packages/codegen/src/templates/job-runner-template.handlebars index 3c795e1b..f0be25d6 100644 --- a/packages/codegen/src/templates/job-runner-template.handlebars +++ b/packages/codegen/src/templates/job-runner-template.handlebars @@ -138,7 +138,7 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const jobRunner = new JobRunner(jobQueueConfig, config.server, indexer, jobQueue); + const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); await jobRunner.start(); }; diff --git a/packages/codegen/src/templates/server-template.handlebars b/packages/codegen/src/templates/server-template.handlebars index 66148502..d51449cf 100644 --- a/packages/codegen/src/templates/server-template.handlebars +++ b/packages/codegen/src/templates/server-template.handlebars @@ -45,7 +45,7 @@ export const main = async (): Promise => { // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(config.server, db, ethClient, ethProvider); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index 9b775d0a..a21dead5 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -58,6 +58,7 @@ export class Indexer implements IndexerInterface { constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, serverMode: string) { assert(db); assert(ethClient); + assert(postgraphileClient); this._db = db; this._ethClient = ethClient; diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index f65507e9..3a0edd27 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -52,6 +52,7 @@ export class Indexer implements IndexerInterface { assert(db); assert(uniClient); assert(erc20Client); + assert(ethClient); assert(postgraphileClient); this._db = db; diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index 58275c49..0ee2d263 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -70,7 +70,7 @@ export const processBlockByNumber = async ( if (blocks.length) { for (let bi = 0; bi < blocks.length; bi++) { - const { blockHash, blockNumber, parentHash, timestamp } = blocks[bi]; + const { cid, blockHash, blockNumber, parentHash, timestamp } = blocks[bi]; // Stop blocks already pushed to job queue. They are already retried after fail. if (!syncStatus || syncStatus.chainHeadBlockNumber < blockNumber) { @@ -79,6 +79,7 @@ export const processBlockByNumber = async ( { kind: JOB_KIND_INDEX, blockNumber: Number(blockNumber), + cid, blockHash, parentHash, timestamp