Parse batched transactions for mobymask-watcher events (#141)

* Remove using graph-node from mobymask-watcher

* Parse batched transactions for MobyMask events

* Update to use same ethers package version

* Add CLI for processing block out of order

* Fix job-runner for already processed blocks out of order
This commit is contained in:
nikugogoi 2022-07-11 11:29:33 +05:30 committed by GitHub
parent 74f798b5c4
commit 3cee10607e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 948 additions and 1118 deletions

View File

@ -34,7 +34,7 @@
"@vulcanize/util": "^0.1.0",
"apollo-server-express": "^2.25.0",
"debug": "^4.3.1",
"ethers": "^5.2.0",
"ethers": "^5.4.4",
"express": "^4.17.1",
"graphql": "^15.5.0",
"lodash": "^4.17.21",

View File

@ -22,7 +22,7 @@
"dependencies": {
"canonical-json": "^0.0.4",
"debug": "^4.3.1",
"ethers": "^5.2.0",
"ethers": "^5.4.4",
"fs-extra": "^10.0.0",
"level": "^7.0.0"
},

View File

@ -29,7 +29,7 @@
"homepage": "https://github.com/vulcanize/watcher-ts#readme",
"dependencies": {
"@apollo/client": "^3.3.19",
"@ethersproject/providers": "5.3.0",
"@ethersproject/providers": "^5.4.4",
"@ipld/dag-cbor": "^6.0.12",
"@vulcanize/ipld-eth-client": "^0.1.0",
"@vulcanize/solidity-mapper": "^0.1.0",
@ -38,7 +38,7 @@
"apollo-server-express": "^2.25.0",
"apollo-type-bigint": "^0.1.3",
"debug": "^4.3.1",
"ethers": "^5.2.0",
"ethers": "^5.4.4",
"express": "^4.17.1",
"graphql": "^15.5.0",
"graphql-import-node": "^0.0.4",

View File

@ -29,7 +29,7 @@
"homepage": "https://github.com/vulcanize/watcher-ts#readme",
"dependencies": {
"@apollo/client": "^3.3.19",
"@ethersproject/providers": "5.3.0",
"@ethersproject/providers": "^5.4.4",
"@ipld/dag-cbor": "^6.0.12",
"@vulcanize/ipld-eth-client": "^0.1.0",
"@vulcanize/solidity-mapper": "^0.1.0",
@ -38,7 +38,7 @@
"apollo-server-express": "^2.25.0",
"apollo-type-bigint": "^0.1.3",
"debug": "^4.3.1",
"ethers": "^5.2.0",
"ethers": "^5.4.4",
"express": "^4.17.1",
"graphql": "^15.5.0",
"graphql-import-node": "^0.0.4",

View File

@ -42,7 +42,7 @@
"homepage": "https://github.com/vulcanize/watcher-ts#readme",
"dependencies": {
"@apollo/client": "^3.3.19",
"@ethersproject/providers": "5.3.0",
"@ethersproject/providers": "^5.4.4",
"@types/lodash": "^4.14.168",
"@vulcanize/cache": "^0.1.0",
"@vulcanize/ipld-eth-client": "^0.1.0",
@ -51,7 +51,7 @@
"apollo-server-express": "^2.25.0",
"apollo-type-bigint": "^0.1.3",
"debug": "^4.3.1",
"ethers": "^5.2.0",
"ethers": "^5.4.4",
"express": "^4.17.1",
"graphql": "^15.5.0",
"graphql-import-node": "^0.0.4",

View File

@ -34,7 +34,7 @@
"homepage": "https://github.com/vulcanize/watcher-ts#readme",
"dependencies": {
"@apollo/client": "^3.3.19",
"@ethersproject/providers": "5.3.0",
"@ethersproject/providers": "^5.4.4",
"@ipld/dag-cbor": "^6.0.12",
"@vulcanize/ipld-eth-client": "^0.1.0",
"@vulcanize/solidity-mapper": "^0.1.0",
@ -43,7 +43,7 @@
"apollo-server-express": "^2.25.0",
"apollo-type-bigint": "^0.1.3",
"debug": "^4.3.1",
"ethers": "^5.2.0",
"ethers": "^5.4.4",
"express": "^4.17.1",
"graphql": "^15.5.0",
"graphql-import-node": "^0.0.4",

View File

@ -21,7 +21,7 @@
"eslint-plugin-node": "^11.1.0",
"eslint-plugin-promise": "^5.1.0",
"eslint-plugin-standard": "^5.0.0",
"ethers": "^5.2.0",
"ethers": "^5.4.4",
"hardhat": "^2.3.0",
"mocha": "^8.4.0",
"nodemon": "^2.0.7",

View File

@ -29,7 +29,7 @@
"homepage": "https://github.com/vulcanize/watcher-ts#readme",
"dependencies": {
"@apollo/client": "^3.3.19",
"@ethersproject/providers": "5.3.0",
"@ethersproject/providers": "^5.4.4",
"@ipld/dag-cbor": "^6.0.12",
"@vulcanize/cache": "^0.1.0",
"@vulcanize/graph-node": "^0.1.0",
@ -40,7 +40,7 @@
"apollo-type-bigint": "^0.1.3",
"debug": "^4.3.1",
"decimal.js": "^10.3.1",
"ethers": "^5.2.0",
"ethers": "^5.4.4",
"express": "^4.17.1",
"graphql": "^15.5.0",
"graphql-import-node": "^0.0.4",

View File

@ -23,7 +23,7 @@
"@apollo/client": "^3.3.19",
"@vulcanize/cache": "^0.1.0",
"cross-fetch": "^3.1.4",
"ethers": "^5.2.0",
"ethers": "^5.4.4",
"graphql": "^15.5.0",
"left-pad": "^1.3.0",
"subscriptions-transport-ws": "^0.9.19",

View File

@ -22,7 +22,7 @@
"apollo-server-express": "^2.25.0",
"apollo-type-bigint": "^0.1.3",
"debug": "^4.3.1",
"ethers": "^5.2.0",
"ethers": "^5.4.4",
"express": "^4.17.1",
"json-bigint": "^1.0.0",
"lodash": "^4.17.21",

View File

@ -15,7 +15,8 @@
"checkpoint": "DEBUG=vulcanize:* ts-node src/cli/checkpoint.ts",
"export-state": "DEBUG=vulcanize:* ts-node src/cli/export-state.ts",
"import-state": "DEBUG=vulcanize:* ts-node src/cli/import-state.ts",
"inspect-cid": "DEBUG=vulcanize:* ts-node src/cli/inspect-cid.ts"
"inspect-cid": "DEBUG=vulcanize:* ts-node src/cli/inspect-cid.ts",
"index-block": "DEBUG=vulcanize:* ts-node src/cli/index-block.ts"
},
"repository": {
"type": "git",
@ -29,16 +30,15 @@
"homepage": "https://github.com/vulcanize/watcher-ts#readme",
"dependencies": {
"@apollo/client": "^3.3.19",
"@ethersproject/providers": "5.3.0",
"@ethersproject/providers": "^5.4.4",
"@ipld/dag-cbor": "^6.0.12",
"@vulcanize/ipld-eth-client": "^0.1.0",
"@vulcanize/solidity-mapper": "^0.1.0",
"@vulcanize/util": "^0.1.0",
"@vulcanize/graph-node": "^0.1.0",
"apollo-server-express": "^2.25.0",
"apollo-type-bigint": "^0.1.3",
"debug": "^4.3.1",
"ethers": "^5.2.0",
"ethers": "^5.4.4",
"express": "^4.17.1",
"graphql": "^15.5.0",
"graphql-import-node": "^0.0.4",

View File

@ -2,14 +2,12 @@
// Copyright 2021 Vulcanize, Inc.
//
import path from 'path';
import yargs from 'yargs';
import 'reflect-metadata';
import debug from 'debug';
import assert from 'assert';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import { Database } from '../database';
import { Indexer } from '../indexer';
@ -46,11 +44,6 @@ const main = async (): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -60,11 +53,9 @@ const main = async (): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue);
await indexer.init();
graphWatcher.setIndexer(indexer);
const blockHash = await indexer.processCLICheckpoint(argv.address, argv.blockHash);
log(`Created a checkpoint for contract ${argv.address} at block-hash ${blockHash}`);

View File

@ -10,7 +10,6 @@ import fs from 'fs';
import path from 'path';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, StateKind } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import * as codec from '@ipld/dag-cbor';
import { Database } from '../database';
@ -43,11 +42,6 @@ const main = async (): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -57,11 +51,9 @@ const main = async (): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue);
await indexer.init();
graphWatcher.setIndexer(indexer);
const exportData: any = {
snapshotBlock: {},
contracts: [],

View File

@ -12,7 +12,6 @@ import fs from 'fs';
import path from 'path';
import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients, StateKind } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import * as codec from '@ipld/dag-cbor';
import { Database } from '../database';
@ -47,11 +46,6 @@ export const main = async (): Promise<any> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
@ -65,11 +59,9 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue);
await indexer.init();
graphWatcher.setIndexer(indexer);
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
// Import data.

View File

@ -0,0 +1,178 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import yargs from 'yargs';
import 'reflect-metadata';
import debug from 'debug';
import assert from 'assert';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, OrderDirection, UNKNOWN_EVENT_NAME } from '@vulcanize/util';
import { Database } from '../database';
import { Indexer } from '../indexer';
import { BlockProgress } from '../entity/BlockProgress';
import { Event } from '../entity/Event';
const DEFAULT_EVENTS_IN_BATCH = 50;
const log = debug('vulcanize:watch-contract');
const main = async (): Promise<void> => {
const argv = await yargs.parserConfiguration({
'parse-numbers': false
}).options({
configFile: {
alias: 'f',
type: 'string',
require: true,
demandOption: true,
describe: 'Configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
},
block: {
type: 'number',
require: true,
demandOption: true,
describe: 'Block number to index'
}
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue);
await indexer.init();
let blockProgressEntities: Partial<BlockProgress>[] = await indexer.getBlocksAtHeight(argv.block, false);
if (!blockProgressEntities.length) {
console.time('time:index-block#getBlocks-ipld-eth-server');
const blocks = await indexer.getBlocks({ blockNumber: argv.block });
blockProgressEntities = blocks.map((block: any): Partial<BlockProgress> => {
block.blockTimestamp = block.timestamp;
return block;
});
console.timeEnd('time:index-block#getBlocks-ipld-eth-server');
}
assert(blockProgressEntities.length, `No blocks fetched for block number ${argv.block}.`);
for (let blockProgress of blockProgressEntities) {
// Check if blockProgress fetched from database.
if (!blockProgress.id) {
blockProgress = await indexer.fetchBlockEvents(blockProgress);
}
assert(blockProgress instanceof BlockProgress);
assert(indexer.processBlock);
await indexer.processBlock(blockProgress.blockHash, blockProgress.blockNumber);
// Check if block has unprocessed events.
if (blockProgress.numProcessedEvents < blockProgress.numEvents) {
while (!blockProgress.isComplete) {
console.time('time:index-block#fetching_events_batch');
// Fetch events in batches
const events = await indexer.getBlockEvents(
blockProgress.blockHash,
{
index: [
{ value: blockProgress.lastProcessedEventIndex + 1, operator: 'gte', not: false }
]
},
{
limit: jobQueueConfig.eventsInBatch || DEFAULT_EVENTS_IN_BATCH,
orderBy: 'index',
orderDirection: OrderDirection.asc
}
);
console.timeEnd('time:index-block#fetching_events_batch');
if (events.length) {
log(`Processing events batch from index ${events[0].index} to ${events[0].index + events.length - 1}`);
}
console.time('time:index-block#processEvents-processing_events_batch');
for (const event of events) {
// Process events in loop
await processEvent(indexer, blockProgress, event);
}
console.timeEnd('time:index-block#processEvents-processing_events_batch');
}
}
}
await db.close();
};
main().catch(err => {
log(err);
}).finally(() => {
process.exit(0);
});
/**
* Process individual event from database.
* @param indexer
* @param block
* @param event
*/
const processEvent = async (indexer: Indexer, block: BlockProgress, event: Event) => {
const eventIndex = event.index;
// Check if previous event in block has been processed exactly before this and abort if not.
if (eventIndex > 0) { // Skip the first event in the block.
const prevIndex = eventIndex - 1;
if (prevIndex !== block.lastProcessedEventIndex) {
throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash},` +
` prev event index ${prevIndex}, got event index ${event.index} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`);
}
}
let watchedContract;
if (!indexer.isWatchedContract) {
watchedContract = true;
} else {
watchedContract = await indexer.isWatchedContract(event.contract);
}
if (watchedContract) {
// We might not have parsed this event yet. This can happen if the contract was added
// as a result of a previous event in the same block.
if (event.eventName === UNKNOWN_EVENT_NAME) {
const logObj = JSON.parse(event.extraInfo);
assert(indexer.parseEventNameAndArgs);
assert(typeof watchedContract !== 'boolean');
const { eventName, eventInfo } = indexer.parseEventNameAndArgs(watchedContract.kind, logObj);
event.eventName = eventName;
event.eventInfo = JSON.stringify(eventInfo);
event = await indexer.saveEventEntity(event);
}
await indexer.processEvent(event);
}
block = await indexer.updateBlockProgress(block, event.index);
};

View File

@ -2,7 +2,6 @@
// Copyright 2021 Vulcanize, Inc.
//
import path from 'path';
import assert from 'assert';
import yargs from 'yargs';
import 'reflect-metadata';
@ -10,7 +9,6 @@ import debug from 'debug';
import util from 'util';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import { Database } from '../database';
import { Indexer } from '../indexer';
@ -43,11 +41,6 @@ const main = async (): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -57,11 +50,9 @@ const main = async (): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue);
await indexer.init();
graphWatcher.setIndexer(indexer);
const ipldBlock = await indexer.getIPLDBlockByCid(argv.cid);
assert(ipldBlock, 'IPLDBlock for the provided CID doesn\'t exist.');

View File

@ -2,13 +2,11 @@
// Copyright 2021 Vulcanize, Inc.
//
import path from 'path';
import debug from 'debug';
import { MoreThan } from 'typeorm';
import assert from 'assert';
import { getConfig, initClients, resetJobs, JobQueue } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import { Database } from '../../database';
import { Indexer } from '../../indexer';
@ -42,11 +40,6 @@ export const handler = async (argv: any): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -56,11 +49,9 @@ export const handler = async (argv: any): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue);
await indexer.init();
graphWatcher.setIndexer(indexer);
const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false);
assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`);
assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`);

View File

@ -2,14 +2,12 @@
// Copyright 2021 Vulcanize, Inc.
//
import path from 'path';
import yargs from 'yargs';
import 'reflect-metadata';
import debug from 'debug';
import assert from 'assert';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import { Database } from '../database';
import { Indexer } from '../indexer';
@ -59,11 +57,6 @@ const main = async (): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -73,11 +66,9 @@ const main = async (): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue);
await indexer.init();
graphWatcher.setIndexer(indexer);
await indexer.watchContract(argv.address, argv.kind, argv.checkpoint, argv.startingBlock);
await db.close();

View File

@ -2,7 +2,6 @@
// Copyright 2021 Vulcanize, Inc.
//
import path from 'path';
import assert from 'assert';
import 'reflect-metadata';
import yargs from 'yargs';
@ -11,7 +10,6 @@ import debug from 'debug';
import { PubSub } from 'apollo-server-express';
import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import { Database } from './database';
import { Indexer } from './indexer';
@ -58,11 +56,6 @@ export const main = async (): Promise<any> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -72,11 +65,9 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue);
await indexer.init();
graphWatcher.setIndexer(indexer);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();

View File

@ -9,6 +9,10 @@ import { utils } from 'ethers';
import { Indexer, KIND_PHISHERREGISTRY, ResultEvent } from './indexer';
const INVOKE_SIGNATURE = 'invoke(((((address,uint256,bytes),((address,bytes32,(address,bytes)[]),bytes)[])[],(uint256,uint256)),bytes)[])';
const CLAIM_IF_MEMBER_SIGNATURE = 'claimIfMember(string,bool)';
const CLAIM_IF_PHISHER_SIGNATURE = 'claimIfPhisher(string,bool)';
/**
* Hook function to store an initial state.
* @param indexer Indexer instance.
@ -76,34 +80,50 @@ export async function handleEvent (indexer: Indexer, eventData: ResultEvent): Pr
assert(indexer);
assert(eventData);
// Perform indexing based on the type of event.
switch (eventData.event.__typename) {
// In case of PhisherRegistry 'PhisherStatusUpdated' event.
case 'PhisherStatusUpdatedEvent': {
const txArgs = await getTxArgs(indexer, KIND_PHISHERREGISTRY, eventData.tx.hash);
// Perform indexing for PhisherStatusUpdated and MemberStatusUpdated.
if (['PhisherStatusUpdatedEvent', 'MemberStatusUpdatedEvent'].includes(eventData.event.__typename)) {
const txData = await indexer.getFullTransaction(eventData.tx.hash);
const tx = getTx(indexer, KIND_PHISHERREGISTRY, txData.input);
let txs = [tx];
// Update isPhisher entry for the identifier in database.
await indexer.isPhisher(eventData.block.hash, eventData.contract, txArgs.identifier, true);
if (tx.signature === INVOKE_SIGNATURE) {
// Parse transactions from batches if it is an invoke method in Delegatable contract.
txs = tx.args.signedInvocations
.reduce((txs: utils.TransactionDescription[], signedInvocation: any) => {
// Get transactions from signed invocations batch.
const batchTxs = signedInvocation.invocations.batch.map((invocation: any) => {
return getTx(indexer, KIND_PHISHERREGISTRY, invocation.transaction.data);
});
break;
txs.push(...batchTxs);
return txs;
}, []);
}
// In case of PhisherRegistry 'MemberStatusUpdated' event.
case 'MemberStatusUpdatedEvent': {
const txArgs = await getTxArgs(indexer, KIND_PHISHERREGISTRY, eventData.tx.hash);
// Update isPhisher entry for the identifier in database.
await indexer.isMember(eventData.block.hash, eventData.contract, txArgs.identifier, true);
// Filter transactions for claimIfMember and claimIsPhisher methods.
txs = txs.filter((tx: utils.TransactionDescription) => {
return [CLAIM_IF_MEMBER_SIGNATURE, CLAIM_IF_PHISHER_SIGNATURE].includes(tx.signature);
});
break;
for (const tx of txs) {
switch (tx.signature) {
case CLAIM_IF_MEMBER_SIGNATURE:
// Update isMember entry for the identifier in database.
await indexer.isMember(eventData.block.hash, eventData.contract, tx.args.identifier, true);
break;
case CLAIM_IF_PHISHER_SIGNATURE:
// Update isPhisher entry for the identifier in database.
await indexer.isPhisher(eventData.block.hash, eventData.contract, tx.args.identifier, true);
break;
}
}
}
}
// Get transaction arguments for specified txHash.
const getTxArgs = async (indexer: Indexer, contractKind: string, txHash: string): Promise<utils.Result> => {
const tx = await indexer.getFullTransaction(txHash);
const contractInterface = await indexer.getContractInterface(contractKind);
// Get transaction details from input data.
const getTx = (indexer: Indexer, contractKind: string, data: string): utils.TransactionDescription => {
const contractInterface = indexer.getContractInterface(contractKind);
assert(contractInterface);
const txDescription = contractInterface.parseTransaction({ data: tx.input });
return txDescription.args;
return contractInterface.parseTransaction({ data });
};

View File

@ -30,7 +30,6 @@ import {
IpldStatus as IpldStatusInterface,
getFullTransaction
} from '@vulcanize/util';
import { GraphWatcher } from '@vulcanize/graph-node';
import PhisherRegistryArtifacts from './artifacts/PhisherRegistry.json';
import { Database } from './database';
@ -95,7 +94,6 @@ export class Indexer implements IPLDIndexerInterface {
_ethProvider: BaseProvider
_baseIndexer: BaseIndexer
_serverConfig: ServerConfig
_graphWatcher: GraphWatcher;
_abiMap: Map<string, JsonFragment[]>
_storageLayoutMap: Map<string, StorageLayout>
@ -106,7 +104,7 @@ export class Indexer implements IPLDIndexerInterface {
_entityTypesMap: Map<string, { [key: string]: string }>
_relationsMap: Map<any, { [key: string]: any }>
constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, graphWatcher: GraphWatcher) {
constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue) {
assert(db);
assert(ethClient);
@ -116,7 +114,6 @@ export class Indexer implements IPLDIndexerInterface {
this._serverConfig = serverConfig;
this._ipfsClient = new IPFSClient(this._serverConfig.ipfsApiAddr);
this._baseIndexer = new BaseIndexer(this._serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue, this._ipfsClient);
this._graphWatcher = graphWatcher;
this._abiMap = new Map();
this._storageLayoutMap = new Map();
@ -514,20 +511,9 @@ export class Indexer implements IPLDIndexerInterface {
await this._baseIndexer.removeIPLDBlocks(blockNumber, kind);
}
async getSubgraphEntity<Entity> (entity: new () => Entity, id: string, block?: BlockHeight): Promise<any> {
const relations = this._relationsMap.get(entity) || {};
const data = await this._graphWatcher.getEntity(entity, id, relations, block);
return data;
}
async triggerIndexingOnEvent (event: Event): Promise<void> {
const resultEvent = this.getResultEvent(event);
// Call subgraph handler for event.
await this._graphWatcher.handleEvent(resultEvent);
// Call custom hook function for indexing on event.
await handleEvent(this, resultEvent);
}
@ -540,9 +526,6 @@ export class Indexer implements IPLDIndexerInterface {
async processBlock (blockHash: string, blockNumber: number): Promise<void> {
// Call a function to create initial state for contracts.
await this._baseIndexer.createInit(this, blockHash, blockNumber);
// Call subgraph handler for block.
await this._graphWatcher.handleBlock(blockHash);
}
parseEventNameAndArgs (kind: string, logObj: any): any {
@ -804,7 +787,7 @@ export class Indexer implements IPLDIndexerInterface {
}
// Get contract interface for specified contract kind.
async getContractInterface (kind: string): Promise<ethers.utils.Interface | undefined> {
getContractInterface (kind: string): ethers.utils.Interface | undefined {
return this._contractMap.get(kind);
}

View File

@ -2,7 +2,6 @@
// Copyright 2021 Vulcanize, Inc.
//
import path from 'path';
import assert from 'assert';
import 'reflect-metadata';
import yargs from 'yargs';
@ -24,7 +23,6 @@ import {
DEFAULT_CONFIG_PATH,
initClients
} from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import { Indexer } from './indexer';
import { Database } from './database';
@ -253,11 +251,6 @@ export const main = async (): Promise<any> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -267,11 +260,9 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue);
await indexer.init();
graphWatcher.setIndexer(indexer);
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
await jobRunner.start();
};

View File

@ -15,7 +15,6 @@ import 'graphql-import-node';
import { createServer } from 'http';
import { DEFAULT_CONFIG_PATH, getConfig, Config, JobQueue, KIND_ACTIVE, initClients } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import { createResolvers } from './resolvers';
import { Indexer } from './indexer';
@ -43,11 +42,6 @@ export const main = async (): Promise<any> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
@ -60,11 +54,9 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue);
await indexer.init();
graphWatcher.setIndexer(indexer);
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
if (watcherKind === KIND_ACTIVE) {

View File

@ -5,7 +5,6 @@
"license": "AGPL-3.0",
"devDependencies": {
"@ethersproject/abi": "^5.3.0",
"@ethersproject/contracts": "^5.3.0",
"@nomiclabs/hardhat-ethers": "^2.0.2",
"@nomiclabs/hardhat-waffle": "^2.0.1",
"@types/chai": "^4.2.18",
@ -21,8 +20,8 @@
"eslint-plugin-node": "^11.1.0",
"eslint-plugin-promise": "^5.1.0",
"eslint-plugin-standard": "^5.0.0",
"ethereum-waffle": "^3.3.0",
"ethers": "^5.2.0",
"ethereum-waffle": "^3.1.1",
"ethers": "^5.4.4",
"hardhat": "^2.3.0",
"typescript": "^4.3.2",
"lodash": "^4.17.21"

View File

@ -3,11 +3,10 @@
//
/* eslint-disable @typescript-eslint/no-explicit-any */
import { Contract } from '@ethersproject/contracts';
import { expect } from 'chai';
import '@nomiclabs/hardhat-ethers';
import { ethers } from 'hardhat';
import { ContractTransaction } from 'ethers';
import { ContractTransaction, Contract } from 'ethers';
import { EthClient } from '@vulcanize/ipld-eth-client';

View File

@ -4,7 +4,7 @@
/* eslint-disable no-unused-expressions */
/* eslint-disable @typescript-eslint/no-explicit-any */
import { ContractInterface } from '@ethersproject/contracts';
import { ContractInterface } from 'ethers';
import '@nomiclabs/hardhat-ethers';
import { artifacts, ethers } from 'hardhat';
import { CompilerOutput, CompilerOutputBytecode } from 'hardhat/types';

View File

@ -20,7 +20,7 @@
},
"homepage": "https://github.com/vulcanize/watcher-ts#readme",
"dependencies": {
"ethers": "^5.3.1",
"ethers": "^5.4.4",
"yargs": "^17.0.1"
},
"devDependencies": {

View File

@ -61,7 +61,7 @@
"eslint-plugin-node": "^11.1.0",
"eslint-plugin-promise": "^5.1.0",
"eslint-plugin-standard": "^5.0.0",
"ethers": "^5.2.0",
"ethers": "^5.4.4",
"get-graphql-schema": "^2.1.2",
"graphql-schema-linter": "^2.0.1",
"lodash": "^4.17.21",

View File

@ -46,7 +46,7 @@
"apollo-server-express": "^2.25.0",
"apollo-type-bigint": "^0.1.3",
"debug": "^4.3.1",
"ethers": "^5.2.0",
"ethers": "^5.4.4",
"express": "^4.17.1",
"graphql": "^15.5.0",
"graphql-import-node": "^0.0.4",

View File

@ -7,7 +7,7 @@
"@vulcanize/solidity-mapper": "^0.1.0",
"debug": "^4.3.1",
"decimal.js": "^10.3.1",
"ethers": "^5.2.0",
"ethers": "^5.4.4",
"fs-extra": "^10.0.0",
"lodash": "^4.17.21",
"multiformats": "^9.4.8",

View File

@ -55,19 +55,3 @@ export function decodeHeader (rlp : Uint8Array): any {
export function decodeData (hexLiteral: string): Uint8Array {
return Uint8Array.from(Buffer.from(hexLiteral.slice(2), 'hex'));
}
export function decodeTransaction (rlp : Uint8Array): any {
try {
const data = utils.RLP.decode(rlp);
return {
GasPrice: decodeInteger(data[1], BigInt(0)),
GasLimit: decodeInteger(data[2], BigInt(0)),
Amount: decodeInteger(data[4], BigInt(0)),
Data: data[5]
};
} catch (error: any) {
log(error);
return undefined;
}
}

View File

@ -154,12 +154,6 @@ export class EventWatcher {
if (syncStatus && syncStatus.latestIndexedBlockNumber > (syncStatus.latestCanonicalBlockNumber + MAX_REORG_DEPTH)) {
await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority);
}
// Publish block progress event if no events exist.
// Event for blocks with events will be pusblished from eventProcessingCompleteHandler.
if (blockProgress.numEvents === 0) {
await this.publishBlockProgressToSubscribers(blockProgress);
}
} else {
log(`block not indexed for ${blockHash} ${blockNumber}`);
}

View File

@ -229,10 +229,9 @@ export class JobRunner {
await this._indexer.processBlock(blockHash, blockNumber);
}
// Check if block has unprocessed events.
if (blockProgress.numProcessedEvents < blockProgress.numEvents) {
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: blockProgress.blockHash, publish: true });
}
// Push job to event processing queue.
// Block with all events processed or no events will not be processed again due to check in _processEvents.
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: blockProgress.blockHash, publish: true });
const indexBlockDuration = new Date().getTime() - indexBlockStartTime.getTime();
log(`time:job-runner#_indexBlock: ${indexBlockDuration}ms`);

View File

@ -221,17 +221,17 @@ export const getFullTransaction = async (ethClient: EthClient, txHash: string):
assert(fullTx.blockByMhKey);
// Decode the transaction data.
const extraData = EthDecoder.decodeTransaction(EthDecoder.decodeData(fullTx.blockByMhKey.data));
assert(extraData);
const txData = utils.parseTransaction(EthDecoder.decodeData(fullTx.blockByMhKey.data));
assert(txData);
return {
hash: txHash,
from: fullTx.src,
to: fullTx.dst,
index: fullTx.index,
value: extraData.Amount.toString(),
gasLimit: extraData.GasLimit.toString(),
gasPrice: extraData.GasPrice.toString(),
input: extraData.Data
value: txData.value.toString(),
gasLimit: txData.gasLimit.toString(),
gasPrice: txData.gasPrice?.toString(),
input: txData.data
};
};

1614
yarn.lock

File diff suppressed because it is too large Load Diff