Deprecate postgraphile usage (#122)

* Deprecate postgraphile usage

* Change endpoint in watcher config

* Remove subscription queries from eth-client

* Remove postgraphile config and client from watchers

Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
prathamesh0 2022-06-08 12:13:52 +05:30 committed by GitHub
parent e4b9596be7
commit 71d34331e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
90 changed files with 207 additions and 321 deletions

View File

@ -55,7 +55,6 @@ The default config files used by the watchers assume the following services are
* `vulcanize/go-ethereum` on port 8545
* `vulcanize/ipld-eth-server` with native GQL API enabled, on port 8082
* `postgraphile` on the `vulcanize/ipld-eth-server` database, on port 5000
### Note

View File

@ -20,7 +20,6 @@
* [vulcanize/go-ethereum](https://github.com/vulcanize/go-ethereum) ([v1.10.17-statediff-3.2.0](https://github.com/vulcanize/go-ethereum/releases/tag/v1.10.17-statediff-3.2.0)) on port 8545.
* [vulcanize/ipld-eth-server](https://github.com/vulcanize/ipld-eth-server) ([v3.0.0](https://github.com/vulcanize/ipld-eth-server/releases/tag/v3.0.0)) with native GQL API enabled on port 8082 and RPC API enabled on port 8081.
* [postgraphile](https://github.com/vulcanize/postgraphile) ([v1.1.1](https://github.com/vulcanize/postgraphile/releases/tag/v1.1.1)) on the `vulcanize/ipld-eth-server` database, on port 5000
* Deploy `Example` contract:

View File

@ -34,7 +34,7 @@ createdb address-watcher
Update `environments/local.toml` with database connection settings for both the databases.
Update the `upstream` config in `environments/local.toml` and provide the `ipld-eth-server` GQL API, the `indexer-db` postgraphile and the tracing API (`debug_traceTransaction` RPC provider) endpoints.
Update the `upstream` config in `environments/local.toml` and provide the `ipld-eth-server` GQL API and the tracing API (`debug_traceTransaction` RPC provider) endpoints.
## Run

View File

@ -17,7 +17,6 @@
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
[upstream.cache]
name = "requests"

View File

@ -55,14 +55,13 @@ export const main = async (): Promise<any> => {
await db.init();
assert(upstream, 'Missing upstream config');
const { ethServer: { gqlPostgraphileEndpoint }, traceProviderEndpoint, cache: cacheConfig } = upstream;
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const { ethServer: { gqlApiEndpoint }, traceProviderEndpoint, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(traceProviderEndpoint, 'Missing upstream traceProviderEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
gqlEndpoint: gqlApiEndpoint,
cache
});

View File

@ -42,15 +42,13 @@ export const main = async (): Promise<any> => {
await db.init();
assert(upstream, 'Missing upstream config');
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint }, traceProviderEndpoint, cache: cacheConfig } = upstream;
const { ethServer: { gqlApiEndpoint }, traceProviderEndpoint, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
assert(traceProviderEndpoint, 'Missing upstream traceProviderEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});

View File

@ -50,15 +50,13 @@ export const main = async (): Promise<any> => {
await db.init();
assert(upstream, 'Missing upstream config');
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint }, traceProviderEndpoint, cache: cacheConfig } = upstream;
const { ethServer: { gqlApiEndpoint }, traceProviderEndpoint, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
assert(traceProviderEndpoint, 'Missing upstream traceProviderEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});

View File

@ -67,11 +67,12 @@ export class TxWatcher {
}
});
this._watchTxSubscription = await this._ethClient.watchTransactions(async (value) => {
const { txHash, ethHeaderCidByHeaderId: { blockHash, blockNumber } } = _.get(value, 'data.listen.relatedNode');
log('watchTransaction', JSON.stringify({ txHash, blockHash, blockNumber }, null, 2));
await this._jobQueue.pushJob(QUEUE_TX_TRACING, { txHash, blockHash, publish: true });
});
// TODO: Update to pull based watcher.
// this._watchTxSubscription = await this._ethClient.watchTransactions(async (value) => {
// const { txHash, ethHeaderCidByHeaderId: { blockHash, blockNumber } } = _.get(value, 'data.listen.relatedNode');
// log('watchTransaction', JSON.stringify({ txHash, blockHash, blockNumber }, null, 2));
// await this._jobQueue.pushJob(QUEUE_TX_TRACING, { txHash, blockHash, publish: true });
// });
}
async publishAddressEventToSubscribers (txHash: string, timeElapsedInSeconds: number): Promise<void> {

View File

@ -41,7 +41,7 @@ const main = async (): Promise<void> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -49,7 +49,7 @@ const main = async (): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -60,7 +60,7 @@ const main = async (): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);

View File

@ -30,7 +30,6 @@
[upstream]
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081"
blockDelayInMilliSecs = 2000

View File

@ -32,7 +32,7 @@ export class EventWatcher implements EventWatcherInterface {
_pubsub: PubSub
_jobQueue: JobQueue
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, postgraphileClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient);
assert(indexer);
@ -40,7 +40,7 @@ export class EventWatcher implements EventWatcherInterface {
this._indexer = indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, postgraphileClient, this._indexer, this._pubsub, this._jobQueue);
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue);
}
getEventIterator (): AsyncIterator<any> {

View File

@ -38,7 +38,7 @@ const main = async (): Promise<void> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -46,7 +46,7 @@ const main = async (): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -57,7 +57,7 @@ const main = async (): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);

View File

@ -53,7 +53,7 @@ export const main = async (): Promise<any> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -61,7 +61,7 @@ export const main = async (): Promise<any> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -72,7 +72,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
@ -84,7 +84,7 @@ export const main = async (): Promise<any> => {
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv);
};

View File

@ -42,7 +42,7 @@ export const main = async (): Promise<any> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -50,7 +50,7 @@ export const main = async (): Promise<any> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
@ -65,7 +65,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
@ -73,7 +73,7 @@ export const main = async (): Promise<any> => {
await graphWatcher.init();
{{/if}}
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
// Import data.
const importFilePath = path.resolve(argv.importFile);

View File

@ -99,7 +99,6 @@ export class Indexer implements IPLDIndexerInterface {
_db: Database
_ethClient: EthClient
_ethProvider: BaseProvider
_postgraphileClient: EthClient
_baseIndexer: BaseIndexer
_serverConfig: ServerConfig
_graphWatcher: GraphWatcher;
@ -113,18 +112,16 @@ export class Indexer implements IPLDIndexerInterface {
_entityTypesMap: Map<string, { [key: string]: string }>
_relationsMap: Map<any, { [key: string]: any }>
constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, graphWatcher: GraphWatcher) {
constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, graphWatcher: GraphWatcher) {
assert(db);
assert(ethClient);
assert(postgraphileClient);
this._db = db;
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._ethProvider = ethProvider;
this._serverConfig = serverConfig;
this._ipfsClient = new IPFSClient(this._serverConfig.ipfsApiAddr);
this._baseIndexer = new BaseIndexer(this._serverConfig, this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue, this._ipfsClient);
this._baseIndexer = new BaseIndexer(this._serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue, this._ipfsClient);
this._graphWatcher = graphWatcher;
this._abiMap = new Map();
@ -707,7 +704,7 @@ export class Indexer implements IPLDIndexerInterface {
assert(blockHash);
const logsPromise = this._ethClient.getLogs({ blockHash });
const transactionsPromise = this._postgraphileClient.getBlockWithTransactions({ blockHash });
const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash });
let [
{ block, logs },

View File

@ -38,7 +38,7 @@ const main = async (): Promise<void> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -46,7 +46,7 @@ const main = async (): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -57,7 +57,7 @@ const main = async (): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);

View File

@ -248,7 +248,7 @@ export const main = async (): Promise<any> => {
.argv;
const config: Config = await getConfig(argv.f);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -256,7 +256,7 @@ export const main = async (): Promise<any> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -267,7 +267,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);

View File

@ -45,7 +45,7 @@
* Update the database connection settings.
* Update the `upstream` config and provide the `ipld-eth-server` GQL API and the `indexer-db` postgraphile endpoints.
* Update the `upstream` config and provide the `ipld-eth-server` GQL API endpoint.
* Update the `server` config with state checkpoint settings and provide the IPFS API address.

View File

@ -33,7 +33,7 @@ export const builder = {
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
await resetJobs(config);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
// Initialize database.
const db = new Database(config.database);
@ -42,7 +42,7 @@ export const handler = async (argv: any): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -53,7 +53,7 @@ export const handler = async (argv: any): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);

View File

@ -36,7 +36,7 @@ export const main = async (): Promise<any> => {
.argv;
const config: Config = await getConfig(argv.f);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const { host, port, kind: watcherKind } = config.server;
@ -46,7 +46,7 @@ export const main = async (): Promise<any> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
@ -60,7 +60,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
@ -68,7 +68,7 @@ export const main = async (): Promise<any> => {
await graphWatcher.init();
{{/if}}
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
if (watcherKind === KIND_ACTIVE) {
await jobQueue.start();

View File

@ -54,7 +54,7 @@ const main = async (): Promise<void> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -62,7 +62,7 @@ const main = async (): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -73,7 +73,7 @@ const main = async (): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);

View File

@ -45,7 +45,7 @@
* Update the database connection settings.
* Update the `upstream` config and provide the `ipld-eth-server` GQL API and the `indexer-db` postgraphile endpoints.
* Update the `upstream` config and provide the `ipld-eth-server` GQL API endpoint.
* Update the `server` config with state checkpoint settings and provide the IPFS API address.

View File

@ -28,7 +28,6 @@
[upstream]
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081"
blockDelayInMilliSecs = 2000

View File

@ -41,7 +41,7 @@ const main = async (): Promise<void> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -49,7 +49,7 @@ const main = async (): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -60,7 +60,7 @@ const main = async (): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);

View File

@ -38,7 +38,7 @@ const main = async (): Promise<void> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -46,7 +46,7 @@ const main = async (): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -57,7 +57,7 @@ const main = async (): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);

View File

@ -42,7 +42,7 @@ export const main = async (): Promise<any> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -50,7 +50,7 @@ export const main = async (): Promise<any> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
@ -65,13 +65,13 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
// Import data.
const importFilePath = path.resolve(argv.importFile);

View File

@ -38,7 +38,7 @@ const main = async (): Promise<void> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -46,7 +46,7 @@ const main = async (): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -57,7 +57,7 @@ const main = async (): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);

View File

@ -47,7 +47,7 @@ export const builder = {
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
await resetJobs(config);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
// Initialize database.
const db = new Database(config.database);
@ -56,7 +56,7 @@ export const handler = async (argv: any): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -67,7 +67,7 @@ export const handler = async (argv: any): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);

View File

@ -54,7 +54,7 @@ const main = async (): Promise<void> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -62,7 +62,7 @@ const main = async (): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -73,7 +73,7 @@ const main = async (): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);

View File

@ -32,7 +32,7 @@ export class EventWatcher implements EventWatcherInterface {
_pubsub: PubSub
_jobQueue: JobQueue
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, postgraphileClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient);
assert(indexer);
@ -40,7 +40,7 @@ export class EventWatcher implements EventWatcherInterface {
this._indexer = indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, postgraphileClient, this._indexer, this._pubsub, this._jobQueue);
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue);
}
getEventIterator (): AsyncIterator<any> {

View File

@ -53,7 +53,7 @@ export const main = async (): Promise<any> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -61,7 +61,7 @@ export const main = async (): Promise<any> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -72,7 +72,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
@ -82,7 +82,7 @@ export const main = async (): Promise<any> => {
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv);
};

View File

@ -130,7 +130,6 @@ export class Indexer implements IPLDIndexerInterface {
_db: Database
_ethClient: EthClient
_ethProvider: BaseProvider
_postgraphileClient: EthClient
_baseIndexer: BaseIndexer
_serverConfig: ServerConfig
_graphWatcher: GraphWatcher;
@ -144,18 +143,16 @@ export class Indexer implements IPLDIndexerInterface {
_entityTypesMap: Map<string, { [key: string]: string }>
_relationsMap: Map<any, { [key: string]: any }>
constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, graphWatcher: GraphWatcher) {
constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, graphWatcher: GraphWatcher) {
assert(db);
assert(ethClient);
assert(postgraphileClient);
this._db = db;
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._ethProvider = ethProvider;
this._serverConfig = serverConfig;
this._ipfsClient = new IPFSClient(this._serverConfig.ipfsApiAddr);
this._baseIndexer = new BaseIndexer(this._serverConfig, this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue, this._ipfsClient);
this._baseIndexer = new BaseIndexer(this._serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue, this._ipfsClient);
this._graphWatcher = graphWatcher;
this._abiMap = new Map();
@ -1362,7 +1359,7 @@ export class Indexer implements IPLDIndexerInterface {
assert(blockHash);
const logsPromise = this._ethClient.getLogs({ blockHash });
const transactionsPromise = this._postgraphileClient.getBlockWithTransactions({ blockHash });
const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash });
let [
{ block, logs },

View File

@ -248,7 +248,7 @@ export const main = async (): Promise<any> => {
.argv;
const config: Config = await getConfig(argv.f);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -256,7 +256,7 @@ export const main = async (): Promise<any> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -267,7 +267,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);

View File

@ -36,7 +36,7 @@ export const main = async (): Promise<any> => {
.argv;
const config: Config = await getConfig(argv.f);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const { host, port, kind: watcherKind } = config.server;
@ -46,7 +46,7 @@ export const main = async (): Promise<any> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
@ -60,13 +60,13 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
if (watcherKind === KIND_ACTIVE) {
await jobQueue.start();

View File

@ -46,12 +46,11 @@ Update `environments/local.toml` with database connection settings for both the
dbConnectionString = "postgres://postgres:postgres@localhost/erc20-watcher-job-queue"
```
Update the `upstream` config in `environments/local.toml`. Provide the `ipld-eth-server` GQL and RPC API and the `indexer-db` postgraphile endpoints.
Update the `upstream` config in `environments/local.toml`. Provide the `ipld-eth-server` GQL and RPC API endpoints.
```toml
[upstream]
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081"
```

View File

@ -17,7 +17,6 @@
[upstream]
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081"
blockDelayInMilliSecs = 2000

View File

@ -29,7 +29,7 @@ export const builder = {
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
await resetJobs(config);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
// Initialize database.
const db = new Database(config.database);
@ -43,7 +43,7 @@ export const handler = async (argv: any): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, config.server.mode);
const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, config.server.mode);
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');

View File

@ -44,7 +44,7 @@ import { CONTRACT_KIND } from '../utils/index';
const config: Config = await getConfig(argv.configFile);
const { database: dbConfig, server: { mode }, jobQueue: jobQueueConfig } = config;
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
assert(dbConfig);
@ -59,7 +59,7 @@ import { CONTRACT_KIND } from '../utils/index';
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, mode);
const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, mode);
await indexer.watchContract(argv.address, CONTRACT_KIND, argv.checkpoint, argv.startingBlock);

View File

@ -31,7 +31,7 @@ export class EventWatcher {
_pubsub: PubSub
_jobQueue: JobQueue
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, postgraphileClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient);
assert(indexer);
@ -39,7 +39,7 @@ export class EventWatcher {
this._indexer = indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, postgraphileClient, this._indexer, this._pubsub, this._jobQueue);
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue);
}
getEventIterator (): AsyncIterator<any> {

View File

@ -54,7 +54,7 @@ export const main = async (): Promise<any> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -72,9 +72,9 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, config.server.mode);
const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, config.server.mode);
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv);
};

View File

@ -44,7 +44,6 @@ interface EventResult {
export class Indexer implements IndexerInterface {
_db: Database
_ethClient: EthClient
_postgraphileClient: EthClient
_ethProvider: BaseProvider
_baseIndexer: BaseIndexer
@ -53,17 +52,15 @@ export class Indexer implements IndexerInterface {
_contract: ethers.utils.Interface
_serverMode: string
constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, serverMode: string) {
constructor (db: Database, ethClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, serverMode: string) {
assert(db);
assert(ethClient);
assert(postgraphileClient);
this._db = db;
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._ethProvider = ethProvider;
this._serverMode = serverMode;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue);
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._ethProvider, jobQueue);
const { abi, storageLayout } = artifacts;

View File

@ -68,7 +68,7 @@ export const main = async (): Promise<any> => {
.argv;
const config: Config = await getConfig(argv.f);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -82,7 +82,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, config.server.mode);
const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, config.server.mode);
await indexer.init();
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);

View File

@ -36,7 +36,7 @@ export const main = async (): Promise<any> => {
.argv;
const config: Config = await getConfig(argv.f);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const { host, port, mode, kind: watcherKind } = config.server;
@ -55,10 +55,10 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, mode);
const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, mode);
await indexer.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
if (watcherKind === KIND_ACTIVE) {
await jobQueue.start();

View File

@ -28,7 +28,7 @@ interface DataSource {
export class GraphWatcher {
_database: Database;
_indexer?: IndexerInterface;
_postgraphileClient: EthClient;
_ethClient: EthClient;
_ethProvider: providers.BaseProvider;
_subgraphPath: string;
_wasmRestartBlocksInterval: number;
@ -38,9 +38,9 @@ export class GraphWatcher {
_context: Context = {};
constructor (database: Database, postgraphileClient: EthClient, ethProvider: providers.BaseProvider, serverConfig: ServerConfig) {
constructor (database: Database, ethClient: EthClient, ethProvider: providers.BaseProvider, serverConfig: ServerConfig) {
this._database = database;
this._postgraphileClient = postgraphileClient;
this._ethClient = ethClient;
this._ethProvider = ethProvider;
this._subgraphPath = serverConfig.subgraphPath;
this._wasmRestartBlocksInterval = serverConfig.wasmRestartBlocksInterval;
@ -124,7 +124,7 @@ export class GraphWatcher {
const { contract, event, eventSignature, block, tx: { hash: txHash }, eventIndex } = eventData;
if (!this._context.block) {
this._context.block = await getFullBlock(this._postgraphileClient, this._ethProvider, block.hash);
this._context.block = await getFullBlock(this._ethClient, this._ethProvider, block.hash);
}
const blockData = this._context.block;
@ -184,7 +184,7 @@ export class GraphWatcher {
}
async handleBlock (blockHash: string) {
const blockData = await getFullBlock(this._postgraphileClient, this._ethProvider, blockHash);
const blockData = await getFullBlock(this._ethClient, this._ethProvider, blockHash);
this._context.block = blockData;
@ -306,7 +306,7 @@ export class GraphWatcher {
return transaction;
}
transaction = await getFullTransaction(this._postgraphileClient, txHash);
transaction = await getFullTransaction(this._ethClient, txHash);
assert(transaction);
this._transactionsMap.set(txHash, transaction);

View File

@ -6,7 +6,7 @@ dataSources:
name: Example1
network: mainnet
source:
address: "0xD5567AFC3C6c1325698F27d97b74D9ea9c44295e"
address: "0x1ca7c995f8eF0A2989BbcE08D5B7Efe50A584aa1"
abi: Example1
startBlock: 10
mapping:

View File

@ -45,7 +45,7 @@
* Update the database connection settings.
* Update the `upstream` config and provide the `ipld-eth-server` GQL API and the `indexer-db` postgraphile endpoints.
* Update the `upstream` config and provide the `ipld-eth-server` GQL API endpoint.
* Update the `server` config with state checkpoint settings and provide the IPFS API address.

View File

@ -28,7 +28,6 @@
[upstream]
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081"
blockDelayInMilliSecs = 2000

View File

@ -41,7 +41,7 @@ const main = async (): Promise<void> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -49,7 +49,7 @@ const main = async (): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -60,7 +60,7 @@ const main = async (): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);

View File

@ -38,7 +38,7 @@ const main = async (): Promise<void> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -46,7 +46,7 @@ const main = async (): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -57,7 +57,7 @@ const main = async (): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);

View File

@ -42,7 +42,7 @@ export const main = async (): Promise<any> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -50,7 +50,7 @@ export const main = async (): Promise<any> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
@ -65,13 +65,13 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
// Import data.
const importFilePath = path.resolve(argv.importFile);

View File

@ -38,7 +38,7 @@ const main = async (): Promise<void> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -46,7 +46,7 @@ const main = async (): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -57,7 +57,7 @@ const main = async (): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);

View File

@ -35,7 +35,7 @@ export const builder = {
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
await resetJobs(config);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
// Initialize database.
const db = new Database(config.database);
@ -44,7 +44,7 @@ export const handler = async (argv: any): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -55,7 +55,7 @@ export const handler = async (argv: any): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);

View File

@ -54,7 +54,7 @@ const main = async (): Promise<void> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -62,7 +62,7 @@ const main = async (): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -73,7 +73,7 @@ const main = async (): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);

View File

@ -32,7 +32,7 @@ export class EventWatcher implements EventWatcherInterface {
_pubsub: PubSub
_jobQueue: JobQueue
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, postgraphileClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient);
assert(indexer);
@ -40,7 +40,7 @@ export class EventWatcher implements EventWatcherInterface {
this._indexer = indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, postgraphileClient, this._indexer, this._pubsub, this._jobQueue);
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue);
}
getEventIterator (): AsyncIterator<any> {

View File

@ -53,7 +53,7 @@ export const main = async (): Promise<any> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -61,7 +61,7 @@ export const main = async (): Promise<any> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -72,7 +72,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
@ -82,7 +82,7 @@ export const main = async (): Promise<any> => {
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv);
};

View File

@ -97,7 +97,6 @@ export class Indexer implements IPLDIndexerInterface {
_db: Database
_ethClient: EthClient
_ethProvider: BaseProvider
_postgraphileClient: EthClient
_baseIndexer: BaseIndexer
_serverConfig: ServerConfig
_graphWatcher: GraphWatcher;
@ -111,18 +110,16 @@ export class Indexer implements IPLDIndexerInterface {
_entityTypesMap: Map<string, { [key: string]: string }>
_relationsMap: Map<any, { [key: string]: any }>
constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, graphWatcher: GraphWatcher) {
constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, graphWatcher: GraphWatcher) {
assert(db);
assert(ethClient);
assert(postgraphileClient);
this._db = db;
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._ethProvider = ethProvider;
this._serverConfig = serverConfig;
this._ipfsClient = new IPFSClient(this._serverConfig.ipfsApiAddr);
this._baseIndexer = new BaseIndexer(this._serverConfig, this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue, this._ipfsClient);
this._baseIndexer = new BaseIndexer(this._serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue, this._ipfsClient);
this._graphWatcher = graphWatcher;
this._abiMap = new Map();
@ -759,7 +756,7 @@ export class Indexer implements IPLDIndexerInterface {
assert(blockHash);
const logsPromise = this._ethClient.getLogs({ blockHash });
const transactionsPromise = this._postgraphileClient.getBlockWithTransactions({ blockHash });
const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash });
let [
{ block, logs },

View File

@ -248,7 +248,7 @@ export const main = async (): Promise<any> => {
.argv;
const config: Config = await getConfig(argv.f);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -256,7 +256,7 @@ export const main = async (): Promise<any> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -267,7 +267,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);

View File

@ -36,7 +36,7 @@ export const main = async (): Promise<any> => {
.argv;
const config: Config = await getConfig(argv.f);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const { host, port, kind: watcherKind } = config.server;
@ -46,7 +46,7 @@ export const main = async (): Promise<any> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
@ -60,13 +60,13 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
if (watcherKind === KIND_ACTIVE) {
await jobQueue.start();

View File

@ -76,7 +76,7 @@ export class EthClient {
return this._graphqlClient.query(
ethQueries.getBlocks,
{
blockNumber,
blockNumber: blockNumber?.toString(),
blockHash
}
);
@ -86,7 +86,7 @@ export class EthClient {
return this._graphqlClient.query(
ethQueries.getFullBlocks,
{
blockNumber,
blockNumber: blockNumber?.toString(),
blockHash
}
);
@ -132,14 +132,6 @@ export class EthClient {
return { logs, block };
}
async watchBlocks (onNext: (value: any) => void): Promise<ZenObservable.Subscription> {
return this._graphqlClient.subscribe(ethQueries.subscribeBlocks, onNext);
}
async watchTransactions (onNext: (value: any) => void): Promise<ZenObservable.Subscription> {
return this._graphqlClient.subscribe(ethQueries.subscribeTransactions, onNext);
}
async _getCachedOrFetch (queryName: keyof typeof ethQueries, vars: Vars): Promise<any> {
const keyObj = {
queryName,

View File

@ -134,39 +134,6 @@ query block($blockHash: Bytes32) {
}
`;
export const subscribeBlocks = gql`
subscription {
listen(topic: "header_cids") {
relatedNode {
... on EthHeaderCid {
cid
blockHash
blockNumber
parentHash
timestamp
}
}
}
}
`;
export const subscribeTransactions = gql`
subscription SubscriptionHeader {
listen(topic: "transaction_cids") {
relatedNode {
... on EthTransactionCid {
txHash
ethHeaderCidByHeaderId {
blockHash
blockNumber
parentHash
}
}
}
}
}
`;
export default {
getStorageAt,
getLogs,
@ -174,7 +141,5 @@ export default {
getBlocks,
getFullBlocks,
getFullTransaction,
getBlockByHash,
subscribeBlocks,
subscribeTransactions
getBlockByHash
};

View File

@ -8,7 +8,6 @@
[upstream]
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
[upstream.cache]
name = "requests"

View File

@ -38,16 +38,18 @@ export class EventWatcher {
async watchBlocksAtChainHead (): Promise<void> {
log('Started watching upstream blocks...');
this._subscription = await this._ethClient.watchBlocks(async (value) => {
const { blockHash, blockNumber } = _.get(value, 'data.listen.relatedNode');
log('watchBlock', blockHash, blockNumber);
const events = await this._indexer.getOrFetchBlockEvents(blockHash);
// TODO: Update to pull based watcher.
// this._subscription = await this._ethClient.watchBlocks(async (value) => {
// const { blockHash, blockNumber } = _.get(value, 'data.listen.relatedNode');
// log('watchBlock', blockHash, blockNumber);
for (let ei = 0; ei < events.length; ei++) {
await this.publishLighthouseEventToSubscribers(events[ei]);
}
});
// const events = await this._indexer.getOrFetchBlockEvents(blockHash);
// for (let ei = 0; ei < events.length; ei++) {
// await this.publishLighthouseEventToSubscribers(events[ei]);
// }
// });
}
async publishLighthouseEventToSubscribers (resultEvent: ResultEvent): Promise<void> {

View File

@ -37,15 +37,13 @@ export interface Config extends BaseConfig {
export class Indexer {
_config: Config
_ethClient: EthClient
_postgraphileClient: EthClient
_lighthouseContract: ethers.utils.Interface
constructor (config: Config, ethClient: EthClient, postgraphileClient: EthClient) {
constructor (config: Config, ethClient: EthClient) {
assert(config.watch);
this._config = config;
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._lighthouseContract = new ethers.utils.Interface(lighthouseABI);
}
@ -96,7 +94,7 @@ export class Indexer {
}
]
}
} = await this._postgraphileClient.getBlockWithTransactions({ blockHash });
} = await this._ethClient.getBlockWithTransactions({ blockHash });
const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => {
acc[transaction.txHash] = transaction;

View File

@ -43,23 +43,16 @@ export const main = async (): Promise<any> => {
const { upstream } = config;
assert(upstream, 'Missing upstream config');
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint }, cache: cacheConfig } = upstream;
const { ethServer: { gqlApiEndpoint }, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});
const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});
const indexer = new Indexer(config, ethClient, postgraphileClient);
const indexer = new Indexer(config, ethClient);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries

View File

@ -18,7 +18,6 @@
[upstream]
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081"
blockDelayInMilliSecs = 2000

View File

@ -18,7 +18,6 @@
[upstream]
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8545"
blockDelayInMilliSecs = 2000

View File

@ -39,7 +39,7 @@ export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
await resetJobs(config);
const { jobQueue: jobQueueConfig } = config;
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
// Initialize database.
const db = new Database(config.database);
@ -61,7 +61,7 @@ export const handler = async (argv: any): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, config.server.mode);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, jobQueue, config.server.mode);
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');

View File

@ -125,12 +125,12 @@ export class EventWatcher implements EventWatcherInterface {
_jobQueue: JobQueue
_baseEventWatcher: BaseEventWatcher
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, postgraphileClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
this._ethClient = ethClient;
this._indexer = indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, postgraphileClient, this._indexer, this._pubsub, this._jobQueue);
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue);
}
getBlockProgressEventIterator (): AsyncIterator<any> {

View File

@ -56,7 +56,7 @@ export const main = async (): Promise<any> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -79,9 +79,9 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, config.server.mode);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, jobQueue, config.server.mode);
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, blockDelayInMilliSecs, argv);
};

View File

@ -44,23 +44,20 @@ export class Indexer implements IndexerInterface {
_uniClient: UniClient
_erc20Client: ERC20Client
_ethClient: EthClient
_postgraphileClient: EthClient
_baseIndexer: BaseIndexer
_isDemo: boolean
constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: providers.BaseProvider, jobQueue: JobQueue, mode: string) {
constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, ethProvider: providers.BaseProvider, jobQueue: JobQueue, mode: string) {
assert(db);
assert(uniClient);
assert(erc20Client);
assert(ethClient);
assert(postgraphileClient);
this._db = db;
this._uniClient = uniClient;
this._erc20Client = erc20Client;
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, ethProvider, jobQueue);
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, ethProvider, jobQueue);
this._isDemo = mode === 'demo';
}

View File

@ -73,7 +73,7 @@ export const main = async (): Promise<any> => {
.argv;
const config: Config = await getConfig(argv.f);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -95,7 +95,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, config.server.mode);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, jobQueue, config.server.mode);
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
await jobRunner.start();

View File

@ -38,7 +38,7 @@ export const main = async (): Promise<any> => {
.argv;
const config: Config = await getConfig(argv.f);
const { ethClient, postgraphileClient } = await initClients(config);
const { ethClient } = await initClients(config);
const { host, port, mode } = config.server;
@ -60,10 +60,10 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, mode);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, jobQueue, mode);
const pubSub = new PubSub();
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubSub, jobQueue);
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubSub, jobQueue);
await eventWatcher.start();
const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher);

View File

@ -15,7 +15,6 @@
[upstream]
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081"
blockDelayInMilliSecs = 2000

View File

@ -15,7 +15,6 @@
[upstream]
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8545"
blockDelayInMilliSecs = 2000

View File

@ -45,19 +45,12 @@ describe('chain pruning', () => {
// Create an Indexer object.
assert(upstream, 'Missing upstream config');
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream;
const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});
const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});
@ -68,7 +61,7 @@ describe('chain pruning', () => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue);
indexer = new Indexer(db, ethClient, ethProvider, jobQueue);
assert(indexer, 'Could not create indexer object.');
jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);

View File

@ -27,7 +27,7 @@ export const builder = {
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
await resetJobs(config);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
// Initialize database.
const db = new Database(config.database);
@ -40,7 +40,7 @@ export const handler = async (argv: any): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue);
const indexer = new Indexer(db, ethClient, ethProvider, jobQueue);
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');

View File

@ -49,7 +49,7 @@ import { Indexer } from '../indexer';
const config: Config = await getConfig(argv.configFile);
const { database: dbConfig, jobQueue: jobQueueConfig } = config;
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
assert(dbConfig);
@ -64,7 +64,7 @@ import { Indexer } from '../indexer';
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue);
const indexer = new Indexer(db, ethClient, ethProvider, jobQueue);
await indexer.init();
await indexer.watchContract(argv.address, argv.kind, argv.checkpoint, argv.startingBlock);

View File

@ -31,12 +31,12 @@ export class EventWatcher implements EventWatcherInterface {
_jobQueue: JobQueue
_baseEventWatcher: BaseEventWatcher
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, postgraphileClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
this._ethClient = ethClient;
this._indexer = indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, postgraphileClient, this._indexer, this._pubsub, this._jobQueue);
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue);
}
getEventIterator (): AsyncIterator<any> {

View File

@ -54,7 +54,7 @@ export const main = async (): Promise<any> => {
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -72,10 +72,10 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue);
const indexer = new Indexer(db, ethClient, ethProvider, jobQueue);
await indexer.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv);
};

View File

@ -38,7 +38,6 @@ type ResultEvent = {
export class Indexer implements IndexerInterface {
_db: Database
_ethClient: EthClient
_postgraphileClient: EthClient
_baseIndexer: BaseIndexer
_ethProvider: ethers.providers.BaseProvider
@ -46,12 +45,11 @@ export class Indexer implements IndexerInterface {
_poolContract: ethers.utils.Interface
_nfpmContract: ethers.utils.Interface
constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: ethers.providers.BaseProvider, jobQueue: JobQueue) {
constructor (db: Database, ethClient: EthClient, ethProvider: ethers.providers.BaseProvider, jobQueue: JobQueue) {
this._db = db;
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._ethProvider = ethProvider;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue);
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._ethProvider, jobQueue);
this._factoryContract = new ethers.utils.Interface(factoryABI);
this._poolContract = new ethers.utils.Interface(poolABI);
@ -433,7 +431,7 @@ export class Indexer implements IndexerInterface {
assert(blockHash);
const logsPromise = this._ethClient.getLogs({ blockHash });
const transactionsPromise = this._postgraphileClient.getBlockWithTransactions({ blockHash });
const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash });
let [
{ block, logs },

View File

@ -70,7 +70,7 @@ export const main = async (): Promise<any> => {
.argv;
const config: Config = await getConfig(argv.f);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
@ -84,7 +84,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue);
const indexer = new Indexer(db, ethClient, ethProvider, jobQueue);
await indexer.init();
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);

View File

@ -36,7 +36,7 @@ export const main = async (): Promise<any> => {
.argv;
const config: Config = await getConfig(argv.f);
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
const { host, port } = config.server;
@ -56,10 +56,10 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue);
const indexer = new Indexer(db, ethClient, ethProvider, jobQueue);
await indexer.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
await eventWatcher.start();
const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher);

View File

@ -64,7 +64,6 @@ describe('uni-watcher', () => {
let db: Database;
let uniClient: UniClient;
let ethClient: EthClient;
let postgraphileClient: EthClient;
let ethProvider: ethers.providers.JsonRpcProvider;
let jobQueue: JobQueue;
let signer: Signer;
@ -80,9 +79,8 @@ describe('uni-watcher', () => {
assert(host, 'Missing host.');
assert(port, 'Missing port.');
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream;
const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint.');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint.');
assert(rpcProviderEndpoint, 'Missing upstream ethServer.rpcProviderEndpoint.');
assert(cacheConfig, 'Missing dbConfig.');
@ -92,12 +90,6 @@ describe('uni-watcher', () => {
const cache = await getCache(cacheConfig);
ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});
postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});
@ -135,7 +127,7 @@ describe('uni-watcher', () => {
factory = new Contract(factoryContract.address, FACTORY_ABI, signer);
// Verifying with the db.
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue);
const indexer = new Indexer(db, ethClient, ethProvider, jobQueue);
await indexer.init();
assert(await indexer.isWatchedContract(factory.address), 'Factory contract not added to the database.');
});
@ -271,7 +263,7 @@ describe('uni-watcher', () => {
nfpm = new Contract(nfpmContract.address, NFPM_ABI, signer);
// Verifying with the db.
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue);
const indexer = new Indexer(db, ethClient, ethProvider, jobQueue);
await indexer.init();
assert(await indexer.isWatchedContract(nfpm.address), 'NFPM contract not added to the database.');
});

View File

@ -57,7 +57,7 @@ const main = async () => {
assert(host, 'Missing host.');
assert(port, 'Missing port.');
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
const { ethClient, ethProvider } = await initClients(config);
// Initialize uniClient.
const endpoint = `http://${host}:${port}/graphql`;
@ -81,7 +81,7 @@ const main = async () => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue);
const indexer = new Indexer(db, ethClient, ethProvider, jobQueue);
let factory: Contract;
// Checking whether factory is deployed.

View File

@ -63,9 +63,9 @@ export const processBlockByNumber = async (
});
if (!blocks.length) {
console.time('time:common#processBlockByNumber-postgraphile');
console.time('time:common#processBlockByNumber-ipld-eth-server');
blocks = await indexer.getBlocks({ blockNumber });
console.timeEnd('time:common#processBlockByNumber-postgraphile');
console.timeEnd('time:common#processBlockByNumber-ipld-eth-server');
}
if (blocks.length) {

View File

@ -40,7 +40,6 @@ export interface UpstreamConfig {
cache: CacheConfig,
ethServer: {
gqlApiEndpoint: string;
gqlPostgraphileEndpoint: string;
rpcProviderEndpoint: string;
blockDelayInMilliSecs: number;
}
@ -77,7 +76,6 @@ export const getConfig = async (configFile: string): Promise<Config> => {
export const initClients = async (config: Config): Promise<{
ethClient: EthClient,
postgraphileClient: EthClient,
ethProvider: BaseProvider
}> => {
const { database: dbConfig, upstream: upstreamConfig, server: serverConfig } = config;
@ -86,22 +84,15 @@ export const initClients = async (config: Config): Promise<{
assert(dbConfig, 'Missing database config');
assert(upstreamConfig, 'Missing upstream config');
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstreamConfig;
const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstreamConfig;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
assert(rpcProviderEndpoint, 'Missing upstream ethServer.rpcProviderEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});
const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});
@ -109,7 +100,6 @@ export const initClients = async (config: Config): Promise<{
return {
ethClient,
postgraphileClient,
ethProvider
};
};

View File

@ -21,17 +21,15 @@ export const BlockProgressEvent = 'block-progress-event';
export class EventWatcher {
_ethClient: EthClient
_postgraphileClient: EthClient
_indexer: IndexerInterface
_subscription?: ZenObservable.Subscription
_pubsub: PubSub
_jobQueue: JobQueue
_upstreamConfig: UpstreamConfig
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, postgraphileClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
this._upstreamConfig = upstreamConfig;
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._indexer = indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;

View File

@ -29,17 +29,15 @@ export interface ValueResult {
export class Indexer {
_db: DatabaseInterface;
_ethClient: EthClient;
_postgraphileClient: EthClient;
_getStorageAt: GetStorageAt;
_ethProvider: ethers.providers.BaseProvider;
_jobQueue: JobQueue;
_watchedContracts: { [key: string]: ContractInterface } = {};
constructor (db: DatabaseInterface, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: ethers.providers.BaseProvider, jobQueue: JobQueue) {
constructor (db: DatabaseInterface, ethClient: EthClient, ethProvider: ethers.providers.BaseProvider, jobQueue: JobQueue) {
this._db = db;
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._ethProvider = ethProvider;
this._jobQueue = jobQueue;
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
@ -127,7 +125,7 @@ export class Indexer {
async getBlocks (blockFilter: { blockNumber?: number, blockHash?: string }): Promise<any> {
assert(blockFilter.blockHash || blockFilter.blockNumber);
const result = await this._postgraphileClient.getBlocks(blockFilter);
const result = await this._ethClient.getBlocks(blockFilter);
const { allEthHeaderCids: { nodes: blocks } } = result;
if (!blocks.length) {

View File

@ -41,12 +41,11 @@ export class IPLDIndexer extends Indexer {
serverConfig: ServerConfig,
ipldDb: IPLDDatabaseInterface,
ethClient: EthClient,
postgraphileClient: EthClient,
ethProvider: ethers.providers.BaseProvider,
jobQueue: JobQueue,
ipfsClient: IPFSClient
) {
super(ipldDb, ethClient, postgraphileClient, ethProvider, jobQueue);
super(ipldDb, ethClient, ethProvider, jobQueue);
this._serverConfig = serverConfig;
this._ipldDb = ipldDb;

View File

@ -188,7 +188,7 @@ export const getFullBlock = async (ethClient: EthClient, ethProvider: providers.
const header = EthDecoder.decodeHeader(EthDecoder.decodeData(fullBlock.blockByMhKey.data));
assert(header);
// TODO: Calculate size from rlp encoded data provided by postgraphile.
// TODO: Calculate size from rlp encoded data.
// Get block info from JSON RPC API provided by ipld-eth-server.
const provider = ethProvider as providers.JsonRpcProvider;
const { size } = await provider.send('eth_getBlockByHash', [blockHash, false]);