Watch tx to trigger indexing. (#80)

This commit is contained in:
Ashwin Phatak 2021-06-21 18:55:13 +05:30 committed by GitHub
parent eea69fe4d4
commit 38a189d74a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 134 additions and 24 deletions

View File

@ -10,7 +10,7 @@
username = "postgres"
password = "postgres"
synchronize = true
logging = true
logging = false
entities = [ "src/entity/**/*.ts" ]
migrations = [ "src/migration/**/*.ts" ]
@ -24,7 +24,7 @@
[upstream]
gqlEndpoint = "http://127.0.0.1:8083/graphql"
gqlSubscriptionEndpoint = "http://127.0.0.1:5000/graphql"
traceProviderEndpoint = "http://127.0.0.1:9545"
traceProviderEndpoint = "http://127.0.0.1:8545"
[upstream.cache]
name = "requests"

View File

@ -63,7 +63,7 @@ export class Indexer {
return true;
}
async traceTx (txHash: string): Promise<any> {
async traceTxAndIndexAppearances (txHash: string): Promise<any> {
let entity = await this._db.getTrace(txHash);
if (entity) {
log('traceTx: db hit');
@ -100,8 +100,10 @@ export class Indexer {
const addresses = _.uniq(_.compact(_.flattenDeep(addressesIn(traceObj)))).sort();
trace.accounts = _.map(addresses, address => {
assert(address);
const account = new Account();
account.address = address || '';
account.address = ethers.utils.getAddress(address);
account.startingBlock = trace.blockNumber;
return account;

View File

@ -41,7 +41,7 @@ export const createResolvers = async (indexer: Indexer): Promise<any> => {
traceTx: async (_: any, { txHash }: { txHash: string }): Promise<any> => {
log('traceTx', txHash);
return indexer.traceTx(txHash);
return indexer.traceTxAndIndexAppearances(txHash);
}
}
};

View File

@ -18,6 +18,7 @@ import { createResolvers } from './resolvers';
import { Indexer } from './indexer';
import { Database } from './database';
import { getConfig } from './config';
import { TxWatcher } from './tx-watcher';
const log = debug('vulcanize:server');
@ -61,6 +62,9 @@ export const main = async (): Promise<any> => {
const pubsub = new PubSub();
const indexer = new Indexer(db, ethClient, pubsub, tracingClient);
const txWatcher = new TxWatcher(ethClient, indexer);
await txWatcher.start();
const resolvers = await createResolvers(indexer);
const app: Application = express();

View File

@ -0,0 +1,42 @@
import assert from 'assert';
import debug from 'debug';
import _ from 'lodash';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { Indexer } from './indexer';
const log = debug('vulcanize:tx-watcher');
export class TxWatcher {
_ethClient: EthClient
_indexer: Indexer
_subscription: ZenObservable.Subscription | undefined
constructor (ethClient: EthClient, indexer: Indexer) {
assert(ethClient);
assert(indexer);
this._ethClient = ethClient;
this._indexer = indexer;
}
async start (): Promise<void> {
assert(!this._subscription, 'subscription already started');
log('Started watching upstream tx...');
this._subscription = await this._ethClient.watchTransactions(async (value) => {
const { txHash, ethHeaderCidByHeaderId: { blockHash, blockNumber } } = _.get(value, 'data.listen.relatedNode');
log('watchTransaction', JSON.stringify({ txHash, blockHash, blockNumber }, null, 2));
await this._indexer.traceTxAndIndexAppearances(txHash);
});
}
async stop (): Promise<void> {
if (this._subscription) {
log('Stopped watching upstream tx');
this._subscription.unsubscribe();
}
}
}

View File

@ -125,6 +125,18 @@ export class EthClient {
});
}
async watchTransactions (onNext: (value: any) => void): Promise<ZenObservable.Subscription> {
const observable = await this._client.subscribe({
query: ethQueries.subscribeTransactions
});
return observable.subscribe({
next (data) {
onNext(data);
}
});
}
async _getCachedOrFetch (queryName: keyof typeof ethQueries, vars: Vars): Promise<any> {
const keyObj = {
queryName,

View File

@ -47,8 +47,25 @@ subscription SubscriptionReceipt {
}
`;
export const subscribeTransactions = gql`
subscription SubscriptionHeader {
listen(topic: "transaction_cids") {
relatedNode {
... on EthTransactionCid {
txHash
ethHeaderCidByHeaderId {
blockHash
blockNumber
}
}
}
}
}
`;
export default {
getStorageAt,
getLogs,
subscribeLogs
subscribeLogs,
subscribeTransactions
};

View File

@ -0,0 +1,7 @@
# Don't lint node_modules.
node_modules
# Don't lint build output.
dist
src/tracers

View File

@ -0,0 +1,27 @@
{
"env": {
"browser": true,
"es2021": true
},
"extends": [
"semistandard",
"plugin:@typescript-eslint/recommended"
],
"parser": "@typescript-eslint/parser",
"parserOptions": {
"ecmaVersion": 12,
"sourceType": "module"
},
"plugins": [
"@typescript-eslint"
],
"rules": {
"@typescript-eslint/no-explicit-any": "off",
"@typescript-eslint/explicit-module-boundary-types": [
"warn",
{
"allowArgumentsExplicitlyTypedAsAny": true
}
]
}
}

View File

@ -17,7 +17,7 @@ interface StructLog {
}
}).argv;
const { structLogs }: { structLogs: StructLog[] } = JSON.parse(fs.readFileSync(argv.traceFile).toString("utf-8"));
const { structLogs }: { structLogs: StructLog[] } = JSON.parse(fs.readFileSync(argv.traceFile).toString('utf-8'));
const addressMap: any = {};
@ -30,7 +30,7 @@ interface StructLog {
// Address are 40 bytes.
// Example: 000000000000000000000000ca6d29232d1435d8198e3e5302495417dd073d61
if (!maybeAddress.startsWith("000000000000000000000000")) {
if (!maybeAddress.startsWith('000000000000000000000000')) {
return;
}
@ -38,7 +38,7 @@ interface StructLog {
return;
}
maybeAddress = maybeAddress.substr("000000000000000000000000".length);
maybeAddress = maybeAddress.substr('000000000000000000000000'.length);
if (!ethers.utils.isAddress(maybeAddress)) {
return;

View File

@ -15,10 +15,10 @@ import { TracingClient } from '../tracing';
describe: 'ETH JSON-RPC provider URL'
},
block: {
type: 'string',
require: true,
demandOption: true,
describe: 'Block hash or number'
type: 'string',
require: true,
demandOption: true,
describe: 'Block hash or number'
},
txFile: {
type: 'string',
@ -32,7 +32,7 @@ import { TracingClient } from '../tracing';
},
tracerFile: {
type: 'string',
"describe": 'File with custom tracing JS code'
describe: 'File with custom tracing JS code'
}
}).argv;
@ -40,10 +40,10 @@ import { TracingClient } from '../tracing';
const tracerFile = argv.tracerFile;
if (tracerFile) {
tracer = fs.readFileSync(tracerFile).toString("utf-8");
tracer = fs.readFileSync(tracerFile).toString('utf-8');
}
const txData = JSON.parse(fs.readFileSync(argv.txFile).toString("utf-8"));
const txData = JSON.parse(fs.readFileSync(argv.txFile).toString('utf-8'));
const tracingClient = new TracingClient(argv.providerUrl);
const result = await tracingClient.getCallTrace(argv.block, txData, tracer);

View File

@ -30,7 +30,7 @@ import { TracingClient } from '../tracing';
},
timeout: {
type: 'string',
default: "10s",
default: '10s',
describe: 'Trace execution timeout'
}
}).argv;
@ -39,7 +39,7 @@ import { TracingClient } from '../tracing';
const tracerFile = argv.tracerFile;
if (tracerFile) {
tracer = fs.readFileSync(tracerFile).toString("utf-8");
tracer = fs.readFileSync(tracerFile).toString('utf-8');
}
const tracingClient = new TracingClient(argv.providerUrl);

View File

@ -3,14 +3,13 @@ import path from 'path';
import fs from 'fs';
import { ethers } from 'ethers';
const callTracerWithAddresses = fs.readFileSync(path.join(__dirname, 'tracers', 'call_address_tracer.js')).toString("utf-8");
const callTracerWithAddresses = fs.readFileSync(path.join(__dirname, 'tracers', 'call_address_tracer.js')).toString('utf-8');
export class TracingClient {
_providerUrl: string;
_provider: ethers.providers.JsonRpcProvider;
constructor(providerUrl: string) {
constructor (providerUrl: string) {
assert(providerUrl);
this._providerUrl = providerUrl;
@ -27,9 +26,9 @@ export class TracingClient {
}
return this._provider.send('debug_traceTransaction', [txHash, { tracer, timeout }]);
};
}
async getCallTrace (block: string, txData: any, tracer: string | undefined): Promise<any> {
return this._provider.send('debug_traceCall', [ txData, block, { tracer }]);
};
return this._provider.send('debug_traceCall', [txData, block, { tracer }]);
}
}