Block filler for uni-watcher. (#140)

This commit is contained in:
Ashwin Phatak 2021-07-15 13:10:07 +05:30 committed by GitHub
parent 7151521c3b
commit 70e88b1004
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 177 additions and 47 deletions

View File

@ -22,10 +22,12 @@
subscribersDir = "src/subscriber"
[upstream]
gqlEndpoint = "http://127.0.0.1:5000/graphql"
gqlSubscriptionEndpoint = "http://127.0.0.1:5000/graphql"
traceProviderEndpoint = "http://127.0.0.1:8545"
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
[upstream.cache]
name = "requests"
enabled = false

View File

@ -50,14 +50,16 @@ export const main = async (): Promise<any> => {
await db.init();
assert(upstream, 'Missing upstream config');
const { gqlEndpoint, gqlSubscriptionEndpoint, traceProviderEndpoint, cache: cacheConfig } = upstream;
assert(gqlEndpoint, 'Missing upstream gqlEndpoint');
assert(gqlSubscriptionEndpoint, 'Missing upstream gqlSubscriptionEndpoint');
const { ethServer: { gqlPostgraphileEndpoint }, traceProviderEndpoint, cache: cacheConfig } = upstream;
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
assert(traceProviderEndpoint, 'Missing upstream traceProviderEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({ gqlEndpoint, gqlSubscriptionEndpoint, cache });
const ethClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});
assert(jobQueueConfig, 'Missing job queue config');

View File

@ -37,14 +37,17 @@ export const main = async (): Promise<any> => {
await db.init();
assert(upstream, 'Missing upstream config');
const { gqlEndpoint, gqlSubscriptionEndpoint, traceProviderEndpoint, cache: cacheConfig } = upstream;
assert(gqlEndpoint, 'Missing upstream gqlEndpoint');
assert(gqlSubscriptionEndpoint, 'Missing upstream gqlSubscriptionEndpoint');
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint }, traceProviderEndpoint, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
assert(traceProviderEndpoint, 'Missing upstream traceProviderEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({ gqlEndpoint, gqlSubscriptionEndpoint, cache });
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});
const tracingClient = new TracingClient(traceProviderEndpoint);

View File

@ -45,14 +45,17 @@ export const main = async (): Promise<any> => {
await db.init();
assert(upstream, 'Missing upstream config');
const { gqlEndpoint, gqlSubscriptionEndpoint, traceProviderEndpoint, cache: cacheConfig } = upstream;
assert(gqlEndpoint, 'Missing upstream gqlEndpoint');
assert(gqlSubscriptionEndpoint, 'Missing upstream gqlSubscriptionEndpoint');
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint }, traceProviderEndpoint, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
assert(traceProviderEndpoint, 'Missing upstream traceProviderEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({ gqlEndpoint, gqlSubscriptionEndpoint, cache });
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});
const tracingClient = new TracingClient(traceProviderEndpoint);

View File

@ -22,8 +22,9 @@
subscribersDir = "src/subscriber"
[upstream]
gqlEndpoint = "http://127.0.0.1:8082/graphql"
gqlSubscriptionEndpoint = "http://127.0.0.1:5000/graphql"
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
[upstream.cache]
name = "requests"

View File

@ -47,13 +47,16 @@ export const main = async (): Promise<any> => {
await db.init();
assert(upstream, 'Missing upstream config');
const { gqlEndpoint, gqlSubscriptionEndpoint, cache: cacheConfig } = upstream;
assert(gqlEndpoint, 'Missing upstream gqlEndpoint');
assert(gqlSubscriptionEndpoint, 'Missing upstream gqlSubscriptionEndpoint');
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint }, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({ gqlEndpoint, gqlSubscriptionEndpoint, cache });
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries

View File

@ -67,7 +67,7 @@ export class EthClient {
const result = await this._getCachedOrFetch('getLogs', vars);
const { getLogs: resultLogs, block: { number: blockNumHex, timestamp: timestampHex } } = result;
const block = { hash: vars.blockHash, number: parseInt(blockNumHex, 16), timestamp: parseInt(timestampHex, 16) };
const logs = resultLogs.map((logEntry: any) => _.merge({}, logEntry, { transaction: { block }}));
const logs = resultLogs.map((logEntry: any) => _.merge({}, logEntry, { transaction: { block } }));
return { logs, block };
}

View File

@ -22,8 +22,9 @@
subscribersDir = "src/subscriber"
[upstream]
gqlEndpoint = "http://127.0.0.1:8082/graphql"
gqlSubscriptionEndpoint = "http://127.0.0.1:5000/graphql"
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
[upstream.cache]
name = "requests"

View File

@ -48,13 +48,26 @@ export const main = async (): Promise<any> => {
await db.init();
assert(upstream, 'Missing upstream config');
const { gqlEndpoint, gqlSubscriptionEndpoint, cache: cacheConfig, uniWatcher, tokenWatcher } = upstream;
assert(gqlEndpoint, 'Missing upstream gqlEndpoint');
assert(gqlSubscriptionEndpoint, 'Missing upstream gqlSubscriptionEndpoint');
const {
ethServer: {
gqlApiEndpoint,
gqlPostgraphileEndpoint
},
cache: cacheConfig,
uniWatcher,
tokenWatcher
} = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});
const ethClient = new EthClient({ gqlEndpoint, gqlSubscriptionEndpoint, cache });
const uniClient = new UniClient(uniWatcher);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.

View File

@ -22,8 +22,9 @@
subscribersDir = "src/subscriber"
[upstream]
gqlEndpoint = "http://127.0.0.1:8082/graphql"
gqlSubscriptionEndpoint = "http://127.0.0.1:5000/graphql"
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
[upstream.cache]
name = "requests"

View File

@ -0,0 +1,93 @@
import assert from 'assert';
import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { getConfig, JobQueue } from '@vulcanize/util';
import { Database } from './database';
import { QUEUE_BLOCK_PROCESSING } from './events';
const log = debug('vulcanize:server');
export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv)).parserConfiguration({
'parse-numbers': false
}).options({
configFile: {
alias: 'f',
type: 'string',
require: true,
demandOption: true,
describe: 'configuration file path (toml)'
},
startBlock: {
type: 'number',
require: true,
demandOption: true,
describe: 'Block number to start processing at'
},
endBlock: {
type: 'number',
require: true,
demandOption: true,
describe: 'Block number to stop processing at'
}
}).argv;
const config = await getConfig(argv.configFile);
assert(config.server, 'Missing server config');
const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;
assert(dbConfig, 'Missing database config');
const db = new Database(dbConfig);
await db.init();
assert(upstream, 'Missing upstream config');
const { ethServer: { gqlPostgraphileEndpoint }, cache: cacheConfig } = upstream;
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLag } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag });
await jobQueue.start();
for (let blockNumber = argv.startBlock; blockNumber <= argv.endBlock; blockNumber++) {
log(`Fill block ${blockNumber}`);
// TODO: Add pause between requests so as to not overwhelm the upsteam server.
const result = await ethClient.getBlockWithTransactions(blockNumber.toString());
const { allEthHeaderCids: { nodes: blockNodes } } = result;
for (let bi = 0; bi < blockNodes.length; bi++) {
const { blockHash, blockNumber } = blockNodes[bi];
const blockProgress = await db.getBlockProgress(blockHash);
if (blockProgress) {
log(`Block number ${blockNumber}, block hash ${blockHash} already known, skip filling`);
} else {
await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber });
}
}
}
};
main().then(() => {
process.exit();
}).catch(err => {
log(err);
});

View File

@ -37,13 +37,16 @@ export const main = async (): Promise<any> => {
await db.init();
assert(upstream, 'Missing upstream config');
const { gqlEndpoint, gqlSubscriptionEndpoint, cache: cacheConfig } = upstream;
assert(gqlEndpoint, 'Missing upstream gqlEndpoint');
assert(gqlSubscriptionEndpoint, 'Missing upstream gqlSubscriptionEndpoint');
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint }, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({ gqlEndpoint, gqlSubscriptionEndpoint, cache });
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});
const indexer = new Indexer(config, db, ethClient);

View File

@ -78,7 +78,7 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
const events = await indexer.getEventsInRange(fromBlockNumber, toBlockNumber);
return events.map(event => indexer.getResultEvent(event));
},
}
}
};
};

View File

@ -46,13 +46,16 @@ export const main = async (): Promise<any> => {
await db.init();
assert(upstream, 'Missing upstream config');
const { gqlEndpoint, gqlSubscriptionEndpoint, cache: cacheConfig } = upstream;
assert(gqlEndpoint, 'Missing upstream gqlEndpoint');
assert(gqlSubscriptionEndpoint, 'Missing upstream gqlSubscriptionEndpoint');
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint }, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({ gqlEndpoint, gqlSubscriptionEndpoint, cache });
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries

View File

@ -15,10 +15,12 @@ export interface Config {
};
database: ConnectionOptions;
upstream: {
gqlEndpoint: string;
gqlSubscriptionEndpoint: string;
traceProviderEndpoint: string;
cache: CacheConfig,
ethServer: {
gqlApiEndpoint: string;
gqlPostgraphileEndpoint: string;
}
traceProviderEndpoint: string;
uniWatcher: {
gqlEndpoint: string;
gqlSubscriptionEndpoint: string;