Fill task queue for block range. (#90)

This commit is contained in:
Ashwin Phatak 2021-06-25 17:56:58 +05:30 committed by GitHub
parent 07805b6ae9
commit e9880693ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 103 additions and 3 deletions

View File

@ -6,6 +6,7 @@
"scripts": {
"server": "DEBUG=vulcanize:* nodemon src/server.ts -f environments/local.toml",
"job-runner": "DEBUG=vulcanize:* nodemon src/job-runner.ts -f environments/local.toml",
"fill": "DEBUG=vulcanize:* ts-node src/fill.ts -f environments/local.toml",
"test": "mocha -r ts-node/register src/**/*.spec.ts",
"lint": "eslint .",
"build": "tsc"

View File

@ -0,0 +1,96 @@
import assert from 'assert';
import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { Database } from './database';
import { getConfig } from './config';
import { JobQueue } from './job-queue';
import { QUEUE_TX_TRACING } from './tx-watcher';
const log = debug('vulcanize:server');
export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv)).parserConfiguration({
'parse-numbers': false
}).options({
configFile: {
alias: 'f',
type: 'string',
require: true,
demandOption: true,
describe: 'configuration file path (toml)'
},
startBlock: {
type: 'number',
require: true,
demandOption: true,
describe: 'Block number to start processing at'
},
endBlock: {
type: 'number',
require: true,
demandOption: true,
describe: 'Block number to stop processing at'
}
}).argv;
const config = await getConfig(argv.configFile);
assert(config.server, 'Missing server config');
const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;
assert(dbConfig, 'Missing database config');
const db = new Database(dbConfig);
await db.init();
assert(upstream, 'Missing upstream config');
const { gqlEndpoint, gqlSubscriptionEndpoint, traceProviderEndpoint, cache: cacheConfig } = upstream;
assert(gqlEndpoint, 'Missing upstream gqlEndpoint');
assert(gqlSubscriptionEndpoint, 'Missing upstream gqlSubscriptionEndpoint');
assert(traceProviderEndpoint, 'Missing upstream traceProviderEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({ gqlEndpoint, gqlSubscriptionEndpoint, cache });
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLag } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag });
await jobQueue.start();
for (let blockNumber = argv.startBlock; blockNumber <= argv.endBlock; blockNumber++) {
log(`Fill block ${blockNumber}`);
// TODO: Add pause between requests so as to not overwhelm the upsteam server.
const result = await ethClient.getBlockWithTransactions(blockNumber.toString());
const { allEthHeaderCids: { nodes: blockNodes } } = result;
for (let bi = 0; bi < blockNodes.length; bi++) {
const { blockHash, ethTransactionCidsByHeaderId: { nodes: txNodes } } = blockNodes[bi];
for (let ti = 0; ti < txNodes.length; ti++) {
const { txHash } = txNodes[ti];
log(`Filling block number ${blockNumber}, block hash ${blockHash}, tx hash ${txHash}`);
// Never push appearances from fill jobs to GQL subscribers, as this command can be run multiple times
// for the same block range, and/or process the same block in multiple different runs spread over a
// period of time. Also, the tx's are probably too old anyway for publishing.
await jobQueue.pushJob(QUEUE_TX_TRACING, { txHash, publish: false });
}
}
}
};
main().then(() => {
process.exit();
}).catch(err => {
log(err);
});

View File

@ -38,7 +38,8 @@ export class JobQueue {
async onComplete (queue: string, callback: JobCallback): Promise<string> {
return await this._boss.onComplete(queue, async (job: any) => {
log(`Job complete for queue ${queue} job ${job.id}...`);
const { id, data: { failed, createdOn } } = job;
log(`Job onComplete for queue ${queue} job ${id} created ${createdOn} success ${!failed}`);
await callback(job);
});
}

View File

@ -39,7 +39,8 @@ export class TxWatcher {
this._jobQueue.onComplete(QUEUE_TX_TRACING, async (job) => {
const { data: { request, failed, state, createdOn } } = job;
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
if (!failed && state === 'completed') {
log(`Job onComplete tx ${request.data.txHash} publish ${!!request.data.publish}`);
if (!failed && state === 'completed' && request.data.publish) {
// Check for max acceptable lag time between tracing request and sending results to live subscribers.
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
return await this.publishAddressEventToSubscribers(request.data.txHash, timeElapsedInSeconds);
@ -52,7 +53,7 @@ export class TxWatcher {
this._watchTxSubscription = await this._ethClient.watchTransactions(async (value) => {
const { txHash, ethHeaderCidByHeaderId: { blockHash, blockNumber } } = _.get(value, 'data.listen.relatedNode');
log('watchTransaction', JSON.stringify({ txHash, blockHash, blockNumber }, null, 2));
this._jobQueue.pushJob(QUEUE_TX_TRACING, { txHash });
await this._jobQueue.pushJob(QUEUE_TX_TRACING, { txHash, publish: true });
});
}

View File

@ -31,6 +31,7 @@ query allEthHeaderCids($blockNumber: BigInt) {
cid
blockNumber
blockHash
timestamp
ethTransactionCidsByHeaderId {
nodes {
cid